Skip to content

Commit

Permalink
dev: tracker client error enums
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Jun 29, 2024
1 parent ce971ed commit dc52103
Show file tree
Hide file tree
Showing 20 changed files with 769 additions and 549 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ tower-http = { version = "0", features = ["compression-full", "cors", "propagate
trace = "0"
tracing = "0"
tracing-subscriber = { version = "0.3.18", features = ["json"] }
url = "2"
url = {version = "2", features = ["serde"] }
uuid = { version = "1", features = ["v4"] }
zerocopy = "0.7.33"

Expand Down
5 changes: 5 additions & 0 deletions packages/configuration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod v1;
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use std::time::Duration;

use camino::Utf8PathBuf;
use derive_more::Constructor;
Expand All @@ -20,6 +21,10 @@ use torrust_tracker_located_error::{DynError, LocatedError};
/// The maximum number of returned peers for a torrent.
pub const TORRENT_PEERS_LIMIT: usize = 74;

/// Default timeout for sending and receiving packets. And waiting for sockets
/// to be readable and writable.
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);

// Environment variables

/// The whole `tracker.toml` file content. It has priority over the config file.
Expand Down
2 changes: 1 addition & 1 deletion src/console/clients/checker/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub async fn run() -> Result<Vec<CheckResult>> {
console: console_printer,
};

Ok(service.run_checks().await)
service.run_checks().await.context("it should run the check tasks")
}

