Skip to content

Commit

Permalink
Merge pull request #3 from adrien3d/refacto/databases
Browse files Browse the repository at this point in the history
Refacto/databases
  • Loading branch information
adrien3d authored Dec 21, 2023
2 parents 3bd4182 + 1b23cca commit 9ae09e6
Show file tree
Hide file tree
Showing 9 changed files with 1,900 additions and 574 deletions.
2,194 changes: 1,674 additions & 520 deletions Cargo.lock

Large diffs are not rendered by default.

68 changes: 33 additions & 35 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,39 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
actix = "0.13.0"
actix-cors = "0.6.4"
actix-http = "3.4.0"
actix-identity = "0.6.0"
actix-service = "2.0.2"
actix-web = "4.3.1"
actix-web-actors = "4.2.0"
anyhow = "1.0.71"
async-stream = "0.3.5"
bson = "2.7.0"
chrono = "0.4.26"
env_logger = "0.10.0"
envfile = "0.2.1"
envoption = "0.2.1"
failure = "0.1.8"
futures = "0.3.28"
futures-util = "0.3.28"
json = "0.12.4"
jsonwebtoken = "9.1.0"
log = "0.4.19"
mongodb = "2.6.0"
num_enum = "0.7.0"
pretty_env_logger = "0.5.0"
rc = "0.1.1"
rsntp = "3.0.2"
rust-argon2 = "2.0.0"
serde = { version = "1.0.166", features = ["derive"] }
smallvec = "1.10.0"
stoppable_thread = "0.2.1"
strum = "0.25.0"
strum_macros = "0.25.3"
thiserror = "1.0.40"
tokio = "1.33.0"
tracing = "0.1.37"
actix = "~0.13"
actix-cors = "~0.6"
actix-http = "~3.4"
actix-identity = "~0.6"
actix-service = "~2.0"
actix-web = "~4.4"
actix-web-actors = "~4.2"
aws-config = "~1.1"
aws-sdk-sesv2 = "~1.8"
anyhow = "~1.0"
async-stream = "~0.3"
async-trait = "~0.1"
bson = "~2.8"
chrono = "~0.4"
env_logger = "~0.10"
envfile = "~0.2"
envoption = "~0.2"
futures = "~0.3"
json = "~0.12"
jsonwebtoken = "~9.2"
log = "~0.4"
mongodb = "~2.8"
num_enum = "~0.7"
pretty_env_logger = "~0.5"
rc = "~0.1"
rsntp = "~3.0"
rust-argon2 = "~2.0"
serde = { version = "~1.0", features = ["derive"] }
sqlx = { version = "~0.7", features = [ "runtime-tokio", "tls-native-tls", "postgres" ] }
stoppable_thread = "~0.2"
thiserror = "~1.0"
tokio = { version = "~1.35", features = ["full"] }

[dev-dependencies]
env_logger = "~0.10"
test-log = "~0.2"
test-log = "~0.2"
102 changes: 102 additions & 0 deletions src/drivers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use crate::services::emails;
use anyhow::bail;
use async_trait::async_trait;
use mongodb::{error::ErrorKind, Client as mgoClient};
// use sqlx::postgres::PgPool;

use crate::models::users::{self, User};

const DB_NAME: &str = "base-api";

pub mod mongo;
pub mod postgre;

#[derive(Debug)]
pub struct GenericDatabaseStatus {
pub kind: String,
pub is_connected: bool,
pub migrations_performed: bool,
}

#[derive(Debug)]
pub struct MongoDatabase {
pub status: GenericDatabaseStatus,
pub client: Option<mgoClient>,
}

// pub struct PostgreDatabase {
// status: GenericDatabaseStatus,
// client: PgPool,
// }

#[async_trait]
pub trait GenericDatabase {
fn new() -> Self;
async fn connect(&mut self, uri: &str) -> anyhow::Result<&Self>;
fn migrate(&self, reference: &str) -> Self;
}

