Skip to content

Commit

Permalink
Remove *Store traits
Browse files Browse the repository at this point in the history
It was an unnecessary level of indirection, look are all those removed `Arc`

Test Plan: Tests
  • Loading branch information
AhmedSoliman committed Jul 25, 2023
1 parent 0751032 commit e2d4bbe
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 281 deletions.
11 changes: 5 additions & 6 deletions cronback-services/src/api/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ impl From<AuthError> for ApiError {
}

pub struct Authenticator {
store: Box<dyn AuthStore + Send + Sync>,
store: AuthStore,
}

impl Authenticator {
pub fn new(store: Box<dyn AuthStore + Send + Sync>) -> Self {
pub fn new(store: AuthStore) -> Self {
Self { store }
}

Expand Down Expand Up @@ -235,7 +235,6 @@ mod tests {
use cronback_api_model::admin::CreateAPIkeyRequest;

use super::*;
use crate::api::auth_store::SqlAuthStore;
use crate::api::migrate_up;

#[test]
Expand Down Expand Up @@ -265,15 +264,15 @@ mod tests {
}

#[tokio::test]
async fn test_sql_auth_store() -> anyhow::Result<()> {
async fn test_auth_store() -> anyhow::Result<()> {
let db = Database::in_memory().await?;
migrate_up(&db).await?;
let store = SqlAuthStore::new(db);
let store = AuthStore::new(db);

let prj1 = ProjectId::generate();
let prj2 = ProjectId::generate();

let authenticator = Authenticator::new(Box::new(store));
let authenticator = Authenticator::new(store);

let key1 = authenticator
.gen_key(build_create_key_request("key1"), &prj1)
Expand Down
43 changes: 9 additions & 34 deletions cronback-services/src/api/auth_store.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,35 @@
use async_trait::async_trait;
use lib::prelude::*;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};

use crate::api::db_model::{api_keys, ApiKey, ApiKeys};

pub type AuthStoreError = DatabaseError;

#[async_trait]
pub trait AuthStore {
async fn save_key(&self, key: ApiKey) -> Result<(), AuthStoreError>;

async fn get_key(
&self,
key: &str,
) -> Result<Option<ApiKey>, AuthStoreError>;

/// Returns true if the key got deleted, false if the key didn't exist
async fn delete_key(
&self,
key_id: &str,
project: &ValidShardedId<ProjectId>,
) -> Result<bool, AuthStoreError>;

async fn list_keys(
&self,
project: &ValidShardedId<ProjectId>,
) -> Result<Vec<ApiKey>, AuthStoreError>;
}

pub struct SqlAuthStore {
#[derive(Clone)]
pub struct AuthStore {
db: Database,
}

impl SqlAuthStore {
impl AuthStore {
pub fn new(db: Database) -> Self {
Self { db }
}
}

#[async_trait]
impl AuthStore for SqlAuthStore {
async fn save_key(&self, key: ApiKey) -> Result<(), AuthStoreError> {
pub async fn save_key(&self, key: ApiKey) -> Result<(), AuthStoreError> {
let active_model: api_keys::ActiveModel = key.into();
ApiKeys::insert(active_model).exec(&self.db.orm).await?;
Ok(())
}

async fn get_key(
pub async fn get_key(
&self,
key_id: &str,
) -> Result<Option<ApiKey>, AuthStoreError> {
let res = ApiKeys::find_by_id(key_id).one(&self.db.orm).await?;
Ok(res)
}

async fn delete_key(
pub async fn delete_key(
&self,
key_id: &str,
project: &ValidShardedId<ProjectId>,
Expand All @@ -67,7 +42,7 @@ impl AuthStore for SqlAuthStore {
Ok(res.rows_affected > 0)
}

async fn list_keys(
pub async fn list_keys(
&self,
project: &ValidShardedId<ProjectId>,
) -> Result<Vec<ApiKey>, AuthStoreError> {
Expand Down Expand Up @@ -104,10 +79,10 @@ mod tests {
}

#[tokio::test]
async fn test_sql_auth_store() -> anyhow::Result<()> {
async fn test_auth_store() -> anyhow::Result<()> {
let db = Database::in_memory().await?;
migrate_up(&db).await?;
let store = SqlAuthStore::new(db);
let store = AuthStore::new(db);

let owner1 = ProjectId::generate();
let owner2 = ProjectId::generate();
Expand Down
6 changes: 2 additions & 4 deletions cronback-services/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::Arc;
use std::time::Instant;

use auth::Authenticator;
use auth_store::SqlAuthStore;
use auth_store::AuthStore;
use axum::extract::MatchedPath;
use axum::http::{Request, StatusCode};
use axum::middleware::{self, Next};
Expand Down Expand Up @@ -87,9 +87,7 @@ pub async fn start_api_server(
let shared_state = Arc::new(AppState {
_context: context.clone(),
config: config.clone(),
authenicator: Authenticator::new(Box::new(SqlAuthStore::new(
db.clone(),
))),
authenicator: Authenticator::new(AuthStore::new(db)),
scheduler_clients: Box::new(GrpcClientProvider::new(context.clone())),
dispatcher_clients: Box::new(GrpcClientProvider::new(context.clone())),
metadata_svc_clients: Box::new(GrpcClientProvider::new(
Expand Down
61 changes: 23 additions & 38 deletions cronback-services/src/dispatcher/attempt_store.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,43 @@
use async_trait::async_trait;
use lib::prelude::*;
#[cfg(test)]
use proto::common::PaginationIn;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter};

use super::db_model::{attempts, Attempt, Attempts};

pub type AttemptLogStoreError = DatabaseError;

#[async_trait]
pub trait AttemptLogStore {
async fn log_attempt(
&self,
attempt: Attempt,
) -> Result<(), AttemptLogStoreError>;
use sea_orm::ActiveModelTrait;
#[cfg(test)]
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};

async fn get_attempts_for_run(
&self,
project: &ValidShardedId<ProjectId>,
id: &RunId,
pagination: PaginationIn,
) -> Result<PaginatedResponse<Attempt>, AttemptLogStoreError>;
#[cfg(test)]
use super::db_model::Attempts;
use super::db_model::{attempts, Attempt};

async fn get_attempt(
&self,
project: &ValidShardedId<ProjectId>,
id: &AttemptId,
) -> Result<Option<Attempt>, AttemptLogStoreError>;
}
pub type AttemptStoreError = DatabaseError;

pub struct SqlAttemptLogStore {
#[derive(Clone)]
pub struct AttemptStore {
db: Database,
}

impl SqlAttemptLogStore {
impl AttemptStore {
pub fn new(db: Database) -> Self {
Self { db }
}
}

#[async_trait]
impl AttemptLogStore for SqlAttemptLogStore {
async fn log_attempt(
pub async fn log_attempt(
&self,
attempt: Attempt,
) -> Result<(), AttemptLogStoreError> {
) -> Result<(), AttemptStoreError> {
let active_model: attempts::ActiveModel = attempt.into();
active_model.insert(&self.db.orm).await?;
Ok(())
}

async fn get_attempts_for_run(
// Leaving this under cfg(test) until someone actually need it.
#[cfg(test)]
pub async fn get_attempts_for_run(
&self,
project: &ValidShardedId<ProjectId>,
id: &RunId,
pagination: PaginationIn,
) -> Result<PaginatedResponse<Attempt>, AttemptLogStoreError> {
) -> Result<PaginatedResponse<Attempt>, AttemptStoreError> {
let query = Attempts::find()
.filter(attempts::Column::RunId.eq(id.value()))
.filter(attempts::Column::ProjectId.eq(project.value()))
Expand All @@ -65,11 +48,13 @@ impl AttemptLogStore for SqlAttemptLogStore {
Ok(PaginatedResponse::paginate(res, &pagination))
}

async fn get_attempt(
// Leaving this under cfg(test) until someone actually need it.
#[cfg(test)]
pub async fn get_attempt(
&self,
project_id: &ValidShardedId<ProjectId>,
id: &AttemptId,
) -> Result<Option<Attempt>, AttemptLogStoreError> {
) -> Result<Option<Attempt>, AttemptStoreError> {
let res = Attempts::find_by_id((id.clone(), project_id.clone()))
.one(&self.db.orm)
.await?;
Expand Down Expand Up @@ -118,10 +103,10 @@ mod tests {
}

#[tokio::test]
async fn test_sql_trigger_store() -> anyhow::Result<()> {
async fn test_trigger_store() -> anyhow::Result<()> {
let db = Database::in_memory().await?;
migrate_up(&db).await?;
let store = SqlAttemptLogStore::new(db);
let store = AttemptStore::new(db);

let project = ProjectId::generate();
let project2 = ProjectId::generate();
Expand Down
27 changes: 13 additions & 14 deletions cronback-services/src/dispatcher/dispatch_manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::debug_assert;
use std::sync::Arc;
use std::time::Duration;

use chrono::Utc;
Expand All @@ -10,7 +9,7 @@ use proto::dispatcher_svc;
use thiserror::Error;
use tracing::{error, info};

use super::attempt_store::AttemptLogStore;
use super::attempt_store::AttemptStore;
use super::db_model::runs::RunStatus;
use super::db_model::Run;
use super::run_store::{RunStore, RunStoreError};
Expand All @@ -24,15 +23,15 @@ pub enum DispatcherManagerError {

pub struct DispatchManager {
_cell_id: u32,
attempt_store: Arc<dyn AttemptLogStore + Send + Sync>,
run_store: Arc<dyn RunStore + Send + Sync>,
attempt_store: AttemptStore,
run_store: RunStore,
}

impl DispatchManager {
pub fn new(
cell_id: u32,
run_store: Arc<dyn RunStore + Send + Sync>,
attempt_store: Arc<dyn AttemptLogStore + Send + Sync>,
run_store: RunStore,
attempt_store: AttemptStore,
) -> Self {
Self {
_cell_id: cell_id,
Expand All @@ -57,8 +56,8 @@ impl DispatchManager {
tokio::spawn(
RunJob::from(
r,
Arc::clone(&self.run_store),
Arc::clone(&self.attempt_store),
self.run_store.clone(),
self.attempt_store.clone(),
)
.run(),
);
Expand All @@ -76,8 +75,8 @@ impl DispatchManager {

let run_job = RunJob::from(
run,
Arc::clone(&self.run_store),
Arc::clone(&self.attempt_store),
self.run_store.clone(),
self.attempt_store.clone(),
);

Ok(match mode {
Expand All @@ -97,15 +96,15 @@ impl DispatchManager {

pub struct RunJob {
pub run: Run,
run_store: Arc<dyn RunStore + Send + Sync>,
attempt_store: Arc<dyn AttemptLogStore + Send + Sync>,
run_store: RunStore,
attempt_store: AttemptStore,
}

impl RunJob {
fn from(
run: Run,
run_store: Arc<dyn RunStore + Send + Sync>,
attempt_store: Arc<dyn AttemptLogStore + Send + Sync>,
run_store: RunStore,
attempt_store: AttemptStore,
) -> Self {
Self {
run,
Expand Down
6 changes: 2 additions & 4 deletions cronback-services/src/dispatcher/handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use chrono::Utc;
use futures::TryFutureExt;
use lib::prelude::*;
Expand Down Expand Up @@ -27,14 +25,14 @@ pub(crate) struct DispatcherSvcHandler {
#[allow(unused)]
context: ServiceContext,
dispatch_manager: DispatchManager,
run_store: Arc<dyn RunStore + Send + Sync>,
run_store: RunStore,
}

impl DispatcherSvcHandler {
pub fn new(
context: ServiceContext,
dispatch_manager: DispatchManager,
run_store: Arc<dyn RunStore + Send + Sync>,
run_store: RunStore,
) -> Self {
Self {
context,
Expand Down
14 changes: 5 additions & 9 deletions cronback-services/src/dispatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ mod retry;
mod run_store;
mod webhook_action;

use std::sync::Arc;

use attempt_store::{AttemptLogStore, SqlAttemptLogStore};
use attempt_store::AttemptStore;
use dispatch_manager::DispatchManager;
use lib::prelude::*;
use lib::{netutils, service};
use proto::dispatcher_svc::dispatcher_svc_server::DispatcherSvcServer;
use run_store::{RunStore, SqlRunStore};
use run_store::RunStore;
use sea_orm::TransactionTrait;
use sea_orm_migration::MigratorTrait;
use tracing::info;
Expand All @@ -41,16 +39,14 @@ pub async fn start_dispatcher_server(
let db = Database::connect(&config.dispatcher.database_uri).await?;
migrate_up(&db).await?;

let attempt_store: Arc<dyn AttemptLogStore + Send + Sync> =
Arc::new(SqlAttemptLogStore::new(db.clone()));
let attempt_store = AttemptStore::new(db.clone());

let run_store: Arc<dyn RunStore + Send + Sync> =
Arc::new(SqlRunStore::new(db));
let run_store = RunStore::new(db);

let dispatch_manager = DispatchManager::new(
config.dispatcher.cell_id,
run_store.clone(),
attempt_store.clone(),
attempt_store,
);
dispatch_manager.start().await?;

Expand Down
Loading

0 comments on commit e2d4bbe

Please sign in to comment.