Skip to content

Commit

Permalink
add a new visitor instead of using unpack function
Browse files Browse the repository at this point in the history
  • Loading branch information
rawdaGastan committed Sep 16, 2024
1 parent edac067 commit d56b5af
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 95 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fl-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ thiserror = "1.0.63"
hostname-validator = "1.1.1"
walkdir = "2.5.0"
sha256 = "1.5.0"
async-trait = "0.1.53"
12 changes: 6 additions & 6 deletions fl-server/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub async fn sign_in_handler(
Extension(cfg): Extension<config::Config>,
Json(user_data): Json<SignInBody>,
) -> impl IntoResponse {
let user = match get_user_by_username(cfg.users, &user_data.username) {
let user = match get_user_by_username(&cfg.users, &user_data.username) {
Some(user) => user,
None => {
return Err(ResponseError::Unauthorized(
Expand All @@ -70,17 +70,17 @@ pub async fn sign_in_handler(
));
}

let token = encode_jwt(user.username, cfg.jwt_secret, cfg.jwt_expire_hours)
let token = encode_jwt(user.username.clone(), cfg.jwt_secret, cfg.jwt_expire_hours)
.map_err(|_| ResponseError::InternalServerError)?;

Ok(ResponseResult::SignedIn(SignInResponse {
access_token: token,
}))
}

pub fn get_user_by_username(users: Vec<User>, username: &str) -> Option<User> {
pub fn get_user_by_username<'a>(users: &'a [User], username: &str) -> Option<&'a User> {
let user = users.iter().find(|u| u.username == username)?;
Some(user.clone())
Some(user)
}

pub fn encode_jwt(
Expand Down Expand Up @@ -138,7 +138,7 @@ pub async fn authorize(
}
};

let current_user = match get_user_by_username(cfg.users, &token_data.claims.username) {
let current_user = match get_user_by_username(&cfg.users, &token_data.claims.username) {
Some(user) => user,
None => {
return Err(ResponseError::Unauthorized(
Expand All @@ -147,6 +147,6 @@ pub async fn authorize(
}
};

req.extensions_mut().insert(current_user.username);
req.extensions_mut().insert(current_user.username.clone());
Ok(next.run(req).await)
}
4 changes: 2 additions & 2 deletions fl-server/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fs, sync::Mutex};
use std::{collections::HashMap, fs, path::PathBuf, sync::Mutex};
use utoipa::ToSchema;

use crate::{auth, handlers};
Expand All @@ -13,7 +13,7 @@ pub struct Job {
#[derive(Debug, ToSchema)]
pub struct AppState {
pub jobs_state: Mutex<HashMap<String, handlers::FlistState>>,
pub flists_progress: Mutex<HashMap<String, f32>>,
pub flists_progress: Mutex<HashMap<PathBuf, f32>>,
}

#[derive(Debug, Default, Clone, Deserialize)]
Expand Down
129 changes: 47 additions & 82 deletions fl-server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ use axum_macros::debug_handler;
use std::{
collections::HashMap,
fs,
path::PathBuf,
sync::{mpsc, Arc},
};
use tokio::io;
use walkdir::WalkDir;

use bollard::auth::DockerCredentials;
use serde::{Deserialize, Serialize};
Expand All @@ -25,10 +24,7 @@ use crate::{
response::{FileInfo, ResponseError, ResponseResult},
serve_flists::visit_dir_one_level,
};
use rfs::{
cache,
fungi::{Reader, Writer},
};
use rfs::fungi::{Reader, Writer};
use utoipa::{OpenApi, ToSchema};
use uuid::Uuid;

Expand Down Expand Up @@ -58,7 +54,7 @@ pub struct FlistBody {

#[derive(Debug, Deserialize, Serialize, Clone, ToSchema)]
pub struct PreviewResponse {
pub content: Vec<String>,
pub content: Vec<PathBuf>,
pub metadata: String,
pub checksum: String,
}
Expand Down Expand Up @@ -124,37 +120,28 @@ pub async fn create_flist_handler(
}

let fl_name = docker_image.replace([':', '/'], "-") + ".fl";
let username_dir = format!("{}/{}", cfg.flist_dir, username);
let username_dir = std::path::Path::new(&cfg.flist_dir).join(username);
let fl_path = username_dir.join(&fl_name);

match flist_exists(std::path::Path::new(&username_dir), &fl_name).await {
Ok(exists) => {
if exists {
return Err(ResponseError::Conflict("flist already exists".to_string()));
}
}
Err(e) => {
log::error!("failed to check flist existence with error {:?}", e);
return Err(ResponseError::InternalServerError);
}
if fl_path.exists() {
return Err(ResponseError::Conflict("flist already exists".to_string()));
}

let created = fs::create_dir_all(&username_dir);
if created.is_err() {
log::error!(
"failed to create user flist directory `{}` with error {:?}",
"failed to create user flist directory `{:?}` with error {:?}",
&username_dir,
created.err()
);
return Err(ResponseError::InternalServerError);
}

let fl_path: String = format!("{}/{}", username_dir, fl_name);

let meta = match Writer::new(&fl_path).await {
Ok(writer) => writer,
Err(err) => {
log::error!(
"failed to create a new writer for flist `{}` with error {}",
"failed to create a new writer for flist `{:?}` with error {}",
fl_path,
err
);
Expand Down Expand Up @@ -257,7 +244,7 @@ pub async fn create_flist_handler(
state.jobs_state.lock().unwrap().insert(
job.id.clone(),
FlistState::Created(format!(
"flist {}:{}/{} is created successfully",
"flist {}:{}/{:?} is created successfully",
cfg.host, cfg.port, fl_path
)),
);
Expand Down Expand Up @@ -397,7 +384,7 @@ pub async fn preview_flist_handler(
Err(err) => return Err(ResponseError::BadRequest(err.to_string())),
};

let content = match unpack_flist(&fl_path).await {
let content = match get_flist_content(&fl_path).await {
Ok(paths) => paths,
Err(_) => return Err(ResponseError::InternalServerError),
};
Expand All @@ -421,20 +408,6 @@ pub async fn preview_flist_handler(
}))
}

async fn flist_exists(dir_path: &std::path::Path, flist_name: &String) -> io::Result<bool> {
let mut dir = tokio::fs::read_dir(dir_path).await?;

while let Some(child) = dir.next_entry().await? {
let file_name = child.file_name().to_string_lossy().to_string();

if file_name.eq(flist_name) {
return Ok(true);
}
}

Ok(false)
}

async fn validate_flist_path(
users: Vec<User>,
flist_dir: &String,
Expand Down Expand Up @@ -466,7 +439,7 @@ async fn validate_flist_path(
}

// validate username
match get_user_by_username(users, parts[1]) {
match get_user_by_username(&users, parts[1]) {
Some(_) => (),
None => {
return Err(anyhow::anyhow!(
Expand All @@ -493,23 +466,20 @@ async fn validate_flist_path(
}

// validate flist existence
let username_dir = format!("{}/{}", parts[0], parts[1]);
match flist_exists(std::path::Path::new(&username_dir), &fl_name).await {
Ok(exists) => {
if !exists {
return Err(anyhow::anyhow!("flist '{}' doesn't exist", fl_path));
}
}
Err(e) => {
log::error!("failed to check flist existence with error {:?}", e);
return Err(anyhow::anyhow!("Internal server error"));
}
if !std::path::Path::new(parts[0])
.join(parts[1])
.join(&fl_name)
.exists()
{
return Err(anyhow::anyhow!("flist '{}' doesn't exist", fl_path));
}

Ok(())
}

async fn unpack_flist(fl_path: &String) -> Result<Vec<std::string::String>, Error> {
async fn get_flist_content(fl_path: &String) -> Result<Vec<PathBuf>, Error> {
let mut visitor = ReadVisitor::default();

let meta = match Reader::new(&fl_path).await {
Ok(reader) => reader,
Err(err) => {
Expand All @@ -522,43 +492,38 @@ async fn unpack_flist(fl_path: &String) -> Result<Vec<std::string::String>, Erro
}
};

let router = match rfs::store::get_router(&meta).await {
Ok(r) => r,
Err(err) => {
log::error!("failed to get router with error {}", err);
return Err(anyhow::anyhow!("Internal server error"));
}
};

let cache = cache::Cache::new(String::from("/tmp/cache"), router);
let tmp_target = match tempdir::TempDir::new("target") {
Ok(dir) => dir,
match meta.walk(&mut visitor).await {
Ok(()) => return Ok(visitor.into_inner()),
Err(err) => {
log::error!("failed to create tmp dir with error {}", err);
log::error!(
"failed to walk through metadata for flist `{}` with error {}",
fl_path,
err
);
return Err(anyhow::anyhow!("Internal server error"));
}
};
let tmp_target_path = tmp_target.path().to_owned();
}

match rfs::unpack(&meta, &cache, &tmp_target_path, false).await {
Ok(_) => (),
Err(err) => {
log::error!("failed to unpack flist {} with error {}", fl_path, err);
return Err(anyhow::anyhow!("Internal server error"));
}
};
#[derive(Default)]
struct ReadVisitor {
inner: Vec<PathBuf>,
}

let mut paths = Vec::new();
for file in WalkDir::new(tmp_target_path.clone())
.into_iter()
.filter_map(|file| file.ok())
{
let path = file.path().to_string_lossy().to_string();
match path.strip_prefix(&tmp_target_path.to_string_lossy().to_string()) {
Some(p) => paths.push(p.to_string()),
None => return Err(anyhow::anyhow!("Internal server error")),
};
impl ReadVisitor {
pub fn into_inner(self) -> Vec<PathBuf> {
self.inner
}
}

Ok(paths)
#[async_trait::async_trait]
impl rfs::fungi::meta::WalkVisitor for ReadVisitor {
async fn visit(
&mut self,
path: &std::path::Path,
_node: &rfs::fungi::meta::Inode,
) -> rfs::fungi::meta::Result<rfs::fungi::meta::Walk> {
self.inner.push(path.to_path_buf());
Ok(rfs::fungi::meta::Walk::Continue)
}
}
6 changes: 1 addition & 5 deletions fl-server/src/serve_flists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,7 @@ pub async fn visit_dir_one_level(

let mut progress = 0.0;
if is_file {
match state.flists_progress.lock().unwrap().get(&format!(
"{}/{}",
path.to_string_lossy().to_string(),
name
)) {
match state.flists_progress.lock().unwrap().get(&path.join(&name)) {
Some(p) => progress = p.to_owned(),
None => progress = 100.0,
}
Expand Down

0 comments on commit d56b5af

Please sign in to comment.