Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various Lambda fixes #5016

Merged
merged 3 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions distribution/lambda/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ package:
then
pushd ../../quickwit/
rustc --version
# TODO: remove --disable-optimizations when upgrading to a release containing
# https://github.com/cargo-lambda/cargo-lambda/issues/649 (> 1.2.1)
cargo lambda build \
-p quickwit-lambda \
--disable-optimizations \
--release \
--output-format zip \
--target x86_64-unknown-linux-gnu
Expand Down
8 changes: 7 additions & 1 deletion distribution/lambda/cdk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ def get_logs(
last_event_id = ""
last_event_found = True
start_time = time.time()
while time.time() - start_time < timeout:
describe_resp = client.describe_log_groups(logGroupNamePrefix=log_group_name)
group_names = [group["logGroupName"] for group in describe_resp["logGroups"]]
if log_group_name in group_names:
break
print(f"log group not found, retrying...")
time.sleep(3)
while time.time() - start_time < timeout:
for page in paginator.paginate(
logGroupName=log_group_name,
Expand All @@ -268,7 +275,6 @@ def get_logs(
last_event_id = event["eventId"]
yield event["message"]
if event["message"].startswith("REPORT"):
lower_time_bound = int(event["timestamp"])
last_event_id = "REPORT"
break
if last_event_id == "REPORT":
Expand Down
60 changes: 41 additions & 19 deletions quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ use quickwit_indexing::IndexingPipeline;
use quickwit_ingest::IngesterPool;
use quickwit_janitor::{start_janitor_service, JanitorService};
use quickwit_metastore::{
CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadata, IndexMetadataResponseExt,
AddSourceRequestExt, CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadata,
IndexMetadataResponseExt,
};
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::metastore::{
CreateIndexRequest, IndexMetadataRequest, MetastoreError, MetastoreService,
AddSourceRequest, CreateIndexRequest, IndexMetadataRequest, MetastoreError, MetastoreService,
MetastoreServiceClient, ResetSourceCheckpointRequest,
};
use quickwit_proto::types::PipelineUid;
Expand All @@ -60,7 +61,7 @@ use crate::indexer::environment::{
DISABLE_JANITOR, DISABLE_MERGE, INDEX_CONFIG_URI, MAX_CHECKPOINTS,
};

const LAMBDA_SOURCE_ID: &str = "_ingest-lambda-source";
const LAMBDA_SOURCE_ID: &str = "ingest-lambda-source";

/// The indexing service needs to update its cluster chitchat state so that the control plane is
/// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service
Expand Down Expand Up @@ -154,29 +155,47 @@ pub(super) async fn configure_source(
}

/// Check if the index exists, creating or overwriting it if necessary
///
/// If the index exists but without the Lambda source ([`LAMBDA_SOURCE_ID`]),
/// the source is added.
pub(super) async fn init_index_if_necessary(
metastore: &mut MetastoreServiceClient,
storage_resolver: &StorageResolver,
default_index_root_uri: &Uri,
overwrite: bool,
source_config: &SourceConfig,
) -> anyhow::Result<IndexMetadata> {
let metadata_result = metastore
.index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone()))
.await;
let metadata = match metadata_result {
Ok(_) if overwrite => {
info!(
index_id = *INDEX_ID,
"Overwrite enabled, clearing existing index",
);
let mut index_service = IndexService::new(metastore.clone(), storage_resolver.clone());
index_service.clear_index(&INDEX_ID).await?;
metastore
.index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone()))
.await?
.deserialize_index_metadata()?
Ok(metadata_resp) => {
let current_metadata = metadata_resp.deserialize_index_metadata()?;
let mut metadata_changed = false;
if overwrite {
info!(index_uid = %current_metadata.index_uid, "overwrite enabled, clearing existing index");
let mut index_service =
IndexService::new(metastore.clone(), storage_resolver.clone());
index_service.clear_index(&INDEX_ID).await?;
metadata_changed = true;
}
if !current_metadata.sources.contains_key(LAMBDA_SOURCE_ID) {
let add_source_request = AddSourceRequest::try_from_source_config(
current_metadata.index_uid.clone(),
source_config,
)?;
metastore.add_source(add_source_request).await?;
metadata_changed = true;
}
if metadata_changed {
metastore
.index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone()))
.await?
.deserialize_index_metadata()?
} else {
current_metadata
}
}
Ok(metadata_resp) => metadata_resp.deserialize_index_metadata()?,
Err(MetastoreError::NotFound(_)) => {
info!(
index_id = *INDEX_ID,
Expand All @@ -191,10 +210,13 @@ pub(super) async fn init_index_if_necessary(
index_config.index_id,
);
}
let create_resp = metastore
.create_index(CreateIndexRequest::try_from_index_config(&index_config)?)
.await?;
info!("index created");
let create_index_request = CreateIndexRequest::try_from_index_and_source_configs(
&index_config,
std::slice::from_ref(source_config),
)?;
let create_resp = metastore.create_index(create_index_request).await?;

info!(index_uid = %create_resp.index_uid(), "index created");
create_resp.deserialize_index_metadata()?
}
Err(e) => bail!(e),
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-lambda/src/indexer/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,18 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
let (config, storage_resolver, mut metastore) =
load_node_config(CONFIGURATION_TEMPLATE).await?;

let source_config =
configure_source(args.input_path, args.input_format, args.vrl_script).await?;

let index_metadata = init_index_if_necessary(
&mut metastore,
&storage_resolver,
&config.default_index_root_uri,
args.overwrite,
&source_config,
)
.await?;

let source_config =
configure_source(args.input_path, args.input_format, args.vrl_script).await?;

let mut services = vec![QuickwitService::Indexer];
if !*DISABLE_JANITOR {
services.push(QuickwitService::Janitor);
Expand Down
Loading