wip! backend: tasks shit

This commit is contained in:
2025-11-14 16:09:27 +03:00
parent 9a9f92ed95
commit aed6093a50
8 changed files with 341 additions and 61 deletions

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2024"
[dependencies]
lazy_static = "1.5.0"
rocket = { version = "0.5.1", features = ["json", "serde_json"] }
rocket_cors = "0.6.0"
tokio = { version = "1.44.1", features = ["fs", "process"] }

61
backend/src/common.rs Normal file
View File

@@ -0,0 +1,61 @@
use std::{env, path::Path};
use rocket::response::Responder;
#[derive(Responder)]
pub(crate) enum GenerationError {
#[response(status = 500, content_type = "json")]
InternalError(String),
#[response(status = 404, content_type = "json")]
DirectoryNotFoundError(String),
}
pub(crate) fn get_base_directory() -> String {
match env::var("GENERATION_BASE_DIRECTORY") {
Ok(directory) => directory,
Err(_) => "base/".into(),
}
}
pub(crate) async fn check_is_valid_directory(
directory: impl AsRef<Path> + Clone,
) -> Result<(), GenerationError> {
match tokio::fs::metadata(directory.clone()).await {
Ok(metadata) => {
if !metadata.is_dir() {
return Err(GenerationError::DirectoryNotFoundError(
"The specified directory is not valid".into(),
));
}
}
Err(_) => {
return Err(GenerationError::DirectoryNotFoundError(
"The specified directory is not exists".into(),
));
}
};
// Check if the directory contains a 'vars.bat' and 'template.ovpn' files
const REQUIRED_FILES: [&str; 2] = ["vars.bat", "template.ovpn"];
for file_name in REQUIRED_FILES.iter() {
let file_path = directory.as_ref().join(file_name);
match tokio::fs::metadata(&file_path).await {
Ok(metadata) => {
if !metadata.is_file() {
return Err(GenerationError::DirectoryNotFoundError(format!(
"The specified directory is not valid, invalid file: {}",
file_name
)));
}
}
Err(_) => {
return Err(GenerationError::DirectoryNotFoundError(format!(
"The specified directory is not valid, missing file: {}",
file_name
)));
}
};
}
Ok(())
}

View File

