Skip to content

Commit

Permalink
Merge branch 'main' of github.com:samoii/quickwit into fix-s3-sse
Browse files Browse the repository at this point in the history
  • Loading branch information
samoii committed Sep 20, 2024
2 parents 5147505 + 634a1bc commit ad68a93
Show file tree
Hide file tree
Showing 71 changed files with 1,604 additions and 675 deletions.
2 changes: 1 addition & 1 deletion .github/actions/cargo-build-macos-binary/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ runs:
BINARY_VERSION=${{ inputs.version }} ARCHIVE_NAME=${{ env.ASSET_FULL_NAME }}
shell: bash
- name: Save binary archive for three days
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4.4.0
with:
name: ${{ env.ASSET_FULL_NAME }}.tar.gz
path: ./${{ env.ASSET_FULL_NAME }}.tar.gz
Expand Down
2 changes: 1 addition & 1 deletion .github/actions/cross-build-binary/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ runs:
BINARY_VERSION=${{ inputs.version }} ARCHIVE_NAME=${{ env.ASSET_FULL_NAME }}
shell: bash
- name: Save binary archive for three days
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4.4.0
with:
name: ${{ env.ASSET_FULL_NAME }}.tar.gz
path: ./${{ env.ASSET_FULL_NAME }}.tar.gz
Expand Down
9 changes: 6 additions & 3 deletions .github/workflows/publish_docker_images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ jobs:
include:
- os: ubuntu-latest
platform: linux/amd64
platform_suffix: amd64
- os: gh-ubuntu-arm64
platform: linux/arm64
platform_suffix: arm64
runs-on: ${{ matrix.os }}
steps:
- name: Checkout
Expand Down Expand Up @@ -79,9 +81,9 @@ jobs:
touch "/tmp/digests/${digest#sha256:}"
- name: Upload digest
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4.4.0
with:
name: digest
name: digest-${{ matrix.platform_suffix }}
path: /tmp/digests/*
if-no-files-found: error
retention-days: 1
Expand All @@ -93,8 +95,9 @@ jobs:
- name: Download digests
uses: actions/[email protected]
with:
name: digest
pattern: digest-*
path: /tmp/digests
merge-multiple: true

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/source-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ Required fields for the SQS `notifications` parameter items:
- `message_type`: format of the message payload, either
- `s3_notification`: an [S3 event notification](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html)
- `raw_uri`: a message containing just the file object URI (e.g. `s3://mybucket/mykey`)
- `deduplication_window_duration_sec`: maximum duration for which ingested files checkpoints are kept (default 3600)
- `deduplication_window_max_messages`: maximum number of ingested file checkpoints kept (default 100k)
- `deduplication_cleanup_interval_secs`: frequency at which outdated file checkpoints are cleaned up

*Adding a file source with SQS notifications to an index with the [CLI](../reference/cli.md#source)*

Expand All @@ -82,6 +85,7 @@ EOF
- the notification message could not be parsed (e.g it is not a valid S3 notification)
- the file was not found
- the file is corrupted (e.g unexpected compression)
- AWS S3 notifications and AWS SQS provide "at least once" delivery guaranties. To avoid duplicates, the file source includes a mechanism that prevents the same file from being ingested twice. It works by storing checkpoints in the metastore that track the indexing progress for each file. You can decrease `deduplication_window_*` or increase `deduplication_cleanup_interval_secs` to reduce the load on the metastore.

:::

Expand Down
20 changes: 11 additions & 9 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ quickwit-serve = { path = "quickwit-serve" }
quickwit-storage = { path = "quickwit-storage" }
quickwit-telemetry = { path = "quickwit-telemetry" }

tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "9f81d59", default-features = false, features = [
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "55b0b52", default-features = false, features = [
"lz4-compression",
"mmap",
"quickwit",
Expand Down
51 changes: 37 additions & 14 deletions quickwit/quickwit-cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashSet;
use std::pin::pin;
use std::str::FromStr;

use clap::{arg, ArgAction, ArgMatches, Command};
use colored::Colorize;
use futures::future::select;
use itertools::Itertools;
use quickwit_common::runtimes::RuntimesConfig;
Expand All @@ -33,6 +35,7 @@ use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, Teleme
use tokio::signal;
use tracing::{debug, info};

use crate::checklist::{BLUE_COLOR, RED_COLOR};
use crate::{config_cli_arg, get_resolvers, load_node_config, start_actor_runtimes};

pub fn build_run_command() -> Command {
Expand All @@ -53,6 +56,38 @@ pub struct RunCliCommand {
pub services: Option<HashSet<QuickwitService>>,
}

async fn listen_interrupt() {
async fn ctrl_c() {
signal::ctrl_c()
.await
.expect("registering a signal handler for SIGINT should not fail");
// carriage return to hide the ^C echo from the terminal
print!("\r");
}
ctrl_c().await;
println!(
"{} Graceful shutdown initiated. Waiting for ingested data to be indexed. This may take a \
few minutes. Press Ctrl+C again to force shutdown.",
"❢".color(BLUE_COLOR)
);
tokio::spawn(async {
ctrl_c().await;
println!(
"{} Quickwit was forcefully shut down. Some data might not have been indexed.",
"✘".color(RED_COLOR)
);
std::process::exit(1);
});
}

async fn listen_sigterm() {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("registering a signal handler for SIGTERM should not fail")
.recv()
.await;
info!("SIGTERM received");
}

impl RunCliCommand {
pub fn parse_cli_args(mut matches: ArgMatches) -> anyhow::Result<Self> {
let config_uri = matches
Expand Down Expand Up @@ -95,20 +130,7 @@ impl RunCliCommand {
let runtimes_config = RuntimesConfig::default();
start_actor_runtimes(runtimes_config, &node_config.enabled_services)?;
let shutdown_signal = Box::pin(async {
select(
Box::pin(async {
signal::ctrl_c()
.await
.expect("registering a signal handler for SIGINT should not fail");
}),
Box::pin(async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("registering a signal handler for SIGTERM should not fail")
.recv()
.await;
}),
)
.await;
select(pin!(listen_interrupt()), pin!(listen_sigterm())).await;
});
let serve_result = serve_quickwit(
node_config,
Expand All @@ -129,6 +151,7 @@ impl RunCliCommand {
telemetry_handle.terminate_telemetry().await;
}
serve_result?;
info!("quickwit successfully terminated");
Ok(())
}
}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-common/src/tower/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod rate;
mod rate_estimator;
mod rate_limit;
mod retry;
mod timeout;
mod transport;

use std::error;
Expand All @@ -55,6 +56,7 @@ pub use rate::{ConstantRate, Rate};
pub use rate_estimator::{RateEstimator, SmaRateEstimator};
pub use rate_limit::{RateLimit, RateLimitLayer};
pub use retry::{RetryLayer, RetryPolicy};
pub use timeout::{Timeout, TimeoutExceeded, TimeoutLayer};
pub use transport::{make_channel, warmup_channel, BalanceChannel};

pub type BoxError = Box<dyn error::Error + Send + Sync + 'static>;
Expand Down
Loading

0 comments on commit ad68a93

Please sign in to comment.