Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC for ssd cache #3723

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f28c81c
SSD Cache prototype
imotov Jul 3, 2023
1d4dad2
Fix test_cache_storage_factory and cleanup
imotov Aug 8, 2023
44379e3
Clean up format
imotov Aug 9, 2023
ab5dd9b
Remove unnecessary dependencies
imotov Aug 9, 2023
4406858
Start cache storage service refactoring
imotov Aug 9, 2023
d66f6c8
Start cache storage controller cleanup
imotov Aug 10, 2023
14be853
Update quickwit/quickwit-control-plane/src/cache_storage_controller.rs
imotov Aug 12, 2023
028154b
Update quickwit/quickwit-control-plane/src/cache_storage_controller.rs
imotov Aug 12, 2023
859adb9
Update quickwit/quickwit-control-plane/src/cache_storage_controller.rs
imotov Aug 12, 2023
686d73d
Refactor cache storage service configuration
imotov Aug 11, 2023
46cc9eb
Start refactoring cache storage
imotov Aug 11, 2023
3ee114c
Fix format
imotov Aug 12, 2023
d324652
Fix build after main branch merge
imotov Aug 14, 2023
3a5198c
Add proper loading and deletion logic to cache storage
imotov Aug 16, 2023
090d6bb
Add cache storage communication flowchart.
imotov Aug 16, 2023
1636889
Enable cache storage stats
imotov Aug 17, 2023
3422581
Add cache cleanup when an index is deleted
imotov Aug 17, 2023
13a350f
Fix build after rebase
imotov Aug 17, 2023
5475b8e
Fix MetastoreError reference
imotov Aug 18, 2023
84db9e2
Fix cache storage grpc server
imotov Aug 19, 2023
93acedb
Fix format
imotov Aug 19, 2023
bd28308
Minor code cleanups.
fulmicoton Aug 21, 2023
d0f1818
Fix accesing cache storage that got broken during cleanup
imotov Aug 21, 2023
03b4328
Fix cached_splits_registry's test_basic_workflow test
imotov Aug 22, 2023
664e339
First step in refactoring of storage configuration
imotov Aug 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ resolver = "2"
members = [
"quickwit-actors",
"quickwit-aws",
"quickwit-cache-storage",
"quickwit-cli",
"quickwit-cluster",
"quickwit-codegen",
Expand Down Expand Up @@ -205,6 +206,7 @@ azure_storage_blobs = { version = "0.13.0", default-features = false, features =

quickwit-actors = { version = "0.6.3", path = "./quickwit-actors" }
quickwit-aws = { version = "0.6.3", path = "./quickwit-aws" }
quickwit-cache-storage = { version = "0.6.3", path = "./quickwit-cache-storage" }
quickwit-cluster = { version = "0.6.3", path = "./quickwit-cluster" }
quickwit-codegen = { version = "0.6.3", path = "./quickwit-codegen" }
quickwit-codegen-example = { version = "0.6.3", path = "./quickwit-codegen/example" }
Expand Down
47 changes: 47 additions & 0 deletions quickwit/quickwit-cache-storage/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[package]
name = "quickwit-cache-storage"
version = "0.6.3"
authors = ["Quickwit, Inc. <[email protected]>"]
edition = "2021"
license = "AGPL-3.0-or-later" # For a commercial, license, contact [email protected]
description = "Quickwit's cache storage implementation"
repository = "https://github.com/quickwit-oss/quickwit"
homepage = "https://quickwit.io/"
documentation = "https://quickwit.io/docs/"

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
mockall = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

quickwit-actors = { workspace = true }
quickwit-aws = { workspace = true }
quickwit-cluster = { workspace = true }
quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-storage = { workspace = true }

[dev-dependencies]
mockall = { workspace = true }
proptest = { workspace = true }
tokio = { workspace = true }
tracing-subscriber = { workspace = true }

quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true, features = ["testsuite"] }
quickwit-storage = { workspace = true, features = ["testsuite"] }

[build-dependencies]
quickwit-codegen = { workspace = true }

[features]
testsuite = [
"mockall",
]
40 changes: 40 additions & 0 deletions quickwit/quickwit-cache-storage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Cache Storage

Cache Storage provides a local storage cache for a subset of splits. The high-level interactions between components responsible for the local storage cache are depicted in the following diagram:

```mermaid
flowchart
subgraph MetastoreNode
Metastore --> MetastoreEventPublisher
MetastoreEventPublisher --MetastoreEvent::PublishSplit--> ControlPlaneEventSubscriber
ControlPlaneEventSubscriber --NotifySplitsChangeRequest--> ControlPlaneServiceClient
end

subgraph ControlPlaneNode
ControlPlaneServiceClient --NotifySplitsChangeRequest--> ControlPlaneService
ControlPlaneService --CacheUpdateRequest--> CacheStorageController
CacheStorageServicePool --CacheUpdateRequest--> CacheStorageController
CacheStorageController --NotifySplitsChangeRequest--> CacheStorageServiceClient
end

subgraph SearchNode
Metastore <-. list_all_metadata/list_splits .-> CacheStorageController
CacheStorageServiceClient --NotifySplitsChangeRequest--> CacheStorageService
CacheStorageService --update_split_cache--> CacheStorageFactory
CacheStorageFactory --bulk_update--> CachedSplitRegistry
CacheStorageFactory --resolve-->CacheStorage
CachedSplitRegistry --> CacheStorage
end

cluster[Cluster/ChitChat] --cluster_change_stream-->CacheStorageServicePool

```

The allocation of splits is controlled by the CacheStorageController that resides on the control plane nodes and receives notifications about newly published splits from the `Metastore` via `ControlPlaneService` as well as information about available search nodes from `Cluster` via cluster change stream.


On each notification the `CacheStorageController` requests a list of all indices. It then iterates through this list requesting available splits and allocating them to available nodes for caching (Search nodes that have cache configured).

All nodes that had their list of cached splits changed are getting notified via `NotifySplitsChangeRequest` that contains a full list of splits that that node is supposed to have allocated locally.

On the search nodes the `NotifySplitsChangeRequest` is handled by `CacheStorageService` that discovers the local `CacheStorageFactory` and notifies it about the changes. The `CacheStorageFactory` maintains a list of splits allocated locally, performs split downloads and clean up in `CachedSplitRegistry`. The `CacheStorageFactory` also makes the `CachedSplitRegistry` available to `CacheStorage`. For all read requests the `CacheStorage` checks if splits are available in the `CachedSplitRegistry` and services them from the local cache if possible.
187 changes: 187 additions & 0 deletions quickwit/quickwit-cache-storage/src/cache_storage_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright (C) 2023 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::fmt;

use anyhow::anyhow;
use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Universe};
use quickwit_common::uri::Uri;
use quickwit_config::NodeConfig;
use quickwit_proto::cache_storage::{
CacheStorageServiceClient, NotifySplitsChangeRequest, NotifySplitsChangeResponse,
SplitsChangeNotification,
};
use quickwit_proto::metastore::MetastoreError;
use quickwit_proto::{ServiceError, ServiceErrorCode};
use quickwit_storage::{
CacheStorageCounters, CachedSplitRegistry, StorageError, StorageResolver, StorageResolverError,
};
use thiserror::Error;
use tracing::{debug, error, info};

