Skip to content

Commit

Permalink
Fix source path in Lambda distrib (#5327)
Browse files Browse the repository at this point in the history
* Fix source file path

* Expose Lambda tests in Makefile
  • Loading branch information
rdettai authored Aug 20, 2024
1 parent c59be63 commit 5d7a549
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 23 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ test-all: docker-compose-up
test-failpoints:
@$(MAKE) -C $(QUICKWIT_SRC) test-failpoints

test-lambda: DOCKER_SERVICES=localstack
test-lambda: docker-compose-up
@$(MAKE) -C $(QUICKWIT_SRC) test-lambda

# This will build and push all custom cross images for cross-compilation.
# You will need to login into Docker Hub with the `quickwit` account.
IMAGE_TAGS = x86_64-unknown-linux-gnu aarch64-unknown-linux-gnu x86_64-unknown-linux-musl aarch64-unknown-linux-musl
Expand Down Expand Up @@ -104,4 +108,3 @@ build-rustdoc:
.PHONY: build-ui
build-ui:
$(MAKE) -C $(QUICKWIT_SRC) build-ui

8 changes: 8 additions & 0 deletions quickwit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ test-all:
test-failpoints:
cargo nextest run --test failpoints --features fail/failpoints

test-lambda:
AWS_ACCESS_KEY_ID=ignored \
AWS_SECRET_ACCESS_KEY=ignored \
AWS_REGION=us-east-1 \
QW_S3_ENDPOINT=http://localhost:4566 \
QW_S3_FORCE_PATH_STYLE_ACCESS=1 \
cargo nextest run --all-features -p quickwit-lambda --retries 1

# TODO: to be replaced by https://github.com/quickwit-oss/quickwit/issues/237
TARGET ?= x86_64-unknown-linux-gnu
.PHONY: build
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-lambda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ path = "src/bin/indexer.rs"
name = "searcher"
path = "src/bin/searcher.rs"

[features]
s3-localstack-tests = []

[dependencies]
anyhow = { workspace = true }
aws_lambda_events = "0.15.0"
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-lambda/src/indexer/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub const CONFIGURATION_TEMPLATE: &str = r#"
version: 0.8
node_id: lambda-indexer
cluster_id: lambda-ephemeral
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/${QW_LAMBDA_METASTORE_PREFIX:-index}
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/${QW_LAMBDA_INDEX_PREFIX:-index}
data_dir: /tmp
"#;

Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-lambda/src/indexer/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ async fn indexer_handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
let ingest_res = ingest(IngestArgs {
input_path: payload.uri()?,
input_format: quickwit_config::SourceInputFormat::Json,
overwrite: false,
vrl_script: None,
// TODO: instead of clearing the cache, we use a cache and set its max
// size with indexer_config.split_store_max_num_bytes
Expand Down
17 changes: 2 additions & 15 deletions quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use quickwit_config::{
load_index_config_from_user_config, ConfigFormat, IndexConfig, NodeConfig, SourceConfig,
SourceInputFormat, SourceParams, TransformConfig,
};
use quickwit_index_management::IndexService;
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService};
use quickwit_indexing::models::{DetachIndexingPipeline, DetachMergePipeline, SpawnPipeline};
use quickwit_indexing::IndexingPipeline;
Expand Down Expand Up @@ -154,15 +153,14 @@ pub(super) async fn configure_source(
})
}

