Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read from Storage in file source #3869

Merged
merged 9 commits into from
Oct 5, 2023
4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,13 @@ pub async fn run_index_checklist(
if let Some(source_config) = source_config_opt {
checks.push((
source_config.source_id.as_str(),
check_source_connectivity(source_config).await,
check_source_connectivity(storage_resolver, source_config).await,
));
} else {
for source_config in index_metadata.sources.values() {
checks.push((
source_config.source_id.as_str(),
check_source_connectivity(source_config).await,
check_source_connectivity(storage_resolver, source_config).await,
));
}
}
Expand Down
15 changes: 11 additions & 4 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use quickwit_proto::metastore::{EntityKind, MetastoreError};
use serde_json::{json, Number, Value};
use tokio::time::{sleep, Duration};

use crate::helpers::{create_test_env, PACKAGE_BIN_NAME};
use crate::helpers::{create_test_env, upload_test_file, PACKAGE_BIN_NAME};

async fn create_logs_index(test_env: &TestEnv) -> anyhow::Result<()> {
let args = CreateIndexArgs {
Expand Down Expand Up @@ -893,9 +893,16 @@ async fn test_all_with_s3_localstack_cli() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();

local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env)
.await
.unwrap();
let s3_path = upload_test_file(
test_env.storage_resolver.clone(),
test_env.resource_files["logs"].clone(),
"quickwit-integration-tests",
"sources/",
&append_random_suffix("test-all--cli-s3-localstack"),
)
.await;

local_ingest_docs(&s3_path, &test_env).await.unwrap();

// Cli search
let args = SearchIndexArgs {
Expand Down
22 changes: 22 additions & 0 deletions quickwit/quickwit-cli/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,25 @@ pub async fn create_test_env(
storage,
})
}

/// TODO: this should be part of the test env setup
pub async fn upload_test_file(
storage_resolver: StorageResolver,
local_src_path: PathBuf,
bucket: &str,
prefix: &str,
filename: &str,
) -> PathBuf {
let test_data = tokio::fs::read(local_src_path).await.unwrap();
let mut src_location: PathBuf = [r"s3://", bucket, prefix].iter().collect();
let storage = storage_resolver
.resolve(&Uri::from_well_formed(src_location.to_string_lossy()))
.await
.unwrap();
storage
.put(&PathBuf::from(filename), Box::new(test_data))
.await
.unwrap();
src_location.push(filename);
src_location
}
11 changes: 7 additions & 4 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,16 @@ pub struct FileSourceParams {
pub filepath: Option<PathBuf>, //< If None read from stdin.
}

// Deserializing a filepath string into an absolute filepath.
/// Deserializing as an URI first to validate the input.
///
/// TODO: we might want to replace `PathBuf` with `Uri` directly in
/// `FileSourceParams`
fn absolute_filepath_from_str<'de, D>(deserializer: D) -> Result<Option<PathBuf>, D::Error>
where D: Deserializer<'de> {
let filepath_opt: Option<String> = Deserialize::deserialize(deserializer)?;
if let Some(filepath) = filepath_opt {
let uri = Uri::from_str(&filepath).map_err(D::Error::custom)?;
Ok(uri.filepath().map(|path| path.to_path_buf()))
Ok(Some(PathBuf::from(uri.as_str())))
} else {
Ok(None)
}
Expand Down Expand Up @@ -800,8 +803,8 @@ mod tests {
let uri = Uri::from_str("source-path.json").unwrap();
assert_eq!(
file_params.filepath.unwrap().as_path(),
uri.filepath().unwrap()
)
Path::new(uri.as_str())
);
}
}

Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl IndexService {
validate_identifier("Source ID", &source_id).map_err(|_| {
IndexServiceError::InvalidIdentifier(format!("invalid source ID: `{source_id}`"))
})?;
check_source_connectivity(&source_config)
check_source_connectivity(&self.storage_resolver, &source_config)
.await
.map_err(IndexServiceError::InvalidConfig)?;
self.metastore
Expand Down Expand Up @@ -368,7 +368,7 @@ mod tests {
#[tokio::test]
async fn test_create_index() {
let metastore = metastore_for_test();
let storage_resolver = StorageResolver::ram_for_test();
let storage_resolver = StorageResolver::ram_and_file_for_test();
let index_service = IndexService::new(metastore.clone(), storage_resolver);
let index_id = "test-index";
let index_uri = "ram://indexes/test-index";
Expand Down Expand Up @@ -404,7 +404,7 @@ mod tests {
#[tokio::test]
async fn test_delete_index() {
let metastore = metastore_for_test();
let storage_resolver = StorageResolver::ram_for_test();
let storage_resolver = StorageResolver::ram_and_file_for_test();
let storage = storage_resolver
.resolve(&Uri::for_test("ram://indexes/test-index"))
.await
Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use quickwit_ingest::IngesterPool;
use quickwit_metastore::Metastore;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::MetastoreError;
use quickwit_storage::Storage;
use quickwit_storage::{Storage, StorageResolver};
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};

Expand Down Expand Up @@ -431,6 +431,7 @@ impl IndexingPipeline {
metastore: self.params.metastore.clone(),
ingester_pool: self.params.ingester_pool.clone(),
queues_dir_path: self.params.queues_dir_path.clone(),
storage_resolver: self.params.source_storage_resolver.clone(),
}),
source_checkpoint,
))
Expand Down Expand Up @@ -567,6 +568,7 @@ pub struct IndexingPipelineParams {

// Source-related parameters
pub source_config: SourceConfig,
pub source_storage_resolver: StorageResolver,
pub ingester_pool: IngesterPool,
pub queues_dir_path: PathBuf,
pub event_broker: EventBroker,
Expand Down Expand Up @@ -678,6 +680,7 @@ mod tests {
pipeline_id,
doc_mapper: Arc::new(default_doc_mapper_for_test()),
source_config,
source_storage_resolver: StorageResolver::ram_and_file_for_test(),
indexing_directory: TempDirectory::for_test(),
indexing_settings: IndexingSettings::for_test(),
ingester_pool: IngesterPool::default(),
Expand Down Expand Up @@ -780,6 +783,7 @@ mod tests {
pipeline_id,
doc_mapper: Arc::new(default_doc_mapper_for_test()),
source_config,
source_storage_resolver: StorageResolver::ram_and_file_for_test(),
indexing_directory: TempDirectory::for_test(),
indexing_settings: IndexingSettings::for_test(),
ingester_pool: IngesterPool::default(),
Expand Down Expand Up @@ -858,6 +862,7 @@ mod tests {
pipeline_id,
doc_mapper,
source_config,
source_storage_resolver: StorageResolver::ram_and_file_for_test(),
indexing_directory: TempDirectory::for_test(),
indexing_settings: IndexingSettings::for_test(),
ingester_pool: IngesterPool::default(),
Expand Down Expand Up @@ -982,6 +987,7 @@ mod tests {
pipeline_id,
doc_mapper: Arc::new(broken_mapper),
source_config,
source_storage_resolver: StorageResolver::ram_and_file_for_test(),
indexing_directory: TempDirectory::for_test(),
indexing_settings: IndexingSettings::for_test(),
ingester_pool: IngesterPool::default(),
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ impl IndexingService {
source_config,
ingester_pool: self.ingester_pool.clone(),
queues_dir_path: self.queue_dir_path.clone(),
source_storage_resolver: self.storage_resolver.clone(),

event_broker: self.event_broker.clone(),
};
Expand Down
66 changes: 43 additions & 23 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::fmt;
use std::io::SeekFrom;
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use async_trait::async_trait;
use bytes::Bytes;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_common::uri::Uri;
use quickwit_config::FileSourceParams;
use quickwit_metastore::checkpoint::{PartitionId, Position, SourceCheckpoint};
use serde::Serialize;
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncSeekExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tracing::info;

use crate::actors::DocProcessor;
Expand All @@ -51,7 +52,7 @@ pub struct FileSource {
source_id: String,
params: FileSourceParams,
counters: FileSourceCounters,
reader: BufReader<Box<dyn AsyncRead + Send + Sync + Unpin>>,
reader: BufReader<Box<dyn AsyncRead + Send + Unpin>>,
}

impl fmt::Debug for FileSource {
Expand Down Expand Up @@ -137,28 +138,34 @@ impl TypedSourceFactory for FileSourceFactory {
checkpoint: SourceCheckpoint,
) -> anyhow::Result<FileSource> {
let mut offset = 0;
let reader: Box<dyn AsyncRead + Send + Sync + Unpin> =
if let Some(filepath) = &params.filepath {
let mut file = File::open(&filepath).await.with_context(|| {
format!("failed to open source file `{}`", filepath.display())
})?;
let partition_id = PartitionId::from(filepath.to_string_lossy().to_string());
if let Some(Position::Offset(offset_str)) =
checkpoint.position_for_partition(&partition_id).cloned()
{
offset = offset_str.parse::<u64>()?;
file.seek(SeekFrom::Start(offset)).await?;
}
Box::new(file)
} else {
// We cannot use the checkpoint.
Box::new(tokio::io::stdin())
};
let reader: Box<dyn AsyncRead + Send + Unpin> = if let Some(filepath) = &params.filepath {
let partition_id = PartitionId::from(filepath.to_string_lossy().to_string());
if let Some(Position::Offset(offset_str)) =
checkpoint.position_for_partition(&partition_id).cloned()
{
offset = offset_str.parse::<usize>()?;
}
let (dir_uri, file_name) = dir_and_filename(filepath)?;
let storage = ctx.storage_resolver.resolve(&dir_uri).await?;
let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap();
storage
.get_slice_stream(
file_name,
Range {
start: offset,
end: file_size,
},
)
.await?
} else {
// We cannot use the checkpoint.
Box::new(tokio::io::stdin())
};
let file_source = FileSource {
source_id: ctx.source_id().to_string(),
counters: FileSourceCounters {
previous_offset: offset,
current_offset: offset,
previous_offset: offset as u64,
current_offset: offset as u64,
num_lines_processed: 0,
},
reader: BufReader::new(reader),
Expand All @@ -168,6 +175,19 @@ impl TypedSourceFactory for FileSourceFactory {
}
}

pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> {
let dir_uri: Uri = filepath
.parent()
.context("Parent directory could not be resolved")?
.to_str()
.context("Path cannot be turned to string")?
.parse()?;
let file_name = filepath
.file_name()
.context("Path does not appear to be a file")?;
Ok((dir_uri, file_name.as_ref()))
}

#[cfg(test)]
mod tests {
use std::io::Write;
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ mod tests {
use quickwit_proto::ingest::ingester::{IngesterServiceClient, TruncateResponse};
use quickwit_proto::ingest::{DocBatchV2, Shard};
use quickwit_proto::metastore::{AcquireShardsResponse, AcquireShardsSubresponse};
use quickwit_storage::StorageResolver;
use tokio::sync::watch;

use super::*;
Expand Down Expand Up @@ -531,6 +532,7 @@ mod tests {
metastore,
ingester_pool: ingester_pool.clone(),
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::ram_and_file_for_test(),
});
let checkpoint = SourceCheckpoint::default();
let mut source = IngestSource::try_new(runtime_args, checkpoint)
Expand Down Expand Up @@ -608,6 +610,7 @@ mod tests {
metastore,
ingester_pool: ingester_pool.clone(),
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::ram_and_file_for_test(),
});
let checkpoint = SourceCheckpoint::default();
let mut source = IngestSource::try_new(runtime_args, checkpoint)
Expand Down Expand Up @@ -760,6 +763,7 @@ mod tests {
metastore,
ingester_pool: ingester_pool.clone(),
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::ram_and_file_for_test(),
});
let checkpoint = SourceCheckpoint::default();
let mut source = IngestSource::try_new(runtime_args, checkpoint)
Expand Down
Loading