Skip to content

Commit

Permalink
support progress for conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
rawdaGastan committed Jul 28, 2024
1 parent 230b0d3 commit 1862904
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 38 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docker2fl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ toml = "0.4.2"
clap = { version = "4.2", features = ["derive"] }
serde = { version = "1.0.159" , features = ["derive"] }
tokio-async-drop = "0.1.0"
walkdir = "2.5.0"
110 changes: 77 additions & 33 deletions docker2fl/src/docker2fl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@ use bollard::container::{
};
use bollard::image::{CreateImageOptions, RemoveImageOptions};
use bollard::Docker;
use std::sync::mpsc::Sender;
use walkdir::WalkDir;

use anyhow::{Context, Result};
use futures_util::stream::StreamExt;
use serde_json::json;
use std::collections::HashMap;
use std::default::Default;
use std::fs;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::process::Command;
use tokio_async_drop::tokio_async_drop;

use rfs::fungi::Writer;
use rfs::store::Store;

use uuid::Uuid;

struct DockerInfo {
image_name: String,
container_name: String,
Expand All @@ -43,46 +43,90 @@ impl Drop for DockerInfo {
}
}

pub async fn convert<S: Store>(
#[derive(Clone)]
pub struct DockerImageToFlist {
meta: Writer,
store: S,
image_name: &str,
image_name: String,
credentials: Option<DockerCredentials>,
) -> Result<()> {
#[cfg(unix)]
let docker = Docker::connect_with_socket_defaults().context("failed to create docker")?;
docker_tmp_dir_path: PathBuf,
files_count: usize,
}

let container_name = Uuid::new_v4().to_string();
impl DockerImageToFlist {
pub fn new(
meta: Writer,
image_name: String,
credentials: Option<DockerCredentials>,
docker_tmp_dir_path: PathBuf,
) -> Self {
DockerImageToFlist {
meta,
image_name,
credentials,
docker_tmp_dir_path,
files_count: 0,
}
}

let docker_tmp_dir = tempdir::TempDir::new(&container_name)?;
let docker_tmp_dir_path = docker_tmp_dir.path();
pub fn files_count(&self) -> usize {
self.files_count
}

let docker_info = DockerInfo {
image_name: image_name.to_owned(),
container_name,
docker,
};
pub async fn prepare(&mut self) -> Result<()> {
#[cfg(unix)]
let docker = Docker::connect_with_socket_defaults().context("failed to create docker")?;

extract_image(
&docker_info.docker,
&docker_info.image_name,
&docker_info.container_name,
docker_tmp_dir_path,
credentials,
)
.await
.context("failed to extract docker image to a directory")?;
log::info!(
"docker image '{}' is extracted successfully",
docker_info.image_name
);
let container_file = Path::file_stem(self.docker_tmp_dir_path.as_path()).unwrap();
let container_name = container_file.to_str().unwrap().to_owned();

let docker_info = DockerInfo {
image_name: self.image_name.to_owned(),
container_name,
docker,
};

extract_image(
&docker_info.docker,
&docker_info.image_name,
&docker_info.container_name,
&self.docker_tmp_dir_path,
self.credentials.clone(),
)
.await
.context("failed to extract docker image to a directory")?;
log::info!(
"docker image '{}' is extracted successfully",
docker_info.image_name
);

self.files_count = WalkDir::new(self.docker_tmp_dir_path.as_path())
.into_iter()
.count();

Ok(())
}

rfs::pack(meta, store, docker_tmp_dir_path, true)
pub async fn pack<S: Store>(&mut self, store: S, sender: Option<Sender<i32>>) -> Result<()> {
rfs::pack(
self.meta.clone(),
store,
&self.docker_tmp_dir_path,
true,
sender,
)
.await
.context("failed to pack flist")?;

log::info!("flist has been created successfully");
Ok(())
Ok(())
}

pub async fn convert<S: Store>(&mut self, store: S, sender: Option<Sender<i32>>) -> Result<()> {
self.prepare().await?;
self.pack(store, sender).await?;

log::info!("flist has been created successfully");
Ok(())
}
}

async fn extract_image(
Expand Down
9 changes: 8 additions & 1 deletion docker2fl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use bollard::auth::DockerCredentials;
use clap::{ArgAction, Parser};
use rfs::fungi;
use rfs::store::parse_router;
use uuid::Uuid;

mod docker2fl;

Expand Down Expand Up @@ -87,7 +88,13 @@ async fn main() -> Result<()> {
let meta = fungi::Writer::new(&fl_name).await?;
let store = parse_router(&opts.store).await?;

let res = docker2fl::convert(meta, store, &docker_image, credentials).await;
let container_name = Uuid::new_v4().to_string();
let docker_tmp_dir = tempdir::TempDir::new(&container_name).unwrap();
let docker_tmp_dir_path = docker_tmp_dir.path().to_owned();

let mut docker_to_fl =
docker2fl::DockerImageToFlist::new(meta, docker_image, credentials, docker_tmp_dir_path);
let res = docker_to_fl.convert(store, None).await;

// remove the file created with the writer if fl creation failed
if res.is_err() {
Expand Down
1 change: 1 addition & 0 deletions fl-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ utoipa-swagger-ui = { version = "7", features = ["axum"] }
utoipa-redoc = { version = "4", features = ["axum"] }
thiserror = "1.0.63"
hostname-validator = "1.1.1"
walkdir = "2.5.0"
54 changes: 52 additions & 2 deletions fl-server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use axum::{
Extension, Json,
};
use axum_macros::debug_handler;
use std::{collections::HashMap, fs, sync::Arc};
use std::{
collections::HashMap,
fs,
sync::{mpsc, Arc},
};
use tokio::io;

use bollard::auth::DockerCredentials;
Expand Down Expand Up @@ -48,11 +52,18 @@ pub struct FlistInputs {
pub enum FlistState {
Accepted(String),
Started(String),
InProgress(FlistStateInfo),
Created(String),
Failed(String),
NotExists(String),
}

#[derive(Debug, Clone, Serialize)]
pub struct FlistStateInfo {
msg: String,
progress: usize,
}

#[utoipa::path(
get,
path = "/v1/api",
Expand Down Expand Up @@ -197,7 +208,45 @@ pub async fn create_flist_handler(
FlistState::Started(format!("flist '{}' is started", fl_name)),
);

let res = docker2fl::convert(meta, store, &docker_image, credentials).await;
let container_name = Uuid::new_v4().to_string();
let docker_tmp_dir = tempdir::TempDir::new(&container_name).unwrap();
let docker_tmp_dir_path = docker_tmp_dir.path().to_owned();

let (tx, rx) = mpsc::channel();
let mut docker_to_fl = docker2fl::DockerImageToFlist::new(
meta,
docker_image,
credentials,
docker_tmp_dir_path.clone(),
);

let res = docker_to_fl.prepare().await;
// remove the file created with the writer if fl creation failed
if res.is_err() {
let _ = tokio::fs::remove_file(&fl_path).await;
state.jobs_state.lock().unwrap().insert(
job.id.clone(),
FlistState::Failed(format!("flist preparing '{}' has failed", fl_name)),
);
return;
}

let files_count = docker_to_fl.files_count();
tokio::spawn(async move {
for _ in 0..files_count {
let step = rx.recv().unwrap() as usize;
// state.jobs_state.lock().unwrap().insert(
// job.id.clone(),
// FlistState::InProgress(FlistStateInfo {
// msg: "flist is in progress".to_string(),
// progress: step / files_count * 100,
// }),
// ); //TODO:
log::info!("val '{}'", step / files_count * 100);
}
});

let res = docker_to_fl.pack(store, Some(tx)).await;

// remove the file created with the writer if fl creation failed
if res.is_err() {
Expand All @@ -206,6 +255,7 @@ pub async fn create_flist_handler(
job.id.clone(),
FlistState::Failed(format!("flist '{}' has failed", fl_name)),
);
return;
}

state.jobs_state.lock().unwrap().insert(
Expand Down
2 changes: 1 addition & 1 deletion rfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ mod test {
store.add(0x00, 0x7f, store0);
store.add(0x80, 0xff, store1);

pack(writer, store, &source, false).await.unwrap();
pack(writer, store, &source, false, None).await.unwrap();

println!("packing complete");
// recreate the stores for reading.
Expand Down
14 changes: 13 additions & 1 deletion rfs/src/pack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::ffi::OsString;
use std::fs::Metadata;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
use std::sync::mpsc::Sender;
use std::sync::Arc;
use workers::WorkerPool;

Expand All @@ -26,6 +27,7 @@ pub async fn pack<P: Into<PathBuf>, S: Store>(
store: S,
root: P,
strip_password: bool,
sender: Option<Sender<i32>>,
) -> Result<()> {
use tokio::fs;

Expand Down Expand Up @@ -70,12 +72,13 @@ pub async fn pack<P: Into<PathBuf>, S: Store>(
&writer,
&mut pool,
Item(0, root, OsString::from("/"), meta),
sender.clone(),
)
.await?;

while !list.is_empty() {
let dir = list.pop_back().unwrap();
pack_one(&mut list, &writer, &mut pool, dir).await?;
pack_one(&mut list, &writer, &mut pool, dir, sender.clone()).await?;
}

pool.close().await;
Expand All @@ -102,6 +105,7 @@ async fn pack_one<S: Store>(
writer: &Writer,
pool: &mut WorkerPool<Uploader<S>>,
Item(parent, path, name, meta): Item,
sender: Option<Sender<i32>>,
) -> Result<()> {
use std::os::unix::fs::MetadataExt;
use tokio::fs;
Expand Down Expand Up @@ -174,6 +178,14 @@ async fn pack_one<S: Store>(
worker
.send((child_ino, child_path))
.context("failed to schedule file upload")?;

if sender.is_some() {
sender
.clone()
.unwrap()
.send(1)
.context("failed to send progress")?;
}
}
Ok(())
}
Expand Down

0 comments on commit 1862904

Please sign in to comment.