diff --git a/distribution/lambda/Makefile b/distribution/lambda/Makefile
index 2399c937182..00642e125f4 100644
--- a/distribution/lambda/Makefile
+++ b/distribution/lambda/Makefile
@@ -11,8 +11,6 @@ endif
build:
cargo lambda build --manifest-path=../../quickwit/Cargo.toml -p quickwit-lambda --release
- cp resources/indexer-config.yaml ../../quickwit/target/lambda/indexer/config.yaml
- cp resources/searcher-config.yaml ../../quickwit/target/lambda/searcher/config.yaml
init: build check-env
cdk bootstrap aws://$$CDK_ACCOUNT/$$CDK_REGION
@@ -23,32 +21,13 @@ deploy: build check-env
export EXAMPLE_FILE=hdfs-logs-multitenants-10000.json
upload-src-file: check-env
- export AWS_REGION=$$CDK_REGION
- bucket_name=$$(aws cloudformation describe-stacks --stack-name LambdaStack --query "Stacks[0].Outputs[?ExportName=='index-store-bucket-name'].OutputValue" --output text)
- source_uri="s3://$$bucket_name/$$EXAMPLE_FILE"
- echo "upload src file to $$source_uri"
- curl https://quickwit-datasets-public.s3.amazonaws.com/$$EXAMPLE_FILE | aws s3 cp - $$source_uri
+ python -c 'import cli; cli.upload_src_file()'
invoke-indexer: check-env
- export AWS_REGION=$$CDK_REGION
- function_name=$$(aws cloudformation describe-stacks --stack-name LambdaStack --query "Stacks[0].Outputs[?ExportName=='indexer-function-name'].OutputValue" --output text)
- bucket_name=$$(aws cloudformation describe-stacks --stack-name LambdaStack --query "Stacks[0].Outputs[?ExportName=='index-store-bucket-name'].OutputValue" --output text)
- source_uri="s3://$$bucket_name/$$EXAMPLE_FILE"
- echo "indexer function name: $$function_name, src_file: $$source_uri"
- aws lambda invoke \
- --cli-binary-format raw-in-base64-out \
- --payload "{ \"source_uri\": \"$$source_uri\" }" \
- --function-name $$function_name \
- --log-type Tail /dev/null \
- | jq -r '.LogResult' \
- | base64 -d
+ python -c 'import cli; cli.invoke_indexer()'
invoke-searcher: check-env
- export AWS_REGION=$$CDK_REGION
- function_name=$$(aws cloudformation describe-stacks --stack-name LambdaStack --query "Stacks[0].Outputs[?ExportName=='searcher-function-name'].OutputValue" --output text)
- echo "searcher function name: $$function_name"
- aws lambda invoke --function-name $$function_name --log-type Tail /dev/null | jq -r '.LogResult' | base64 -d
-
+ python -c 'import cli; cli.invoke_searcher()'
index-creation-instruction: deploy
export AWS_REGION=$$CDK_REGION
diff --git a/distribution/lambda/cli.py b/distribution/lambda/cli.py
new file mode 100644
index 00000000000..b135714b178
--- /dev/null
+++ b/distribution/lambda/cli.py
@@ -0,0 +1,82 @@
+"""Helper scripts to test and explore the deployed infrastructure.
+
+These functions are wrapped by the Makefile for convenience."""
+
+import os
+import boto3
+import base64
+import http.client
+
+region = os.environ["CDK_REGION"]
+stack_name = "LambdaStack"
+example_bucket = "quickwit-datasets-public.s3.amazonaws.com"
+example_file = "hdfs-logs-multitenants-10000.json"
+
+session = boto3.Session(region_name=region)
+
+
+def _get_cloudformation_output_value(export_name: str) -> str:
+ client = session.client("cloudformation")
+ stacks = client.describe_stacks(StackName=stack_name)["Stacks"]
+ if len(stacks) != 1:
+ print(f"Stack {stack_name} not identified uniquely, found {stacks}")
+ outputs = stacks[0]["Outputs"]
+ for output in outputs:
+ if output["ExportName"] == export_name:
+ return output["OutputValue"]
+ else:
+ print(f"Export name {export_name} not found in stack {stack_name}")
+ exit(1)
+
+
+def _format_lambda_output(lambda_resp):
+ if "FunctionError" in lambda_resp and lambda_resp["FunctionError"] != "":
+ print("\n## FUNCTION ERROR:")
+ print(lambda_resp["FunctionError"])
+ print("\n## LOG TAIL:")
+ print(base64.b64decode(lambda_resp["LogResult"]).decode())
+ print("\n## RESPONSE:")
+ print(lambda_resp["Payload"].read().decode())
+
+
+def upload_src_file():
+ bucket_name = _get_cloudformation_output_value("index-store-bucket-name")
+ source_uri = f"s3://{bucket_name}/{example_file}"
+ print(f"upload src file to {source_uri}")
+ conn = http.client.HTTPSConnection(example_bucket)
+ conn.request("GET", f"/{example_file}")
+ response = conn.getresponse()
+ if response.status != 200:
+ print(f"Failed to fetch dataset")
+ exit(1)
+ file_data = response.read()
+ session.client("s3").put_object(
+ Bucket=bucket_name, Body=file_data, Key=example_file
+ )
+
+
+def invoke_indexer():
+ function_name = _get_cloudformation_output_value("indexer-function-name")
+ print(f"indexer function name: {function_name}")
+ bucket_name = _get_cloudformation_output_value("index-store-bucket-name")
+ source_uri = f"s3://{bucket_name}/{example_file}"
+ print(f"src_file: {source_uri}")
+ resp = session.client("lambda").invoke(
+ FunctionName=function_name,
+ InvocationType="RequestResponse",
+ LogType="Tail",
+ Payload=f"""{{ "source_uri": "{source_uri}" }}""",
+ )
+ _format_lambda_output(resp)
+
+
+def invoke_searcher():
+ function_name = _get_cloudformation_output_value("searcher-function-name")
+ print(f"searcher function name: {function_name}")
+ resp = session.client("lambda").invoke(
+ FunctionName=function_name,
+ InvocationType="RequestResponse",
+ LogType="Tail",
+ Payload="{}",
+ )
+ _format_lambda_output(resp)
diff --git a/distribution/lambda/requirements-dev.txt b/distribution/lambda/requirements-dev.txt
index 17995349da4..31a750a0c8a 100644
--- a/distribution/lambda/requirements-dev.txt
+++ b/distribution/lambda/requirements-dev.txt
@@ -1,2 +1,3 @@
black==23.9.1
+boto3==1.28.59
pytest==6.2.5
diff --git a/distribution/lambda/resources/indexer-config.yaml b/distribution/lambda/resources/indexer-config.yaml
deleted file mode 100644
index ba433cac4e3..00000000000
--- a/distribution/lambda/resources/indexer-config.yaml
+++ /dev/null
@@ -1,5 +0,0 @@
-version: 0.6
-node_id: lambda-indexer
-metastore_uri: s3://${METASTORE_BUCKET}
-default_index_root_uri: s3://${INDEX_BUCKET}
-data_dir: /tmp
diff --git a/distribution/lambda/resources/searcher-config.yaml b/distribution/lambda/resources/searcher-config.yaml
deleted file mode 100644
index caed87ed0e9..00000000000
--- a/distribution/lambda/resources/searcher-config.yaml
+++ /dev/null
@@ -1,5 +0,0 @@
-version: 0.6
-node_id: lambda-searcher
-metastore_uri: s3://${METASTORE_BUCKET}
-default_index_root_uri: s3://${INDEX_BUCKET}
-data_dir: /tmp
diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock
index 3f9f9b36822..7e5f42bbedc 100644
--- a/quickwit/Cargo.lock
+++ b/quickwit/Cargo.lock
@@ -5522,6 +5522,7 @@ dependencies = [
"quickwit-doc-mapper",
"quickwit-index-management",
"quickwit-indexing",
+ "quickwit-ingest",
"quickwit-metastore",
"quickwit-proto",
"quickwit-rest-client",
diff --git a/quickwit/quickwit-lambda/Cargo.toml b/quickwit/quickwit-lambda/Cargo.toml
index 0bcf4d2690b..eab090c64fe 100644
--- a/quickwit/quickwit-lambda/Cargo.toml
+++ b/quickwit/quickwit-lambda/Cargo.toml
@@ -34,6 +34,7 @@ quickwit-index-management = { workspace = true }
quickwit-directories = { workspace = true }
quickwit-doc-mapper = { workspace = true }
quickwit-indexing = { workspace = true }
+quickwit-ingest = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-rest-client = { workspace = true }
diff --git a/quickwit/quickwit-lambda/src/bin/indexer.rs b/quickwit/quickwit-lambda/src/bin/indexer.rs
index 2f8a7de1292..8d916b15366 100644
--- a/quickwit/quickwit-lambda/src/bin/indexer.rs
+++ b/quickwit/quickwit-lambda/src/bin/indexer.rs
@@ -17,13 +17,46 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use lambda_runtime::service_fn;
-use quickwit_lambda::{index_handler, setup_lambda_tracer};
+use std::path::PathBuf;
+
+use lambda_runtime::{service_fn, Error, LambdaEvent};
+use quickwit_lambda::{ingest, setup_lambda_tracer, IngestArgs};
+use serde_json::Value;
+use tracing::{error, info};
+
+pub async fn handler(event: LambdaEvent) -> Result {
+ let source_uri = if let Some(source_uri) = event.payload["source_uri"].as_str() {
+ source_uri
+ } else {
+ println!("Missing source_uri");
+ return Err(anyhow::anyhow!("Missing source_uri").into());
+ };
+ let ingest_res = ingest(IngestArgs {
+ index_id: String::from("hdfs-logs"),
+ input_path: PathBuf::from(source_uri),
+ input_format: quickwit_config::SourceInputFormat::Json,
+ overwrite: true,
+ vrl_script: None,
+ clear_cache: true,
+ })
+ .await;
+
+ match ingest_res {
+ Ok(stats) => {
+ info!(stats=?stats, "Indexing succeeded");
+ Ok(serde_json::to_value(stats)?)
+ }
+ Err(e) => {
+ error!(err=?e, "Indexing failed");
+ Err(anyhow::anyhow!("Indexing failed").into())
+ }
+ }
+}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
setup_lambda_tracer()?;
- let func = service_fn(index_handler);
+ let func = service_fn(handler);
lambda_runtime::run(func)
.await
.map_err(|e| anyhow::anyhow!(e))
diff --git a/quickwit/quickwit-lambda/src/bin/searcher.rs b/quickwit/quickwit-lambda/src/bin/searcher.rs
index ee42632f53e..c31634b030a 100644
--- a/quickwit/quickwit-lambda/src/bin/searcher.rs
+++ b/quickwit/quickwit-lambda/src/bin/searcher.rs
@@ -17,13 +17,41 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use lambda_runtime::service_fn;
-use quickwit_lambda::{search_handler, setup_lambda_tracer};
+use lambda_runtime::{service_fn, Error, LambdaEvent};
+use quickwit_lambda::{search, setup_lambda_tracer, SearchArgs};
+use serde_json::Value;
+use tracing::{debug, error};
+
+pub async fn handler(_event: LambdaEvent) -> Result {
+ let ingest_res = search(SearchArgs {
+ index_id: String::from("hdfs-logs"),
+ query: String::new(),
+ aggregation: None,
+ max_hits: 10,
+ start_offset: 0,
+ search_fields: None,
+ snippet_fields: None,
+ start_timestamp: None,
+ end_timestamp: None,
+ sort_by_field: None,
+ })
+ .await;
+ match ingest_res {
+ Ok(resp) => {
+ debug!(resp=?resp, "Search succeeded");
+ Ok(serde_json::to_value(resp)?)
+ }
+ Err(e) => {
+ error!(err=?e, "Search failed");
+ return Err(anyhow::anyhow!("Query failed").into());
+ }
+ }
+}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
setup_lambda_tracer()?;
- let func = service_fn(search_handler);
+ let func = service_fn(handler);
lambda_runtime::run(func)
.await
.map_err(|e| anyhow::anyhow!(e))
diff --git a/quickwit/quickwit-lambda/src/ingest.rs b/quickwit/quickwit-lambda/src/ingest.rs
index b53c659d648..7987df8a799 100644
--- a/quickwit/quickwit-lambda/src/ingest.rs
+++ b/quickwit/quickwit-lambda/src/ingest.rs
@@ -17,37 +17,51 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-///////////////////////////////////////////////////////
-// NOTE: Archived in case we want to to fork the CLI //
-///////////////////////////////////////////////////////
-
use std::collections::HashSet;
-use std::io::IsTerminal;
use std::num::NonZeroUsize;
-use std::{env, io};
+use std::path::PathBuf;
-use anyhow::{bail, Context};
+use anyhow::bail;
+use chitchat::transport::ChannelTransport;
use chitchat::FailureDetectorConfig;
use quickwit_actors::Universe;
-use quickwit_cli::{
- run_index_checklist, start_actor_runtimes, tool::start_statistics_reporting_loop,
- tool::LocalIngestDocsArgs,
-};
-use quickwit_cluster::{ChannelTransport, Cluster, ClusterMember};
+use quickwit_cli::tool::start_statistics_reporting_loop;
+use quickwit_cli::{run_index_checklist, start_actor_runtimes};
+use quickwit_cluster::{Cluster, ClusterMember};
+use quickwit_common::pubsub::EventBroker;
use quickwit_common::runtimes::RuntimesConfig;
-use quickwit_common::uri::Uri;
use quickwit_config::service::QuickwitService;
use quickwit_config::{
- ConfigFormat, IndexerConfig, MetastoreConfigs, NodeConfig, SourceConfig, SourceParams,
- StorageConfigs, TransformConfig, CLI_INGEST_SOURCE_ID,
+ IndexerConfig, NodeConfig, SourceConfig, SourceInputFormat, SourceParams, TransformConfig,
+ CLI_INGEST_SOURCE_ID,
};
use quickwit_index_management::{clear_cache_directory, IndexService};
use quickwit_indexing::actors::{IndexingService, MergePipelineId};
-use quickwit_indexing::models::{DetachIndexingPipeline, DetachMergePipeline, SpawnPipeline};
-use quickwit_metastore::MetastoreResolver;
-use quickwit_storage::{load_file, StorageResolver};
+use quickwit_indexing::models::{
+ DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
+};
+use quickwit_ingest::IngesterPool;
use tracing::{debug, info};
+use crate::utils::load_node_config;
+
+const CONFIGURATION_TEMPLATE: &str = "version: 0.6
+node_id: lambda-indexer
+metastore_uri: s3://${METASTORE_BUCKET}
+default_index_root_uri: s3://${INDEX_BUCKET}
+data_dir: /tmp
+";
+
+#[derive(Debug, Eq, PartialEq)]
+pub struct IngestArgs {
+ pub index_id: String,
+ pub input_path: PathBuf,
+ pub input_format: SourceInputFormat,
+ pub overwrite: bool,
+ pub vrl_script: Option,
+ pub clear_cache: bool,
+}
+
async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result {
let self_node = ClusterMember::new(
config.node_id.clone(),
@@ -70,49 +84,11 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result {
Ok(cluster)
}
-fn get_resolvers(
- storage_configs: &StorageConfigs,
- metastore_configs: &MetastoreConfigs,
-) -> (StorageResolver, MetastoreResolver) {
- // The CLI tests rely on the unconfigured singleton resolvers, so it's better to return them if
- // the storage and metastore configs are not set.
- if storage_configs.is_empty() && metastore_configs.is_empty() {
- return (
- StorageResolver::unconfigured(),
- MetastoreResolver::unconfigured(),
- );
- }
- let storage_resolver = StorageResolver::configured(storage_configs);
- let metastore_resolver =
- MetastoreResolver::configured(storage_resolver.clone(), metastore_configs);
- (storage_resolver, metastore_resolver)
-}
-
-async fn load_node_config(config_uri: &Uri) -> anyhow::Result {
- let config_content = load_file(&StorageResolver::unconfigured(), config_uri)
- .await
- .context("Failed to load node config.")?;
- let config_format = ConfigFormat::sniff_from_uri(config_uri)?;
- let config = NodeConfig::load(config_format, config_content.as_slice())
- .await
- .with_context(|| format!("Failed to parse node config `{config_uri}`."))?;
- info!(config_uri=%config_uri, config=?config, "Loaded node config.");
- Ok(config)
-}
-
-pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<()> {
- debug!(args=?args, "local-ingest-docs");
+pub async fn ingest(args: IngestArgs) -> anyhow::Result {
+ debug!(args=?args, "lambda-ingest");
+ let (config, storage_resolver, metastore) = load_node_config(CONFIGURATION_TEMPLATE).await?;
- let config = load_node_config(&args.config_uri).await?;
- let (storage_resolver, metastore_resolver) =
- get_resolvers(&config.storage_configs, &config.metastore_configs);
- let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
-
- let source_params = if let Some(filepath) = args.input_path_opt.as_ref() {
- SourceParams::file(filepath)
- } else {
- SourceParams::stdin()
- };
+ let source_params = SourceParams::file(args.input_path);
let transform_config = args
.vrl_script
.map(|vrl_script| TransformConfig::new(vrl_script, None));
@@ -157,7 +133,9 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
cluster,
metastore,
None,
+ IngesterPool::default(),
storage_resolver,
+ EventBroker::default(),
)
.await?;
let universe = Universe::new();
@@ -179,19 +157,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
.ask_for_res(DetachIndexingPipeline { pipeline_id })
.await?;
- if args.input_path_opt.is_none() && io::stdin().is_terminal() {
- let eof_shortcut = match env::consts::OS {
- "windows" => "CTRL+Z",
- _ => "CTRL+D",
- };
- println!(
- "Please, enter JSON documents one line at a time.\nEnd your input using \
- {eof_shortcut}."
- );
- }
- let statistics =
- start_statistics_reporting_loop(indexing_pipeline_handle, args.input_path_opt.is_none())
- .await?;
+ let statistics = start_statistics_reporting_loop(indexing_pipeline_handle, false).await?;
merge_pipeline_handle.quit().await;
// Shutdown the indexing server.
universe
@@ -199,25 +165,15 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
.await?;
indexing_server_handle.join().await;
universe.quit().await;
- if statistics.num_published_splits > 0 {
- println!(
- "Now, you can query the index with the following command:\nquickwit index search \
- --index {} --config ./config/quickwit.yaml --query \"my query\"",
- args.index_id
- );
- }
if args.clear_cache {
- println!("Clearing local cache directory...");
+ info!("Clearing local cache directory...");
clear_cache_directory(&config.data_dir_path).await?;
- println!("{} Local cache directory cleared.", "✔");
+ info!("Local cache directory cleared.");
}
- match statistics.num_invalid_docs {
- 0 => {
- println!("{} Documents successfully indexed.", "✔");
- Ok(())
- }
- _ => bail!("Failed to ingest all the documents."),
+ if statistics.num_invalid_docs > 0 {
+ bail!("Failed to ingest {} documents", statistics.num_invalid_docs)
}
+ Ok(statistics)
}
diff --git a/quickwit/quickwit-lambda/src/lib.rs b/quickwit/quickwit-lambda/src/lib.rs
index 4bdf3bdfe4b..62c33d75df4 100644
--- a/quickwit/quickwit-lambda/src/lib.rs
+++ b/quickwit/quickwit-lambda/src/lib.rs
@@ -17,68 +17,17 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use std::path::PathBuf;
-
-use lambda_runtime::{Error, LambdaEvent};
use quickwit_cli::logger::setup_logging_and_tracing;
-use quickwit_cli::tool::{
- local_ingest_docs_cli, local_search_cli, LocalIngestDocsArgs, LocalSearchArgs,
-};
-use quickwit_common::uri::Uri;
use quickwit_serve::BuildInfo;
-use serde_json::{json, Value};
use tracing::Level;
+mod ingest;
+mod search;
+mod utils;
+
pub fn setup_lambda_tracer() -> anyhow::Result<()> {
setup_logging_and_tracing(Level::INFO, false, BuildInfo::get())
}
-pub async fn index_handler(event: LambdaEvent) -> Result {
- let source_uri = if let Some(source_uri) = event.payload["source_uri"].as_str() {
- source_uri
- } else {
- println!("Missing source_uri");
- return Err(anyhow::anyhow!("Missing source_uri").into());
- };
- let ingest_res = local_ingest_docs_cli(LocalIngestDocsArgs {
- clear_cache: true,
- config_uri: Uri::from_well_formed("file:///var/task/config.yaml"),
- index_id: String::from("hdfs-logs"),
- input_format: quickwit_config::SourceInputFormat::Json,
- overwrite: true,
- input_path_opt: Some(PathBuf::from(source_uri)),
- vrl_script: None,
- })
- .await;
- if let Err(e) = ingest_res {
- println!("{:?}", e);
- return Err(anyhow::anyhow!("Indexing failed").into());
- }
- Ok(json!({
- "message": format!("Hello from Quickwit {}!", BuildInfo::get().version)
- }))
-}
-
-pub async fn search_handler(_event: LambdaEvent) -> Result {
- let ingest_res = local_search_cli(LocalSearchArgs {
- config_uri: Uri::from_well_formed("file:///var/task/config.yaml"),
- index_id: String::from("hdfs-logs"),
- query: String::new(),
- aggregation: None,
- max_hits: 10,
- start_offset: 0,
- search_fields: None,
- snippet_fields: None,
- start_timestamp: None,
- end_timestamp: None,
- sort_by_field: None,
- })
- .await;
- if let Err(e) = ingest_res {
- println!("{:?}", e);
- return Err(anyhow::anyhow!("Query failed").into());
- }
- Ok(json!({
- "message": format!("Hello from Quickwit {}!", BuildInfo::get().version)
- }))
-}
+pub use ingest::{ingest, IngestArgs};
+pub use search::{search, SearchArgs};
diff --git a/quickwit/quickwit-lambda/src/search.rs b/quickwit/quickwit-lambda/src/search.rs
new file mode 100644
index 00000000000..02eb8e55176
--- /dev/null
+++ b/quickwit/quickwit-lambda/src/search.rs
@@ -0,0 +1,77 @@
+// Copyright (C) 2023 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use quickwit_proto::search::SearchResponse;
+use quickwit_search::{single_node_search, SearchResponseRest};
+use quickwit_serve::{
+ search_request_from_api_request, BodyFormat, SearchRequestQueryString, SortBy,
+};
+use tracing::debug;
+
+use crate::utils::load_node_config;
+
+const CONFIGURATION_TEMPLATE: &str = "version: 0.6
+node_id: lambda-searcher
+metastore_uri: s3://${METASTORE_BUCKET}
+default_index_root_uri: s3://${INDEX_BUCKET}
+data_dir: /tmp
+";
+
+#[derive(Debug, Eq, PartialEq)]
+pub struct SearchArgs {
+ pub index_id: String,
+ pub query: String,
+ pub aggregation: Option,
+ pub max_hits: usize,
+ pub start_offset: usize,
+ pub search_fields: Option>,
+ pub snippet_fields: Option>,
+ pub start_timestamp: Option,
+ pub end_timestamp: Option,
+ pub sort_by_field: Option,
+}
+
+pub async fn search(args: SearchArgs) -> anyhow::Result {
+ debug!(args=?args, "lambda-search");
+ let (_, storage_resolver, metastore) = load_node_config(CONFIGURATION_TEMPLATE).await?;
+ let aggs = args
+ .aggregation
+ .map(|agg_string| serde_json::from_str(&agg_string))
+ .transpose()?;
+ let sort_by: SortBy = args.sort_by_field.map(SortBy::from).unwrap_or_default();
+ let search_request_query_string = SearchRequestQueryString {
+ query: args.query,
+ start_offset: args.start_offset as u64,
+ max_hits: args.max_hits as u64,
+ search_fields: args.search_fields,
+ snippet_fields: args.snippet_fields,
+ start_timestamp: args.start_timestamp,
+ end_timestamp: args.end_timestamp,
+ aggs,
+ format: BodyFormat::Json,
+ sort_by,
+ };
+ let search_request =
+ search_request_from_api_request(vec![args.index_id], search_request_query_string)?;
+ debug!(search_request=?search_request, "search-request");
+ let search_response: SearchResponse =
+ single_node_search(search_request, metastore, storage_resolver).await?;
+ let search_response_rest = SearchResponseRest::try_from(search_response)?;
+ Ok(search_response_rest)
+}
diff --git a/quickwit/quickwit-lambda/src/utils.rs b/quickwit/quickwit-lambda/src/utils.rs
new file mode 100644
index 00000000000..4f126536747
--- /dev/null
+++ b/quickwit/quickwit-lambda/src/utils.rs
@@ -0,0 +1,40 @@
+// Copyright (C) 2023 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::sync::Arc;
+
+use anyhow::Context;
+use quickwit_config::{ConfigFormat, NodeConfig};
+use quickwit_metastore::{Metastore, MetastoreResolver};
+use quickwit_storage::StorageResolver;
+use tracing::info;
+
+pub(crate) async fn load_node_config(
+ config_template: &str,
+) -> anyhow::Result<(NodeConfig, StorageResolver, Arc)> {
+ let config = NodeConfig::load(ConfigFormat::Yaml, config_template.as_bytes())
+ .await
+ .with_context(|| format!("Failed to parse node config `{config_template}`."))?;
+ info!(config=?config, "Loaded node config.");
+ let storage_resolver = StorageResolver::configured(&config.storage_configs);
+ let metastore_resolver =
+ MetastoreResolver::configured(storage_resolver.clone(), &config.metastore_configs);
+ let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
+ Ok((config, storage_resolver, metastore))
+}