Skip to content

Commit

Permalink
Add http source.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Jul 12, 2023
1 parent 36cba7e commit e50f38b
Show file tree
Hide file tree
Showing 17 changed files with 789 additions and 55 deletions.
18 changes: 17 additions & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ arc-swap = "1.6"
assert-json-diff = "2"
async-speed-limit = "0.4"
async-trait = "0.1"
async-compression = { version = "0.4.1", features = ["gzip", "futures-io"] }
atty = "0.2"
backoff = { version = "0.4", features = ["tokio"] }
base64 = "0.21"
Expand Down Expand Up @@ -133,6 +134,7 @@ regex = "1.9.1"
reqwest = { version = "0.11", default-features = false, features = [
"json",
"rustls-tls",
"stream",
] }
rust-embed = "6.8.1"
serde = { version = "1.0", features = ["derive", "rc"] }
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-common/src/uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub enum Protocol {
Azure,
File,
Grpc,
Https,
Http,
PostgreSQL,
Ram,
S3,
Expand All @@ -48,6 +50,8 @@ impl Protocol {
Protocol::Azure => "azure",
Protocol::File => "file",
Protocol::Grpc => "grpc",
Protocol::Https => "https",
Protocol::Http => "http",
Protocol::PostgreSQL => "postgresql",
Protocol::Ram => "ram",
Protocol::S3 => "s3",
Expand Down Expand Up @@ -105,6 +109,8 @@ impl FromStr for Protocol {
"azure" => Ok(Protocol::Azure),
"file" => Ok(Protocol::File),
"grpc" => Ok(Protocol::Grpc),
"https" => Ok(Protocol::Https),
"http" => Ok(Protocol::Http),
"pg" | "postgres" | "postgresql" => Ok(Protocol::PostgreSQL),
"ram" => Ok(Protocol::Ram),
"s3" => Ok(Protocol::S3),
Expand Down
9 changes: 5 additions & 4 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value as JsonValue;
pub use source_config::{
load_source_config_from_user_config, FileSourceParams, KafkaSourceParams, KinesisSourceParams,
PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig, SourceInputFormat,
SourceParams, TransformConfig, VecSourceParams, VoidSourceParams, CLI_INGEST_SOURCE_ID,
INGEST_API_SOURCE_ID,
load_source_config_from_user_config, FileSourceParams, HttpSourceParams, KafkaSourceParams,
KinesisSourceParams, PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig,
SourceInputFormat, SourceParams, TransformConfig, VecSourceParams, VoidSourceParams,
CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID,
};
use tracing::warn;

Expand Down Expand Up @@ -88,6 +88,7 @@ pub use crate::storage_config::{
SourceInputFormat,
SourceParams,
FileSourceParams,
HttpSourceParams,
KafkaSourceParams,
KinesisSourceParams,
PulsarSourceParams,
Expand Down
44 changes: 44 additions & 0 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl SourceConfig {
pub fn source_type(&self) -> &str {
match self.source_params {
SourceParams::File(_) => "file",
SourceParams::Http(_) => "http",
SourceParams::Kafka(_) => "kafka",
SourceParams::Kinesis(_) => "kinesis",
SourceParams::Vec(_) => "vec",
Expand All @@ -103,6 +104,7 @@ impl SourceConfig {
pub fn params(&self) -> JsonValue {
match &self.source_params {
SourceParams::File(params) => serde_json::to_value(params),
SourceParams::Http(params) => serde_json::to_value(params),
SourceParams::Kafka(params) => serde_json::to_value(params),
SourceParams::Kinesis(params) => serde_json::to_value(params),
SourceParams::Vec(params) => serde_json::to_value(params),
Expand Down Expand Up @@ -206,6 +208,8 @@ impl FromStr for SourceInputFormat {
pub enum SourceParams {
#[serde(rename = "file")]
File(FileSourceParams),
#[serde(rename = "http")]
Http(HttpSourceParams),
#[serde(rename = "kafka")]
Kafka(KafkaSourceParams),
#[serde(rename = "kinesis")]
Expand Down Expand Up @@ -476,6 +480,34 @@ impl TransformConfig {
}
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct HttpSourceParams {
#[schema(value_type = Vec<String>)]
#[serde(default)]
pub uris: Vec<Uri>,
#[schema(value_type = Option<String>)]
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub uri_pattern: Option<Uri>,
}

impl HttpSourceParams {
pub fn from_pattern(uri: Uri) -> Self {
Self {
uris: vec![],
uri_pattern: Some(uri),
}
}

pub fn from_list(uris: Vec<Uri>) -> Self {
Self {
uris,
uri_pattern: None,
}
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;
Expand Down Expand Up @@ -1136,4 +1168,16 @@ mod tests {
.unwrap();
assert_eq!(source_config.input_format, SourceInputFormat::PlainText);
}

#[tokio::test]
async fn test_http_source_config() {
let file_content = r#"{"uris":["http://localhost:9000/file.json"], "uri_pattern": "http://localhost:9000/a.json"}"#;
assert_eq!(
serde_json::from_str::<HttpSourceParams>(file_content).unwrap(),
HttpSourceParams {
uris: vec![Uri::from_well_formed("http://localhost:9000/file.json")],
uri_pattern: Some(Uri::from_well_formed("http://localhost:9000/a.json")),
}
);
}
}
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/source_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl SourceConfigForSerialization {
)
}
}
SourceParams::Http(_) => {}
SourceParams::Kafka(_) | SourceParams::Kinesis(_) | SourceParams::Pulsar(_) => {
// TODO consider any validation opportunity
}
Expand All @@ -100,7 +101,7 @@ impl SourceConfigForSerialization {
| SourceParams::IngestCli => {}
}
match &self.source_params {
SourceParams::Kafka(_) => {}
SourceParams::Kafka(_) | SourceParams::Http(_) => {}
_ => {
if self.desired_num_pipelines > 1 || self.max_num_pipelines_per_indexer > 1 {
bail!("Quickwit currently supports multiple pipelines only for Kafka sources. Open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types.");
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ aws-sdk-kinesis = { workspace = true, optional = true }

anyhow = { workspace = true }
arc-swap = { workspace = true }
async-compression = { workspace = true }
async-trait = { workspace = true }
backoff = { workspace = true, optional = true }
byte-unit = { workspace = true }
Expand All @@ -25,6 +26,7 @@ fail = { workspace = true }
flume = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
futures-util = {workspace = true }
itertools = { workspace = true }
libz-sys = { workspace = true, optional = true }
once_cell = { workspace = true }
Expand All @@ -33,6 +35,8 @@ openssl = { workspace = true, optional = true }
pulsar = { workspace = true, optional = true }
quickwit-query = { workspace = true }
rdkafka = { workspace = true, optional = true }
regex = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tantivy = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl IndexingPipeline {
.protect_future(quickwit_supported_sources().load_source(
Arc::new(SourceExecutionContext {
metastore: self.params.metastore.clone(),
index_uid: self.params.pipeline_id.index_uid.clone(),
pipeline_id: self.params.pipeline_id.clone(),
queues_dir_path: self.params.queues_dir_path.clone(),
source_config: self.params.source_config.clone(),
}),
Expand Down
25 changes: 22 additions & 3 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ mod tests {
use quickwit_proto::IndexUid;

use super::*;
use crate::models::IndexingPipelineId;
use crate::source::SourceActor;

#[tokio::test]
Expand All @@ -190,10 +191,16 @@ mod tests {
let params = FileSourceParams::file("data/test_corpus.json");

let metastore = metastore_for_test();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
source_id: "test-file-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
};
let file_source = FileSourceFactory::typed_create_source(
SourceExecutionContext::for_test(
metastore,
IndexUid::new("test-index"),
pipeline_id,
PathBuf::from("./queues"),
SourceConfig {
source_id: "test-file-source".to_string(),
Expand Down Expand Up @@ -256,10 +263,16 @@ mod tests {
.to_string();

let metastore = metastore_for_test();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
source_id: "test-file-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
};
let source = FileSourceFactory::typed_create_source(
SourceExecutionContext::for_test(
metastore,
IndexUid::new("test-index"),
pipeline_id,
PathBuf::from("./queues"),
SourceConfig {
source_id: "test-file-source".to_string(),
Expand Down Expand Up @@ -346,10 +359,16 @@ mod tests {
checkpoint.try_apply_delta(checkpoint_delta).unwrap();

let metastore = metastore_for_test();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
source_id: "test-file-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
};
let source = FileSourceFactory::typed_create_source(
SourceExecutionContext::for_test(
metastore,
IndexUid::new("test-index"),
pipeline_id,
PathBuf::from("./queues"),
SourceConfig {
source_id: "test-file-source".to_string(),
Expand Down
Loading

0 comments on commit e50f38b

Please sign in to comment.