impl MongoDatabase {
pub async fn seed_user(&self, user: User) -> anyhow::Result<&Self> {
match &self.client {
Some(client) => {
let collection = client.database(DB_NAME).collection(users::REPOSITORY_NAME);
match collection.insert_one(user.clone(), None).await {
Ok(_) => {
let _ = emails::send_email_with_aws_ses(&user.email, "Welcome", "Message")
.await;
Ok(self)
}
Err(error) => {
match *error.kind {
ErrorKind::Write(write_error) => {
match write_error {
mongodb::error::WriteFailure::WriteError(e) => {
if e.code != 11000 {
log::warn!("seed_user WriteFailure::WriteError: {e:?}");
}
},
_ => bail!("Unknown writeConcernError while seed_user: {write_error:?}"),
}
Ok(self)
}
_ => bail!("Unknown errorKind while seed_user: {error}"),
}
}
}
}
None => bail!("seed_user unable to get client"),
}
}

// pub fn aggregate(&self) {
// log::info!("MongoDatabase aggregate");
// }
}

#[async_trait]
impl GenericDatabase for MongoDatabase {
fn new() -> Self {
MongoDatabase {
status: GenericDatabaseStatus {
kind: "mongo".to_string(),
is_connected: false,
migrations_performed: false,
},
client: None,
}
}

async fn connect(&mut self, uri: &str) -> anyhow::Result<&Self> {
log::info!("Connecting to MongoDB with uri:{uri}");
let mongo_db_client = mgoClient::with_uri_str(uri).await?;
self.client = Some(mongo_db_client);
self.status.is_connected = true;
// MongoDatabase { status: (), client: mongo_db_client }
Ok(self)
}

fn migrate(&self, _reference: &str) -> Self {
todo!()
}
}
1 change: 1 addition & 0 deletions src/drivers/mongo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions src/drivers/postgre.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

57 changes: 42 additions & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod configuration;
mod controllers;
mod drivers;
mod middlewares;
mod models;
mod services;
Expand All @@ -19,8 +20,9 @@ use tokio::sync::broadcast::Sender;

use crate::{
controllers::authentication::AuthState,
drivers::{GenericDatabase, MongoDatabase},
middlewares::authorization::AuthenticateMiddlewareFactory,
models::users::{self, User},
models::users::User,
services::ntp,
};

