Skip to content

Commit

Permalink
Merge torrust#913: Minor UDP server refactorings
Browse files Browse the repository at this point in the history
f06976e docs: update some UDP server comments (Jose Celano)
89bb735 refactor: reorganize UDP server mod (Jose Celano)
c121bf2 refactor: rename UDP server types (Jose Celano)
336e0e6 refactor: reorganize mod to extract new submods (Jose Celano)
61fb4b2 refactor: move active requests logic to ActiveRequest type (Jose Celano)
35b6c84 refactor: simplify UDP server receiver (Jose Celano)
a5e2baf refactor: extract method (Jose Celano)
b4b4515 refactor: extract const for logging targets (Jose Celano)
0388e1d refactor: extract consts for logging targets (Jose Celano)
16ae4fd refactor: rename vars and extract constructor (Jose Celano)
7ff0cd2 refactor: rename var (Jose Celano)
0e3678d refactor: rename Socket to BoundSocket and fix format errors" (Jose Celano)
9b3b75b fix: log message (Jose Celano)

Pull request description:

  This PR only includes some minor changes I've proposed in this [PR](torrust#873), and some refactorings.

ACKs for top commit:
  josecelano:
    ACK f06976e

Tree-SHA512: a7a4ea14077c2ce6df0b80b222952d0c6c6588f1df50f78d01198ea0ab12ce3ca74923caed6601994e955880e2afe3d9432f87ae74f383362fae452e367ad359
  • Loading branch information
josecelano committed Jun 25, 2024
2 parents b7bcd96 + f06976e commit eb9f997
Show file tree
Hide file tree
Showing 29 changed files with 873 additions and 725 deletions.
9 changes: 5 additions & 4 deletions src/bootstrap/jobs/health_check_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use torrust_tracker_configuration::HealthCheckApi;
use tracing::info;

use super::Started;
use crate::servers::health_check_api::server;
use crate::servers::health_check_api::{server, HEALTH_CHECK_API_LOG_TARGET};
use crate::servers::logging::STARTED_ON;
use crate::servers::registar::ServiceRegistry;
use crate::servers::signals::Halted;

Expand All @@ -44,18 +45,18 @@ pub async fn start_job(config: &HealthCheckApi, register: ServiceRegistry) -> Jo

// Run the API server
let join_handle = tokio::spawn(async move {
info!(target: "HEALTH CHECK API", "Starting on: {protocol}://{}", bind_addr);
info!(target: HEALTH_CHECK_API_LOG_TARGET, "Starting on: {protocol}://{}", bind_addr);

let handle = server::start(bind_addr, tx_start, rx_halt, register);

if let Ok(()) = handle.await {
info!(target: "HEALTH CHECK API", "Stopped server running on: {protocol}://{}", bind_addr);
info!(target: HEALTH_CHECK_API_LOG_TARGET, "Stopped server running on: {protocol}://{}", bind_addr);
}
});

// Wait until the server sends the started message
match rx_start.await {
Ok(msg) => info!(target: "HEALTH CHECK API", "Started on: {protocol}://{}", msg.address),
Ok(msg) => info!(target: HEALTH_CHECK_API_LOG_TARGET, "{STARTED_ON}: {protocol}://{}", msg.address),
Err(e) => panic!("the Health Check API server was dropped: {e}"),
}

Expand Down
12 changes: 7 additions & 5 deletions src/bootstrap/jobs/udp_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use tracing::debug;

use crate::core;
use crate::servers::registar::ServiceRegistrationForm;
use crate::servers::udp::server::{Launcher, UdpServer};
use crate::servers::udp::server::spawner::Spawner;
use crate::servers::udp::server::Server;
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;

