Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove *Store traits #17

Merged
merged 1 commit into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions cronback-services/src/api/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ impl From<AuthError> for ApiError {
}

pub struct Authenticator {
store: Box<dyn AuthStore + Send + Sync>,
store: AuthStore,
}

impl Authenticator {
pub fn new(store: Box<dyn AuthStore + Send + Sync>) -> Self {
pub fn new(store: AuthStore) -> Self {
Self { store }
}

Expand Down Expand Up @@ -235,7 +235,6 @@ mod tests {
use cronback_api_model::admin::CreateAPIkeyRequest;

use super::*;
use crate::api::auth_store::SqlAuthStore;
use crate::api::migrate_up;

#[test]
Expand Down Expand Up @@ -265,15 +264,15 @@ mod tests {
}

#[tokio::test]
async fn test_sql_auth_store() -> anyhow::Result<()> {
async fn test_auth_store() -> anyhow::Result<()> {
let db = Database::in_memory().await?;
migrate_up(&db).await?;
let store = SqlAuthStore::new(db);
let store = AuthStore::new(db);

let prj1 = ProjectId::generate();
let prj2 = ProjectId::generate();

let authenticator = Authenticator::new(Box::new(store));
let authenticator = Authenticator::new(store);

let key1 = authenticator
.gen_key(build_create_key_request("key1"), &prj1)
Expand Down
43 changes: 9 additions & 34 deletions cronback-services/src/api/auth_store.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,35 @@
use async_trait::async_trait;
use lib::prelude::*;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};

use crate::api::db_model::{api_keys, ApiKey, ApiKeys};

pub type AuthStoreError = DatabaseError;

#[async_trait]
pub trait AuthStore {
async fn save_key(&self, key: ApiKey) -> Result<(), AuthStoreError>;

async fn get_key(
&self,
key: &str,
) -> Result<Option<ApiKey>, AuthStoreError>;

/// Returns true if the key got deleted, false if the key didn't exist
async fn delete_key(
&self,
key_id: &str,
project: &ValidShardedId<ProjectId>,
) -> Result<bool, AuthStoreError>;

async fn list_keys(
&self,
project: &ValidShardedId<ProjectId>,
) -> Result<Vec<ApiKey>, AuthStoreError>;
}

pub struct SqlAuthStore {
#[derive(Clone)]
pub struct AuthStore {
db: Database,
}

impl SqlAuthStore {
impl AuthStore {
pub fn new(db: Database) -> Self {
Self { db }
}
}

#[async_trait]
impl AuthStore for SqlAuthStore {
async fn save_key(&self, key: ApiKey) -> Result<(), AuthStoreError> {
pub async fn save_key(&self, key: ApiKey) -> Result<(), AuthStoreError> {
let active_model: api_keys::ActiveModel = key.into();
ApiKeys::insert(active_model).exec(&self.db.orm).await?;
Ok(())
}

async fn get_key(
pub async fn get_key(
&self,
key_id: &str,
) -> Result<Option<ApiKey>, AuthStoreError> {
let res = ApiKeys::find_by_id(key_id).one(&self.db.orm).await?;
Ok(res)
}

async fn delete_key(
pub async fn delete_key(
&self,
key_id: &str,
project: &ValidShardedId<ProjectId>,
Expand All @@ -67,7 +42,7 @@ impl AuthStore for SqlAuthStore {
Ok(res.rows_affected > 0)
}

async fn list_keys(
pub async fn list_keys(
&self,
project: &ValidShardedId<ProjectId>,
) -> Result<Vec<ApiKey>, AuthStoreError> {
Expand Down Expand Up @@ -104,10 +79,10 @@ mod tests {
}

#[tokio::test]
async fn test_sql_auth_store() -> anyhow::Result<()> {
async fn test_auth_store() -> anyhow::Result<()> {
let db = Database::in_memory().await?;
migrate_up(&db).await?;
let store = SqlAuthStore::new(db);
let store = AuthStore::new(db);

let owner1 = ProjectId::generate();
let owner2 = ProjectId::generate();
Expand Down
6 changes: 2 additions & 4 deletions cronback-services/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::Arc;
use std::time::Instant;

use auth::Authenticator;
use auth_store::SqlAuthStore;
use auth_store::AuthStore;
use axum::extract::MatchedPath;
use axum::http::{Request, StatusCode};
use axum::middleware::{self, Next};
Expand Down Expand Up @@ -87,9 +87,7 @@ pub async fn start_api_server(
let shared_state = Arc::new(AppState {
_context: context.clone(),
config: config.clone(),
authenicator: Authenticator::new(Box::new(SqlAuthStore::new(
db.clone(),
))),
authenicator: Authenticator::new(AuthStore::new(db)),
scheduler_clients: Box::new(GrpcClientProvider::new(context.clone())),
dispatcher_clients: Box::new(GrpcClientProvider::new(context.clone())),
metadata_svc_clients: Box::new(GrpcClientProvider::new(
Expand Down
61 changes: 23 additions & 38 deletions cronback-services/src/dispatcher/attempt_store.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,43 @@
use async_trait::async_trait;
use lib::prelude::*;
#[cfg(test)]
use proto::common::PaginationIn;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter};

use super::db_model::{attempts, Attempt, Attempts};

pub type AttemptLogStoreError = DatabaseError;

#[async_trait]
pub trait AttemptLogStore {
async fn log_attempt(
&self,
attempt: Attempt,
) -> Result<(), AttemptLogStoreError>;
use sea_orm::ActiveModelTrait;
#[cfg(test)]
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};

async fn get_attempts_for_run(
&self,
project: &ValidShardedId<ProjectId>,
id: &RunId,
pagination: PaginationIn,
) -> Result<PaginatedResponse<Attempt>, AttemptLogStoreError>;
#[cfg(test)]
use super::db_model::Attempts;
use super::db_model::{attempts, Attempt};

async fn get_attempt(
&self,
project: &ValidShardedId<ProjectId>,
id: &AttemptId,
) -> Result<Option<Attempt>, AttemptLogStoreError>;
}
pub type AttemptStoreError = DatabaseError;

pub struct SqlAttemptLogStore {
#[derive(Clone)]
pub struct AttemptStore {
db: Database,
}

impl SqlAttemptLogStore {
impl AttemptStore {
pub fn new(db: Database) -> Self {
Self { db }
}
}

#[async_trait]
impl AttemptLogStore for SqlAttemptLogStore {
async fn log_attempt(
pub async fn log_attempt(
&self,
attempt: Attempt,
) -> Result<(), AttemptLogStoreError> {
) -> Result<(), AttemptStoreError> {
let active_model: attempts::ActiveModel = attempt.into();
active_model.insert(&self.db.orm).await?;
Ok(())
}

async fn get_attempts_for_run(
// Leaving this under cfg(test) until someone actually need it.
#[cfg(test)]
AhmedSoliman marked this conversation as resolved.
Show resolved Hide resolved
pub async fn get_attempts_for_run(
&self,
project: &ValidShardedId<ProjectId>,
id: &RunId,
pagination: PaginationIn,
) -> Result<PaginatedResponse<Attempt>, AttemptLogStoreError> {
) -> Result<PaginatedResponse<Attempt>, AttemptStoreError> {
let query = Attempts::find()
.filter(attempts::Column::RunId.eq(id.value()))
.filter(attempts::Column::ProjectId.eq(project.value()))
Expand All @@ -65,11 +48,13 @@ impl AttemptLogStore for SqlAttemptLogStore {
Ok(PaginatedResponse::paginate(res, &pagination))
}

async fn get_attempt(
// Leaving this under cfg(test) until someone actually need it.
#[cfg(test)]
pub async fn get_attempt(
&self,
project_id: &ValidShardedId<ProjectId>,
id: &AttemptId,
) -> Result<Option<Attempt>, AttemptLogStoreError> {
) -> Result<Option<Attempt>, AttemptStoreError> {
let res = Attempts::find_by_id((id.clone(), project_id.clone()))
.one(&self.db.orm)
.await?;
Expand Down Expand Up @@ -118,10 +103,10 @@ mod tests {
}

#[tokio::test]
async fn test_sql_trigger_store() -> anyhow::Result<()> {
async fn test_trigger_store() -> anyhow::Result<()> {
let db = Database::in_memory().await?;
migrate_up(&db).await?;
let store = SqlAttemptLogStore::new(db);
let store = AttemptStore::new(db);

let project = ProjectId::generate();
let project2 = ProjectId::generate();
Expand Down
27 changes: 13 additions & 14 deletions cronback-services/src/dispatcher/dispatch_manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::debug_assert;
use std::sync::Arc;
use std::time::Duration;

use chrono::Utc;
Expand All @@ -10,7 +9,7 @@ use proto::dispatcher_svc;
use thiserror::Error;
use tracing::{error, info};

use super::attempt_store::AttemptLogStore;
use super::attempt_store::AttemptStore;
use super::db_model::runs::RunStatus;
use super::db_model::Run;
use super::run_store::{RunStore, RunStoreError};
Expand All @@ -24,15 +23,15 @@ pub enum DispatcherManagerError {

pub struct DispatchManager {
_cell_id: u32,
attempt_store: Arc<dyn AttemptLogStore + Send + Sync>,
run_store: Arc<dyn RunStore + Send + Sync>,
attempt_store: AttemptStore,
run_store: RunStore,
}

impl DispatchManager {
pub fn new(
cell_id: u32,
run_store: Arc<dyn RunStore + Send + Sync>,
attempt_store: Arc<dyn AttemptLogStore + Send + Sync>,
run_store: RunStore,
attempt_store: AttemptStore,
) -> Self {
Self {
_cell_id: cell_id,
Expand All @@ -57,8 +56,8 @@ impl DispatchManager {
tokio::spawn(
RunJob::from(
r,
Arc::clone(&self.run_store),
Arc::clone(&self.attempt_store),
self.run_store.clone(),
self.attempt_store.clone(),
)
.run(),
);
Expand All @@ -76,8 +75,8 @@ impl DispatchManager {

let run_job = RunJob::from(
run,
Arc::clone(&self.run_store),
Arc::clone(&self.attempt_store),
self.run_store.clone(),
self.attempt_store.clone(),
);

Ok(match mode {
Expand All @@ -97,15 +96,15 @@ impl DispatchManager {

pub struct RunJob {
pub run: Run,
run_store: Arc<dyn RunStore + Send + Sync>,
attempt_store: Arc<dyn AttemptLogStore + Send + Sync>,
run_store: RunStore,
attempt_store: AttemptStore,
}

impl RunJob {
fn from(
run: Run,
run_store: Arc<dyn RunStore + Send + Sync>,
attempt_store: Arc<dyn AttemptLogStore + Send + Sync>,
run_store: RunStore,
attempt_store: AttemptStore,
) -> Self {
Self {
run,
Expand Down
6 changes: 2 additions & 4 deletions cronback-services/src/dispatcher/handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use chrono::Utc;
use futures::TryFutureExt;
use lib::prelude::*;
Expand Down Expand Up @@ -27,14 +25,14 @@ pub(crate) struct DispatcherSvcHandler {
#[allow(unused)]
context: ServiceContext,
dispatch_manager: DispatchManager,
run_store: Arc<dyn RunStore + Send + Sync>,
run_store: RunStore,
}

impl DispatcherSvcHandler {
pub fn new(
context: ServiceContext,
dispatch_manager: DispatchManager,
run_store: Arc<dyn RunStore + Send + Sync>,
run_store: RunStore,
) -> Self {
Self {
context,
Expand Down
14 changes: 5 additions & 9 deletions cronback-services/src/dispatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ mod retry;
mod run_store;
mod webhook_action;

use std::sync::Arc;

use attempt_store::{AttemptLogStore, SqlAttemptLogStore};
use attempt_store::AttemptStore;
use dispatch_manager::DispatchManager;
use lib::prelude::*;
use lib::{netutils, service};
use proto::dispatcher_svc::dispatcher_svc_server::DispatcherSvcServer;
use run_store::{RunStore, SqlRunStore};
use run_store::RunStore;
use sea_orm::TransactionTrait;
use sea_orm_migration::MigratorTrait;
use tracing::info;
Expand All @@ -41,16 +39,14 @@ pub async fn start_dispatcher_server(
let db = Database::connect(&config.dispatcher.database_uri).await?;
migrate_up(&db).await?;

let attempt_store: Arc<dyn AttemptLogStore + Send + Sync> =
Arc::new(SqlAttemptLogStore::new(db.clone()));
let attempt_store = AttemptStore::new(db.clone());

let run_store: Arc<dyn RunStore + Send + Sync> =
Arc::new(SqlRunStore::new(db));
let run_store = RunStore::new(db);

let dispatch_manager = DispatchManager::new(
config.dispatcher.cell_id,
run_store.clone(),
attempt_store.clone(),
attempt_store,
);
dispatch_manager.start().await?;

Expand Down
Loading