Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace FileSourceParams path with URI #5104

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ mod tests {
source_id: "foo-source".to_string(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::file("path/to/file"),
source_params: SourceParams::file_from_str("path/to/file").unwrap(),
transform_config: None,
input_format: SourceInputFormat::Json,
}];
Expand All @@ -753,9 +753,10 @@ mod tests {
source_type: "file".to_string(),
enabled: "true".to_string(),
}];
let expected_uri = Uri::from_str("path/to/file").unwrap();
let expected_params = vec![ParamsRow {
key: "filepath".to_string(),
value: JsonValue::String("path/to/file".to_string()),
value: JsonValue::String(expected_uri.to_string()),
}];
let expected_checkpoint = vec![
CheckpointRow {
Expand Down
10 changes: 4 additions & 6 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ pub fn build_tool_command() -> Command {
pub struct LocalIngestDocsArgs {
pub config_uri: Uri,
pub index_id: IndexId,
pub input_path_opt: Option<PathBuf>,
pub input_path_opt: Option<Uri>,
pub input_format: SourceInputFormat,
pub overwrite: bool,
pub vrl_script: Option<String>,
Expand Down Expand Up @@ -251,9 +251,7 @@ impl ToolCliCommand {
.remove_one::<String>("index")
.expect("`index` should be a required arg.");
let input_path_opt = if let Some(input_path) = matches.remove_one::<String>("input-path") {
Uri::from_str(&input_path)?
.filepath()
.map(|path| path.to_path_buf())
Some(Uri::from_str(&input_path)?)
} else {
None
};
Expand Down Expand Up @@ -410,8 +408,8 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
get_resolvers(&config.storage_configs, &config.metastore_configs);
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;

let source_params = if let Some(filepath) = args.input_path_opt.as_ref() {
SourceParams::file(filepath)
let source_params = if let Some(uri) = args.input_path_opt.as_ref() {
SourceParams::file_from_uri(uri.clone())
} else {
SourceParams::stdin()
};
Expand Down
61 changes: 29 additions & 32 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::path::Path;

use anyhow::Result;
use clap::error::ErrorKind;
use helpers::{TestEnv, TestStorageType};
use helpers::{uri_from_path, TestEnv, TestStorageType};
use quickwit_cli::checklist::ChecklistError;
use quickwit_cli::cli::build_cli;
use quickwit_cli::index::{
Expand All @@ -37,6 +37,7 @@ use quickwit_cli::tool::{
};
use quickwit_common::fs::get_cache_directory_path;
use quickwit_common::rand::append_random_suffix;
use quickwit_common::uri::Uri;
use quickwit_config::{RetentionPolicy, SourceInputFormat, CLI_SOURCE_ID};
use quickwit_metastore::{
ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, MetastoreServiceStreamSplitsExt,
Expand All @@ -61,11 +62,11 @@ async fn create_logs_index(test_env: &TestEnv) -> anyhow::Result<()> {
create_index_cli(args).await
}

async fn local_ingest_docs(input_path: &Path, test_env: &TestEnv) -> anyhow::Result<()> {
async fn local_ingest_docs(uri: Uri, test_env: &TestEnv) -> anyhow::Result<()> {
let args = LocalIngestDocsArgs {
config_uri: test_env.resource_files.config.clone(),
index_id: test_env.index_id.clone(),
input_path_opt: Some(input_path.to_path_buf()),
input_path_opt: Some(uri),
input_format: SourceInputFormat::Json,
overwrite: false,
clear_cache: true,
Expand All @@ -74,6 +75,10 @@ async fn local_ingest_docs(input_path: &Path, test_env: &TestEnv) -> anyhow::Res
local_ingest_docs_cli(args).await
}

async fn local_ingest_log_docs(test_env: &TestEnv) -> anyhow::Result<()> {
local_ingest_docs(test_env.resource_files.log_docs.clone(), test_env).await
}

#[test]
fn test_cmd_help() {
let cmd = build_cli();
Expand Down Expand Up @@ -252,14 +257,17 @@ async fn test_ingest_docs_cli() {

// Ensure cache directory is empty.
let cache_directory_path = get_cache_directory_path(&test_env.data_dir_path);

assert!(cache_directory_path.read_dir().unwrap().next().is_none());

let does_not_exit_uri = uri_from_path(&test_env.data_dir_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let does_not_exit_uri = uri_from_path(&test_env.data_dir_path)
let does_not_exist_uri = uri_from_path(&test_env.data_dir_path)

.join("file-does-not-exist.json")
.unwrap();

// Ingest a non-existing file should fail.
let args = LocalIngestDocsArgs {
config_uri: test_env.resource_files.config,
index_id: test_env.index_id,
input_path_opt: Some(test_env.data_dir_path.join("file-does-not-exist.json")),
input_path_opt: Some(does_not_exit_uri),
input_format: SourceInputFormat::Json,
overwrite: false,
clear_cache: true,
Expand Down Expand Up @@ -332,9 +340,7 @@ async fn test_cmd_search_aggregation() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();

local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

let aggregation: Value = json!(
{
Expand Down Expand Up @@ -432,9 +438,7 @@ async fn test_cmd_search_with_snippets() -> Result<()> {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();

local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

// search with snippets
let args = SearchIndexArgs {
Expand Down Expand Up @@ -487,9 +491,7 @@ async fn test_search_index_cli() {
sort_by_score: false,
};

local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

let args = create_search_args("level:info");

Expand Down Expand Up @@ -600,9 +602,7 @@ async fn test_delete_index_cli_dry_run() {
.unwrap();
assert!(metastore.index_exists(&index_id).await.unwrap());

local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

// On non-empty index
let args = create_delete_args(true);
Expand All @@ -626,9 +626,7 @@ async fn test_delete_index_cli() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();

local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

let args = DeleteIndexArgs {
client_args: test_env.default_client_args(),
Expand All @@ -652,9 +650,7 @@ async fn test_garbage_collect_cli_no_grace() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();
let index_uid = test_env.index_metadata().await.unwrap().index_uid;
local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

let metastore = MetastoreResolver::unconfigured()
.resolve(&test_env.metastore_uri)
Expand Down Expand Up @@ -762,9 +758,7 @@ async fn test_garbage_collect_index_cli() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();
let index_uid = test_env.index_metadata().await.unwrap().index_uid;
local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

let refresh_metastore = |metastore| async {
// In this test we rely on the file backed metastore and
Expand Down Expand Up @@ -914,9 +908,7 @@ async fn test_all_local_index() {
.unwrap();
assert!(metadata_file_exists);

local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

let query_response = reqwest::get(format!(
"http://127.0.0.1:{}/api/v1/{}/search?query=level:info",
Expand Down Expand Up @@ -970,16 +962,21 @@ async fn test_all_with_s3_localstack_cli() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();

let s3_path = upload_test_file(
let s3_uri = upload_test_file(
test_env.storage_resolver.clone(),
test_env.resource_files.log_docs.clone(),
test_env
.resource_files
.log_docs
.filepath()
.unwrap()
.to_path_buf(),
"quickwit-integration-tests",
"sources/",
&append_random_suffix("test-all--cli-s3-localstack"),
)
.await;

local_ingest_docs(&s3_path, &test_env).await.unwrap();
local_ingest_docs(s3_uri, &test_env).await.unwrap();

// Cli search
let args = SearchIndexArgs {
Expand Down
32 changes: 15 additions & 17 deletions quickwit/quickwit-cli/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::borrow::Borrow;
use std::fs;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;

Expand Down Expand Up @@ -114,8 +113,8 @@ pub struct TestResourceFiles {
pub index_config: Uri,
pub index_config_without_uri: Uri,
pub index_config_with_retention: Uri,
pub log_docs: PathBuf,
pub wikipedia_docs: PathBuf,
pub log_docs: Uri,
pub wikipedia_docs: Uri,
}

/// A struct to hold few info about the test environment.
Expand Down Expand Up @@ -192,8 +191,8 @@ pub enum TestStorageType {
LocalFileSystem,
}

fn uri_from_path(path: PathBuf) -> Uri {
Uri::from_str(&format!("file://{}", path.display())).unwrap()
pub fn uri_from_path(path: &Path) -> Uri {
Uri::from_str(path.to_str().unwrap()).unwrap()
}

/// Creates all necessary artifacts in a test environment.
Expand Down Expand Up @@ -265,12 +264,12 @@ pub async fn create_test_env(
.context("failed to parse cluster endpoint")?;

let resource_files = TestResourceFiles {
config: uri_from_path(node_config_path),
index_config: uri_from_path(index_config_path),
index_config_without_uri: uri_from_path(index_config_without_uri_path),
index_config_with_retention: uri_from_path(index_config_with_retention_path),
log_docs: log_docs_path,
wikipedia_docs: wikipedia_docs_path,
config: uri_from_path(&node_config_path),
index_config: uri_from_path(&index_config_path),
index_config_without_uri: uri_from_path(&index_config_without_uri_path),
index_config_with_retention: uri_from_path(&index_config_with_retention_path),
log_docs: uri_from_path(&log_docs_path),
wikipedia_docs: uri_from_path(&wikipedia_docs_path),
};

Ok(TestEnv {
Expand All @@ -297,15 +296,14 @@ pub async fn upload_test_file(
bucket: &str,
prefix: &str,
filename: &str,
) -> PathBuf {
) -> Uri {
let test_data = tokio::fs::read(local_src_path).await.unwrap();
let mut src_location: PathBuf = [r"s3://", bucket, prefix].iter().collect();
let storage_uri = Uri::from_str(src_location.to_string_lossy().borrow()).unwrap();
let src_location = format!("s3://{}/{}", bucket, prefix);
let storage_uri = Uri::from_str(&src_location).unwrap();
let storage = storage_resolver.resolve(&storage_uri).await.unwrap();
storage
.put(&PathBuf::from(filename), Box::new(test_data))
.await
.unwrap();
src_location.push(filename);
src_location
storage_uri.join(filename).unwrap()
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-common/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn empty_dir<P: AsRef<Path>>(path: P) -> anyhow::Result<()> {
Ok(())
}

/// Helper function to get the cache path.
/// Helper function to get the indexer split cache path.
pub fn get_cache_directory_path(data_dir_path: &Path) -> PathBuf {
data_dir_path.join("indexer-split-cache").join("splits")
}
Expand Down
Loading
Loading