/// Check if the index exists, creating or overwriting it if necessary
/// Check if the index exists, creating it if necessary
///
/// If the index exists but without the Lambda source ([`LAMBDA_SOURCE_ID`]),
/// the source is added.
pub(super) async fn init_index_if_necessary(
metastore: &mut MetastoreServiceClient,
storage_resolver: &StorageResolver,
default_index_root_uri: &Uri,
overwrite: bool,
source_config: &SourceConfig,
) -> anyhow::Result<IndexMetadata> {
let metadata_result = metastore
Expand All @@ -171,23 +169,12 @@ pub(super) async fn init_index_if_necessary(
let metadata = match metadata_result {
Ok(metadata_resp) => {
let current_metadata = metadata_resp.deserialize_index_metadata()?;
let mut metadata_changed = false;
if overwrite {
info!(index_uid = %current_metadata.index_uid, "overwrite enabled, clearing existing index");
let mut index_service =
IndexService::new(metastore.clone(), storage_resolver.clone());
index_service.clear_index(&INDEX_ID).await?;
metadata_changed = true;
}
if !current_metadata.sources.contains_key(LAMBDA_SOURCE_ID) {
let add_source_request = AddSourceRequest::try_from_source_config(
current_metadata.index_uid.clone(),
source_config,
)?;
metastore.add_source(add_source_request).await?;
metadata_changed = true;
}
if metadata_changed {
metastore
.index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone()))
.await?
Expand Down Expand Up @@ -305,7 +292,7 @@ pub(super) async fn spawn_pipelines(

/// Prune old Lambda file checkpoints if there are too many
///
/// Without pruning checkpoints accumulate indifinitely. This is particularly
/// Without pruning checkpoints accumulate indefinitely. This is particularly
/// problematic when indexing a lot of small files, as the metastore will grow
/// large even for a small index.
///
Expand Down
130 changes: 128 additions & 2 deletions quickwit/quickwit-lambda/src/indexer/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::utils::load_node_config;
pub struct IngestArgs {
pub input_path: Uri,
pub input_format: SourceInputFormat,
pub overwrite: bool,
pub vrl_script: Option<String>,
pub clear_cache: bool,
}
Expand All @@ -65,7 +64,6 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
&mut metastore,
&storage_resolver,
&config.default_index_root_uri,
args.overwrite,
&source_config,
)
.await?;
Expand Down Expand Up @@ -123,3 +121,131 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
}
Ok(statistics)
}

#[cfg(all(test, feature = "s3-localstack-tests"))]
mod tests {
use std::path::PathBuf;
use std::str::FromStr;

use quickwit_common::new_coolid;
use quickwit_storage::StorageResolver;

use super::*;

async fn put_object(
storage_resolver: StorageResolver,
bucket: &str,
prefix: &str,
filename: &str,
data: Vec<u8>,
) -> Uri {
let src_location = format!("s3://{}/{}", bucket, prefix);
let storage_uri = Uri::from_str(&src_location).unwrap();
let storage = storage_resolver.resolve(&storage_uri).await.unwrap();
storage
.put(&PathBuf::from(filename), Box::new(data))
.await
.unwrap();
storage_uri.join(filename).unwrap()
}

#[tokio::test]
async fn test_ingest() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let bucket = "quickwit-integration-tests";
let prefix = new_coolid("lambda-ingest-test");
let storage_resolver = StorageResolver::unconfigured();

let index_config = br#"
version: 0.8
index_id: lambda-test
doc_mapping:
field_mappings:
- name: timestamp
type: datetime
input_formats:
- unix_timestamp
fast: true
timestamp_field: timestamp
"#;
let config_uri = put_object(
storage_resolver.clone(),
bucket,
&prefix,
"index-config.yaml",
index_config.to_vec(),
)
.await;

// TODO use dependency injection instead of lazy static for env configs
std::env::set_var("QW_LAMBDA_METASTORE_BUCKET", bucket);
std::env::set_var("QW_LAMBDA_INDEX_BUCKET", bucket);
std::env::set_var("QW_LAMBDA_METASTORE_PREFIX", &prefix);
std::env::set_var("QW_LAMBDA_INDEX_PREFIX", &prefix);
std::env::set_var("QW_LAMBDA_INDEX_CONFIG_URI", config_uri.as_str());
std::env::set_var("QW_LAMBDA_INDEX_ID", "lambda-test");

// first ingestion creates the index metadata
let test_data_1 = br#"{"timestamp": 1724140899, "field1": "value1"}"#;
let test_data_1_uri = put_object(
storage_resolver.clone(),
bucket,
&prefix,
"data.json",
test_data_1.to_vec(),
)
.await;

{
let args = IngestArgs {
input_path: test_data_1_uri.clone(),
input_format: SourceInputFormat::Json,
vrl_script: None,
clear_cache: true,
};
let stats = ingest(args).await?;
assert_eq!(stats.num_invalid_docs, 0);
assert_eq!(stats.num_docs, 1);
}

tokio::time::sleep(std::time::Duration::from_secs(1)).await;

{
// ingesting the same data again is a no-op
let args = IngestArgs {
input_path: test_data_1_uri,
input_format: SourceInputFormat::Json,
vrl_script: None,
clear_cache: true,
};
let stats = ingest(args).await?;
assert_eq!(stats.num_invalid_docs, 0);
assert_eq!(stats.num_docs, 0);
}

{
// second ingestion should not fail when metadata already exists
let test_data = br#"{"timestamp": 1724149900, "field1": "value2"}"#;
let test_data_uri = put_object(
storage_resolver.clone(),
bucket,
&prefix,
"data2.json",
test_data.to_vec(),
)
.await;

let args = IngestArgs {
input_path: test_data_uri,
input_format: SourceInputFormat::Json,
vrl_script: None,
clear_cache: true,
};
let stats = ingest(args).await?;
assert_eq!(stats.num_invalid_docs, 0);
assert_eq!(stats.num_docs, 1);
}

Ok(())
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-lambda/src/indexer/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl IndexerEvent {
IndexerEvent::S3(event) => [
"s3://",
event.records[0].s3.bucket.name.as_ref().unwrap(),
"/",
event.records[0].s3.object.key.as_ref().unwrap(),
]
.join(""),
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-lambda/src/searcher/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
pub(crate) const CONFIGURATION_TEMPLATE: &str = r#"
version: 0.8
node_id: lambda-searcher
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index#polling_interval=${QW_LAMBDA_SEARCHER_METASTORE_POLLING_INTERVAL_SECONDS:-60}s
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/${QW_LAMBDA_METASTORE_PREFIX:-index}#polling_interval=${QW_LAMBDA_SEARCHER_METASTORE_POLLING_INTERVAL_SECONDS:-60}s
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/${QW_LAMBDA_INDEX_PREFIX:-index}
data_dir: /tmp
searcher:
partial_request_cache_capacity: ${QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY:-64M}
Expand Down

0 comments on commit 5d7a549

Please sign in to comment.