From d47b6bd74a94565bdcfa0dbb13492a0bfb37ce8a Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 10 Jan 2024 11:02:09 +0800 Subject: [PATCH] feat: Introduce Google Cloud Storage Implementation (#4344) Introduce native support for GCS using OpenDAL Closes #4236 Signed-off-by: Xuanwo --- .github/workflows/coverage.yml | 5 + docker-compose.yml | 13 ++ quickwit/Cargo.lock | 141 +++++++++++- quickwit/Cargo.toml | 3 + quickwit/quickwit-common/src/uri.rs | 60 ++++- quickwit/quickwit-config/src/lib.rs | 4 +- .../quickwit-config/src/storage_config.rs | 68 +++++- quickwit/quickwit-storage/Cargo.toml | 12 + quickwit/quickwit-storage/src/lib.rs | 7 + .../src/opendal_storage/base.rs | 217 ++++++++++++++++++ .../opendal_storage/google_cloud_storage.rs | 158 +++++++++++++ .../src/opendal_storage/mod.rs | 26 +++ .../quickwit-storage/src/storage_resolver.rs | 18 ++ .../tests/google_cloud_storage.rs | 49 ++++ 14 files changed, 772 insertions(+), 9 deletions(-) create mode 100644 quickwit/quickwit-storage/src/opendal_storage/base.rs create mode 100644 quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs create mode 100644 quickwit/quickwit-storage/src/opendal_storage/mod.rs create mode 100644 quickwit/quickwit-storage/tests/google_cloud_storage.rs diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index dca91d9871d..3b8a1a914b1 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -146,6 +146,11 @@ jobs: - name: Run Azurite service run: DOCKER_SERVICES=azurite make docker-compose-up + # GitHub Actions does not allow services to be started with a custom command, + # so we are running fake gcs server as a container manually. + - name: Run Fake GCS Server service + run: DOCKER_SERVICES=fake-gcs-server make docker-compose-up + - name: Run Pulsar service run: DOCKER_SERVICES=pulsar make docker-compose-up diff --git a/docker-compose.yml b/docker-compose.yml index 884cd1e83f1..f0f35963f4f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -148,6 +148,18 @@ services: - azurite_data:/data command: azurite --blobHost 0.0.0.0 --loose + fake-gcs-server: + image: fsouza/fake-gcs-server:${FAKE_GCS_SERVER_VERSION:-1.47.7} + container_name: fake-gcs-server + ports: + - "${MAP_HOST_FAKE_GCS_SERVER:-127.0.0.1}:4443:4443" # Blob store port + profiles: + - all + - fake-gcs-server + volumes: + - fake_gcs_server_data:/data/sample-bucket + command: -scheme http + grafana: image: grafana/grafana-oss:${GRAFANA_VERSION:-9.4.7} container_name: grafana @@ -223,3 +235,4 @@ volumes: localstack_data: postgres_data: azurite_data: + fake_gcs_server_data: diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 58befb0bde0..2e0c3225789 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -258,6 +258,19 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-compat" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f68a707c1feb095d8c07f8a65b9f506b117d30af431cab89374357de7c11461b" +dependencies = [ + "futures-core", + "futures-io", + "once_cell", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-compression" version = "0.4.5" @@ -806,7 +819,7 @@ dependencies = [ "log", "paste", "pin-project", - "quick-xml", + "quick-xml 0.29.0", "rand 0.8.5", "reqwest", "rustc_version", @@ -874,6 +887,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "backon" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "pin-project", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -2224,6 +2249,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flagset" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a7e408202050813e6f1d9addadcaafef3dca7530c7ddfb005d4081cce6779" + [[package]] name = "flate2" version = "1.0.28" @@ -2499,7 +2530,7 @@ dependencies = [ "google-cloud-metadata", "google-cloud-token", "home", - "jsonwebtoken", + "jsonwebtoken 8.3.0", "reqwest", "serde", "serde_json", @@ -2521,7 +2552,7 @@ dependencies = [ "google-cloud-metadata", "google-cloud-token", "home", - "jsonwebtoken", + "jsonwebtoken 8.3.0", "reqwest", "serde", "serde_json", @@ -3203,13 +3234,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" dependencies = [ "base64 0.21.5", - "pem", + "pem 1.1.1", "ring 0.16.20", "serde", "serde_json", "simple_asn1", ] +[[package]] +name = "jsonwebtoken" +version = "9.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4" +dependencies = [ + "base64 0.21.5", + "js-sys", + "pem 3.0.3", + "ring 0.17.5", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "keccak" version = "0.1.4" @@ -4138,6 +4184,38 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "opendal" +version = "0.44.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c32736a48ef08a5d2212864e2295c8e54f4d6b352b7f49aa0c29a12fc410ff66" +dependencies = [ + "anyhow", + "async-compat", + "async-trait", + "backon", + "base64 0.21.5", + "bytes", + "chrono", + "flagset", + "futures", + "getrandom 0.2.11", + "http", + "log", + "md-5", + "once_cell", + "parking_lot", + "percent-encoding", + "pin-project", + "quick-xml 0.30.0", + "reqsign", + "reqwest", + "serde", + "serde_json", + "tokio", + "uuid", +] + [[package]] name = "openidconnect" version = "2.5.1" @@ -4485,6 +4563,16 @@ dependencies = [ "base64 0.13.1", ] +[[package]] +name = "pem" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" +dependencies = [ + "base64 0.21.5", + "serde", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -5104,7 +5192,7 @@ dependencies = [ "nom", "oauth2", "openidconnect", - "pem", + "pem 1.1.1", "prost", "prost-build", "prost-derive", @@ -5137,6 +5225,16 @@ dependencies = [ "serde", ] +[[package]] +name = "quick-xml" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quickwit-actors" version = "0.7.0" @@ -6036,6 +6134,7 @@ dependencies = [ "md5", "mockall", "once_cell", + "opendal", "proptest", "quickwit-aws", "quickwit-common", @@ -6043,6 +6142,8 @@ dependencies = [ "quickwit-proto", "rand 0.8.5", "regex", + "reqsign", + "reqwest", "serde", "serde_json", "tantivy", @@ -6327,6 +6428,36 @@ dependencies = [ "bytecheck", ] +[[package]] +name = "reqsign" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce87f66ba6c6acef277a729f989a0eca946cb9ce6a15bcc036bda0f72d4b9fd" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.21.5", + "chrono", + "form_urlencoded", + "getrandom 0.2.11", + "hex", + "hmac", + "home", + "http", + "jsonwebtoken 9.2.0", + "log", + "once_cell", + "percent-encoding", + "rand 0.8.5", + "reqwest", + "rsa", + "serde", + "serde_json", + "sha1", + "sha2", + "tokio", +] + [[package]] name = "reqwest" version = "0.11.22" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 6141b122f6c..92b82686235 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -247,6 +247,9 @@ azure_storage_blobs = { version = "0.13.0", default-features = false, features = "enable_reqwest_rustls", ] } +opendal = { version = "0.44", default-features = false } +reqsign = { version = "0.14", default-features = false } + quickwit-actors = { version = "0.7.0", path = "./quickwit-actors" } quickwit-aws = { version = "0.7.0", path = "./quickwit-aws" } quickwit-cluster = { version = "0.7.0", path = "./quickwit-cluster" } diff --git a/quickwit/quickwit-common/src/uri.rs b/quickwit/quickwit-common/src/uri.rs index 1f9d2f3e399..78959afc674 100644 --- a/quickwit/quickwit-common/src/uri.rs +++ b/quickwit/quickwit-common/src/uri.rs @@ -42,6 +42,7 @@ pub enum Protocol { PostgreSQL = 5, Ram = 6, S3 = 7, + Google = 8, } impl Protocol { @@ -54,6 +55,7 @@ impl Protocol { Protocol::PostgreSQL => "postgresql", Protocol::Ram => "ram", Protocol::S3 => "s3", + Protocol::Google => "gs", } } @@ -66,7 +68,7 @@ impl Protocol { } pub fn is_object_storage(&self) -> bool { - matches!(&self, Protocol::Azure | Protocol::S3) + matches!(&self, Protocol::Azure | Protocol::S3 | Protocol::Google) } pub fn is_database(&self) -> bool { @@ -92,6 +94,7 @@ impl FromStr for Protocol { "pg" | "postgres" | "postgresql" => Ok(Protocol::PostgreSQL), "ram" => Ok(Protocol::Ram), "s3" => Ok(Protocol::S3), + "gs" => Ok(Protocol::Google), _ => bail!("unknown URI protocol `{protocol}`"), } } @@ -185,6 +188,9 @@ impl Uri { if protocol == Protocol::Azure && path.components().count() < 3 { return None; } + if protocol == Protocol::Google && path.components().count() < 2 { + return None; + } let parent_path = path.parent()?; Some(Self { @@ -210,6 +216,9 @@ impl Uri { if self.protocol() == Protocol::Azure && path.components().count() < 3 { return None; } + if self.protocol() == Protocol::Google && path.components().count() < 2 { + return None; + } path.file_name().map(Path::new) } @@ -473,6 +482,14 @@ mod tests { Uri::from_str("azure://account/container/homer/docs/../dognuts").unwrap(), "azure://account/container/homer/docs/../dognuts" ); + assert_eq!( + Uri::from_str("gs://bucket/docs/dognuts").unwrap(), + "gs://bucket/docs/dognuts" + ); + assert_eq!( + Uri::from_str("gs://bucket/homer/docs/../dognuts").unwrap(), + "gs://bucket/homer/docs/../dognuts" + ); assert_eq!( Uri::from_str("actor://localhost:7281/an-actor-id").unwrap(), "actor://localhost:7281/an-actor-id" @@ -495,6 +512,10 @@ mod tests { Uri::for_test("azure://account/bucket/key").protocol(), Protocol::Azure ); + assert_eq!( + Uri::for_test("gs://bucket/key").protocol(), + Protocol::Google + ); assert_eq!( Uri::for_test("postgres://localhost:5432/metastore").protocol(), Protocol::PostgreSQL @@ -547,6 +568,10 @@ mod tests { .unwrap(), "azure://account/container/key" ); + assert_eq!( + Uri::for_test("gs://bucket").join("key").unwrap(), + "gs://bucket/key" + ); Uri::for_test("s3://bucket/").join("/key").unwrap_err(); Uri::for_test("azure://account/container/") .join("/key") @@ -620,6 +645,24 @@ mod tests { .unwrap(), "azure://account/container/foo" ); + assert!(Uri::for_test("gs://bucket").parent().is_none()); + assert!(Uri::for_test("gs://bucket/").parent().is_none()); + assert_eq!( + Uri::for_test("gs://bucket/foo").parent().unwrap(), + "gs://bucket" + ); + assert_eq!( + Uri::for_test("gs://bucket/foo/").parent().unwrap(), + "gs://bucket" + ); + assert_eq!( + Uri::for_test("gs://bucket/foo/bar").parent().unwrap(), + "gs://bucket/foo" + ); + assert_eq!( + Uri::for_test("gs://bucket/foo/bar/").parent().unwrap(), + "gs://bucket/foo" + ); } #[test] @@ -676,6 +719,16 @@ mod tests { .unwrap(), Path::new("foo"), ); + assert!(Uri::for_test("gs://bucket").file_name().is_none()); + assert!(Uri::for_test("gs://bucket/").file_name().is_none()); + assert_eq!( + Uri::for_test("gs://bucket/foo").file_name().unwrap(), + Path::new("foo"), + ); + assert_eq!( + Uri::for_test("gs://bucket/foo/").file_name().unwrap(), + Path::new("foo"), + ); } #[test] @@ -700,6 +753,7 @@ mod tests { assert!(Uri::for_test("azure://account/container/foo.json") .filepath() .is_none()); + assert!(Uri::for_test("gs://bucket/").filepath().is_none()); } #[test] @@ -712,6 +766,10 @@ mod tests { Uri::for_test("azure://account/container/key").as_redacted_str(), "azure://account/container/key" ); + assert_eq!( + Uri::for_test("gs://bucket/key").as_redacted_str(), + "gs://bucket/key" + ); assert_eq!( Uri::for_test("postgres://localhost:5432/metastore").as_redacted_str(), "postgresql://localhost:5432/metastore" diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 0bcde6528c8..13fe40c4976 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -69,8 +69,8 @@ pub use crate::node_config::{ }; use crate::source_config::serialize::{SourceConfigV0_7, VersionedSourceConfig}; pub use crate::storage_config::{ - AzureStorageConfig, FileStorageConfig, RamStorageConfig, S3StorageConfig, StorageBackend, - StorageBackendFlavor, StorageConfig, StorageConfigs, + AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, + S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, }; #[derive(utoipa::OpenApi)] diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index ea5ace0d9a0..fe06fa6117e 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -37,6 +37,8 @@ pub enum StorageBackend { Ram, /// Amazon S3 or S3-compatible storage S3, + /// Google Cloud Storage + Google, } #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] @@ -114,6 +116,15 @@ impl StorageConfigs { }) } + pub fn find_google(&self) -> Option<&GoogleCloudStorageConfig> { + self.0 + .iter() + .find_map(|storage_config| match storage_config { + StorageConfig::Google(google_storage_config) => Some(google_storage_config), + _ => None, + }) + } + pub fn find_file(&self) -> Option<&FileStorageConfig> { self.0 .iter() @@ -157,13 +168,14 @@ pub enum StorageConfig { File(FileStorageConfig), Ram(RamStorageConfig), S3(S3StorageConfig), + Google(GoogleCloudStorageConfig), } impl StorageConfig { pub fn redact(&mut self) { match self { Self::Azure(azure_storage_config) => azure_storage_config.redact(), - Self::File(_) | Self::Ram(_) => {} + Self::File(_) | Self::Ram(_) | Self::Google(_) => {} Self::S3(s3_storage_config) => s3_storage_config.redact(), } } @@ -195,6 +207,13 @@ impl StorageConfig { _ => None, } } + + pub fn as_google(&self) -> Option<&GoogleCloudStorageConfig> { + match self { + Self::Google(google_cloud_storage_config) => Some(google_cloud_storage_config), + _ => None, + } + } } impl From for StorageConfig { @@ -221,6 +240,12 @@ impl From for StorageConfig { } } +impl From for StorageConfig { + fn from(google_cloud_storage_config: GoogleCloudStorageConfig) -> Self { + Self::Google(google_cloud_storage_config) + } +} + impl StorageConfig { pub fn backend(&self) -> StorageBackend { match self { @@ -228,6 +253,7 @@ impl StorageConfig { Self::File(_) => StorageBackend::File, Self::Ram(_) => StorageBackend::Ram, Self::S3(_) => StorageBackend::S3, + Self::Google(_) => StorageBackend::Google, } } } @@ -374,6 +400,27 @@ pub struct FileStorageConfig; #[serde(deny_unknown_fields)] pub struct RamStorageConfig; +#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct GoogleCloudStorageConfig { + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub credential_path: Option, +} + +impl GoogleCloudStorageConfig { + pub const GOOGLE_CLOUD_STORAGE_CREDENTIAL_PATH_ENV_VAR: &'static str = + "QW_GOOGLE_CLOUD_STORAGE_CREDENTIAL_PATH"; + + /// Attempts to find the credential path in the environment variable + /// `QW_GOOGLE_CLOUD_STORAGE_CREDENTIAL_PATH` or the config. + pub fn resolve_credential_path(&self) -> Option { + env::var(Self::GOOGLE_CLOUD_STORAGE_CREDENTIAL_PATH_ENV_VAR) + .ok() + .or_else(|| self.credential_path.clone()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -533,6 +580,25 @@ mod tests { } } + #[test] + fn test_storage_google_config_serde() { + { + let google_cloud_storage_config_yaml = r#" + credential_path: /path/to/credential.json + "#; + let google_cloud_storage_config: GoogleCloudStorageConfig = + serde_yaml::from_str(google_cloud_storage_config_yaml).unwrap(); + + let expected_google_cloud_storage_config = GoogleCloudStorageConfig { + credential_path: Some("/path/to/credential.json".to_string()), + }; + assert_eq!( + google_cloud_storage_config, + expected_google_cloud_storage_config + ); + } + } + #[test] fn test_storage_s3_config_serde() { { diff --git a/quickwit/quickwit-storage/Cargo.toml b/quickwit/quickwit-storage/Cargo.toml index c798ffc3ffe..7672e758ef8 100644 --- a/quickwit/quickwit-storage/Cargo.toml +++ b/quickwit/quickwit-storage/Cargo.toml @@ -50,6 +50,10 @@ quickwit-common = { workspace = true } quickwit-config = { workspace = true } quickwit-proto = { workspace = true } +opendal = { workspace = true, optional = true } +reqsign = { workspace = true, optional = true } +reqwest = { workspace = true, optional = true } + [dev-dependencies] mockall = { workspace = true } proptest = { workspace = true } @@ -70,11 +74,19 @@ azure = [ "azure_storage/enable_reqwest_rustls", "azure_storage_blobs/enable_reqwest_rustls", ] +google = [ + "dep:opendal", + "opendal/services-gcs" +] ci-test = [] integration-testsuite = [ "azure", "azure_core/azurite_workaround", "azure_storage_blobs/azurite_workaround", + "google", + "dep:reqsign", + "reqsign/services-google", + "dep:reqwest" ] testsuite = [ "mockall", diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index e159a97f1b7..dbeee5134a1 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -43,8 +43,11 @@ pub use self::storage::Storage; mod bundle_storage; mod error; + mod local_file_storage; mod object_storage; +#[cfg(feature = "google")] +mod opendal_storage; mod payload; mod prefix_storage; mod ram_storage; @@ -71,6 +74,10 @@ pub use self::object_storage::{AzureBlobStorage, AzureBlobStorageFactory}; pub use self::object_storage::{ MultiPartPolicy, S3CompatibleObjectStorage, S3CompatibleObjectStorageFactory, }; +#[cfg(any(feature = "google", feature = "integration-testsuite"))] +pub use self::opendal_storage::new_emulated_google_cloud_storage; +#[cfg(feature = "google")] +pub use self::opendal_storage::GoogleCloudStorageFactory; pub use self::ram_storage::{RamStorage, RamStorageBuilder}; pub use self::split::{SplitPayload, SplitPayloadBuilder}; #[cfg(any(test, feature = "testsuite"))] diff --git a/quickwit/quickwit-storage/src/opendal_storage/base.rs b/quickwit/quickwit-storage/src/opendal_storage/base.rs new file mode 100644 index 00000000000..ae3276ea5bb --- /dev/null +++ b/quickwit/quickwit-storage/src/opendal_storage/base.rs @@ -0,0 +1,217 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::fmt; +use std::ops::Range; +use std::path::{Path, PathBuf}; + +use async_trait::async_trait; +use bytesize::ByteSize; +use opendal::Operator; +use quickwit_common::uri::Uri; +use tokio::io::{AsyncRead, AsyncWriteExt}; + +use crate::storage::SendableAsync; +use crate::{ + BulkDeleteError, DeleteFailure, OwnedBytes, PutPayload, Storage, StorageError, + StorageErrorKind, StorageResolverError, StorageResult, +}; + +/// OpenDAL based storage implementation. +/// # TODO +/// +/// - Implement REQUEST_SEMAPHORE to control the concurrency. +/// - Implement STORAGE_METRICS for metrics. +/// - Add multipart_policy to control write at once or via multiple. +#[derive(Clone)] +pub struct OpendalStorage { + uri: Uri, + op: Operator, +} + +impl fmt::Debug for OpendalStorage { + fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter + .debug_struct("OpendalStorage") + .field("operator", &self.op.info()) + .finish() + } +} + +impl OpendalStorage { + /// Create a new google cloud storage. + pub fn new_google_cloud_storage( + uri: Uri, + cfg: opendal::services::Gcs, + ) -> Result { + let op = Operator::new(cfg)?.finish(); + Ok(Self { uri, op }) + } +} + +#[async_trait] +impl Storage for OpendalStorage { + async fn check_connectivity(&self) -> anyhow::Result<()> { + self.op.check().await?; + Ok(()) + } + + /// # TODO + /// + /// We can implement something like `multipart_policy` determine whether to use copy. + /// If the payload is small enough, we can call `op.write()` at once. + async fn put(&self, path: &Path, payload: Box) -> StorageResult<()> { + let path = path.as_os_str().to_string_lossy(); + let mut payload_reader = payload.byte_stream().await?.into_async_read(); + + let mut storage_writer = self + .op + .writer_with(&path) + .buffer(ByteSize::mb(8).as_u64() as usize) + .await?; + tokio::io::copy(&mut payload_reader, &mut storage_writer).await?; + storage_writer.close().await?; + + Ok(()) + } + + async fn copy_to(&self, path: &Path, output: &mut dyn SendableAsync) -> StorageResult<()> { + let path = path.as_os_str().to_string_lossy(); + let mut storage_reader = self.op.reader(&path).await?; + tokio::io::copy(&mut storage_reader, output).await?; + output.flush().await?; + Ok(()) + } + + async fn get_slice(&self, path: &Path, range: Range) -> StorageResult { + let path = path.as_os_str().to_string_lossy(); + let range = range.start as u64..range.end as u64; + let storage_content = self.op.read_with(&path).range(range).await?; + + Ok(OwnedBytes::new(storage_content)) + } + + async fn get_slice_stream( + &self, + path: &Path, + range: Range, + ) -> StorageResult> { + let path = path.as_os_str().to_string_lossy(); + let range = range.start as u64..range.end as u64; + let storage_reader = self.op.reader_with(&path).range(range).await?; + + Ok(Box::new(storage_reader)) + } + + async fn get_all(&self, path: &Path) -> StorageResult { + let path = path.as_os_str().to_string_lossy(); + let storage_content = self.op.read(&path).await?; + + Ok(OwnedBytes::new(storage_content)) + } + + async fn delete(&self, path: &Path) -> StorageResult<()> { + let path = path.as_os_str().to_string_lossy(); + self.op.delete(&path).await?; + + Ok(()) + } + + async fn bulk_delete<'a>(&self, paths: &[&'a Path]) -> Result<(), BulkDeleteError> { + // The mock service we used in integration testsuite doesn't support bucket delete. + // Let's fallback to delete one by one in this case. + #[cfg(feature = "integration-testsuite")] + { + let storage_info = self.op.info(); + if storage_info.name() == "sample-bucket" + && storage_info.scheme() == opendal::Scheme::Gcs + { + let mut bulk_error = BulkDeleteError::default(); + for (index, path) in paths.iter().enumerate() { + let result = self.op.delete(&path.as_os_str().to_string_lossy()).await; + if let Err(err) = result { + let storage_error_kind = err.kind(); + let storage_error: StorageError = err.into(); + bulk_error.failures.insert( + PathBuf::from(&path), + DeleteFailure { + code: Some(storage_error_kind.to_string()), + message: Some(storage_error.to_string()), + error: Some(storage_error.clone()), + }, + ); + bulk_error.error = Some(storage_error); + for path in paths[index..].iter() { + bulk_error.unattempted.push(PathBuf::from(&path)) + } + break; + } else { + bulk_error.successes.push(PathBuf::from(&path)) + } + } + + return if bulk_error.error.is_some() { + Err(bulk_error) + } else { + Ok(()) + }; + } + } + + let paths: Vec = paths + .iter() + .map(|path| path.as_os_str().to_string_lossy().to_string()) + .collect(); + + // OpenDAL will check the services' capability internally. + self.op.remove(paths).await.map_err(|err| BulkDeleteError { + error: Some(err.into()), + ..BulkDeleteError::default() + })?; + + Ok(()) + } + + async fn file_num_bytes(&self, path: &Path) -> StorageResult { + let path = path.as_os_str().to_string_lossy(); + let meta = self.op.stat(&path).await?; + Ok(meta.content_length()) + } + + fn uri(&self) -> &Uri { + &self.uri + } +} + +impl From for StorageError { + fn from(err: opendal::Error) -> Self { + match err.kind() { + opendal::ErrorKind::NotFound => StorageErrorKind::NotFound.with_error(err), + opendal::ErrorKind::PermissionDenied => StorageErrorKind::Unauthorized.with_error(err), + opendal::ErrorKind::ConfigInvalid => StorageErrorKind::Service.with_error(err), + _ => StorageErrorKind::Io.with_error(err), + } + } +} + +impl From for StorageResolverError { + fn from(err: opendal::Error) -> Self { + StorageResolverError::InvalidConfig(err.to_string()) + } +} diff --git a/quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs b/quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs new file mode 100644 index 00000000000..2824f1b2a6a --- /dev/null +++ b/quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs @@ -0,0 +1,158 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use once_cell::sync::OnceCell; +use quickwit_common::uri::Uri; +use quickwit_config::{GoogleCloudStorageConfig, StorageBackend}; +use regex::Regex; + +use super::OpendalStorage; +use crate::debouncer::DebouncedStorage; +use crate::{Storage, StorageFactory, StorageResolverError}; + +/// Google cloud storage resolver. +pub struct GoogleCloudStorageFactory { + storage_config: GoogleCloudStorageConfig, +} + +impl GoogleCloudStorageFactory { + /// Create a new google cloud storage factory via config. + pub fn new(storage_config: GoogleCloudStorageConfig) -> Self { + Self { storage_config } + } +} + +#[async_trait] +impl StorageFactory for GoogleCloudStorageFactory { + fn backend(&self) -> StorageBackend { + StorageBackend::Google + } + + async fn resolve(&self, uri: &Uri) -> Result, StorageResolverError> { + let storage = from_uri(&self.storage_config, uri)?; + Ok(Arc::new(DebouncedStorage::new(storage))) + } +} + +/// Creates an emulated storage for testing. +#[cfg(feature = "integration-testsuite")] +pub fn new_emulated_google_cloud_storage( + uri: &Uri, +) -> Result { + let (bucket, root) = parse_google_uri(uri).expect("must be valid google uri"); + + let mut cfg = opendal::services::Gcs::default(); + cfg.bucket(&bucket); + cfg.root(&root.to_string_lossy()); + // The default port for the fake gcs server is 4443. + cfg.endpoint("http://127.0.0.1:4443"); + + #[derive(Debug)] + struct DummyTokenLoader; + #[async_trait] + impl reqsign::GoogleTokenLoad for DummyTokenLoader { + async fn load(&self, _: reqwest::Client) -> anyhow::Result> { + Ok(Some(reqsign::GoogleToken::new( + "dummy", + 86400, + "https://www.googleapis.com/auth/devstorage.full_control", + ))) + } + } + cfg.customed_token_loader(Box::new(DummyTokenLoader)); + + let store = OpendalStorage::new_google_cloud_storage(uri.clone(), cfg)?; + Ok(store) +} + +fn from_uri( + google_cloud_storage_config: &GoogleCloudStorageConfig, + uri: &Uri, +) -> Result { + let credential_path = google_cloud_storage_config + .resolve_credential_path() + .ok_or_else(|| { + let message = format!( + "could not find Google credential path in environment variable `{}` or storage \ + config", + GoogleCloudStorageConfig::GOOGLE_CLOUD_STORAGE_CREDENTIAL_PATH_ENV_VAR + ); + StorageResolverError::InvalidConfig(message) + })?; + let (bucket_name, prefix) = parse_google_uri(uri).ok_or_else(|| { + let message = format!("failed to extract bucket name from google URI: {uri}"); + StorageResolverError::InvalidUri(message) + })?; + + let mut cfg = opendal::services::Gcs::default(); + cfg.credential_path(&credential_path); + cfg.bucket(&bucket_name); + cfg.root(&prefix.to_string_lossy()); + + let store = OpendalStorage::new_google_cloud_storage(uri.clone(), cfg)?; + Ok(store) +} + +fn parse_google_uri(uri: &Uri) -> Option<(String, PathBuf)> { + // Ex: gs://bucket/prefix. + static URI_PTN: OnceCell = OnceCell::new(); + + let captures = URI_PTN + .get_or_init(|| { + Regex::new(r"gs(\+[^:]+)?://(?P[^/]+)(/(?P.*))?$") + .expect("The regular expression should compile.") + }) + .captures(uri.as_str())?; + + let bucket = captures.name("bucket")?.as_str().to_string(); + let prefix = captures + .name("prefix") + .map(|prefix_match| PathBuf::from(prefix_match.as_str())) + .unwrap_or_default(); + Some((bucket, prefix)) +} + +#[cfg(test)] +mod tests { + use quickwit_common::uri::Uri; + + use super::parse_google_uri; + + #[test] + fn test_parse_google_uri() { + assert!(parse_google_uri(&Uri::for_test("gs://")).is_none()); + + let (bucket, prefix) = parse_google_uri(&Uri::for_test("gs://test-bucket")).unwrap(); + assert_eq!(bucket, "test-bucket"); + assert!(prefix.to_str().unwrap().is_empty()); + + let (bucket, prefix) = parse_google_uri(&Uri::for_test("gs://test-bucket/")).unwrap(); + assert_eq!(bucket, "test-bucket"); + assert!(prefix.to_str().unwrap().is_empty()); + + let (bucket, prefix) = + parse_google_uri(&Uri::for_test("gs://test-bucket/indexes")).unwrap(); + assert_eq!(bucket, "test-bucket"); + assert_eq!(prefix.to_str().unwrap(), "indexes"); + } +} diff --git a/quickwit/quickwit-storage/src/opendal_storage/mod.rs b/quickwit/quickwit-storage/src/opendal_storage/mod.rs new file mode 100644 index 00000000000..ad6e2369c8c --- /dev/null +++ b/quickwit/quickwit-storage/src/opendal_storage/mod.rs @@ -0,0 +1,26 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod base; +use base::OpendalStorage; + +mod google_cloud_storage; +#[cfg(feature = "integration-testsuite")] +pub use google_cloud_storage::new_emulated_google_cloud_storage; +pub use google_cloud_storage::GoogleCloudStorageFactory; diff --git a/quickwit/quickwit-storage/src/storage_resolver.rs b/quickwit/quickwit-storage/src/storage_resolver.rs index 5eda31f3753..9a6d2dd1abf 100644 --- a/quickwit/quickwit-storage/src/storage_resolver.rs +++ b/quickwit/quickwit-storage/src/storage_resolver.rs @@ -29,6 +29,8 @@ use crate::local_file_storage::LocalFileStorageFactory; use crate::ram_storage::RamStorageFactory; #[cfg(feature = "azure")] use crate::AzureBlobStorageFactory; +#[cfg(feature = "google")] +use crate::GoogleCloudStorageFactory; use crate::{S3CompatibleObjectStorageFactory, Storage, StorageFactory, StorageResolverError}; /// Returns the [`Storage`] instance associated with the protocol of a URI. The actual creation of @@ -58,6 +60,7 @@ impl StorageResolver { Protocol::File => StorageBackend::File, Protocol::Ram => StorageBackend::Ram, Protocol::S3 => StorageBackend::S3, + Protocol::Google => StorageBackend::Google, _ => { let message = format!( "Quickwit does not support {} as a storage backend", @@ -109,6 +112,21 @@ impl StorageResolver { "Quickwit was compiled without the `azure` feature.", )) } + #[cfg(feature = "google")] + { + builder = builder.register(GoogleCloudStorageFactory::new( + storage_configs.find_google().cloned().unwrap_or_default(), + )); + } + #[cfg(not(feature = "google"))] + { + use crate::storage_factory::UnsupportedStorage; + + builder = builder.register(UnsupportedStorage::new( + StorageBackend::Google, + "Quickwit was compiled without the `google` feature.", + )) + } builder .build() .expect("Storage factory and config backends should match.") diff --git a/quickwit/quickwit-storage/tests/google_cloud_storage.rs b/quickwit/quickwit-storage/tests/google_cloud_storage.rs new file mode 100644 index 00000000000..535c8055ec1 --- /dev/null +++ b/quickwit/quickwit-storage/tests/google_cloud_storage.rs @@ -0,0 +1,49 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// This file is an integration test that assumes that a connection +// to Fake GCS Server (the emulated google cloud storage environment) + +#[cfg(feature = "integration-testsuite")] +#[tokio::test] +#[cfg_attr(not(feature = "ci-test"), ignore)] +async fn google_cloud_storage_test_suite() -> anyhow::Result<()> { + use std::str::FromStr; + + use anyhow::Context; + use quickwit_common::uri::Uri; + use quickwit_storage::new_emulated_google_cloud_storage; + let _ = tracing_subscriber::fmt::try_init(); + + let mut object_storage = + new_emulated_google_cloud_storage(&Uri::from_str("gs://sample-bucket")?)?; + quickwit_storage::storage_test_suite(&mut object_storage).await?; + + let mut object_storage = new_emulated_google_cloud_storage(&Uri::from_str( + "gs://sample-bucket/integration-tests/test-azure-compatible-storage", + )?)?; + quickwit_storage::storage_test_single_part_upload(&mut object_storage) + .await + .context("test single-part upload failed")?; + + quickwit_storage::storage_test_multi_part_upload(&mut object_storage) + .await + .context("test multipart upload failed")?; + Ok(()) +}