Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for group operations #60

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ thiserror = "1.0"
clap = "2.33"
git-version = "0.3.5"
command-group = "1.0.8"
async-recursion = "1.1.1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, please don't

Any recursion can be expanded to an iteration. But avoid async recursion because it dramatically consume memory and we don't want zinit to get killed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I would avoid recursion and definitely in async. The reason is each Future is a state machine that built during compile time. When u do recursion each state machine has a copy of itself, this means recursively this machine size has infinite size (unknown size in compile time) hence this is needed.

But I am 100% sure (even before i see where u need it) this can be rewritten in a way that doesn't need recursion

134 changes: 106 additions & 28 deletions src/app/api.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use crate::zinit::{config, ZInit};
use crate::zinit::{self, config, ZInit, ZInitStatus};
use anyhow::{Context, Result};
use nix::sys::signal;
use serde::{Deserialize, Serialize};
use serde_json::{self as encoder, Value};
use std::collections::HashMap;
use std::env::current_dir;
use std::io::{self, ErrorKind};
use std::marker::Unpin;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use tokio::fs;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream};
use tokio::net::{UnixListener, UnixStream};

Expand All @@ -24,16 +27,30 @@ enum State {
Error,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase", untagged)]
pub enum Status {
Service(ServiceStatus),
Group(GroupStatus),
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub struct Status {
pub struct ServiceStatus {
pub name: String,
pub pid: u32,
pub state: String,
pub target: String,
pub after: HashMap<String, String>,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub struct GroupStatus {
pub name: String,
pub services: Vec<ServiceStatus>,
}

pub struct Api {
zinit: ZInit,
socket: PathBuf,
Expand Down Expand Up @@ -149,17 +166,55 @@ impl Api {
let services = zinit.list().await?;
let mut map: HashMap<String, String> = HashMap::new();
for service in services {
let state = zinit.status(&service).await?;
map.insert(service, format!("{:?}", state.state));
if let ZInitStatus::Service(state) = zinit.status(&service).await? {
map.insert(service, format!("{:?}", state.state));
}
}

Ok(encoder::to_value(map)?)
}

async fn monitor<S: AsRef<str>>(name: S, zinit: ZInit) -> Result<Value> {
let (name, service) = config::load(format!("{}.yaml", name.as_ref()))
.context("failed to load service config")?;
zinit.monitor(name, service).await?;
match config::load(format!("{}.yaml", name.as_ref())) {
Ok((name, service)) => zinit.monitor(name, config::Entry::Service(service)).await?,
Err(e) => {
if let Some(err) = e.downcast_ref::<io::Error>() {
if err.kind() != ErrorKind::NotFound {
return Err(e.context("failed to load service config"));
}
} else {
return Err(e.context("failed to load service config"));
}
}
}
let canonical_path = fs::canonicalize(name.as_ref()).await?;
Comment on lines +180 to +189
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer instead of using downcast (although it's fine) that instead we create our concrete defined error type (using thiserror for example)

I am wondering if it's better if you do stat first of the file to see if it exists instead of what you do here.

I am wondering also what will happen if you have

- file: <service>.yaml 
- dir: <service>

what should take precedence ? should we monitor both, or only the group? do we give errors. etc..

let path = if !canonical_path.starts_with(current_dir()?) {
bail!("directory outside of zinit configuration directory")
} else {
canonical_path.strip_prefix(current_dir()?)?
};
let prefix = path.to_str().ok_or(anyhow!("invalid path name"))?;
match config::load_dir_with_prefix(path, prefix.to_string()) {
Ok(services) => {
for (k, v) in services {
if let Err(err) = zinit.monitor(&k, v).await {
error!("failed to monitor service {}: {}", k, err);
};
}
}
Err(e) => {
if let Some(err) = e.downcast_ref::<io::Error>() {
if err.kind() == ErrorKind::NotFound {
bail!(
"neither {}.yaml nor {} directory was found",
name.as_ref(),
name.as_ref()
)
}
}
Comment on lines +206 to +213
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see this patterns starting to get out of hand and I think we need to define our own errors. Or completely avoid this by doing checks of which file/dir we need to process.

return Err(e.context("failed to load service config"));
}
}
Ok(Value::Null)
}

Expand Down Expand Up @@ -206,30 +261,50 @@ impl Api {
}

async fn status<S: AsRef<str>>(name: S, zinit: ZInit) -> Result<Value> {
let status = zinit.status(&name).await?;

let result = Status {
name: name.as_ref().into(),
pid: status.pid.as_raw() as u32,
state: format!("{:?}", status.state),
target: format!("{:?}", status.target),
after: {
let mut after = HashMap::new();
for service in status.service.after {
let status = match zinit.status(&service).await {
Ok(dep) => dep.state,
Err(_) => crate::zinit::State::Unknown,
};
after.insert(service, format!("{:?}", status));
}
after
},
let result = match zinit.status(&name).await? {
ZInitStatus::Service(status) => {
Status::Service(zinit_status_to_service_status(name, zinit, status).await)
}
ZInitStatus::Group(group) => Status::Group(GroupStatus {
name: name.as_ref().into(),
services: {
let mut services = vec![];
for (name, status) in group.services {
services
.push(zinit_status_to_service_status(name, zinit.clone(), status).await)
}
services
},
}),
};

Ok(encoder::to_value(result)?)
}
}

async fn zinit_status_to_service_status<S: AsRef<str>>(
name: S,
zinit: ZInit,
status: zinit::ServiceStatus,
) -> ServiceStatus {
ServiceStatus {
name: name.as_ref().into(),
pid: status.pid.as_raw() as u32,
state: format!("{:?}", status.state),
target: format!("{:?}", status.target),
after: {
let mut after = HashMap::new();
for service in status.service.after {
if let Ok(ZInitStatus::Service(status)) = zinit.status(&service).await {
after.insert(service, format!("{:?}", status.state));
} else {
after.insert(service, format!("{:?}", crate::zinit::State::Unknown));
}
}
Comment on lines +297 to +301
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was cleaner when we first get the status then format it. It's more readable before and also makes formatting in one place instead of 2

after
},
}
}

pub struct Client {
socket: PathBuf,
}
Expand Down Expand Up @@ -291,7 +366,8 @@ impl Client {
match filter {
None => tokio::io::copy(&mut con, &mut out).await?,
Some(filter) => {
let filter = format!("{}:", filter.as_ref());
let service_filter = format!("{}:", filter.as_ref());
let group_filter = format!("{}/", filter.as_ref());
let mut stream = BufStream::new(con);
loop {
let mut line = String::new();
Expand All @@ -303,7 +379,9 @@ impl Client {
}
}

if line[4..].starts_with(&filter) {
if line[4..].starts_with(&service_filter)
|| line[4..].starts_with(&group_filter)
{
let _ = out.write_all(line.as_bytes()).await;
}
}
Expand Down
25 changes: 21 additions & 4 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::time;

use self::api::Status;

fn logger(level: log::LevelFilter) -> Result<()> {
let logger = fern::Dispatch::new()
.format(|out, message, record| {
Expand Down Expand Up @@ -135,10 +137,25 @@ pub async fn restart(socket: &str, name: &str) -> Result<()> {
client.stop(name).await?;
//pull status
for _ in 0..20 {
let result = client.status(name).await?;
if result.pid == 0 && result.target == "Down" {
client.start(name).await?;
return Ok(());
match client.status(name).await? {
Status::Service(result) => {
if result.pid == 0 && result.target == "Down" {
client.start(name).await?;
return Ok(());
}
}
Status::Group(result) => {
let mut start = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think down is a better name for this variable. Since you assume this is true for all services. and if any service is not down or target is down but still running (pid != 0) then u set it to false.

Then check if down you call the start

for service in result.services {
if service.pid != 0 || service.target != "Down" {
start = false;
}
}
if start {
client.start(name).await?;
return Ok(());
}
}
}
time::sleep(std::time::Duration::from_secs(1)).await;
}
Expand Down
Loading
Loading