From 32ea58c70eb91592bbf2e503cf0ee2e2ef7b4f63 Mon Sep 17 00:00:00 2001 From: Dave Jones Date: Mon, 28 Nov 2022 16:22:03 -0600 Subject: [PATCH] v0.2.1-rc1 --- README.md | 43 +++++---- podping/Cargo.lock | 4 +- podping/Cargo.toml | 2 +- podping/databases/auth.db | Bin 0 -> 16384 bytes podping/{ => databases}/queue.db | Bin 1241088 -> 1241088 bytes podping/dbif/Cargo.toml | 2 +- podping/dbif/src/lib.rs | 154 ++++++++++++++++++++++--------- podping/src/main.rs | 14 ++- 8 files changed, 152 insertions(+), 67 deletions(-) create mode 100644 podping/databases/auth.db rename podping/{ => databases}/queue.db (99%) diff --git a/README.md b/README.md index 504b044..8e73a4b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Podping.cloud -Podping.cloud is the hosted front-end to the [Podping](https://github.com/Podcastindex-org/podping) notification system. It stands in front of the back-end writer(s) -to provide a more friendly HTTP based API. +Podping.cloud is the hosted front-end to the [Podping](https://github.com/Podcastindex-org/podping) notification +system. It stands in front of the back-end writer(s) to provide a more friendly HTTP based API.
@@ -14,6 +14,16 @@ accepts GET requests like so: GET https://podping.cloud/?url=https://feeds.example.org/podcast/rss ``` +You can also append 2 additional parameters (`reason` and/or `medium`): + +```http +GET https://podping.cloud/?url=https://feeds.example.org/livestream/rss&reason=live&medium=music +``` + +If `reason` is not present, the default is "update". If `medium` is not present, the default is "podcast". A full +explanation of these options and what they mean +is [here](https://github.com/Podcastindex-org/podping-hivewriter#podping-reasons). + The next component is one or more back-end writers that connect to the front-end over a ZMQ socket. Currently, the only back-end writer is the [hive-writer](https://github.com/Podcastindex-org/podping-hivewriter), a python script that listens on localhost port `9999` for incoming events. When it receives an event, it attempts to write that event as @@ -31,13 +41,14 @@ The front-end accepts GET requests and does a few things: 4. Saves the url into the `queue.db` sqlite database in the `queue` table. 5. Returns `200` to the sending publisher. -A separate thread runs in a loop every 3 seconds as a queue checker and does the following: +A separate thread runs in a loop as a queue checker and does the following: -1. Checks the `queue.db` database and fetches 10 feeds at a time in FIFO order. +1. Checks the `queue.db` database and fetches up to 1000 feeds at a time in FIFO order ("live" reason is prioritized). 2. Checks the ZEROMQ tcp socket to the `hive-writer` listener on port `9999`. -3. Sends the url string to `hive-writer` socket for processing and waits for "OK" to be returned. -4. If "OK" is returned from the hive writer python script, the url is removed from `queue.db`. -5. If "ERR" is returned or an exception is raised, another attempt is made on the next cycle. +3. Construct one or more `Ping` objects in protocol buffers to send over the socket to the writer(s). +4. Sends the `Ping` objects to `hive-writer` socket for processing and waits for success or error to be returned. +5. If success is returned from the writer, the url is removed from `queue.db`. +6. If an error is returned or an exception is raised, another attempt is made after 180 seconds. There is a dummy auth token in the `auth.db` that is ready to use for testing. The token value is: @@ -66,19 +77,13 @@ PODPING_RUNAS_USER="podping" ./target/release/podping ## Running a Full Podping.cloud Node -First clone this repo. - -Make sure that libzmq-dev is installed: - -`apt-get install libzmq3-dev` - -Build and launch podping like so: - -`cd podping && crate run` - -Then launch hive-write like this: +The best way to run a podping.cloud node is with docker compose. There is a [docker] folder for this. Just clone this +repo, switch to the `docker` folder and issue `docker compose up`. It is expected that the database files will live in +a directory called `/data`. If this directory doesn't exist, you will need to create it. -`python3 ./hive-writer/hive-writer.py` +Initially, the `auth.db` and `queue.db` will be blank. You will need to populate the "publishers" table in the +`auth.db` file to have a funcional system. See the example files in the `databases` directory in this repo for an +example of the format for publisher token records.
diff --git a/podping/Cargo.lock b/podping/Cargo.lock index 98c36bc..5efb9c2 100644 --- a/podping/Cargo.lock +++ b/podping/Cargo.lock @@ -184,7 +184,7 @@ dependencies = [ [[package]] name = "dbif" -version = "0.1.1" +version = "0.1.2" dependencies = [ "rusqlite", ] @@ -687,7 +687,7 @@ checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" [[package]] name = "podping" -version = "0.2.1-beta9" +version = "0.2.1" dependencies = [ "async-trait", "bytes 0.5.6", diff --git a/podping/Cargo.toml b/podping/Cargo.toml index c41e78e..13f55ec 100644 --- a/podping/Cargo.toml +++ b/podping/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "podping" -version = "0.2.1-beta9" +version = "0.2.1" authors = ["Dave Jones"] edition = "2021" build = "build.rs" diff --git a/podping/databases/auth.db b/podping/databases/auth.db new file mode 100644 index 0000000000000000000000000000000000000000..078687bb068641ce1936055db3c260ce1d09d815 GIT binary patch literal 16384 zcmeI%O-lkn7zgkfR}zAbdOdml^Kw#_FgE!&=o0@1~q z4dE6`6*2EXrU$vA>5@}5xlyU?(_^5dR-J)wrg6o!s4rraec}l_o#qoe8hjYGz22y+ zg3+=T`!UV(Ic3O~eYHB740ZQKuDk<;7U`hn`vlN}lYtO>VXFhlQi#lhd>5{@?uc zM}hzZAOHafKmY;|fB*y_009Uu}T9`5QX8HyLWGPlhwp@k${DrMX=vOkwrAONG~Zv6hy=~$Y5b%;|PnLmDpGf z7LtdFr0^|lJX`q$;pgL=#X2t5@oCpBOC6qsWl5u`HhbHfCv`G*zdT37CR%MGm0Qn0 zbfi)?J61XM$v4*~$~ZJpiOoipb5~rOTo%uV*;RN?yZ`yuO=U9jK0Tv5l^Y*c{9;nG z%P&l(dEf&g+W#z$AN`<*+lCIbAcFw;y;ToZF*r!TLka;}kUv^yRX62mr|U*6HeYwRjg@&-&OVJ?m&NM343zosZzHE=G1qb?gb1r-d-ets oGDZZA6h4BZvnOXI6k71mhK7#-AtJ;`&_NdlDS8iQ`}?~Te*=d#umAu6 diff --git a/podping/dbif/Cargo.toml b/podping/dbif/Cargo.toml index 5915812..f0b06ef 100644 --- a/podping/dbif/Cargo.toml +++ b/podping/dbif/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dbif" -version = "0.1.1" +version = "0.1.2" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/podping/dbif/src/lib.rs b/podping/dbif/src/lib.rs index 3fa82da..d247446 100644 --- a/podping/dbif/src/lib.rs +++ b/podping/dbif/src/lib.rs @@ -4,13 +4,13 @@ use std::fmt; use std::str::FromStr; -//Globals ---------------------------------------------------------------------------------------------------- +//Globals -------------------------------------------------------------------------------------------------------------- const SQLITE_FILE_AUTH: &str = "/data/auth.db"; const SQLITE_FILE_QUEUE: &str = "/data/queue.db"; const PING_BATCH_SIZE: u64 = 1000; -//Structs & Enums -------------------------------------------------------------------------------------------- +//Structs & Enums ------------------------------------------------------------------------------------------------------ #[derive(Debug)] struct HydraError(String); @@ -37,18 +37,19 @@ impl FromStr for Reason { type Err = (); fn from_str(input: &str) -> Result { match input.to_lowercase().as_str() { - "update" => Ok(Reason::Update), - "live" => Ok(Reason::Live), + "update" => Ok(Reason::Update), + "live" => Ok(Reason::Live), "liveend" => Ok(Reason::LiveEnd), - _ => Ok(Reason::Update), + _ => Ok(Reason::Update), } } } + impl fmt::Display for Reason { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Reason::Update => write!(f, "update"), - Reason::Live => write!(f, "live"), + Reason::Update => write!(f, "update"), + Reason::Live => write!(f, "live"), Reason::LiveEnd => write!(f, "liveend"), } } @@ -71,44 +72,46 @@ pub enum Medium { Blog, BlogL, } + impl FromStr for Medium { type Err = (); fn from_str(input: &str) -> Result { match input.to_lowercase().as_str() { - "podcast" => Ok(Medium::Podcast), + "podcast" => Ok(Medium::Podcast), "podcastl" => Ok(Medium::PodcastL), - "music" => Ok(Medium::Music), - "musicl" => Ok(Medium::MusicL), - "video" => Ok(Medium::Video), - "videol" => Ok(Medium::VideoL), - "film" => Ok(Medium::Film), - "filml" => Ok(Medium::FilmL), - "audiobook" => Ok(Medium::Audiobook), - "audiobookl" => Ok(Medium::AudiobookL), - "newsletter" => Ok(Medium::Newsletter), + "music" => Ok(Medium::Music), + "musicl" => Ok(Medium::MusicL), + "video" => Ok(Medium::Video), + "videol" => Ok(Medium::VideoL), + "film" => Ok(Medium::Film), + "filml" => Ok(Medium::FilmL), + "audiobook" => Ok(Medium::Audiobook), + "audiobookl" => Ok(Medium::AudiobookL), + "newsletter" => Ok(Medium::Newsletter), "newsletterl" => Ok(Medium::NewsletterL), - "blog" => Ok(Medium::Blog), + "blog" => Ok(Medium::Blog), "blogl" => Ok(Medium::BlogL), - _ => Ok(Medium::Podcast), + _ => Ok(Medium::Podcast), } } } + impl fmt::Display for Medium { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Medium::Podcast => write!(f, "podcast"), + Medium::Podcast => write!(f, "podcast"), Medium::PodcastL => write!(f, "podcastl"), - Medium::Music => write!(f, "music"), - Medium::MusicL => write!(f, "musicl"), - Medium::Video => write!(f, "video"), - Medium::VideoL => write!(f, "videol"), - Medium::Film => write!(f, "film"), - Medium::FilmL => write!(f, "filml"), - Medium::Audiobook => write!(f, "audiobook"), - Medium::AudiobookL => write!(f, "audiobookl"), - Medium::Newsletter => write!(f, "newsletter"), + Medium::Music => write!(f, "music"), + Medium::MusicL => write!(f, "musicl"), + Medium::Video => write!(f, "video"), + Medium::VideoL => write!(f, "videol"), + Medium::Film => write!(f, "film"), + Medium::FilmL => write!(f, "filml"), + Medium::Audiobook => write!(f, "audiobook"), + Medium::AudiobookL => write!(f, "audiobookl"), + Medium::Newsletter => write!(f, "newsletter"), Medium::NewsletterL => write!(f, "newsletterl"), - Medium::Blog => write!(f, "blog"), + Medium::Blog => write!(f, "blog"), Medium::BlogL => write!(f, "blogl"), } } @@ -131,11 +134,76 @@ pub struct PingRow { } -//Functions ---- +//Functions ------------------------------------------------------------------------------------------------------------ + +//Connect to the database at the given file location +fn connect_to_database(filepath: &str) -> Result> { + if let Ok(conn) = Connection::open(filepath) { + Ok(conn) + } else { + return Err( + Box::new( + HydraError(format!("Could not open a database file at: [{}].", filepath).into()) + ) + ); + } +} + +//Create or update database files if needed +pub fn create_databases() -> Result> { + + //Create the publishers table + let mut conn = connect_to_database(SQLITE_FILE_AUTH)?; + match conn.execute( + "CREATE TABLE IF NOT EXISTS publishers ( + name text, + authval text primary key + )", + [], + ) { + Ok(_) => { + println!("Publishers table is ready."); + } + Err(e) => { + eprintln!("{}", e); + return Err( + Box::new( + HydraError(format!("Failed to create database publishers table: [{}].", SQLITE_FILE_AUTH).into()) + ) + ); + } + } + + //Create the queue table + conn = connect_to_database(SQLITE_FILE_QUEUE)?; + match conn.execute( + "CREATE TABLE IF NOT EXISTS queue ( + url text primary key, + createdon integer, + reason text, + medium text, + inflight bool + )", + [], + ) { + Ok(_) => { + println!("Queue table is ready."); + Ok(true) + } + Err(e) => { + eprintln!("{}", e); + return Err( + Box::new( + HydraError(format!("Failed to create database queue table: [{}].", SQLITE_FILE_QUEUE).into()) + ) + ); + } + } +} //Returns a vector of Publisher structs from the auth db or an Error pub fn get_publishers() -> Result, Box> { - let conn = Connection::open(SQLITE_FILE_AUTH)?; + let conn = connect_to_database(SQLITE_FILE_AUTH)?; let mut pubs: Vec = Vec::new(); let mut stmt = conn.prepare("SELECT name \ @@ -156,8 +224,8 @@ pub fn get_publishers() -> Result, Box> { } //Returns a vector of Ping structs from the queue or an Error -pub fn get_pings_from_queue(with_in_flight:bool) -> Result, Box> { - let conn = Connection::open(SQLITE_FILE_QUEUE)?; +pub fn get_pings_from_queue(with_in_flight: bool) -> Result, Box> { + let conn = connect_to_database(SQLITE_FILE_QUEUE)?; let mut pings: Vec = Vec::new(); //With in flights also? @@ -204,7 +272,7 @@ pub fn get_pings_from_queue(with_in_flight:bool) -> Result, Box Result> { - let conn = Connection::open(SQLITE_FILE_QUEUE)?; + let conn = connect_to_database(SQLITE_FILE_QUEUE)?; match conn.execute("INSERT INTO queue (url, createdon, reason, medium, inflight) \ VALUES (?1, ?2, ?3, ?4 , 0)", @@ -221,8 +289,8 @@ pub fn add_ping_to_queue(ping: &Ping) -> Result> { Err(_e) => { match ping.reason { Reason::Live | Reason::LiveEnd => { - return update_ping_in_queue(&ping) - }, + return update_ping_in_queue(&ping); + } _ => return Err(Box::new(HydraError(format!("URL already in queue: [{}].", ping.url).into()))) } } @@ -231,7 +299,7 @@ pub fn add_ping_to_queue(ping: &Ping) -> Result> { //Change the info for a ping by it's url. Returns Ok(true/false) or an Error pub fn update_ping_in_queue(ping: &Ping) -> Result> { - let conn = Connection::open(SQLITE_FILE_QUEUE)?; + let conn = connect_to_database(SQLITE_FILE_QUEUE)?; match conn.execute("UPDATE queue \ @@ -259,7 +327,7 @@ pub fn update_ping_in_queue(ping: &Ping) -> Result> { //Marks a ping record as inflight. Returns Ok(true/false) or an Error pub fn set_ping_as_inflight(ping: &Ping) -> Result> { - let conn = Connection::open(SQLITE_FILE_QUEUE)?; + let conn = connect_to_database(SQLITE_FILE_QUEUE)?; match conn.execute("UPDATE queue \ SET inflight = 1 \ @@ -280,7 +348,7 @@ pub fn set_ping_as_inflight(ping: &Ping) -> Result> { //Adds a url to the queue. Takes a Ping struct as input. Returns Ok(true/false) or an Error pub fn reset_pings_in_flight() -> Result> { - let conn = Connection::open(SQLITE_FILE_QUEUE)?; + let conn = connect_to_database(SQLITE_FILE_QUEUE)?; match conn.execute("UPDATE queue \ SET inflight = 0, \ @@ -301,7 +369,7 @@ pub fn reset_pings_in_flight() -> Result> { //Deletes a url from the queue. Takes a url as a String. Returns Ok(true/false) or an Error pub fn delete_ping_from_queue(url: String) -> Result> { - let conn = Connection::open(SQLITE_FILE_QUEUE)?; + let conn = connect_to_database(SQLITE_FILE_QUEUE)?; conn.execute( "DELETE FROM queue \ @@ -314,7 +382,7 @@ pub fn delete_ping_from_queue(url: String) -> Result> { //Returns the name of the publisher that corresponds with this authorization header or an Error pub fn check_auth(authstring: &str) -> Result> { - let conn = Connection::open(SQLITE_FILE_AUTH)?; + let conn = connect_to_database(SQLITE_FILE_AUTH)?; let mut tokens: Vec = Vec::new(); let mut stmt = conn.prepare("SELECT name \ @@ -342,7 +410,7 @@ pub fn check_auth(authstring: &str) -> Result> { //Returns the name of the publisher that corresponds with this hybrid authorization header or an Error pub fn check_auth_hybrid(authstring: &str) -> Result> { - let conn = Connection::open(SQLITE_FILE_AUTH)?; + let conn = connect_to_database(SQLITE_FILE_AUTH)?; let mut tokens: Vec = Vec::new(); let authstringparm = &authstring[0..22]; diff --git a/podping/src/main.rs b/podping/src/main.rs index 04eaacd..981ae9a 100644 --- a/podping/src/main.rs +++ b/podping/src/main.rs @@ -12,6 +12,7 @@ use std::thread; use std::time; use std::env; use std::panic; +use std::process; use capnp::data::Reader; use drop_root::set_user_group; use hyper::body::Buf; @@ -113,6 +114,17 @@ async fn main() { println!("Version: {}", version); println!("--------------------"); + //Make sure we have databases + match dbif::create_databases() { + Ok(_) => { + println!("Databases ready."); + }, + Err(e) => { + eprintln!("Database error: [{:#?}]", e); + process::exit(1); + } + } + //ZMQ socket version thread::spawn(move || { @@ -280,7 +292,7 @@ async fn main() { panic::set_hook(Box::new(move |panic_info| { // invoke the default handler and exit the process orig_hook(panic_info); - std::process::exit(1); + process::exit(1); })); let some_state = "state".to_string();