/// It starts a new UDP server with the provided configuration.
///
Expand All @@ -29,14 +31,14 @@ use crate::servers::udp::server::{Launcher, UdpServer};
pub async fn start_job(config: &UdpTracker, tracker: Arc<core::Tracker>, form: ServiceRegistrationForm) -> JoinHandle<()> {
let bind_to = config.bind_address;

let server = UdpServer::new(Launcher::new(bind_to))
let server = Server::new(Spawner::new(bind_to))
.start(tracker, form)
.await
.expect("it should be able to start the udp tracker");

tokio::spawn(async move {
debug!(target: "UDP TRACKER", "Wait for launcher (UDP service) to finish ...");
debug!(target: "UDP TRACKER", "Is halt channel closed before waiting?: {}", server.state.halt_task.is_closed());
debug!(target: UDP_TRACKER_LOG_TARGET, "Wait for launcher (UDP service) to finish ...");
debug!(target: UDP_TRACKER_LOG_TARGET, "Is halt channel closed before waiting?: {}", server.state.halt_task.is_closed());

assert!(
!server.state.halt_task.is_closed(),
Expand All @@ -49,6 +51,6 @@ pub async fn start_job(config: &UdpTracker, tracker: Arc<core::Tracker>, form: S
.await
.expect("it should be able to join to the udp tracker task");

debug!(target: "UDP TRACKER", "Is halt channel closed after finishing the server?: {}", server.state.halt_task.is_closed());
debug!(target: UDP_TRACKER_LOG_TARGET, "Is halt channel closed after finishing the server?: {}", server.state.halt_task.is_closed());
})
}
14 changes: 8 additions & 6 deletions src/console/ci/e2e/logs_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
use regex::Regex;
use serde::{Deserialize, Serialize};

use crate::servers::health_check_api::HEALTH_CHECK_API_LOG_TARGET;
use crate::servers::http::HTTP_TRACKER_LOG_TARGET;
use crate::servers::logging::STARTED_ON;
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;

const INFO_LOG_LEVEL: &str = "INFO";
const UDP_TRACKER_LOG_TARGET: &str = "UDP TRACKER";
const HTTP_TRACKER_LOG_TARGET: &str = "HTTP TRACKER";
const HEALTH_CHECK_API_LOG_TARGET: &str = "HEALTH CHECK API";

#[derive(Serialize, Deserialize, Debug, Default)]
pub struct RunningServices {
Expand Down Expand Up @@ -64,9 +66,9 @@ impl RunningServices {
let mut http_trackers: Vec<String> = Vec::new();
let mut health_checks: Vec<String> = Vec::new();

let udp_re = Regex::new(r"Started on: udp://([0-9.]+:[0-9]+)").unwrap();
let http_re = Regex::new(r"Started on: (https?://[0-9.]+:[0-9]+)").unwrap(); // DevSkim: ignore DS137138
let health_re = Regex::new(r"Started on: (https?://[0-9.]+:[0-9]+)").unwrap(); // DevSkim: ignore DS137138
let udp_re = Regex::new(&format!("{STARTED_ON}: {}", r"udp://([0-9.]+:[0-9]+)")).unwrap();
let http_re = Regex::new(&format!("{STARTED_ON}: {}", r"(https?://[0-9.]+:[0-9]+)")).unwrap(); // DevSkim: ignore DS137138
let health_re = Regex::new(&format!("{STARTED_ON}: {}", r"(https?://[0-9.]+:[0-9]+)")).unwrap(); // DevSkim: ignore DS137138
let ansi_escape_re = Regex::new(r"\x1b\[[0-9;]*m").unwrap();

for line in logs.lines() {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ pub mod bootstrap;
pub mod console;
pub mod core;
pub mod servers;
pub mod shared;
pub mod shared;

#[macro_use]
extern crate lazy_static;
Expand Down
6 changes: 4 additions & 2 deletions src/servers/apis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ pub mod routes;
pub mod server;
pub mod v1;

use serde::{Deserialize, Serialize};

pub const API_LOG_TARGET: &str = "API";

/// The info hash URL path parameter.
///
/// Some API endpoints require an info hash as a path parameter.
Expand All @@ -169,8 +173,6 @@ pub mod v1;
#[derive(Deserialize)]
pub struct InfoHashParam(pub String);

use serde::{Deserialize, Serialize};

/// The version of the HTTP Api.
#[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Eq, Debug)]
pub enum Version {
Expand Down
5 changes: 3 additions & 2 deletions src/servers/apis/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use super::v1;
use super::v1::context::health_check::handlers::health_check_handler;
use super::v1::middlewares::auth::State;
use crate::core::Tracker;
use crate::servers::apis::API_LOG_TARGET;

const TIMEOUT: Duration = Duration::from_secs(5);

Expand Down Expand Up @@ -60,7 +61,7 @@ pub fn router(tracker: Arc<Tracker>, access_tokens: Arc<AccessTokens>) -> Router
.unwrap_or_default();

tracing::span!(
target: "API",
target: API_LOG_TARGET,
tracing::Level::INFO, "request", method = %method, uri = %uri, request_id = %request_id);
})
.on_response(|response: &Response, latency: Duration, _span: &Span| {
Expand All @@ -73,7 +74,7 @@ pub fn router(tracker: Arc<Tracker>, access_tokens: Arc<AccessTokens>) -> Router
let latency_ms = latency.as_millis();

tracing::span!(
target: "API",
target: API_LOG_TARGET,
tracing::Level::INFO, "response", latency = %latency_ms, status = %status_code, request_id = %request_id);
}),
)
Expand Down
10 changes: 6 additions & 4 deletions src/servers/apis/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ use tracing::{debug, error, info};
use super::routes::router;
use crate::bootstrap::jobs::Started;
use crate::core::Tracker;
use crate::servers::apis::API_LOG_TARGET;
use crate::servers::custom_axum_server::{self, TimeoutAcceptor};
use crate::servers::logging::STARTED_ON;
use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm};
use crate::servers::signals::{graceful_shutdown, Halted};

Expand Down Expand Up @@ -121,11 +123,11 @@ impl ApiServer<Stopped> {
let launcher = self.state.launcher;

let task = tokio::spawn(async move {
debug!(target: "API", "Starting with launcher in spawned task ...");
debug!(target: API_LOG_TARGET, "Starting with launcher in spawned task ...");

let _task = launcher.start(tracker, access_tokens, tx_start, rx_halt).await;

debug!(target: "API", "Started with launcher in spawned task");
debug!(target: API_LOG_TARGET, "Started with launcher in spawned task");

launcher
});
Expand Down Expand Up @@ -231,7 +233,7 @@ impl Launcher {
let tls = self.tls.clone();
let protocol = if tls.is_some() { "https" } else { "http" };

info!(target: "API", "Starting on {protocol}://{}", address);
info!(target: API_LOG_TARGET, "Starting on {protocol}://{}", address);

let running = Box::pin(async {
match tls {
Expand All @@ -250,7 +252,7 @@ impl Launcher {
}
});

info!(target: "API", "Started on {protocol}://{}", address);
info!(target: API_LOG_TARGET, "{STARTED_ON} {protocol}://{}", address);

tx_start
.send(Started { address })
Expand Down
2 changes: 2 additions & 0 deletions src/servers/health_check_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ pub mod handlers;
pub mod resources;
pub mod responses;
pub mod server;

pub const HEALTH_CHECK_API_LOG_TARGET: &str = "HEALTH CHECK API";
7 changes: 4 additions & 3 deletions src/servers/health_check_api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tracing::{debug, Level, Span};

use crate::bootstrap::jobs::Started;
use crate::servers::health_check_api::handlers::health_check_handler;
use crate::servers::health_check_api::HEALTH_CHECK_API_LOG_TARGET;
use crate::servers::registar::ServiceRegistry;
use crate::servers::signals::{graceful_shutdown, Halted};

Expand Down Expand Up @@ -56,7 +57,7 @@ pub fn start(
.unwrap_or_default();

tracing::span!(
target: "HEALTH CHECK API",
target: HEALTH_CHECK_API_LOG_TARGET,
tracing::Level::INFO, "request", method = %method, uri = %uri, request_id = %request_id);
})
.on_response(|response: &Response, latency: Duration, _span: &Span| {
Expand All @@ -69,7 +70,7 @@ pub fn start(
let latency_ms = latency.as_millis();

tracing::span!(
target: "HEALTH CHECK API",
target: HEALTH_CHECK_API_LOG_TARGET,
tracing::Level::INFO, "response", latency = %latency_ms, status = %status_code, request_id = %request_id);
}),
)
Expand All @@ -80,7 +81,7 @@ pub fn start(

let handle = Handle::new();

debug!(target: "HEALTH CHECK API", "Starting service with graceful shutdown in a spawned task ...");
debug!(target: HEALTH_CHECK_API_LOG_TARGET, "Starting service with graceful shutdown in a spawned task ...");

tokio::task::spawn(graceful_shutdown(
handle.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/servers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ pub mod percent_encoding;
pub mod server;
pub mod v1;

pub const HTTP_TRACKER_LOG_TARGET: &str = "HTTP TRACKER";

/// The version of the HTTP tracker.
#[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Eq, Debug)]
pub enum Version {
Expand Down
6 changes: 4 additions & 2 deletions src/servers/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use super::v1::routes::router;
use crate::bootstrap::jobs::Started;
use crate::core::Tracker;
use crate::servers::custom_axum_server::{self, TimeoutAcceptor};
use crate::servers::http::HTTP_TRACKER_LOG_TARGET;
use crate::servers::logging::STARTED_ON;
use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm};
use crate::servers::signals::{graceful_shutdown, Halted};

Expand Down Expand Up @@ -55,7 +57,7 @@ impl Launcher {
let tls = self.tls.clone();
let protocol = if tls.is_some() { "https" } else { "http" };

info!(target: "HTTP TRACKER", "Starting on: {protocol}://{}", address);
info!(target: HTTP_TRACKER_LOG_TARGET, "Starting on: {protocol}://{}", address);

let app = router(tracker, address);

Expand All @@ -76,7 +78,7 @@ impl Launcher {
}
});

info!(target: "HTTP TRACKER", "Started on: {protocol}://{}", address);
info!(target: HTTP_TRACKER_LOG_TARGET, "{STARTED_ON}: {protocol}://{}", address);

tx_start
.send(Started { address })
Expand Down
5 changes: 3 additions & 2 deletions src/servers/http/v1/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tracing::{Level, Span};

use super::handlers::{announce, health_check, scrape};
use crate::core::Tracker;
use crate::servers::http::HTTP_TRACKER_LOG_TARGET;

const TIMEOUT: Duration = Duration::from_secs(5);

Expand Down Expand Up @@ -56,7 +57,7 @@ pub fn router(tracker: Arc<Tracker>, server_socket_addr: SocketAddr) -> Router {
.unwrap_or_default();

tracing::span!(
target:"HTTP TRACKER",
target: HTTP_TRACKER_LOG_TARGET,
tracing::Level::INFO, "request", server_socket_addr= %server_socket_addr, method = %method, uri = %uri, request_id = %request_id);
})
.on_response(move |response: &Response, latency: Duration, _span: &Span| {
Expand All @@ -69,7 +70,7 @@ pub fn router(tracker: Arc<Tracker>, server_socket_addr: SocketAddr) -> Router {
let latency_ms = latency.as_millis();

tracing::span!(
target: "HTTP TRACKER",
target: HTTP_TRACKER_LOG_TARGET,
tracing::Level::INFO, "response", server_socket_addr= %server_socket_addr, latency = %latency_ms, status = %status_code, request_id = %request_id);
}),
)
Expand Down
29 changes: 29 additions & 0 deletions src/servers/logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/// This is the prefix used in logs to identify a started service.
///
/// For example:
///
/// ```text
/// 2024-06-25T12:36:25.025312Z INFO UDP TRACKER: Started on: udp://0.0.0.0:6969
/// 2024-06-25T12:36:25.025445Z INFO HTTP TRACKER: Started on: http://0.0.0.0:7070
/// 2024-06-25T12:36:25.025527Z INFO API: Started on http://0.0.0.0:1212
/// 2024-06-25T12:36:25.025580Z INFO HEALTH CHECK API: Started on: http://127.0.0.1:1313
/// ```
pub const STARTED_ON: &str = "Started on";

/*
todo: we should use a field fot the URL.
For example, instead of:
```
2024-06-25T12:36:25.025312Z INFO UDP TRACKER: Started on: udp://0.0.0.0:6969
```
We should use something like:
```
2024-06-25T12:36:25.025312Z INFO UDP TRACKER started_at_url=udp://0.0.0.0:6969
```
*/
1 change: 1 addition & 0 deletions src/servers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod apis;
pub mod custom_axum_server;
pub mod health_check_api;
pub mod http;
pub mod logging;
pub mod registar;
pub mod signals;
pub mod udp;
6 changes: 3 additions & 3 deletions src/servers/udp/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use uuid::Uuid;
use zerocopy::network_endian::I32;

use super::connection_cookie::{check, from_connection_id, into_connection_id, make};
use super::UdpRequest;
use super::RawRequest;
use crate::core::{statistics, ScrapeData, Tracker};
use crate::servers::udp::error::Error;
use crate::servers::udp::logging::{log_bad_request, log_error_response, log_request, log_response};
Expand All @@ -33,7 +33,7 @@ use crate::shared::bit_torrent::common::MAX_SCRAPE_TORRENTS;
/// - Delegating the request to the correct handler depending on the request type.
///
/// It will return an `Error` response if the request is invalid.
pub(crate) async fn handle_packet(udp_request: UdpRequest, tracker: &Arc<Tracker>, addr: SocketAddr) -> Response {
pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc<Tracker>, addr: SocketAddr) -> Response {
debug!("Handling Packets: {udp_request:?}");

let start_time = Instant::now();
Expand Down Expand Up @@ -304,7 +304,7 @@ fn handle_error(e: &Error, transaction_id: TransactionId) -> Response {
pub struct RequestId(Uuid);

impl RequestId {
fn make(_request: &UdpRequest) -> RequestId {
fn make(_request: &RawRequest) -> RequestId {
RequestId(Uuid::new_v4())
}
}
Expand Down
Loading

0 comments on commit eb9f997

Please sign in to comment.