Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

H-3385: Allow updating the data type cache in the database #5285

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion apps/hash-graph/bins/cli/src/subcommand/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod completions;
mod migrate;
mod reindex_cache;
mod server;
mod snapshot;
#[cfg(feature = "test-server")]
Expand All @@ -21,7 +22,10 @@ pub use self::{
snapshot::{SnapshotArgs, snapshot},
type_fetcher::{TypeFetcherArgs, type_fetcher},
};
use crate::error::{GraphError, HealthcheckError};
use crate::{
error::{GraphError, HealthcheckError},
subcommand::reindex_cache::{ReindexCacheArgs, reindex_cache},
};

/// Subcommand for the program.
#[derive(Debug, clap::Subcommand)]
Expand All @@ -36,6 +40,11 @@ pub enum Subcommand {
Completions(CompletionsArgs),
/// Snapshot API for the database.
Snapshot(SnapshotArgs),
/// Re-indexes the cache.
///
/// This is only needed if the backend was changed in an uncommon way such as schemas being
/// updated in place. This is a rare operation and should be avoided if possible.
ReindexCache(ReindexCacheArgs),
/// Test server
#[cfg(feature = "test-server")]
TestServer(TestServerArgs),
Expand Down Expand Up @@ -69,6 +78,7 @@ impl Subcommand {
Ok(())
}
Self::Snapshot(args) => block_on(snapshot(args), tracing_config),
Self::ReindexCache(args) => block_on(reindex_cache(args), tracing_config),
#[cfg(feature = "test-server")]
Self::TestServer(args) => block_on(test_server(args), tracing_config),
}
Expand Down
71 changes: 71 additions & 0 deletions apps/hash-graph/bins/cli/src/subcommand/reindex_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use authorization::NoAuthorization;
use clap::Parser;
use error_stack::{Report, Result, ResultExt, ensure};
use graph::store::{
DataTypeStore, DatabaseConnectionInfo, DatabasePoolConfig, PostgresStorePool, StorePool,
};
use tokio_postgres::NoTls;

use crate::error::GraphError;

#[derive(Debug, Parser)]
#[clap(version, author, about, long_about = None)]
pub struct ReindexCacheArgs {
#[clap(flatten)]
pub db_info: DatabaseConnectionInfo,

#[clap(flatten)]
pub pool_config: DatabasePoolConfig,

#[clap(flatten)]
pub operations: ReindexOperations,
}

#[derive(Debug, Parser)]
#[clap(version, author, about, long_about = None, next_help_heading = Some("Reindex operations"))]
pub struct ReindexOperations {
/// Reindex data types cache
#[clap(long)]
pub data_types: bool,
}

