server_executor

This commit is contained in:
Dmitry Belyaev 2022-09-24 19:49:44 +03:00
parent 80b5fb67b9
commit 7f6ddb0562
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3
2 changed files with 20 additions and 19 deletions

View File

@ -5,12 +5,11 @@ use tokio_util::sync::CancellationToken;
mod config; mod config;
mod server; mod server;
use crate::config::Config; use crate::config::Config;
use crate::server::spawn_socks5_server; use crate::server::server_executor;
use flexi_logger::{AdaptiveFormat, Age, Cleanup, Criterion, Duplicate, FileSpec, Logger, Naming}; use flexi_logger::{AdaptiveFormat, Age, Cleanup, Criterion, Duplicate, FileSpec, Logger, Naming};
#[tokio::main] fn main() {
async fn main() {
Logger::try_with_str("info") Logger::try_with_str("info")
.unwrap() .unwrap()
.log_to_file(FileSpec::default()) .log_to_file(FileSpec::default())
@ -25,21 +24,15 @@ async fn main() {
.start_with_specfile("logspec.toml") .start_with_specfile("logspec.toml")
.unwrap(); .unwrap();
let cfg = tokio::task::spawn_blocking(Config::get) let cfg = Config::get();
.await
.expect("get config");
log::info!("cfg: {:#?}", cfg); log::info!("cfg: {:#?}", cfg);
let token = CancellationToken::new(); let token = CancellationToken::new();
let child_token = token.child_token(); let child_token = token.child_token();
let handle = std::thread::spawn(move || server_executor(cfg, child_token));
let (r, _) = tokio::join!( std::thread::sleep(std::time::Duration::from_secs(10));
spawn_socks5_server(cfg, child_token),
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
token.cancel(); token.cancel();
})
);
r.unwrap(); handle.join().unwrap();
} }

View File

@ -12,6 +12,16 @@ use tokio_util::sync::CancellationToken;
use crate::config::Config; use crate::config::Config;
use crate::config::PasswordAuth; use crate::config::PasswordAuth;
pub fn server_executor(cfg: Config, token: CancellationToken) {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
spawn_socks5_server(cfg, token).await.unwrap();
})
}
pub async fn spawn_socks5_server(cfg: Config, token: CancellationToken) -> Result<()> { pub async fn spawn_socks5_server(cfg: Config, token: CancellationToken) -> Result<()> {
let mut server_config = fast_socks5::server::Config::default(); let mut server_config = fast_socks5::server::Config::default();
server_config.set_request_timeout(cfg.request_timeout); server_config.set_request_timeout(cfg.request_timeout);
@ -34,8 +44,7 @@ pub async fn spawn_socks5_server(cfg: Config, token: CancellationToken) -> Resul
log::info!("Listen for socks connections @ {}", &cfg.listen_addr); log::info!("Listen for socks connections @ {}", &cfg.listen_addr);
// Standard TCP loop while let Some(socket_res) = check_cancelled(incoming.next(), token.child_token()).await {
while let Some(socket_res) = or_chancel(incoming.next(), token.child_token()).await {
match socket_res { match socket_res {
Ok(socket) => { Ok(socket) => {
let child_token = token.child_token(); let child_token = token.child_token();
@ -50,13 +59,13 @@ pub async fn spawn_socks5_server(cfg: Config, token: CancellationToken) -> Resul
Ok(()) Ok(())
} }
async fn or_chancel<F, R>(future: F, token: CancellationToken) -> Option<R> async fn check_cancelled<F, R>(future: F, token: CancellationToken) -> Option<R>
where where
F: Future<Output = Option<R>>, F: Future<Output = Option<R>>,
{ {
select! { select! {
_ = token.cancelled() => { _ = token.cancelled() => {
log::error!("canceled"); log::error!("accept canceled");
None None
} }
res = future => { res = future => {
@ -71,7 +80,6 @@ where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin,
{ {
tokio::spawn(async move { tokio::spawn(async move {
// Wait for either cancellation or a very long time
let result = select! { let result = select! {
_ = token.cancelled() => { _ = token.cancelled() => {
Err("Client connection canceled".to_string()) Err("Client connection canceled".to_string())