Skip to content

Commit

Permalink
feat: experiencing the capabilities of the Tokio runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
okqin committed Jul 30, 2024
1 parent a3cf3cf commit 94e2a17
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ rand = "0.8.5"
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.120"
strum = { version = "0.26.3", features = ["derive"] }
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", "rt"] }
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", "rt", "fs"] }
68 changes: 68 additions & 0 deletions examples/minginx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::sync::Arc;

use anyhow::Result;
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tracing::{error, info, level_filters::LevelFilter};
use tracing_subscriber::{
fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt, Layer,
};

#[derive(Debug, Serialize, Deserialize)]
struct Config {
listen_addr: String,
upstream_addr: String,
}

fn resolve_config() -> Result<Config> {
let config = Config {
listen_addr: "0.0.0.0:9090".to_string(),
upstream_addr: "127.0.0.1:8080".to_string(),
};
Ok(config)
}

#[tokio::main]
async fn main() -> Result<()> {
let console = tracing_subscriber::fmt::Layer::new()
.with_span_events(FmtSpan::CLOSE)
.with_filter(LevelFilter::INFO);

tracing_subscriber::registry().with(console).init();

let config = resolve_config()?;
let config = Arc::new(config);
info!("linten_addr: {}", config.listen_addr);
info!("upstream_addr: {}", config.upstream_addr);

let listener = TcpListener::bind(&config.listen_addr).await?;
loop {
let (stream, addr) = listener.accept().await?;
info!("Accepted connection from: {}", addr);
let config = Arc::clone(&config);
tokio::spawn(async move {
match TcpStream::connect(&config.upstream_addr).await {
Ok(upstream) => {
if let Err(e) = proxy(stream, upstream).await {
error!("Error proxy data: {:?}", e);
}
}
Err(e) => {
error!("Error connect to upstream: {:?}", e);
}
}
Ok::<(), anyhow::Error>(())
});
}
}

async fn proxy(mut client: TcpStream, mut upstream: TcpStream) -> Result<()> {
let (mut client_reader, mut client_writer) = client.split();
let (mut upstream_reader, mut upstream_writer) = upstream.split();

let client_to_upstream = tokio::io::copy(&mut client_reader, &mut upstream_writer);
let upstream_to_client = tokio::io::copy(&mut upstream_reader, &mut client_writer);

tokio::try_join!(client_to_upstream, upstream_to_client)?;
Ok(())
}
32 changes: 32 additions & 0 deletions examples/tokio1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::{thread, time::Duration};
use tokio::{fs, runtime::Builder, time::sleep};

// tokio runtime example for
fn main() {
let handle = thread::spawn(|| {
let rt = Builder::new_current_thread().enable_all().build().unwrap();

rt.spawn(async {
println!("Future 1");
let content = fs::read_to_string("Cargo.toml").await.unwrap();
println!("Content length: {}", content.len());
});

rt.spawn(async {
let name = "Future 2";
println!("{}", name);
let ret = expensive_blocking_task(name);
println!("result: {}", ret);
});

rt.block_on(async {
sleep(Duration::from_millis(1000)).await;
});
});
handle.join().unwrap();
}

fn expensive_blocking_task(name: &str) -> String {
thread::sleep(Duration::from_millis(900));
format!("{} done", name)
}
37 changes: 37 additions & 0 deletions examples/tokio2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Passing messages between asynchronous and synchronous threads.
use anyhow::Result;
use std::{thread, time::Duration};
use tokio::sync::mpsc;

fn worker(mut rx: mpsc::Receiver<String>) -> thread::JoinHandle<()> {
thread::spawn(move || {
while let Some(msg) = rx.blocking_recv() {
let ret = expensive_blocking_task(&msg);
println!("result: {}", ret);
}
})
}

fn expensive_blocking_task(name: &str) -> String {
thread::sleep(Duration::from_millis(900));
format!("{} done", name)
}

#[tokio::main]
async fn main() -> Result<()> {
let (tx, rx) = mpsc::channel(32);
let handle = worker(rx);

tokio::spawn(async move {
let mut number = 0;
loop {
number += 1;
println!("Sending message {number}");
tx.send(format!("message {number}")).await?;
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
});
handle.join().unwrap();
Ok(())
}
3 changes: 3 additions & 0 deletions http.rest
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ Content-Type: application/json
{
"skills": ["Rust", "TypeScript", "Python"]
}

### proxy test
GET http://127.0.0.1:9090

0 comments on commit 94e2a17

Please sign in to comment.