pub async fn reindex_cache(args: ReindexCacheArgs) -> Result<(), GraphError> {
let pool = PostgresStorePool::new(&args.db_info, &args.pool_config, NoTls)
.await
.change_context(GraphError)
.map_err(|report| {
tracing::error!(error = ?report, "Failed to connect to database");
report
})?;

let mut store = pool
.acquire(NoAuthorization, None)
.await
.change_context(GraphError)
.map_err(|report| {
tracing::error!(error = ?report, "Failed to acquire database connection");
report
})?;

let mut did_something = false;

if args.operations.data_types {
did_something = true;
DataTypeStore::reindex_cache(&mut store)
.await
.change_context(GraphError)
.map_err(|report| {
tracing::error!(error = ?report, "Failed to reindex data type cache");
report
})?;
}

ensure!(
did_something,
Report::new(GraphError).attach_printable(
"No reindex operation was requested. See --help for more information."
)
);

Ok(())
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use alloc::sync::Arc;
use std::collections::HashMap;

use async_trait::async_trait;
use authorization::{backend::ZanzibarBackend, schema::DataTypeRelationAndSubject};
use authorization::{
AuthorizationApi, backend::ZanzibarBackend, schema::DataTypeRelationAndSubject,
};
use error_stack::{Result, ResultExt};
use graph_types::ontology::{DataTypeId, DataTypeWithMetadata};
use hash_graph_store::filter::Filter;
use graph_types::ontology::DataTypeId;
use tokio_postgres::GenericClient;
use type_system::schema::OntologyTypeResolver;

use crate::{
snapshot::WriteBatch,
store::{
AsClient, InsertionError, PostgresStore,
crud::Read,
AsClient, DataTypeStore, InsertionError, PostgresStore,
postgres::query::rows::{DataTypeConversionsRow, DataTypeEmbeddingRow, DataTypeRow},
},
};
Expand All @@ -29,7 +27,7 @@ pub enum DataTypeRowBatch {
impl<C, A> WriteBatch<C, A> for DataTypeRowBatch
where
C: AsClient,
A: ZanzibarBackend + Send + Sync,
A: AuthorizationApi + ZanzibarBackend + Send + Sync,
{
async fn begin(postgres_client: &mut PostgresStore<C, A>) -> Result<(), InsertionError> {
postgres_client
Expand Down Expand Up @@ -151,33 +149,9 @@ where
.await
.change_context(InsertionError)?;

let mut ontology_type_resolver = OntologyTypeResolver::default();

let data_types = Read::<DataTypeWithMetadata>::read_vec(
postgres_client,
&Filter::All(Vec::new()),
None,
true,
)
.await
.change_context(InsertionError)?
.into_iter()
.map(|data_type| {
let data_type = Arc::new(data_type.schema);
ontology_type_resolver.add_open(Arc::clone(&data_type));
data_type
})
.collect::<Vec<_>>();

for data_type in &data_types {
let schema_metadata = ontology_type_resolver
.resolve_data_type_metadata(&data_type.id)
.change_context(InsertionError)?;

postgres_client
.insert_data_type_references(DataTypeId::from_url(&data_type.id), &schema_metadata)
.await?;
}
<PostgresStore<C, A> as DataTypeStore>::reindex_cache(postgres_client)
.await
.change_context(InsertionError)?;

Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions apps/hash-graph/libs/graph/src/store/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,10 @@ where
.update_data_type_embeddings(actor_id, params)
.await
}

async fn reindex_cache(&mut self) -> Result<(), UpdateError> {
self.store.reindex_cache().await
}
}

impl<S, A> PropertyTypeStore for FetchingStore<S, A>
Expand Down
10 changes: 10 additions & 0 deletions apps/hash-graph/libs/graph/src/store/ontology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@ pub trait DataTypeStore {

params: UpdateDataTypeEmbeddingParams<'_>,
) -> impl Future<Output = Result<(), UpdateError>> + Send;

/// Re-indexes the cache for data types.
///
/// This is only needed if the schema of a data type has changed in place without bumping
/// the version. This is a rare operation and should be avoided if possible.
///
/// # Errors
///
/// - if re-indexing the cache fails.
fn reindex_cache(&mut self) -> impl Future<Output = Result<(), UpdateError>> + Send;
}

#[derive(Debug, Deserialize)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,56 @@ where

Ok(())
}

#[tracing::instrument(level = "info", skip(self))]
async fn reindex_cache(&mut self) -> Result<(), UpdateError> {
tracing::info!("Reindexing data type cache");
let transaction = self.transaction().await.change_context(UpdateError)?;

// We remove the data from the reference tables first
transaction
.as_client()
.simple_query(
"
DELETE FROM data_type_inherits_from;
",
)
.await
.change_context(UpdateError)?;

let mut ontology_type_resolver = OntologyTypeResolver::default();

let data_types = Read::<DataTypeWithMetadata>::read_vec(
&transaction,
&Filter::All(Vec::new()),
None,
true,
)
.await
.change_context(UpdateError)?
.into_iter()
.map(|data_type| {
let schema = Arc::new(data_type.schema);
ontology_type_resolver.add_open(Arc::clone(&schema));
schema
})
.collect::<Vec<_>>();

for data_type in &data_types {
let schema_metadata = ontology_type_resolver
.resolve_data_type_metadata(&data_type.id)
.change_context(UpdateError)?;

transaction
.insert_data_type_references(DataTypeId::from_url(&data_type.id), &schema_metadata)
.await
.change_context(UpdateError)?;
}

transaction.commit().await.change_context(UpdateError)?;

Ok(())
}
}

#[derive(Debug, Copy, Clone)]
Expand Down
4 changes: 4 additions & 0 deletions tests/hash-graph-integration/postgres/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ impl<A: AuthorizationApi> DataTypeStore for DatabaseApi<'_, A> {
.update_data_type_embeddings(actor_id, params)
.await
}

async fn reindex_cache(&mut self) -> Result<(), UpdateError> {
self.store.reindex_cache().await
}
}

impl<A: AuthorizationApi> PropertyTypeStore for DatabaseApi<'_, A> {
Expand Down
Loading