@@ -12,6 +12,11 @@ use std::env;
use std::path::Path;
use std::path::PathBuf;
use crate::common::{get_base_directory, check_is_valid_directory, GenerationError};
pub(crate) mod tasks;
pub(crate) mod common;
#[derive(Deserialize)]
#[serde(crate = "rocket::serde")]
struct GenerationRequest<'r> {
@@ -25,64 +30,6 @@ struct GenerationRequest<'r> {
use_openssl: Option<bool>,
}
#[derive(Responder)]
enum GenerationError {
#[response(status = 500, content_type = "json")]
InternalError(String),
#[response(status = 404, content_type = "json")]
DirectoryNotFoundError(String),
}
fn get_base_directory() -> String {
match env::var("GENERATION_BASE_DIRECTORY") {
Ok(directory) => directory,
Err(_) => "base/".into(),
}
}
async fn check_is_valid_directory(
directory: impl AsRef<Path> + Clone,
) -> Result<(), GenerationError> {
match tokio::fs::metadata(directory.clone()).await {
Ok(metadata) => {
if !metadata.is_dir() {
return Err(GenerationError::DirectoryNotFoundError(
"The specified directory is not valid".into(),
));
}
}
Err(_) => {
return Err(GenerationError::DirectoryNotFoundError(
"The specified directory is not exists".into(),
));
}
};
// Check if the directory contains a 'vars.bat' and 'template.ovpn' files
const REQUIRED_FILES: [&str; 2] = ["vars.bat", "template.ovpn"];
for file_name in REQUIRED_FILES.iter() {
let file_path = directory.as_ref().join(file_name);
match tokio::fs::metadata(&file_path).await {
Ok(metadata) => {
if !metadata.is_file() {
return Err(GenerationError::DirectoryNotFoundError(format!(
"The specified directory is not valid, invalid file: {}",
file_name
)));
}
}
Err(_) => {
return Err(GenerationError::DirectoryNotFoundError(format!(
"The specified directory is not valid, missing file: {}",
file_name
)));
}
};
}
Ok(())
}
#[get("/get")]
async fn list_directories() -> Result<Json<Vec<String>>, status::Custom<String>> {
let mut reader = tokio::fs::read_dir(get_base_directory())
@@ -98,9 +45,10 @@ async fn list_directories() -> Result<Json<Vec<String>>, status::Custom<String>>
while let Ok(Some(entry)) = reader.next_entry().await {
let path = entry.path();
if check_is_valid_directory(&path).await.is_ok()
&& let Some(name) = path.file_name() {
directories.push(name.to_str().unwrap().to_string())
}
&& let Some(name) = path.file_name()
{
directories.push(name.to_str().unwrap().to_string())
}
}
Ok(Json(directories))

View File

@@ -0,0 +1,81 @@
use crate::tasks::repo::{Task, TasksRepository};
use std::sync::{Arc, Mutex};
use lazy_static::lazy_static;
lazy_static! {
static ref LAST_ID: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
pub(crate) static ref REPO: Arc<Mutex<MemoryTasksRepository>> =
Arc::new(Mutex::new(MemoryTasksRepository::new())).clone();
}
pub(crate) fn get_repo() -> Arc<Mutex<MemoryTasksRepository>> {
REPO.clone()
}
pub(crate) struct MemoryTasksRepository {
pub tasks: Vec<Task>,
}
impl MemoryTasksRepository {
pub(crate) fn new() -> Self {
MemoryTasksRepository { tasks: Vec::new() }
}
fn find_task_index(&self, task_id: u32) -> Option<usize> {
for (index, task) in self.tasks.iter().enumerate() {
if task.id == task_id {
return Some(index);
}
}
None
}
}
impl TasksRepository for MemoryTasksRepository {
fn all_tasks(&self) -> Vec<Task> {
self.tasks.clone()
}
fn tasks_for(&self, directory_name: &str) -> Vec<Task> {
self.tasks
.iter()
.filter(|task| task.directory_name == directory_name)
.cloned()
.collect()
}
fn add_task(
&mut self,
directory_name: String,
config_names: Vec<String>,
use_openssl: Option<bool>,
) -> Task {
let id = *LAST_ID.lock().unwrap() + 1;
let task = Task::new(id, directory_name, config_names, use_openssl);
self.tasks.push(task.clone());
*LAST_ID.lock().unwrap() = id;
task
}
fn update_task(&mut self, task: &Task) {
if let Some(idx) = self.find_task_index(task.id) {
self.tasks[idx] = task.clone();
} else {
self.tasks.push(task.clone());
}
}
fn remove_task(&mut self, task_id: u32) {
if let Some(idx) = self.find_task_index(task_id) {
self.tasks.remove(idx);
}
}
fn get_task(&self, task_id: u32) -> Option<Task> {
self.find_task_index(task_id)
.map(|idx| self.tasks[idx].clone())
}
}

3
backend/src/tasks/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub(crate) mod memrepo;
pub(crate) mod repo;
pub(crate) mod worker;

84
backend/src/tasks/repo.rs Normal file
View File

@@ -0,0 +1,84 @@
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub(crate) enum TaskState {
#[default]
Pending,
Running,
Completed,
Failed {
errors: Vec<String>,
},
Cancelled,
}
#[derive(Clone, Debug)]
pub(crate) struct Task {
pub(crate) id: u32,
pub(crate) directory_name: String,
pub(crate) state: TaskState,
pub(crate) config_names: Vec<String>,
pub(crate) use_openssl: Option<bool>,
}
impl Task {
pub(crate) fn new(
id: u32,
directory_name: String,
config_names: Vec<String>,
use_openssl: Option<bool>,
) -> Self {
Self {
id,
directory_name,
state: TaskState::default(),
config_names,
use_openssl,
}
}
}
pub(crate) trait TasksRepository {
fn all_tasks(&self) -> Vec<Task>;
fn tasks_for(&self, directory_name: &str) -> Vec<Task>;
fn add_task(
&mut self,
directory_name: String,
config_names: Vec<String>,
use_openssl: Option<bool>,
) -> Task;
fn update_task(&mut self, task: &Task);
fn remove_task(&mut self, task_id: u32);
fn get_task(&self, task_id: u32) -> Option<Task>;
fn take_next(&mut self) -> Option<Task> {
if let Some(task) = self
.all_tasks()
.into_iter()
.find(|t| t.state == TaskState::Pending)
{
let mut task = task.clone();
task.state = TaskState::Running;
self.update_task(&task);
Some(task)
} else {
None
}
}
fn complete_task(&mut self, task_id: u32) {
if let Some(mut task) = self.get_task(task_id) {
task.state = TaskState::Completed;
self.update_task(&task);
}
}
fn fail_task(&mut self, task_id: u32, errors: Vec<String>) {
if let Some(mut task) = self.get_task(task_id) {
task.state = TaskState::Failed { errors };
self.update_task(&task);
}
}
fn cancel_task(&mut self, task_id: u32) {
if let Some(mut task) = self.get_task(task_id) {
task.state = TaskState::Cancelled;
self.update_task(&task);
}
}
}

101
backend/src/tasks/worker.rs Normal file
View File

@@ -0,0 +1,101 @@
use std::{env, path::Path};
use tokio::sync::mpsc;
use crate::{common::{check_is_valid_directory, get_base_directory}, tasks::{self, memrepo::get_repo, repo::{Task, TasksRepository}}};
pub(crate) async fn tasks_sender(tasks_tx: mpsc::Sender<Task>) {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
while let Some(task) = get_repo().lock().unwrap().take_next() {
if let Err(e) = tasks_tx.send(task).await {
eprintln!("Error sending task: {}", e);
};
tokio::time::sleep(tokio::time::Duration::from_millis(0)).await;
}
}
}
pub(crate) async fn tasks_updater(mut tasks_rx: mpsc::Receiver<Task>) {
while let Some(task) = tasks_rx.recv().await {
get_repo().lock().unwrap().update_task(&task);
tokio::time::sleep(tokio::time::Duration::from_millis(0)).await;
}
}
pub(crate) async fn task_runner(mut task: Task) -> Task {
let dir = Path::new(&get_base_directory()).join(&task.directory_name);
let dir_ok = check_is_valid_directory(dir.clone()).await;
if dir_ok.is_err() {
task.state = tasks::repo::TaskState::Failed{errors: task.config_names.clone()};
return task;
}
let generator_bin = env::var("GENERATOR_BIN").unwrap_or("peazyrsa".into());
let mut use_openssl = env::var("USE_OPENSSL").unwrap_or("no".into()) == "yes";
if let Some(req_use_openssl) = task.use_openssl {
use_openssl = req_use_openssl;
}
let mut openssl_bin = "".to_string();
if use_openssl {
openssl_bin = env::var("OPENSSL_BIN").unwrap_or("openssl".into());
}
let mut errors: Vec<String> = vec![];
for config_name in &task.config_names {
let mut cmd = tokio::process::Command::new(&generator_bin);
if use_openssl {
cmd.arg("--with-openssl").arg(&openssl_bin);
}
cmd.arg("-d").arg(&dir).arg(&config_name);
// execute the command and check error code
match cmd.status().await {
Ok(status) if status.success() => {
}
_ => {
errors.push(config_name.clone());
continue;
}
}
// check output file exists
if tokio::fs::metadata(dir.join("config").join(format!("{}.ovpn", &config_name))).await.is_err() {
errors.push(config_name.clone());
}
}
if errors.is_empty() {
task.state = tasks::repo::TaskState::Completed;
} else {
task.state = tasks::repo::TaskState::Failed {
errors,
};
}
task
}
pub(crate) async fn worker(mut tasks_rx: mpsc::Receiver<Task>, tasks_tx: mpsc::Sender<Task>) {
while let Some(task) = tasks_rx.recv().await {
let task = tokio::spawn(async move { task_runner(task).await }).await.unwrap();
if let Err(e) = tasks_tx.send(task).await {
eprintln!("Error sending task back: {}", e);
}
}
}
pub(crate) async fn start_worker() {
let (tasks_tx_sender, tasks_rx_sender) = mpsc::channel(100);
let (tasks_tx_updater, tasks_rx_updater) = mpsc::channel(100);
tokio::spawn(tasks_sender(tasks_tx_sender));
tokio::spawn(worker(tasks_rx_sender, tasks_tx_updater));
tokio::spawn(tasks_updater(tasks_rx_updater));
}