From b10eca96885bd0615f2ac2a8a9fceb7906f83552 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 29 Sep 2022 10:34:55 +0300 Subject: [PATCH] remove mpsc::channel --- src/server.rs | 18 ++++-------------- src/service.rs | 29 +++++++++++------------------ 2 files changed, 15 insertions(+), 32 deletions(-) diff --git a/src/server.rs b/src/server.rs index cd1377b..13b96b7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,35 +3,25 @@ use fast_socks5::{ Result, }; use std::future::Future; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{self, AsyncRead, AsyncWrite}; use tokio::select; use tokio::task; use tokio_stream::{Stream, StreamExt}; use tokio_util::sync::CancellationToken; -use std::sync::mpsc::Sender; - use async_stream::stream; use crate::config::Config; use crate::config::PasswordAuth; -pub fn server_executor( - cfg: Config, - token: CancellationToken, - shutdown_tx: Sender<()>, -) -> std::io::Result<()> { +pub fn server_executor(cfg: Config, token: CancellationToken) -> io::Result<()> { tokio::runtime::Builder::new_multi_thread() .enable_all() .build()? - .block_on(async { - let result = spawn_socks5_server(cfg, token).await; - shutdown_tx.send(()).unwrap(); - result - }) + .block_on(async { spawn_socks5_server(cfg, token).await }) } -pub async fn spawn_socks5_server(cfg: Config, token: CancellationToken) -> std::io::Result<()> { +pub async fn spawn_socks5_server(cfg: Config, token: CancellationToken) -> io::Result<()> { let mut server_config = fast_socks5::server::Config::default(); server_config.set_request_timeout(cfg.request_timeout); server_config.set_skip_auth(cfg.skip_auth); diff --git a/src/service.rs b/src/service.rs index 41c9529..e785e5c 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,6 +1,6 @@ use tokio_util::sync::CancellationToken; -use std::{ffi::OsString, sync::mpsc, thread, time::Duration}; +use std::{ffi::OsString, thread, time::Duration}; use windows_service::{ define_windows_service, service::{ @@ -168,10 +168,9 @@ pub fn my_service_main(_arguments: Vec) { } pub fn run_service() -> std::result::Result<(), String> { - // Create a channel to be able to poll a stop event from the service worker loop. - let (shutdown_tx, shutdown_rx) = mpsc::channel(); - - let shutdown_tx1 = shutdown_tx.clone(); + // Create a cancellation token to be able to cancell server + let control_token = CancellationToken::new(); + let server_token = control_token.child_token(); // Define system service event handler that will be receiving service events. let event_handler = move |control_event| -> ServiceControlHandlerResult { @@ -182,8 +181,8 @@ pub fn run_service() -> std::result::Result<(), String> { // Handle stop ServiceControl::Stop => { - log::debug!("stop signal from system"); - shutdown_tx1.send(()).unwrap(); + log::info!("service stop event received"); + control_token.cancel(); ServiceControlHandlerResult::NoError } @@ -203,17 +202,10 @@ pub fn run_service() -> std::result::Result<(), String> { let cfg = Config::get(); log::info!("start with config: {:#?}", cfg); - let token = CancellationToken::new(); - let child_token = token.child_token(); - let server_handle = std::thread::spawn(move || server_executor(cfg, child_token, shutdown_tx)); + let result = std::thread::spawn(move || server_executor(cfg, server_token)).join(); - shutdown_rx.recv().unwrap(); // wait for shutdown signal - log::info!("service stop"); + log::info!("server thread stoped"); - // stop server - token.cancel(); - - let result = server_handle.join(); if let Err(e) = result { log::error!("server panic: {:#?}", e); status_handle @@ -222,8 +214,7 @@ pub fn run_service() -> std::result::Result<(), String> { return Err("server panic".into()); } - let result = result.unwrap(); - if let Err(e) = result { + if let Err(e) = result.unwrap() { log::error!("server error: {:#?}", e); status_handle .set_service_status(ServiceStatus::stopped_with_error(2)) @@ -235,5 +226,7 @@ pub fn run_service() -> std::result::Result<(), String> { status_handle .set_service_status(ServiceStatus::stopped()) .str_err()?; + + log::info!("service stoped"); Ok(()) }