diff --git a/clash_lib/src/app/dispatcher/dispatcher.rs b/clash_lib/src/app/dispatcher/dispatcher.rs index 3657a181..f09b5129 100644 --- a/clash_lib/src/app/dispatcher/dispatcher.rs +++ b/clash_lib/src/app/dispatcher/dispatcher.rs @@ -11,6 +11,7 @@ use crate::proxy::AnyInboundDatagram; use crate::session::Session; use futures::SinkExt; use futures::StreamExt; + use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::net::SocketAddr; @@ -77,7 +78,7 @@ impl Dispatcher { #[instrument(skip(lhs))] pub async fn dispatch_stream(&self, sess: Session, mut lhs: S) where - S: AsyncRead + AsyncWrite + Unpin + Send, + S: AsyncRead + AsyncWrite + Unpin + Send +'static, { let sess = if self.resolver.fake_ip_enabled() { match sess.destination { @@ -133,11 +134,11 @@ impl Dispatcher { { Ok(rhs) => { debug!("remote connection established {}", sess); - let mut rhs = + let rhs = TrackedStream::new(rhs, self.manager.clone(), sess.clone(), rule).await; match copy_buf_bidirectional_with_timeout( - &mut lhs, - &mut rhs, + lhs, + rhs, 4096, Duration::from_secs(10), Duration::from_secs(10), diff --git a/clash_lib/src/common/io.rs b/clash_lib/src/common/io.rs index b9e5fa59..185196ee 100644 --- a/clash_lib/src/common/io.rs +++ b/clash_lib/src/common/io.rs @@ -1,291 +1,510 @@ /// copy of https://github.com/eycorsican/leaf/blob/a77a1e497ae034f3a2a89c8628d5e7ebb2af47f0/leaf/src/common/io.rs -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; +// use std::future::Future; +// use std::io; +// use std::pin::Pin; +// use std::task::{Context, Poll}; use std::time::Duration; -use futures::ready; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +// use futures::ready; +use futures::TryFutureExt; +use std::io::Error; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::task::JoinHandle; -#[derive(Debug)] -pub struct CopyBuffer { - read_done: bool, - need_flush: bool, - pos: usize, - cap: usize, - amt: u64, - buf: Box<[u8]>, -} +// #[derive(Debug)] +// pub struct CopyBuffer { +// read_done: bool, +// need_flush: bool, +// pos: usize, +// cap: usize, +// amt: u64, +// buf: Box<[u8]>, +// } -impl CopyBuffer { - #[allow(unused)] - pub fn new() -> Self { - Self { - read_done: false, - need_flush: false, - pos: 0, - cap: 0, - amt: 0, - buf: vec![0; 2 * 1024].into_boxed_slice(), - } - } +// impl CopyBuffer { +// #[allow(unused)] +// pub fn new() -> Self { +// Self { +// read_done: false, +// need_flush: false, +// pos: 0, +// cap: 0, +// amt: 0, +// buf: vec![0; 2 * 1024].into_boxed_slice(), +// } +// } - pub fn new_with_capacity(size: usize) -> Result { - let mut buf = Vec::new(); - buf.try_reserve(size).map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("new buffer failed: {}", e), - ) - })?; - buf.resize(size, 0); - Ok(Self { - read_done: false, - need_flush: false, - pos: 0, - cap: 0, - amt: 0, - buf: buf.into_boxed_slice(), - }) - } +// pub fn new_with_capacity(size: usize) -> Result { +// let mut buf = Vec::new(); +// buf.try_reserve(size).map_err(|e| { +// std::io::Error::new( +// std::io::ErrorKind::Other, +// format!("new buffer failed: {}", e), +// ) +// })?; +// buf.resize(size, 0); +// Ok(Self { +// read_done: false, +// need_flush: false, +// pos: 0, +// cap: 0, +// amt: 0, +// buf: buf.into_boxed_slice(), +// }) +// } - pub fn amount_transfered(&self) -> u64 { - self.amt - } +// pub fn amount_transfered(&self) -> u64 { +// self.amt +// } - pub fn poll_copy( - &mut self, - cx: &mut Context<'_>, - mut reader: Pin<&mut R>, - mut writer: Pin<&mut W>, - ) -> Poll> - where - R: AsyncRead + ?Sized, - W: AsyncWrite + ?Sized, - { - loop { - // If our buffer is empty, then we need to read some data to - // continue. - if self.pos == self.cap && !self.read_done { - let me = &mut *self; - let mut buf = ReadBuf::new(&mut me.buf); +// pub fn poll_copy( +// &mut self, +// cx: &mut Context<'_>, +// mut reader: Pin<&mut R>, +// mut writer: Pin<&mut W>, +// ) -> Poll> +// where +// R: AsyncRead + ?Sized, +// W: AsyncWrite + ?Sized, +// { +// loop { +// // If our buffer is empty, then we need to read some data to +// // continue. +// if self.pos == self.cap && !self.read_done { +// let me = &mut *self; +// let mut buf = ReadBuf::new(&mut me.buf); - match reader.as_mut().poll_read(cx, &mut buf) { - Poll::Ready(Ok(_)) => (), - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => { - // Try flushing when the reader has no progress to avoid deadlock - // when the reader depends on buffered writer. - if self.need_flush { - ready!(writer.as_mut().poll_flush(cx))?; - self.need_flush = false; - } +// match reader.as_mut().poll_read(cx, &mut buf) { +// Poll::Ready(Ok(_)) => (), +// Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), +// Poll::Pending => { +// // Try flushing when the reader has no progress to avoid deadlock +// // when the reader depends on buffered writer. +// if self.need_flush { +// ready!(writer.as_mut().poll_flush(cx))?; +// self.need_flush = false; +// } - return Poll::Pending; - } - } +// return Poll::Pending; +// } +// } - let n = buf.filled().len(); - if n == 0 { - self.read_done = true; - } else { - self.pos = 0; - self.cap = n; - } - } +// let n = buf.filled().len(); +// if n == 0 { +// self.read_done = true; +// } else { +// self.pos = 0; +// self.cap = n; +// } +// } - // If our buffer has some data, let's write it out! - while self.pos < self.cap { - let me = &mut *self; - let i = ready!(writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]))?; - if i == 0 { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::WriteZero, - "write zero byte into writer", - ))); - } else { - self.pos += i; - self.amt += i as u64; - self.need_flush = true; - } - } +// // If our buffer has some data, let's write it out! +// while self.pos < self.cap { +// let me = &mut *self; +// let i = ready!(writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]))?; +// if i == 0 { +// return Poll::Ready(Err(io::Error::new( +// io::ErrorKind::WriteZero, +// "write zero byte into writer", +// ))); +// } else { +// self.pos += i; +// self.amt += i as u64; +// self.need_flush = true; +// } +// } - // If pos larger than cap, this loop will never stop. - // In particular, user's wrong poll_write implementation returning - // incorrect written length may lead to thread blocking. - debug_assert!( - self.pos <= self.cap, - "writer returned length larger than input slice" - ); +// // If pos larger than cap, this loop will never stop. +// // In particular, user's wrong poll_write implementation returning +// // incorrect written length may lead to thread blocking. +// debug_assert!( +// self.pos <= self.cap, +// "writer returned length larger than input slice" +// ); - // If we've written all the data and we've seen EOF, flush out the - // data and finish the transfer. - if self.pos == self.cap && self.read_done { - ready!(writer.as_mut().poll_flush(cx))?; - return Poll::Ready(Ok(self.amt)); - } - } - } -} +// // If we've written all the data and we've seen EOF, flush out the +// // data and finish the transfer. +// if self.pos == self.cap && self.read_done { +// ready!(writer.as_mut().poll_flush(cx))?; +// return Poll::Ready(Ok(self.amt)); +// } +// } +// } +// } + +// enum TransferState { +// Running(CopyBuffer), +// ShuttingDown(u64), +// Done, +// } + +// struct CopyBidirectional<'a, A: ?Sized, B: ?Sized> { +// a: &'a mut A, +// b: &'a mut B, +// a_to_b: TransferState, +// b_to_a: TransferState, +// a_to_b_count: u64, +// b_to_a_count: u64, +// a_to_b_delay: Option>>, +// b_to_a_delay: Option>>, +// a_to_b_timeout_duration: Duration, +// b_to_a_timeout_duration: Duration, +// } + +// impl<'a, A, B> Future for CopyBidirectional<'a, A, B> +// where +// A: AsyncRead + AsyncWrite + Unpin + ?Sized, +// B: AsyncRead + AsyncWrite + Unpin + ?Sized, +// { +// type Output = io::Result<(u64, u64)>; + +// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +// // Unpack self into mut refs to each field to avoid borrow check issues. +// let CopyBidirectional { +// a, +// b, +// a_to_b, +// b_to_a, +// a_to_b_count, +// b_to_a_count, +// a_to_b_delay, +// b_to_a_delay, +// a_to_b_timeout_duration, +// b_to_a_timeout_duration, +// } = &mut *self; + +// let mut a = Pin::new(a); +// let mut b = Pin::new(b); + +// loop { +// match a_to_b { +// TransferState::Running(buf) => { +// let res = buf.poll_copy(cx, a.as_mut(), b.as_mut()); +// match res { +// Poll::Ready(Ok(count)) => { +// *a_to_b = TransferState::ShuttingDown(count); +// continue; +// } +// Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), +// Poll::Pending => { +// if let Some(delay) = a_to_b_delay { +// match delay.as_mut().poll(cx) { +// Poll::Ready(()) => { +// *a_to_b = +// TransferState::ShuttingDown(buf.amount_transfered()); +// continue; +// } +// Poll::Pending => (), +// } +// } +// } +// } +// } +// TransferState::ShuttingDown(count) => { +// let res = b.as_mut().poll_shutdown(cx); +// match res { +// Poll::Ready(Ok(())) => { +// *a_to_b_count += *count; +// *a_to_b = TransferState::Done; +// b_to_a_delay +// .replace(Box::pin(tokio::time::sleep(*b_to_a_timeout_duration))); +// continue; +// } +// Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), +// Poll::Pending => (), +// } +// } +// TransferState::Done => (), +// } -enum TransferState { - Running(CopyBuffer), - ShuttingDown(u64), +// match b_to_a { +// TransferState::Running(buf) => { +// let res = buf.poll_copy(cx, b.as_mut(), a.as_mut()); +// match res { +// Poll::Ready(Ok(count)) => { +// *b_to_a = TransferState::ShuttingDown(count); +// continue; +// } +// Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), +// Poll::Pending => { +// if let Some(delay) = b_to_a_delay { +// match delay.as_mut().poll(cx) { +// Poll::Ready(()) => { +// *b_to_a = +// TransferState::ShuttingDown(buf.amount_transfered()); +// continue; +// } +// Poll::Pending => (), +// } +// } +// } +// } +// } +// TransferState::ShuttingDown(count) => { +// let res = a.as_mut().poll_shutdown(cx); +// match res { +// Poll::Ready(Ok(())) => { +// *b_to_a_count += *count; +// *b_to_a = TransferState::Done; +// a_to_b_delay +// .replace(Box::pin(tokio::time::sleep(*a_to_b_timeout_duration))); +// continue; +// } +// Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), +// Poll::Pending => (), +// } +// } +// TransferState::Done => (), +// } + +// match (&a_to_b, &b_to_a) { +// (TransferState::Done, TransferState::Done) => break, +// _ => return Poll::Pending, +// } +// } + +// Poll::Ready(Ok((*a_to_b_count, *b_to_a_count))) +// } +// } + +// pub async fn copy_buf_bidirectional_with_timeout( +// a: &mut A, +// b: &mut B, +// size: usize, +// a_to_b_timeout_duration: Duration, +// b_to_a_timeout_duration: Duration, +// ) -> Result<(u64, u64), std::io::Error> +// where +// A: AsyncRead + AsyncWrite + Unpin + ?Sized, +// B: AsyncRead + AsyncWrite + Unpin + ?Sized, +// { +// CopyBidirectional { +// a, +// b, +// a_to_b: TransferState::Running(CopyBuffer::new_with_capacity(size)?), +// b_to_a: TransferState::Running(CopyBuffer::new_with_capacity(size)?), +// a_to_b_count: 0, +// b_to_a_count: 0, +// a_to_b_delay: None, +// b_to_a_delay: None, +// a_to_b_timeout_duration, +// b_to_a_timeout_duration, +// } +// .await +// } + +// #1 close the connection if no traffic can be read within the specified time +// pub async fn copy_buf_bidirectional_with_timeout( +// a: A, +// b: B, +// size: usize, +// a_to_b_timeout_duration: Duration, +// b_to_a_timeout_duration: Duration, +// ) -> Result<(u64, u64), std::io::Error> +// where +// A: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static, +// B: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static, +// { +// let (mut a_reader, mut a_writer) = tokio::io::split(a); +// let (mut b_reader, mut b_writer) = tokio::io::split(b); +// let a_to_b = tokio::spawn(async move { +// let mut upload_total_size = 0; +// let mut buf = Vec::new(); +// buf.resize(size, 0); +// loop { +// match tokio::time::timeout(a_to_b_timeout_duration, a_reader.read(&mut buf[..])).await { +// Ok(Ok(size)) => { +// if size == 0 { +// return Ok(upload_total_size); +// } +// match b_writer.write_all(&buf[..size]).await { +// Ok(_) => { +// upload_total_size += size; +// continue; +// } +// Err(e) => { +// return Err(e); +// } +// } +// } +// Ok(Err(e)) => { +// return Err(e); +// } +// Err(_) => { +// return Ok(upload_total_size); +// } +// } +// } +// }); +// let b_to_a = tokio::spawn(async move { +// let mut download_total_size = 0; +// let mut buf = Vec::new(); +// buf.resize(size, 0); +// loop { +// match tokio::time::timeout(b_to_a_timeout_duration, b_reader.read(&mut buf[..])).await { +// Ok(Ok(size)) => { +// if size == 0 { +// return Ok(download_total_size); +// } +// match a_writer.write_all(&buf[..size]).await { +// Ok(_) => { +// download_total_size += size; +// continue; +// } +// Err(e) => { +// return Err(e); +// } +// } +// } +// Ok(Err(e)) => { +// return Err(e); +// } +// Err(_) => { +// return Ok(download_total_size); +// } +// } +// } +// }); +// let up = match a_to_b.await { +// Ok(Ok(up)) => up, +// Ok(Err(e)) => { +// return Err(e); +// } +// Err(_) => { +// return Err(std::io::Error::new(std::io::ErrorKind::Other, "")); +// } +// }; +// let down = match b_to_a.await { +// Ok(Ok(up)) => up, +// Ok(Err(e)) => { +// return Err(e); +// } +// Err(_) => { +// return Err(std::io::Error::new(std::io::ErrorKind::Other, "")); +// } +// }; +// Ok((up as u64, down as u64)) +// } + +// #2 1:1 implement the original logic, that is, the other direction has a timeout requirement after one direction is ready +// if both directions are pending, no timeout limition will be imposed on them + +enum SyncMsg { + SetTimeOut, + Terminate(std::io::Error), Done, } -struct CopyBidirectional<'a, A: ?Sized, B: ?Sized> { - a: &'a mut A, - b: &'a mut B, - a_to_b: TransferState, - b_to_a: TransferState, - a_to_b_count: u64, - b_to_a_count: u64, - a_to_b_delay: Option>>, - b_to_a_delay: Option>>, +pub async fn copy_buf_bidirectional_with_timeout( + a: A, + b: B, + size: usize, a_to_b_timeout_duration: Duration, b_to_a_timeout_duration: Duration, -} - -impl<'a, A, B> Future for CopyBidirectional<'a, A, B> +) -> Result<(u64, u64), std::io::Error> where - A: AsyncRead + AsyncWrite + Unpin + ?Sized, - B: AsyncRead + AsyncWrite + Unpin + ?Sized, + A: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static, + B: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static, { - type Output = io::Result<(u64, u64)>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Unpack self into mut refs to each field to avoid borrow check issues. - let CopyBidirectional { - a, - b, - a_to_b, - b_to_a, - a_to_b_count, - b_to_a_count, - a_to_b_delay, - b_to_a_delay, - a_to_b_timeout_duration, - b_to_a_timeout_duration, - } = &mut *self; + let (a_reader, a_writer) = tokio::io::split(a); + let (b_reader, b_writer) = tokio::io::split(b); + let (tx_for_a_to_b, rx_for_a_to_b) = tokio::sync::mpsc::unbounded_channel(); + let (tx_for_b_to_a, rx_for_b_to_a) = tokio::sync::mpsc::unbounded_channel(); + let a_to_b = copy_from_lhs_to_rhs( + a_reader, + b_writer, + size, + a_to_b_timeout_duration, + rx_for_a_to_b, + tx_for_b_to_a, + ); + let b_to_a = copy_from_lhs_to_rhs( + b_reader, + a_writer, + size, + b_to_a_timeout_duration, + rx_for_b_to_a, + tx_for_a_to_b, + ); - let mut a = Pin::new(a); - let mut b = Pin::new(b); + let up = a_to_b + .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "")) + .await??; + let down = b_to_a + .map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "")) + .await??; + //dbg!(up,down); + Ok((up as u64, down as u64)) +} +fn copy_from_lhs_to_rhs( + mut lhs_reader: A, + mut rhs_writer: B, + buf_size: usize, + timeout: Duration, + mut msg_rx: UnboundedReceiver, + other_side_sender: UnboundedSender, +) -> JoinHandle> +where + A: AsyncRead + Unpin + Sized + Send + 'static, + B: AsyncWrite + Unpin + Sized + Send + 'static, +{ + tokio::spawn(async move { + let mut transferred_size = 0; + let mut buf = Vec::new(); + buf.resize(buf_size, 0); + let mut need_timeout = false; loop { - match a_to_b { - TransferState::Running(buf) => { - let res = buf.poll_copy(cx, a.as_mut(), b.as_mut()); - match res { - Poll::Ready(Ok(count)) => { - *a_to_b = TransferState::ShuttingDown(count); - continue; - } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => { - if let Some(delay) = a_to_b_delay { - match delay.as_mut().poll(cx) { - Poll::Ready(()) => { - *a_to_b = - TransferState::ShuttingDown(buf.amount_transfered()); - continue; - } - Poll::Pending => (), - } - } + tokio::select! { + msg = async { + if need_timeout{ + tokio::time::sleep(timeout).await; + SyncMsg::Done + }else{ + match msg_rx.recv().await{ + Some(v)=>v, + None=>SyncMsg::SetTimeOut // the other direction is done } } - } - TransferState::ShuttingDown(count) => { - let res = b.as_mut().poll_shutdown(cx); - match res { - Poll::Ready(Ok(())) => { - *a_to_b_count += *count; - *a_to_b = TransferState::Done; - b_to_a_delay - .replace(Box::pin(tokio::time::sleep(*b_to_a_timeout_duration))); - continue; - } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => (), + } =>{ + match msg{ + SyncMsg::SetTimeOut => { + need_timeout = true; + }, + SyncMsg::Terminate(e) =>{ + return Err(e); + }, + SyncMsg::Done => return Ok(transferred_size), } } - TransferState::Done => (), - } - - match b_to_a { - TransferState::Running(buf) => { - let res = buf.poll_copy(cx, b.as_mut(), a.as_mut()); - match res { - Poll::Ready(Ok(count)) => { - *b_to_a = TransferState::ShuttingDown(count); - continue; - } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => { - if let Some(delay) = b_to_a_delay { - match delay.as_mut().poll(cx) { - Poll::Ready(()) => { - *b_to_a = - TransferState::ShuttingDown(buf.amount_transfered()); - continue; + r = lhs_reader.read(&mut buf[..]) =>{ + match r{ + Ok(size)=>{ + if size == 0{ + // peer has shutdown + match rhs_writer.shutdown().await{ + Ok(_)=>{ + let _ = other_side_sender.send(SyncMsg::SetTimeOut); + } + Err(e)=>{ + let _ = other_side_sender.send(SyncMsg::Terminate(e)); } - Poll::Pending => (), } + return Ok(transferred_size); } - } - } - } - TransferState::ShuttingDown(count) => { - let res = a.as_mut().poll_shutdown(cx); - match res { - Poll::Ready(Ok(())) => { - *b_to_a_count += *count; - *b_to_a = TransferState::Done; - a_to_b_delay - .replace(Box::pin(tokio::time::sleep(*a_to_b_timeout_duration))); + if let Err(e) = rhs_writer.write_all(&buf[..size]).await{ + return Err(e); + } + transferred_size += size; continue; } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => (), + Err(e)=>{ + return Err(e); + } } } - TransferState::Done => (), - } - - match (&a_to_b, &b_to_a) { - (TransferState::Done, TransferState::Done) => break, - _ => return Poll::Pending, - } + }; } - - Poll::Ready(Ok((*a_to_b_count, *b_to_a_count))) - } -} - -pub async fn copy_buf_bidirectional_with_timeout( - a: &mut A, - b: &mut B, - size: usize, - a_to_b_timeout_duration: Duration, - b_to_a_timeout_duration: Duration, -) -> Result<(u64, u64), std::io::Error> -where - A: AsyncRead + AsyncWrite + Unpin + ?Sized, - B: AsyncRead + AsyncWrite + Unpin + ?Sized, -{ - CopyBidirectional { - a, - b, - a_to_b: TransferState::Running(CopyBuffer::new_with_capacity(size)?), - b_to_a: TransferState::Running(CopyBuffer::new_with_capacity(size)?), - a_to_b_count: 0, - b_to_a_count: 0, - a_to_b_delay: None, - b_to_a_delay: None, - a_to_b_timeout_duration, - b_to_a_timeout_duration, - } - .await + }) } diff --git a/clash_lib/src/proxy/mixed/mod.rs b/clash_lib/src/proxy/mixed/mod.rs index 9cd2326c..949f7cd7 100644 --- a/clash_lib/src/proxy/mixed/mod.rs +++ b/clash_lib/src/proxy/mixed/mod.rs @@ -53,7 +53,7 @@ impl InboundListener for Listener { loop { let (socket, _) = listener.accept().await?; - let mut socket = apply_tcp_options(socket)?; + let socket = apply_tcp_options(socket)?; let mut p = [0; 1]; let n = socket.peek(&mut p).await?; @@ -75,7 +75,7 @@ impl InboundListener for Listener { }; tokio::spawn(async move { - socks::handle_tcp(&mut sess, &mut socket, dispatcher, authenticator).await + socks::handle_tcp(&mut sess, socket, dispatcher, authenticator).await }); } diff --git a/clash_lib/src/proxy/socks/inbound/mod.rs b/clash_lib/src/proxy/socks/inbound/mod.rs index 4b37c807..78faeb37 100644 --- a/clash_lib/src/proxy/socks/inbound/mod.rs +++ b/clash_lib/src/proxy/socks/inbound/mod.rs @@ -83,7 +83,7 @@ impl InboundListener for Listener { loop { let (socket, _) = listener.accept().await?; - let mut socket = apply_tcp_options(socket)?; + let socket = apply_tcp_options(socket)?; let mut sess = Session { network: Network::Tcp, @@ -97,7 +97,7 @@ impl InboundListener for Listener { let authenticator = self.authenticator.clone(); tokio::spawn(async move { - handle_tcp(&mut sess, &mut socket, dispatcher, authenticator).await + handle_tcp(&mut sess, socket, dispatcher, authenticator).await }); } } diff --git a/clash_lib/src/proxy/socks/inbound/stream.rs b/clash_lib/src/proxy/socks/inbound/stream.rs index d518c8ca..fbd5cf9a 100644 --- a/clash_lib/src/proxy/socks/inbound/stream.rs +++ b/clash_lib/src/proxy/socks/inbound/stream.rs @@ -19,7 +19,7 @@ use tracing::{instrument, trace, warn}; #[instrument(skip(s, dispatcher, authenticator))] pub async fn handle_tcp<'a>( sess: &'a mut Session, - s: &'a mut TcpStream, + mut s: TcpStream, dispatcher: Arc, authenticator: ThreadSafeAuthenticator, ) -> io::Result<()> { @@ -117,7 +117,7 @@ pub async fn handle_tcp<'a>( )); } - let dst = SocksAddr::read_from(s).await?; + let dst = SocksAddr::read_from(& mut s).await?; match buf[1] { socks_command::CONNECT => {