Skip to content

Commit

Permalink
Send state updates to web client using web socket
Browse files Browse the repository at this point in the history
  • Loading branch information
Virv12 committed Dec 18, 2023
1 parent cf81b09 commit e854b47
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 89 deletions.
88 changes: 64 additions & 24 deletions pixie-server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ use std::{net::Ipv4Addr, sync::Arc};

use anyhow::Result;
use axum::{
extract::{self, Path},
extract::{
self,
ws::{self, Message},
Path,
},
http::StatusCode,
response::IntoResponse,
routing::get,
Router,
};
use macaddr::MacAddr6;

use pixie_shared::HttpConfig;
use pixie_shared::{HttpConfig, WsUpdate};
use tokio::net::TcpListener;
use tower_http::{
compression::CompressionLayer, services::ServeDir, trace::TraceLayer,
Expand Down Expand Up @@ -137,29 +141,68 @@ async fn image(
}
}

async fn get_config(extract::State(state): extract::State<Arc<State>>) -> String {
serde_json::to_string(&state.config).unwrap()
}

async fn get_hostmap(extract::State(state): extract::State<Arc<State>>) -> String {
serde_json::to_string(&state.hostmap).unwrap()
}

async fn get_units(extract::State(state): extract::State<Arc<State>>) -> String {
let units = state.units.borrow();
serde_json::to_string(&*units).unwrap()
}

async fn get_images(extract::State(state): extract::State<Arc<State>>) -> String {
let image_stats = state.image_stats.borrow();
serde_json::to_string(&*image_stats).unwrap()
}

async fn gc(extract::State(state): extract::State<Arc<State>>) -> String {
state.gc_chunks().await.unwrap();
"".to_owned()
}

async fn ws(
extract::State(state): extract::State<Arc<State>>,
ws: extract::ws::WebSocketUpgrade,
) -> axum::response::Response {
ws.on_upgrade(move |mut socket| async move {
let msg = WsUpdate::Config(state.config.clone());
let msg = serde_json::to_string(&msg).unwrap();
let msg = Message::Text(msg);
socket.send(msg).await.unwrap();

let msg = WsUpdate::HostMap(state.hostmap.clone());
let msg = serde_json::to_string(&msg).unwrap();
let msg = Message::Text(msg);
socket.send(msg).await.unwrap();

let mut units_rx = state.units.subscribe();
units_rx.mark_changed();

let mut image_rx = state.image_stats.subscribe();
image_rx.mark_changed();

'main_loop: loop {
tokio::select! {
ret = units_rx.changed() => {
ret.unwrap();
let msg = {
let units = units_rx.borrow_and_update();
let msg = WsUpdate::Units(units.clone());
let msg = serde_json::to_string(&msg).unwrap();
ws::Message::Text(msg)
};
socket.send(msg).await.unwrap();
}
ret = image_rx.changed() => {
ret.unwrap();
let msg = {
let image_stats = image_rx.borrow_and_update();
let msg = WsUpdate::ImageStats(image_stats.clone());
let msg = serde_json::to_string(&msg).unwrap();
ws::Message::Text(msg)
};
socket.send(msg).await.unwrap();
}
packet = socket.recv() => {
let packet = packet.unwrap().unwrap();
match packet {
Message::Close(_) => {
break 'main_loop;
}
_ => {}
}
}
};
}
})
}

pub async fn main(state: Arc<State>) -> Result<()> {
let HttpConfig {
listen_on,
Expand All @@ -169,10 +212,7 @@ pub async fn main(state: Arc<State>) -> Result<()> {
let admin_path = state.storage_dir.join("admin");

let router = Router::new()
.route("/admin/config", get(get_config))
.route("/admin/hostmap", get(get_hostmap))
.route("/admin/units", get(get_units))
.route("/admin/images", get(get_images))
.route("/admin/ws", get(ws))
.route("/admin/gc", get(gc))
.route("/admin/action/:unit/:action", get(action))
.route("/admin/image/:unit/:image", get(image))
Expand Down
12 changes: 11 additions & 1 deletion pixie-shared/src/bijection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ where
U: PartialEq,
{
pub fn new() -> Self {
Bijection(Vec::new())
Self(Vec::new())
}

pub fn get_by_first(&self, t: &T) -> Option<&U> {
Expand All @@ -27,6 +27,10 @@ where
pub fn iter(&self) -> impl Iterator<Item = &(T, U)> {
self.0.iter()
}

pub fn as_slice(&self) -> &[(T, U)] {
self.0.as_slice()
}
}

impl<T, U> PartialEq for Bijection<T, U>
Expand Down Expand Up @@ -56,3 +60,9 @@ impl<T, U> IntoIterator for Bijection<T, U> {
self.0.into_iter()
}
}

impl<T, U> Default for Bijection<T, U> {
fn default() -> Self {
Self(Vec::new())
}
}
14 changes: 13 additions & 1 deletion pixie-shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use blake3::OUT_LEN;
use serde::{Deserialize, Serialize};

#[cfg(feature = "std")]
use std::net::SocketAddrV4;
use std::{
collections::HashMap,
net::{Ipv4Addr, SocketAddrV4},
};

pub mod bijection;
pub use bijection::Bijection;
Expand Down Expand Up @@ -117,6 +120,15 @@ pub enum TcpRequest {
ActionComplete,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg(feature = "std")]
pub enum WsUpdate {
Config(config::Config),
HostMap(HashMap<Ipv4Addr, String>),
Units(Vec<Unit>),
ImageStats(ImageStat),
}

#[cfg(feature = "std")]
pub mod config;

Expand Down
2 changes: 2 additions & 0 deletions pixie-web/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pixie-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ name = "pixie-web"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures = "0.3.29"
gloo-timers = { version = "0.2.6", features = ["futures"] }
itertools = "0.10.5"
macaddr = "1.0.1"
pixie-shared = { version = "0.1.0", path = "../pixie-shared", features = ["macaddr", "std"] }
reqwasm = "0.5.0"
serde = "1.0.193"
serde_json = "1.0.108"
sycamore = { version = "0.8.2", features = ["sycamore-futures", "futures", "suspense"] }
wasm-bindgen = "0.2.89"
web-sys = "0.3.66"
web-sys = { version = "0.3.66", features = ["Location"] }
Loading

0 comments on commit e854b47

Please sign in to comment.