diff --git a/distribution/lambda/Makefile b/distribution/lambda/Makefile index 00642e125f4..786253d98f8 100644 --- a/distribution/lambda/Makefile +++ b/distribution/lambda/Makefile @@ -1,5 +1,7 @@ .SILENT: .ONESHELL: +SHELL := bash +.SHELLFLAGS := -eu -o pipefail -c check-env: ifndef CDK_ACCOUNT @@ -9,6 +11,8 @@ ifndef CDK_REGION $(error CDK_REGION is undefined) endif +export INDEX_ID=hdfs-logs + build: cargo lambda build --manifest-path=../../quickwit/Cargo.toml -p quickwit-lambda --release @@ -16,9 +20,8 @@ init: build check-env cdk bootstrap aws://$$CDK_ACCOUNT/$$CDK_REGION deploy: build check-env - cdk deploy - -export EXAMPLE_FILE=hdfs-logs-multitenants-10000.json + cdk deploy --parameters quickwitIndexId=$$INDEX_ID + python -c 'import cli; cli.upload_index_config()' upload-src-file: check-env python -c 'import cli; cli.upload_src_file()' @@ -28,22 +31,3 @@ invoke-indexer: check-env invoke-searcher: check-env python -c 'import cli; cli.invoke_searcher()' - -index-creation-instruction: deploy - export AWS_REGION=$$CDK_REGION - BUCKET_NAME=$$(aws cloudformation describe-stacks --stack-name LambdaStack --query "Stacks[0].Outputs[?ExportName=='index-store-bucket-name'].OutputValue" --output text) - echo "\n=> start an interactive quickwit shell" - echo "docker run --entrypoint bash -it quickwit/quickwit" - echo "\n=> inside the interactive shell, configure the AWS credentials" - echo "export AWS_ACCESS_KEY_ID=..." - echo "export AWS_SECRET_ACCESS_KEY=..." - echo "export AWS_SESSION_TOKEN=..." - echo "\n=> then configure and start the server" - echo "apt update && apt install curl -y" - echo "curl -o hdfs_logs_index_config.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/hdfs-logs/index-config.yaml" - echo "export AWS_REGION=$$CDK_REGION" - echo "echo \"metastore_uri: s3://$$BUCKET_NAME\" >> config/quickwit.yaml" - echo "echo \"default_index_root_uri: s3://$$BUCKET_NAME\" >> config/quickwit.yaml" - echo "quickwit run &" - echo "\n=> once the server has started, create the index" - echo "quickwit index create --index-config hdfs_logs_index_config.yaml" diff --git a/distribution/lambda/cli.py b/distribution/lambda/cli.py index b135714b178..ebd040d2c86 100644 --- a/distribution/lambda/cli.py +++ b/distribution/lambda/cli.py @@ -2,15 +2,21 @@ These functions are wrapped by the Makefile for convenience.""" -import os -import boto3 import base64 import http.client +import os +import time +from urllib.parse import urlparse + +import boto3 region = os.environ["CDK_REGION"] +index_id = os.environ["INDEX_ID"] + stack_name = "LambdaStack" example_bucket = "quickwit-datasets-public.s3.amazonaws.com" example_file = "hdfs-logs-multitenants-10000.json" +index_config_path = "../../config/tutorials/hdfs-logs/index-config.yaml" session = boto3.Session(region_name=region) @@ -29,7 +35,7 @@ def _get_cloudformation_output_value(export_name: str) -> str: exit(1) -def _format_lambda_output(lambda_resp): +def _format_lambda_output(lambda_resp, duration=None): if "FunctionError" in lambda_resp and lambda_resp["FunctionError"] != "": print("\n## FUNCTION ERROR:") print(lambda_resp["FunctionError"]) @@ -37,12 +43,25 @@ def _format_lambda_output(lambda_resp): print(base64.b64decode(lambda_resp["LogResult"]).decode()) print("\n## RESPONSE:") print(lambda_resp["Payload"].read().decode()) + if duration is not None: + print("\n## TOTAL INVOCATION DURATION:") + print(duration) + + +def upload_index_config(): + target_uri = _get_cloudformation_output_value("index-config-uri") + print(f"upload src file to {target_uri}") + target_uri_parsed = urlparse(target_uri, allow_fragments=False) + with open(index_config_path, "rb") as f: + session.client("s3").put_object( + Bucket=target_uri_parsed.netloc, Body=f, Key=target_uri_parsed.path[1:] + ) def upload_src_file(): bucket_name = _get_cloudformation_output_value("index-store-bucket-name") - source_uri = f"s3://{bucket_name}/{example_file}" - print(f"upload src file to {source_uri}") + target_uri = f"s3://{bucket_name}/{example_file}" + print(f"upload src file to {target_uri}") conn = http.client.HTTPSConnection(example_bucket) conn.request("GET", f"/{example_file}") response = conn.getresponse() @@ -61,22 +80,24 @@ def invoke_indexer(): bucket_name = _get_cloudformation_output_value("index-store-bucket-name") source_uri = f"s3://{bucket_name}/{example_file}" print(f"src_file: {source_uri}") + invoke_start = time.time() resp = session.client("lambda").invoke( FunctionName=function_name, InvocationType="RequestResponse", LogType="Tail", Payload=f"""{{ "source_uri": "{source_uri}" }}""", ) - _format_lambda_output(resp) + _format_lambda_output(resp, time.time() - invoke_start) def invoke_searcher(): function_name = _get_cloudformation_output_value("searcher-function-name") print(f"searcher function name: {function_name}") + invoke_start = time.time() resp = session.client("lambda").invoke( FunctionName=function_name, InvocationType="RequestResponse", LogType="Tail", Payload="{}", ) - _format_lambda_output(resp) + _format_lambda_output(resp, time.time() - invoke_start) diff --git a/distribution/lambda/stacks/indexer_service.py b/distribution/lambda/stacks/indexer_service.py index 1a7562fe3d4..2aee568ab6f 100644 --- a/distribution/lambda/stacks/indexer_service.py +++ b/distribution/lambda/stacks/indexer_service.py @@ -1,10 +1,17 @@ from constructs import Construct from aws_cdk import aws_lambda, aws_s3, Duration, CfnOutput +import os class IndexerService(Construct): def __init__( - self, scope: Construct, construct_id: str, store_bucket: aws_s3.Bucket, **kwargs + self, + scope: Construct, + construct_id: str, + store_bucket: aws_s3.Bucket, + index_id: str, + index_config_uri: str, + **kwargs ) -> None: super().__init__(scope, construct_id, **kwargs) @@ -17,6 +24,8 @@ def __init__( environment={ "INDEX_BUCKET": store_bucket.bucket_name, "METASTORE_BUCKET": store_bucket.bucket_name, + "INDEX_ID": index_id, + "INDEX_CONFIG_URI": index_config_uri, }, timeout=Duration.seconds(30), ) diff --git a/distribution/lambda/stacks/lambda_stack.py b/distribution/lambda/stacks/lambda_stack.py index 263f2f1a373..4e9a1034f97 100644 --- a/distribution/lambda/stacks/lambda_stack.py +++ b/distribution/lambda/stacks/lambda_stack.py @@ -1,4 +1,4 @@ -from aws_cdk import Stack, aws_s3, CfnOutput +from aws_cdk import Stack, aws_s3, CfnOutput, CfnParameter from constructs import Construct from . import indexer_service, searcher_service @@ -8,9 +8,30 @@ class LambdaStack(Stack): def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) + index_id_param = CfnParameter( + self, + "quickwitIndexId", + type="String", + description="The ID of the Quickwit index", + ) + bucket = aws_s3.Bucket(self, "index-store") - indexer_service.IndexerService(self, "IndexerService", store_bucket=bucket) - searcher_service.SearcherService(self, "SearcherService", store_bucket=bucket) + + index_config_uri = f"s3://{bucket.bucket_name}/index-conf/{index_id_param.value_as_string}.yaml" + + indexer_service.IndexerService( + self, + "IndexerService", + store_bucket=bucket, + index_id=index_id_param.value_as_string, + index_config_uri=index_config_uri, + ) + searcher_service.SearcherService( + self, + "SearcherService", + store_bucket=bucket, + index_id=index_id_param.value_as_string, + ) CfnOutput( self, @@ -18,3 +39,10 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: value=bucket.bucket_name, export_name="index-store-bucket-name", ) + + CfnOutput( + self, + "index-config-uri", + value=index_config_uri, + export_name="index-config-uri", + ) diff --git a/distribution/lambda/stacks/searcher_service.py b/distribution/lambda/stacks/searcher_service.py index 1077d05d882..354a5e19681 100644 --- a/distribution/lambda/stacks/searcher_service.py +++ b/distribution/lambda/stacks/searcher_service.py @@ -4,7 +4,12 @@ class SearcherService(Construct): def __init__( - self, scope: Construct, construct_id: str, store_bucket: aws_s3.Bucket, **kwargs + self, + scope: Construct, + construct_id: str, + store_bucket: aws_s3.Bucket, + index_id: str, + **kwargs ) -> None: super().__init__(scope, construct_id, **kwargs) @@ -17,6 +22,7 @@ def __init__( environment={ "INDEX_BUCKET": store_bucket.bucket_name, "METASTORE_BUCKET": store_bucket.bucket_name, + "INDEX_ID": index_id, }, timeout=Duration.seconds(30), ) diff --git a/quickwit/quickwit-lambda/src/bin/indexer.rs b/quickwit/quickwit-lambda/src/bin/indexer.rs index 8d916b15366..077162c0cc6 100644 --- a/quickwit/quickwit-lambda/src/bin/indexer.rs +++ b/quickwit/quickwit-lambda/src/bin/indexer.rs @@ -32,7 +32,8 @@ pub async fn handler(event: LambdaEvent) -> Result { return Err(anyhow::anyhow!("Missing source_uri").into()); }; let ingest_res = ingest(IngestArgs { - index_id: String::from("hdfs-logs"), + index_config_uri: std::env::var("INDEX_CONFIG_URI")?, + index_id: std::env::var("INDEX_ID")?, input_path: PathBuf::from(source_uri), input_format: quickwit_config::SourceInputFormat::Json, overwrite: true, diff --git a/quickwit/quickwit-lambda/src/bin/searcher.rs b/quickwit/quickwit-lambda/src/bin/searcher.rs index c31634b030a..9033252553d 100644 --- a/quickwit/quickwit-lambda/src/bin/searcher.rs +++ b/quickwit/quickwit-lambda/src/bin/searcher.rs @@ -24,7 +24,7 @@ use tracing::{debug, error}; pub async fn handler(_event: LambdaEvent) -> Result { let ingest_res = search(SearchArgs { - index_id: String::from("hdfs-logs"), + index_id: std::env::var("INDEX_ID")?, query: String::new(), aggregation: None, max_hits: 10, diff --git a/quickwit/quickwit-lambda/src/ingest.rs b/quickwit/quickwit-lambda/src/ingest.rs index 7987df8a799..49002ed3b31 100644 --- a/quickwit/quickwit-lambda/src/ingest.rs +++ b/quickwit/quickwit-lambda/src/ingest.rs @@ -19,9 +19,9 @@ use std::collections::HashSet; use std::num::NonZeroUsize; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; -use anyhow::bail; +use anyhow::{bail, Context}; use chitchat::transport::ChannelTransport; use chitchat::FailureDetectorConfig; use quickwit_actors::Universe; @@ -30,10 +30,11 @@ use quickwit_cli::{run_index_checklist, start_actor_runtimes}; use quickwit_cluster::{Cluster, ClusterMember}; use quickwit_common::pubsub::EventBroker; use quickwit_common::runtimes::RuntimesConfig; +use quickwit_common::uri::Uri; use quickwit_config::service::QuickwitService; use quickwit_config::{ - IndexerConfig, NodeConfig, SourceConfig, SourceInputFormat, SourceParams, TransformConfig, - CLI_INGEST_SOURCE_ID, + load_index_config_from_user_config, ConfigFormat, IndexConfig, IndexerConfig, NodeConfig, + SourceConfig, SourceInputFormat, SourceParams, TransformConfig, CLI_INGEST_SOURCE_ID, }; use quickwit_index_management::{clear_cache_directory, IndexService}; use quickwit_indexing::actors::{IndexingService, MergePipelineId}; @@ -41,6 +42,8 @@ use quickwit_indexing::models::{ DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline, }; use quickwit_ingest::IngesterPool; +use quickwit_proto::metastore::MetastoreError; +use quickwit_storage::StorageResolver; use tracing::{debug, info}; use crate::utils::load_node_config; @@ -54,6 +57,7 @@ data_dir: /tmp #[derive(Debug, Eq, PartialEq)] pub struct IngestArgs { + pub index_config_uri: String, pub index_id: String, pub input_path: PathBuf, pub input_format: SourceInputFormat, @@ -84,6 +88,36 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result { Ok(cluster) } +/// TODO refactor with `dir_and_filename` in file source +pub fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { + let dir_uri: Uri = filepath + .parent() + .context("Parent directory could not be resolved")? + .to_str() + .context("Path cannot be turned to string")? + .parse()?; + let file_name = filepath + .file_name() + .context("Path does not appear to be a file")?; + Ok((dir_uri, file_name.as_ref())) +} + +async fn load_index_config( + resolver: &StorageResolver, + config_uri: &str, + default_index_root_uri: &Uri, +) -> anyhow::Result { + let (dir, file) = dir_and_filename(&Path::new(config_uri))?; + let index_config_storage = resolver.resolve(&dir).await?; + let bytes = index_config_storage.get_all(file).await?; + let index_config = load_index_config_from_user_config( + ConfigFormat::Yaml, + bytes.as_slice(), + default_index_root_uri, + )?; + Ok(index_config) +} + pub async fn ingest(args: IngestArgs) -> anyhow::Result { debug!(args=?args, "lambda-ingest"); let (config, storage_resolver, metastore) = load_node_config(CONFIGURATION_TEMPLATE).await?; @@ -101,13 +135,42 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { transform_config, input_format: args.input_format, }; - run_index_checklist( + + let checklist_result = run_index_checklist( &*metastore, &storage_resolver, &args.index_id, Some(&source_config), ) - .await?; + .await; + if let Err(e) = checklist_result { + let is_not_found = e.downcast_ref().is_some_and(|meta_error| match meta_error { + MetastoreError::NotFound(_) => true, + _ => false, + }); + if !is_not_found { + bail!(e); + } + info!( + index_id = args.index_id, + index_config_uri = args.index_config_uri, + "Index not found, creating it" + ); + let index_config = load_index_config( + &storage_resolver, + &args.index_config_uri, + &config.default_index_root_uri, + ) + .await?; + if index_config.index_id != args.index_id { + bail!( + "Expected index ID was {} but config file had {}", + args.index_id, + index_config.index_id, + ); + } + metastore.create_index(index_config).await?; + } if args.overwrite { let index_service = IndexService::new(metastore.clone(), storage_resolver.clone());