remove mpsc::channel

This commit is contained in:
Dmitry Belyaev 2022-09-29 10:34:55 +03:00
parent b2f81ccc5b
commit b10eca9688
Signed by: b4tman
GPG Key ID: 41A00BF15EA7E5F3
2 changed files with 15 additions and 32 deletions

View File

@ -3,35 +3,25 @@ use fast_socks5::{
Result, Result,
}; };
use std::future::Future; use std::future::Future;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{self, AsyncRead, AsyncWrite};
use tokio::select; use tokio::select;
use tokio::task; use tokio::task;
use tokio_stream::{Stream, StreamExt}; use tokio_stream::{Stream, StreamExt};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use std::sync::mpsc::Sender;
use async_stream::stream; use async_stream::stream;
use crate::config::Config; use crate::config::Config;
use crate::config::PasswordAuth; use crate::config::PasswordAuth;
pub fn server_executor( pub fn server_executor(cfg: Config, token: CancellationToken) -> io::Result<()> {
cfg: Config,
token: CancellationToken,
shutdown_tx: Sender<()>,
) -> std::io::Result<()> {
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.enable_all() .enable_all()
.build()? .build()?
.block_on(async { .block_on(async { spawn_socks5_server(cfg, token).await })
let result = spawn_socks5_server(cfg, token).await;
shutdown_tx.send(()).unwrap();
result
})
} }
pub async fn spawn_socks5_server(cfg: Config, token: CancellationToken) -> std::io::Result<()> { pub async fn spawn_socks5_server(cfg: Config, token: CancellationToken) -> io::Result<()> {
let mut server_config = fast_socks5::server::Config::default(); let mut server_config = fast_socks5::server::Config::default();
server_config.set_request_timeout(cfg.request_timeout); server_config.set_request_timeout(cfg.request_timeout);
server_config.set_skip_auth(cfg.skip_auth); server_config.set_skip_auth(cfg.skip_auth);

View File

@ -1,6 +1,6 @@
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use std::{ffi::OsString, sync::mpsc, thread, time::Duration}; use std::{ffi::OsString, thread, time::Duration};
use windows_service::{ use windows_service::{
define_windows_service, define_windows_service,
service::{ service::{
@ -168,10 +168,9 @@ pub fn my_service_main(_arguments: Vec<OsString>) {
} }
pub fn run_service() -> std::result::Result<(), String> { pub fn run_service() -> std::result::Result<(), String> {
// Create a channel to be able to poll a stop event from the service worker loop. // Create a cancellation token to be able to cancell server
let (shutdown_tx, shutdown_rx) = mpsc::channel(); let control_token = CancellationToken::new();
let server_token = control_token.child_token();
let shutdown_tx1 = shutdown_tx.clone();
// Define system service event handler that will be receiving service events. // Define system service event handler that will be receiving service events.
let event_handler = move |control_event| -> ServiceControlHandlerResult { let event_handler = move |control_event| -> ServiceControlHandlerResult {
@ -182,8 +181,8 @@ pub fn run_service() -> std::result::Result<(), String> {
// Handle stop // Handle stop
ServiceControl::Stop => { ServiceControl::Stop => {
log::debug!("stop signal from system"); log::info!("service stop event received");
shutdown_tx1.send(()).unwrap(); control_token.cancel();
ServiceControlHandlerResult::NoError ServiceControlHandlerResult::NoError
} }
@ -203,17 +202,10 @@ pub fn run_service() -> std::result::Result<(), String> {
let cfg = Config::get(); let cfg = Config::get();
log::info!("start with config: {:#?}", cfg); log::info!("start with config: {:#?}", cfg);
let token = CancellationToken::new(); let result = std::thread::spawn(move || server_executor(cfg, server_token)).join();
let child_token = token.child_token();
let server_handle = std::thread::spawn(move || server_executor(cfg, child_token, shutdown_tx));
shutdown_rx.recv().unwrap(); // wait for shutdown signal log::info!("server thread stoped");
log::info!("service stop");
// stop server
token.cancel();
let result = server_handle.join();
if let Err(e) = result { if let Err(e) = result {
log::error!("server panic: {:#?}", e); log::error!("server panic: {:#?}", e);
status_handle status_handle
@ -222,8 +214,7 @@ pub fn run_service() -> std::result::Result<(), String> {
return Err("server panic".into()); return Err("server panic".into());
} }
let result = result.unwrap(); if let Err(e) = result.unwrap() {
if let Err(e) = result {
log::error!("server error: {:#?}", e); log::error!("server error: {:#?}", e);
status_handle status_handle
.set_service_status(ServiceStatus::stopped_with_error(2)) .set_service_status(ServiceStatus::stopped_with_error(2))
@ -235,5 +226,7 @@ pub fn run_service() -> std::result::Result<(), String> {
status_handle status_handle
.set_service_status(ServiceStatus::stopped()) .set_service_status(ServiceStatus::stopped())
.str_err()?; .str_err()?;
log::info!("service stoped");
Ok(()) Ok(())
} }