fn tracing_stdout_init(filter: LevelFilter) {
Expand Down
100 changes: 64 additions & 36 deletions src/console/clients/checker/checks/health.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,77 @@
use std::sync::Arc;
use std::time::Duration;

use reqwest::{Client as HttpClient, Url, Url as ServiceUrl};
use anyhow::Result;
use hyper::StatusCode;
use reqwest::{Client as HttpClient, Response};
use serde::Serialize;
use thiserror::Error;
use url::Url;

use super::structs::{CheckerOutput, Status};
use crate::console::clients::checker::service::{CheckError, CheckResult};
#[derive(Debug, Clone, Error, Serialize)]
#[serde(into = "String")]
pub enum Error {
#[error("Failed to Build a Http Client: {err:?}")]
ClientBuildingError { err: Arc<reqwest::Error> },
#[error("Heath check failed to get a response: {err:?}")]
ResponseError { err: Arc<reqwest::Error> },
#[error("Http check returned a non-success code: \"{code}\" with the response: \"{response:?}\"")]
UnsuccessfulResponse { code: StatusCode, response: Arc<Response> },
}

impl From<Error> for String {
fn from(value: Error) -> Self {
value.to_string()
}
}

#[derive(Debug, Clone, Serialize)]
pub struct Checks {
url: Url,
result: Result<String, Error>,
}

#[allow(clippy::missing_panics_doc)]
pub async fn run(health_checks: &Vec<ServiceUrl>, check_results: &mut Vec<CheckResult>) -> Vec<CheckerOutput> {
let mut health_checkers: Vec<CheckerOutput> = Vec::new();
pub async fn run(health_checks: Vec<Url>, timeout: Duration) -> Vec<Result<Checks, Checks>> {
let mut results = Vec::default();

for health_check_url in health_checks {
let mut health_checker = CheckerOutput {
url: health_check_url.to_string(),
status: Status {
code: String::new(),
message: String::new(),
},
tracing::debug!("Health checks ...");

for url in health_checks {
let result = match run_health_check(url.clone(), timeout).await {
Ok(response) => Ok(response.status().to_string()),
Err(err) => Err(err),
};
match run_health_check(health_check_url.clone()).await {
Ok(()) => {
check_results.push(Ok(()));
health_checker.status.code = "ok".to_string();
}
Err(err) => {
check_results.push(Err(err));
health_checker.status.code = "error".to_string();
health_checker.status.message = "Health API is failing.".to_string();
}

let check = Checks { url, result };

if check.result.is_err() {
results.push(Err(check));
} else {
results.push(Ok(check));
}
health_checkers.push(health_checker);
}
health_checkers

results
}

async fn run_health_check(url: Url) -> Result<(), CheckError> {
let client = HttpClient::builder().timeout(Duration::from_secs(5)).build().unwrap();
async fn run_health_check(url: Url, timeout: Duration) -> Result<Response, Error> {
let client = HttpClient::builder()
.timeout(timeout)
.build()
.map_err(|e| Error::ClientBuildingError { err: e.into() })?;

match client.get(url.clone()).send().await {
Ok(response) => {
if response.status().is_success() {
Ok(())
} else {
Err(CheckError::HealthCheckError { url })
}
}
Err(_) => Err(CheckError::HealthCheckError { url }),
let response = client
.get(url.clone())
.send()
.await
.map_err(|e| Error::ResponseError { err: e.into() })?;

if response.status().is_success() {
Ok(response)
} else {
Err(Error::UnsuccessfulResponse {
code: response.status(),
response: response.into(),
})
}
}
158 changes: 74 additions & 84 deletions src/console/clients/checker/checks/http.rs
Original file line number Diff line number Diff line change
@@ -1,97 +1,92 @@
use std::str::FromStr;
use std::str::FromStr as _;
use std::time::Duration;

use reqwest::Url as ServiceUrl;
use serde::Serialize;
use torrust_tracker_primitives::info_hash::InfoHash;
use tracing::debug;
use url::Url;

use super::structs::{CheckerOutput, Status};
use crate::console::clients::checker::service::{CheckError, CheckResult};
use crate::shared::bit_torrent::tracker::http::client::requests::announce::QueryBuilder;
use crate::console::clients::http::Error;
use crate::shared::bit_torrent::tracker::http::client::responses::announce::Announce;
use crate::shared::bit_torrent::tracker::http::client::responses::scrape;
use crate::shared::bit_torrent::tracker::http::client::{requests, Client};

#[allow(clippy::missing_panics_doc)]
pub async fn run(http_trackers: &Vec<ServiceUrl>, check_results: &mut Vec<CheckResult>) -> Vec<CheckerOutput> {
let mut http_checkers: Vec<CheckerOutput> = Vec::new();

for http_tracker in http_trackers {
let mut http_checker = CheckerOutput {
url: http_tracker.to_string(),
status: Status {
code: String::new(),
message: String::new(),
},
#[derive(Debug, Clone, Serialize)]
pub struct Checks {
url: Url,
results: Vec<(Check, Result<(), Error>)>,
}

#[derive(Debug, Clone, Serialize)]
pub enum Check {
Announce,
Scrape,
}

pub async fn run(http_trackers: Vec<Url>, timeout: Duration) -> Vec<Result<Checks, Checks>> {
let mut results = Vec::default();

tracing::debug!("HTTP trackers ...");

for ref url in http_trackers {
let mut checks = Checks {
url: url.clone(),
results: Vec::default(),
};

match check_http_announce(http_tracker).await {
Ok(()) => {
check_results.push(Ok(()));
http_checker.status.code = "ok".to_string();
}
Err(err) => {
check_results.push(Err(err));
http_checker.status.code = "error".to_string();
http_checker.status.message = "Announce is failing.".to_string();
}
// Announce
{
let check = check_http_announce(url, timeout).await.map(|_| ());

checks.results.push((Check::Announce, check));
}

match check_http_scrape(http_tracker).await {
Ok(()) => {
check_results.push(Ok(()));
http_checker.status.code = "ok".to_string();
}
Err(err) => {
check_results.push(Err(err));
http_checker.status.code = "error".to_string();
http_checker.status.message = "Scrape is failing.".to_string();
}
// Scrape
{
let check = check_http_scrape(url, timeout).await.map(|_| ());

checks.results.push((Check::Scrape, check));
}

if checks.results.iter().any(|f| f.1.is_err()) {
results.push(Err(checks));
} else {
results.push(Ok(checks));
}
http_checkers.push(http_checker);
}
http_checkers

results
}

async fn check_http_announce(tracker_url: &Url) -> Result<(), CheckError> {
async fn check_http_announce(url: &Url, timeout: Duration) -> Result<Announce, Error> {
let info_hash_str = "9c38422213e30bff212b30c360d26f9a02136422".to_string(); // # DevSkim: ignore DS173237
let info_hash = InfoHash::from_str(&info_hash_str).expect("a valid info-hash is required");

// todo: HTTP request could panic.For example, if the server is not accessible.
// We should change the client to catch that error and return a `CheckError`.
// Otherwise the checking process will stop. The idea is to process all checks
// and return a final report.
let Ok(client) = Client::new(tracker_url.clone()) else {
return Err(CheckError::HttpError {
url: (tracker_url.to_owned()),
});
};
let Ok(response) = client
.announce(&QueryBuilder::with_default_values().with_info_hash(&info_hash).query())
let client = Client::new(url.clone(), timeout).map_err(|err| Error::HttpClientError { err })?;

let response = client
.announce(
&requests::announce::QueryBuilder::with_default_values()
.with_info_hash(&info_hash)
.query(),
)
.await
else {
return Err(CheckError::HttpError {
url: (tracker_url.to_owned()),
});
};

if let Ok(body) = response.bytes().await {
if let Ok(_announce_response) = serde_bencode::from_bytes::<Announce>(&body) {
Ok(())
} else {
debug!("announce body {:#?}", body);
Err(CheckError::HttpError {
url: tracker_url.clone(),
})
}
} else {
Err(CheckError::HttpError {
url: tracker_url.clone(),
})
}
.map_err(|err| Error::HttpClientError { err })?;

let response = response.bytes().await.map_err(|e| Error::ResponseError { err: e.into() })?;

let response = serde_bencode::from_bytes::<Announce>(&response).map_err(|e| Error::ParseBencodeError {
data: response,
err: e.into(),
})?;

Ok(response)
}

async fn check_http_scrape(url: &Url) -> Result<(), CheckError> {
async fn check_http_scrape(url: &Url, timeout: Duration) -> Result<scrape::Response, Error> {
let info_hashes: Vec<String> = vec!["9c38422213e30bff212b30c360d26f9a02136422".to_string()]; // # DevSkim: ignore DS173237
let query = requests::scrape::Query::try_from(info_hashes).expect("a valid array of info-hashes is required");

Expand All @@ -100,21 +95,16 @@ async fn check_http_scrape(url: &Url) -> Result<(), CheckError> {
// Otherwise the checking process will stop. The idea is to process all checks
// and return a final report.

let Ok(client) = Client::new(url.clone()) else {
return Err(CheckError::HttpError { url: (url.to_owned()) });
};
let Ok(response) = client.scrape(&query).await else {
return Err(CheckError::HttpError { url: (url.to_owned()) });
};
let client = Client::new(url.clone(), timeout).map_err(|err| Error::HttpClientError { err })?;

if let Ok(body) = response.bytes().await {
if let Ok(_scrape_response) = scrape::Response::try_from_bencoded(&body) {
Ok(())
} else {
debug!("scrape body {:#?}", body);
Err(CheckError::HttpError { url: url.clone() })
}
} else {
Err(CheckError::HttpError { url: url.clone() })
}
let response = client.scrape(&query).await.map_err(|err| Error::HttpClientError { err })?;

let response = response.bytes().await.map_err(|e| Error::ResponseError { err: e.into() })?;

let response = scrape::Response::try_from_bencoded(&response).map_err(|e| Error::BencodeParseError {
data: response,
err: e.into(),
})?;

Ok(response)
}
Loading

0 comments on commit dc52103

Please sign in to comment.