/// Name of the cache directory, usually located at `<data_dir_path>/searching`.

#[derive(Error, Debug)]
pub enum CacheStorageServiceError {
#[error("Failed to resolve the storage `{0}`.")]
StorageResolverError(#[from] StorageResolverError),
#[error("Storage error `{0}`.")]
StorageError(#[from] StorageError),
#[error("Metastore error `{0}`.")]
MetastoreError(#[from] MetastoreError),
#[error("Invalid params `{0}`.")]
InvalidParams(String),
}

impl ServiceError for CacheStorageServiceError {
fn status_code(&self) -> ServiceErrorCode {
match self {
Self::StorageResolverError(_) | Self::StorageError(_) => ServiceErrorCode::Internal,
Self::MetastoreError(_) => ServiceErrorCode::Internal,
Self::InvalidParams(_) => ServiceErrorCode::BadRequest,
}
}
}

pub struct CacheStorageService {
node_id: String,
storage_resolver: StorageResolver,
cached_split_registry: CachedSplitRegistry,
}

impl CacheStorageService {
pub async fn new(
node_id: String,
storage_resolver: StorageResolver,
) -> anyhow::Result<CacheStorageService> {
if let Some(cached_split_registry) = storage_resolver.cached_split_registry() {
Ok(Self {
node_id,
storage_resolver,
cached_split_registry,
})
} else {
Err(anyhow!(CacheStorageServiceError::InvalidParams(
"The cache storage factory is not available.".to_string()
)))
}
}

async fn handle_supervise(&mut self) -> Result<(), ActorExitStatus> {
Ok(())
}

pub async fn update_split_cache(
&self,
notifications: Vec<SplitsChangeNotification>,
) -> Result<(), ActorExitStatus> {
let mut splits: Vec<(String, String, Uri)> = Vec::new();
for notification in notifications {
splits.push((
notification.split_id,
notification.index_id,
notification.storage_uri.parse()?,
));
}
self.cached_split_registry
.bulk_update(&self.storage_resolver, &splits)
.await;
Ok(())
}
}

impl fmt::Debug for CacheStorageService {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter
.debug_struct("CacheStorageService")
.field("node_id", &self.node_id)
.finish()
}
}

#[derive(Debug)]
struct SuperviseLoop;

#[async_trait]
impl Handler<SuperviseLoop> for CacheStorageService {
type Reply = ();

async fn handle(
&mut self,
_message: SuperviseLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.handle_supervise().await?;
ctx.schedule_self_msg(*quickwit_actors::HEARTBEAT, SuperviseLoop)
.await;
Ok(())
}
}

#[async_trait]
impl Handler<NotifySplitsChangeRequest> for CacheStorageService {
type Reply = quickwit_proto::cache_storage::Result<NotifySplitsChangeResponse>;

async fn handle(
&mut self,
request: NotifySplitsChangeRequest,
_: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
debug!("Split change notification: schedule indexing plan.");
self.update_split_cache(request.splits_change).await?;
Ok(Ok(NotifySplitsChangeResponse {}))
}
}

#[async_trait]
impl Actor for CacheStorageService {
type ObservableState = CacheStorageCounters;

fn observable_state(&self) -> Self::ObservableState {
self.cached_split_registry.counters()
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
self.handle(SuperviseLoop, ctx).await
}
}

pub async fn start_cache_storage_service(
universe: &Universe,
config: &NodeConfig,
storage_resolver: StorageResolver,
) -> anyhow::Result<Option<CacheStorageServiceClient>> {
if !config.is_cache_storage_enabled() {
return Ok(None);
}
info!("Starting cache storage service.");
// Spawn indexing service.
let cache_storage_service =
CacheStorageService::new(config.node_id.clone(), storage_resolver).await?;
let (cache_storage_service, _) = universe.spawn_builder().spawn(cache_storage_service);
let cache_storage_service_client =
CacheStorageServiceClient::from_mailbox(cache_storage_service);
Ok(Some(cache_storage_service_client))
}
#[cfg(test)]
mod tests {
#[tokio::test]
async fn test() {}
}
22 changes: 22 additions & 0 deletions quickwit/quickwit-cache-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (C) 2023 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

mod cache_storage_service;

pub use crate::cache_storage_service::{start_cache_storage_service, CacheStorageService};
Loading
Loading