Expand All @@ -34,6 +36,8 @@ pub struct ProgramAppState {
pub ntp: Ntp,
/// MongoDB client
pub mongo_db_client: Client,
/// SQL pool
//pub sql_pool: sqlx::Pool<dyn Database>,
/// A channel for messages to the UI.
pub ui_sender_channel: Sender<Vec<u8>>,
}
Expand All @@ -50,23 +54,31 @@ async fn main() -> anyhow::Result<()> {
log::info!("NTP Time is:{instant}");

let uri = std::env::var("MONGODB_URI").unwrap_or_else(|_| "mongodb://localhost:27017".into());
let port: u16 = std::env::var("PORT")
.unwrap_or_else(|_| "8080".into())
.parse()
.unwrap();

log::info!("Connecting to DB");
let mongo_db_client = Client::with_uri_str(uri).await.expect("failed to connect");
log::info!("Connecting to MongoDB");
//let mongo_db_client = Client::with_uri_str(uri.clone()).await.expect("failed to connect mongo from main");

let mut mongo_db: MongoDatabase = GenericDatabase::new(); //::new_with_client(&mongo_db_client);
let mongo_db_client: Client;
//let res = mongo_db.connect(&uri.clone()).await;
match mongo_db.connect(&uri.clone()).await {
Ok(mongodb) => {
log::info!("successfully connected mongo with drivers");
match &mongodb.client {
Some(cl) => mongo_db_client = cl.clone(),
None => {
anyhow::bail!("Failed to get mongo.client")
}
}
}
Err(error) => {
anyhow::bail!("Failed to connect mongo with drivers: {:?}", error)
}
}
models::users::create_email_index(&mongo_db_client, DB_NAME).await;

log::info!("Server starting on port: {}", port);

let collection = mongo_db_client
.database(DB_NAME)
.collection(users::REPOSITORY_NAME);

let salt = std::env::var("SECRET_KEY").unwrap_or_else(|_| "0123".repeat(16));

let hashed_password =
argon2::hash_encoded("password".as_bytes(), salt.as_bytes(), &Config::original()).unwrap();
let admin_user = User {
Expand All @@ -78,21 +90,36 @@ async fn main() -> anyhow::Result<()> {
email: "[email protected]".to_string(),
password: hashed_password,
};
let _ = collection.insert_one(admin_user, None).await;

mongo_db.seed_user(admin_user).await?;

let auth_data = AuthState {
mongo_db: mongo_db_client.clone(),
admin_user: Some(ObjectId::new()),
};

// let pool = PgPool::connect("postgres://postgres:password@localhost/test").await?;
// let row: (i64,) = sqlx::query_as("SELECT $1")
// .bind(150_i64)
// .fetch_one(&pool).await?;
// assert_eq!(row.0, 150);

let (ui_sender_channel, _) = broadcast::channel(32);
let app_state = web::Data::new(ProgramAppState {
ntp,
//sql_pool: pool,
mongo_db_client,
ui_sender_channel,
});

let time_thread = app_state.ntp.start_time_thread(app_state.clone());

let port: u16 = std::env::var("PORT")
.unwrap_or_else(|_| "8080".into())
.parse()
.unwrap();
log::info!("Server starting on port: {}", port);

HttpServer::new(move || {
let cors = Cors::default()
.allow_any_origin()
Expand Down
43 changes: 42 additions & 1 deletion src/services/emails.rs
Original file line number Diff line number Diff line change
@@ -1 +1,42 @@
//https://crates.io/crates/aws-sdk-sesv2
use aws_config::BehaviorVersion;
use aws_sdk_sesv2::types::{Body, Content, Destination, EmailContent, Message};

pub async fn send_email_with_aws_ses(
dest: &str,
subject: &str,
message: &str,
) -> anyhow::Result<()> {
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let client = aws_sdk_sesv2::Client::new(&config);

let mut destination: Destination = Destination::builder().build();
destination.to_addresses = Some(vec![dest.to_string()]);
let subject_content = Content::builder()
.data(subject)
.charset("UTF-8")
.build()
.expect("Building subject content");
let body_content = Content::builder()
.data(message)
.charset("UTF-8")
.build()
.expect("Building body content");
let body = Body::builder().text(body_content).build();

let msg = Message::builder()
.subject(subject_content)
.body(body)
.build();

let email_content = EmailContent::builder().simple(msg).build();

client
.send_email()
.from_email_address("[email protected]")
.destination(destination)
.content(email_content)
.send()
.await?;

Ok(())
}
5 changes: 4 additions & 1 deletion src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use actix_web::{
};
use mongodb::Client;

use crate::controllers::users::{create_user, get_user_by_email};
use crate::{
controllers::users::{create_user, get_user_by_email},
models::users,
};

use super::*;

Expand Down
3 changes: 1 addition & 2 deletions src/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use bson::Document;
use futures::Stream;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use std::time::{Duration, Instant};
use strum_macros::EnumIter;

use crate::{ProgramAppState, MAX_FRAME_SIZE};

Expand All @@ -19,7 +18,7 @@ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);

#[repr(u8)]
#[derive(Debug, IntoPrimitive, TryFromPrimitive, PartialEq, Eq, Clone, Copy, EnumIter)]
#[derive(Debug, IntoPrimitive, TryFromPrimitive, PartialEq, Eq, Clone, Copy)]
/// The kind of an incoming request via the websocket.
pub enum RequestId {
BasicCommand = 0,
Expand Down

0 comments on commit 9ae09e6

Please sign in to comment.