From 73fde83239dfe5c8fa3e7ea3221bd65fae83a32d Mon Sep 17 00:00:00 2001 From: PSeitz Date: Mon, 4 Dec 2023 10:38:53 +0100 Subject: [PATCH] store split_fields in split (#4190) * store split_fields in split * add header with version to split fields * refactor * add test * refactor PutPayload --- quickwit/Cargo.lock | 1 + quickwit/Cargo.toml | 1 + quickwit/quickwit-common/src/shared_consts.rs | 3 + .../src/bundle_directory.rs | 7 + quickwit/quickwit-indexing/Cargo.toml | 1 + .../src/actors/merge_split_downloader.rs | 2 +- .../quickwit-indexing/src/actors/packager.rs | 9 +- .../quickwit-indexing/src/actors/uploader.rs | 7 + quickwit/quickwit-indexing/src/models/mod.rs | 2 + .../src/models/packaged_split.rs | 1 + .../src/models/split_fields.rs | 268 ++++++++++++++++++ .../src/split_store/indexing_split_store.rs | 12 +- .../src/split_store/local_split_store.rs | 9 +- .../quickwit-storage/src/bundle_storage.rs | 4 +- quickwit/quickwit-storage/src/split.rs | 80 ++++-- 15 files changed, 367 insertions(+), 40 deletions(-) create mode 100644 quickwit/quickwit-indexing/src/models/split_fields.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 543b7358ae8..35f17f1d077 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5581,6 +5581,7 @@ dependencies = [ "ulid", "utoipa", "vrl", + "zstd 0.13.0", ] [[package]] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 1d07bc766bb..43e1a13effc 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -223,6 +223,7 @@ vrl = { version = "0.8.1", default-features = false, features = [ warp = "0.3" whichlang = { git = "https://github.com/quickwit-oss/whichlang", rev = "fe406416" } wiremock = "0.5" +zstd = "0.13.0" aws-config = "0.55.0" aws-credential-types = { version = "0.55.0", features = [ diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs index d472ce0f49f..8b602ebc0c3 100644 --- a/quickwit/quickwit-common/src/shared_consts.rs +++ b/quickwit/quickwit-common/src/shared_consts.rs @@ -38,3 +38,6 @@ pub const SCROLL_BATCH_LEN: usize = 1_000; /// Prefix used in chitchat to broadcast the list of primary shards hosted by a leader. pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:"; + +/// File name for the encoded list of fields in the split +pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields"; diff --git a/quickwit/quickwit-directories/src/bundle_directory.rs b/quickwit/quickwit-directories/src/bundle_directory.rs index 1fee24ac4de..1b2b4f3e592 100644 --- a/quickwit/quickwit-directories/src/bundle_directory.rs +++ b/quickwit/quickwit-directories/src/bundle_directory.rs @@ -164,6 +164,7 @@ mod tests { use std::fs::File; use std::io::Write; + use quickwit_common::shared_consts::SPLIT_FIELDS_FILE_NAME; use quickwit_storage::{PutPayload, SplitPayloadBuilder}; use super::*; @@ -182,6 +183,7 @@ mod tests { let split_streamer = SplitPayloadBuilder::get_split_payload( &[test_filepath1.clone(), test_filepath2.clone()], + &[], &[ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, ], @@ -213,6 +215,7 @@ mod tests { let split_streamer = SplitPayloadBuilder::get_split_payload( &[test_filepath1.clone(), test_filepath2.clone()], + &[], &[ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, ], @@ -251,6 +254,7 @@ mod tests { let split_streamer = SplitPayloadBuilder::get_split_payload( &[test_filepath1.clone(), test_filepath2.clone()], + &[5, 5, 5], &[1, 2, 3], )?; @@ -258,6 +262,9 @@ mod tests { let bundle_dir = BundleDirectory::open_split(FileSlice::from(data.to_vec()))?; + let field_data = bundle_dir.atomic_read(Path::new(SPLIT_FIELDS_FILE_NAME))?; + assert_eq!(&*field_data, &[5, 5, 5]); + let f1_data = bundle_dir.atomic_read(Path::new("f1"))?; assert_eq!(&*f1_data, &[123u8, 76u8]); diff --git a/quickwit/quickwit-indexing/Cargo.toml b/quickwit/quickwit-indexing/Cargo.toml index 8b07a3ad92b..4e37c860683 100644 --- a/quickwit/quickwit-indexing/Cargo.toml +++ b/quickwit/quickwit-indexing/Cargo.toml @@ -50,6 +50,7 @@ tracing = { workspace = true } ulid = { workspace = true } utoipa = { workspace = true } vrl = { workspace = true, optional = true } +zstd = { workspace = true } quickwit-actors = { workspace = true } quickwit-aws = { workspace = true } diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs index 88c44722849..b4365e42448 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs @@ -160,7 +160,7 @@ mod tests { let split_store = { let mut storage_builder = RamStorageBuilder::default(); for split in &splits_to_merge { - let buffer = SplitPayloadBuilder::get_split_payload(&[], &[1, 2, 3])? + let buffer = SplitPayloadBuilder::get_split_payload(&[], &[], &[1, 2, 3])? .read_all() .await?; storage_builder = storage_builder.put(&split_file(split.split_id()), &buffer); diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs index 658a498b91e..e0cc80cf519 100644 --- a/quickwit/quickwit-indexing/src/actors/packager.rs +++ b/quickwit/quickwit-indexing/src/actors/packager.rs @@ -46,7 +46,8 @@ const MAX_VALUES_PER_TAG_FIELD: usize = if cfg!(any(test, feature = "testsuite") use crate::actors::Uploader; use crate::models::{ - EmptySplit, IndexedSplit, IndexedSplitBatch, PackagedSplit, PackagedSplitBatch, + serialize_split_fields, EmptySplit, IndexedSplit, IndexedSplitBatch, PackagedSplit, + PackagedSplitBatch, }; /// The role of the packager is to get an index writer and @@ -186,7 +187,6 @@ impl Handler for Packager { } } -/// returns true iff merge is required to reach a state where fn list_split_files( segment_metas: &[SegmentMeta], scratch_directory: &TempDirectory, @@ -287,6 +287,9 @@ fn create_packaged_split( .reader_builder() .reload_policy(ReloadPolicy::Manual) .try_into()?; + + let fields_metadata = split.index.fields_metadata()?; + let mut tags = BTreeSet::default(); for named_field in tag_fields { let inverted_indexes = index_reader @@ -312,8 +315,10 @@ fn create_packaged_split( let mut hotcache_bytes = Vec::new(); build_hotcache(split.split_scratch_directory.path(), &mut hotcache_bytes)?; ctx.record_progress(); + let serialized_split_fields = serialize_split_fields(&fields_metadata); let packaged_split = PackagedSplit { + serialized_split_fields, split_attrs: split.split_attrs, split_scratch_directory: split.split_scratch_directory, tags, diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index 8892c220f90..a556bf2b3b5 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -318,6 +318,7 @@ impl Handler for Uploader { let split_streamer = SplitPayloadBuilder::get_split_payload( &packaged_split.split_files, + &packaged_split.serialized_split_fields, &packaged_split.hotcache_bytes, )?; let split_metadata = create_split_metadata( @@ -465,6 +466,7 @@ async fn upload_split( ) -> anyhow::Result<()> { let split_streamer = SplitPayloadBuilder::get_split_payload( &packaged_split.split_files, + &packaged_split.serialized_split_fields, &packaged_split.hotcache_bytes, )?; @@ -561,6 +563,7 @@ mod tests { delete_opstamp: 10, num_merge_ops: 0, }, + serialized_split_fields: Vec::new(), split_scratch_directory, tags: Default::default(), hotcache_bytes: Vec::new(), @@ -672,6 +675,7 @@ mod tests { delete_opstamp: 0, num_merge_ops: 0, }, + serialized_split_fields: Vec::new(), split_scratch_directory: split_scratch_directory_1, tags: Default::default(), split_files: Vec::new(), @@ -695,6 +699,7 @@ mod tests { delete_opstamp: 0, num_merge_ops: 0, }, + serialized_split_fields: Vec::new(), split_scratch_directory: split_scratch_directory_2, tags: Default::default(), split_files: Vec::new(), @@ -812,6 +817,7 @@ mod tests { delete_opstamp: 10, num_merge_ops: 0, }, + serialized_split_fields: Vec::new(), split_scratch_directory, tags: Default::default(), hotcache_bytes: Vec::new(), @@ -990,6 +996,7 @@ mod tests { delete_opstamp: 10, num_merge_ops: 0, }, + serialized_split_fields: Vec::new(), split_scratch_directory, tags: Default::default(), hotcache_bytes: Vec::new(), diff --git a/quickwit/quickwit-indexing/src/models/mod.rs b/quickwit/quickwit-indexing/src/models/mod.rs index 807a540da2c..439b8171ee4 100644 --- a/quickwit/quickwit-indexing/src/models/mod.rs +++ b/quickwit/quickwit-indexing/src/models/mod.rs @@ -32,6 +32,7 @@ mod publisher_message; mod raw_doc_batch; mod shard_positions; mod split_attrs; +mod split_fields; pub use indexed_split::{ CommitTrigger, EmptySplit, IndexedSplit, IndexedSplitBatch, IndexedSplitBatchBuilder, @@ -53,6 +54,7 @@ pub use raw_doc_batch::RawDocBatch; pub(crate) use shard_positions::LocalShardPositionsUpdate; pub use shard_positions::ShardPositionsService; pub use split_attrs::{create_split_metadata, SplitAttrs}; +pub use split_fields::{read_split_fields, serialize_split_fields, FieldConfig}; #[derive(Debug)] pub struct NewPublishToken(pub PublishToken); diff --git a/quickwit/quickwit-indexing/src/models/packaged_split.rs b/quickwit/quickwit-indexing/src/models/packaged_split.rs index 8f7225f89c4..f96f3ac0b89 100644 --- a/quickwit/quickwit-indexing/src/models/packaged_split.rs +++ b/quickwit/quickwit-indexing/src/models/packaged_split.rs @@ -31,6 +31,7 @@ use crate::merge_policy::MergeOperation; use crate::models::{PublishLock, SplitAttrs}; pub struct PackagedSplit { + pub serialized_split_fields: Vec, pub split_attrs: SplitAttrs, pub split_scratch_directory: TempDirectory, pub tags: BTreeSet, diff --git a/quickwit/quickwit-indexing/src/models/split_fields.rs b/quickwit/quickwit-indexing/src/models/split_fields.rs new file mode 100644 index 00000000000..0e6749494c2 --- /dev/null +++ b/quickwit/quickwit-indexing/src/models/split_fields.rs @@ -0,0 +1,268 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::io::{self, ErrorKind, Read}; + +use tantivy::schema::Type; +use tantivy::FieldMetadata; + +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] +pub struct FieldConfig { + pub typ: Type, + pub indexed: bool, + pub stored: bool, + pub fast: bool, +} + +impl FieldConfig { + fn serialize(&self) -> [u8; 2] { + let typ = self.typ.to_code(); + let flags = (self.indexed as u8) << 2 | (self.stored as u8) << 1 | (self.fast as u8); + [typ, flags] + } + fn deserialize_from(data: [u8; 2]) -> io::Result { + let typ = Type::from_code(data[0]).ok_or_else(|| { + io::Error::new( + ErrorKind::InvalidData, + format!("could not deserialize type {}", data[0]), + ) + })?; + + let data = data[1]; + let indexed = (data & 0b100) != 0; + let stored = (data & 0b010) != 0; + let fast = (data & 0b001) != 0; + + Ok(FieldConfig { + typ, + indexed, + stored, + fast, + }) + } +} + +/// Serializes the Split fields. +/// +/// `fields_metadata` has to be sorted. +pub fn serialize_split_fields(fields_metadata: &[FieldMetadata]) -> Vec { + // ensure that fields_metadata is strictly sorted. + debug_assert!(fields_metadata.windows(2).all(|w| w[0] < w[1])); + let mut payload = Vec::new(); + // Write Num Fields + let length = fields_metadata.len() as u32; + payload.extend_from_slice(&length.to_le_bytes()); + + for field_metadata in fields_metadata { + write_field(field_metadata, &mut payload); + } + let compression_level = 3; + let payload_compressed = zstd::stream::encode_all(&mut &payload[..], compression_level) + .expect("zstd encoding failed"); + let mut out = Vec::new(); + // Write Header -- Format Version + let format_version = 1u8; + out.push(format_version); + // Write Payload + out.extend_from_slice(&payload_compressed); + out +} + +fn write_field(field_metadata: &FieldMetadata, out: &mut Vec) { + let field_config = FieldConfig { + typ: field_metadata.typ, + indexed: field_metadata.indexed, + stored: field_metadata.stored, + fast: field_metadata.fast, + }; + + // Write Config 2 bytes + out.extend_from_slice(&field_config.serialize()); + let str_length = field_metadata.field_name.len() as u16; + // Write String length 2 bytes + out.extend_from_slice(&str_length.to_le_bytes()); + out.extend_from_slice(field_metadata.field_name.as_bytes()); +} + +/// Reads a fixed number of bytes into an array and returns the array. +fn read_exact_array(reader: &mut R) -> io::Result<[u8; N]> { + let mut buffer = [0u8; N]; + reader.read_exact(&mut buffer)?; + Ok(buffer) +} + +/// Reads the Split fields from a zstd compressed stream of bytes +pub fn read_split_fields( + mut reader: R, +) -> io::Result>> { + let format_version = read_exact_array::<_, 1>(&mut reader)?[0]; + assert_eq!(format_version, 1); + let reader = zstd::Decoder::new(reader)?; + read_split_fields_from_zstd(reader) +} + +fn read_field(reader: &mut R) -> io::Result { + // Read FieldConfig (2 bytes) + let config_bytes = read_exact_array::<_, 2>(reader)?; + let field_config = FieldConfig::deserialize_from(config_bytes)?; // Assuming this returns a Result + + // Read field name length and the field name + let name_len = u16::from_le_bytes(read_exact_array::<_, 2>(reader)?) as usize; + + let mut data = Vec::new(); + data.resize(name_len, 0); + reader.read_exact(&mut data)?; + + let field_name = String::from_utf8(data).map_err(|err| { + io::Error::new( + ErrorKind::InvalidData, + format!( + "Encountered invalid utf8 when deserializing field name: {}", + err + ), + ) + })?; + Ok(FieldMetadata { + field_name, + typ: field_config.typ, + indexed: field_config.indexed, + stored: field_config.stored, + fast: field_config.fast, + }) +} + +/// Reads the Split fields from a stream of bytes +fn read_split_fields_from_zstd( + mut reader: R, +) -> io::Result>> { + let mut num_fields = u32::from_le_bytes(read_exact_array::<_, 4>(&mut reader)?); + + Ok(std::iter::from_fn(move || { + if num_fields == 0 { + return None; + } + num_fields -= 1; + + Some(read_field(&mut reader)) + })) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn field_config_deser_test() { + let field_config = FieldConfig { + typ: Type::Str, + indexed: true, + stored: false, + fast: true, + }; + let serialized = field_config.serialize(); + let deserialized = FieldConfig::deserialize_from(serialized).unwrap(); + assert_eq!(field_config, deserialized); + } + #[test] + fn write_read_field_test() { + for typ in Type::iter_values() { + let field_metadata = FieldMetadata { + field_name: "test".to_string(), + typ, + indexed: true, + stored: true, + fast: true, + }; + let mut out = Vec::new(); + write_field(&field_metadata, &mut out); + let deserialized = read_field(&mut &out[..]).unwrap(); + assert_eq!(field_metadata, deserialized); + } + let field_metadata = FieldMetadata { + field_name: "test".to_string(), + typ: Type::Str, + indexed: false, + stored: true, + fast: true, + }; + let mut out = Vec::new(); + write_field(&field_metadata, &mut out); + let deserialized = read_field(&mut &out[..]).unwrap(); + assert_eq!(field_metadata, deserialized); + + let field_metadata = FieldMetadata { + field_name: "test".to_string(), + typ: Type::Str, + indexed: false, + stored: false, + fast: true, + }; + let mut out = Vec::new(); + write_field(&field_metadata, &mut out); + let deserialized = read_field(&mut &out[..]).unwrap(); + assert_eq!(field_metadata, deserialized); + + let field_metadata = FieldMetadata { + field_name: "test".to_string(), + typ: Type::Str, + indexed: true, + stored: false, + fast: false, + }; + let mut out = Vec::new(); + write_field(&field_metadata, &mut out); + let deserialized = read_field(&mut &out[..]).unwrap(); + assert_eq!(field_metadata, deserialized); + } + #[test] + fn write_split_fields_test() { + let fields_metadata = vec![ + FieldMetadata { + field_name: "test".to_string(), + typ: Type::Str, + indexed: true, + stored: true, + fast: true, + }, + FieldMetadata { + field_name: "test2".to_string(), + typ: Type::Str, + indexed: true, + stored: false, + fast: false, + }, + FieldMetadata { + field_name: "test3".to_string(), + typ: Type::U64, + indexed: true, + stored: false, + fast: true, + }, + ]; + + let out = serialize_split_fields(&fields_metadata); + + let deserialized: Vec = read_split_fields(&mut &out[..]) + .unwrap() + .map(|el| el.unwrap()) + .collect(); + + assert_eq!(fields_metadata, deserialized); + } +} diff --git a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs index 6f5e2a231fe..3262d0d611b 100644 --- a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs +++ b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs @@ -358,7 +358,11 @@ mod tests { .store_split( &split_metadata1, &split_path, - Box::new(SplitPayloadBuilder::get_split_payload(&[], &[5, 5, 5])?), + Box::new(SplitPayloadBuilder::get_split_payload( + &[], + &[], + &[5, 5, 5], + )?), ) .await?; assert!(!split_path.try_exists()?); @@ -383,7 +387,11 @@ mod tests { .store_split( &split_metadata2, &split_path, - Box::new(SplitPayloadBuilder::get_split_payload(&[], &[5, 5, 5])?), + Box::new(SplitPayloadBuilder::get_split_payload( + &[], + &[], + &[5, 5, 5], + )?), ) .await?; assert!(!split_path.try_exists()?); diff --git a/quickwit/quickwit-indexing/src/split_store/local_split_store.rs b/quickwit/quickwit-indexing/src/split_store/local_split_store.rs index c8911ad8ece..f1271ff80df 100644 --- a/quickwit/quickwit-indexing/src/split_store/local_split_store.rs +++ b/quickwit/quickwit-indexing/src/split_store/local_split_store.rs @@ -551,9 +551,12 @@ mod tests { file1.write_all(b"ab").unwrap(); let mut file2 = File::create(&test_filepath2).unwrap(); file2.write_all(b"def").unwrap(); - let split_streamer = - SplitPayloadBuilder::get_split_payload(&[test_filepath1, test_filepath2], b"hotcache") - .unwrap(); + let split_streamer = SplitPayloadBuilder::get_split_payload( + &[test_filepath1, test_filepath2], + &[], + b"hotcache", + ) + .unwrap(); let data = split_streamer.read_all().await.unwrap(); let bundle_dir = BundleDirectory::open_split(FileSlice::from(data.to_vec())).unwrap(); let f1_data = bundle_dir.atomic_read(Path::new("f1")).unwrap(); diff --git a/quickwit/quickwit-storage/src/bundle_storage.rs b/quickwit/quickwit-storage/src/bundle_storage.rs index 3e25bc1ff96..13dfbd9a32d 100644 --- a/quickwit/quickwit-storage/src/bundle_storage.rs +++ b/quickwit/quickwit-storage/src/bundle_storage.rs @@ -344,6 +344,7 @@ mod tests { let buffer = SplitPayloadBuilder::get_split_payload( &[test_filepath1.clone(), test_filepath2.clone()], + &[], &[5, 5, 5], )? .read_all() @@ -385,6 +386,7 @@ mod tests { let buffer = SplitPayloadBuilder::get_split_payload( &[test_filepath1.clone(), test_filepath2.clone()], + &[], &[1, 3, 3, 7], )? .read_all() @@ -422,7 +424,7 @@ mod tests { #[tokio::test] async fn bundlestorage_test_empty() -> anyhow::Result<()> { - let buffer = SplitPayloadBuilder::get_split_payload(&[], &[])? + let buffer = SplitPayloadBuilder::get_split_payload(&[], &[], &[])? .read_all() .await?; diff --git a/quickwit/quickwit-storage/src/split.rs b/quickwit/quickwit-storage/src/split.rs index f1250181c39..a0b6048e65a 100644 --- a/quickwit/quickwit-storage/src/split.rs +++ b/quickwit/quickwit-storage/src/split.rs @@ -18,7 +18,6 @@ // along with this program. If not, see . use std::collections::HashMap; -use std::fmt::Debug; use std::io::{self, SeekFrom}; use std::ops::Range; use std::path::{Path, PathBuf}; @@ -27,6 +26,7 @@ use async_trait::async_trait; use aws_smithy_http::byte_stream::ByteStream; use futures::{stream, StreamExt}; use hyper::body::Body; +use quickwit_common::shared_consts::SPLIT_FIELDS_FILE_NAME; use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio_util::io::ReaderStream; @@ -106,9 +106,11 @@ impl PutPayload for FilePayload { } /// SplitPayloadBuilder is used to create a `SplitPayload`. -#[derive(Debug, Default)] +#[derive(Default)] pub struct SplitPayloadBuilder { - metadata: BundleStorageFileOffsets, + /// File name, payload, and range of the payload in the bundle file + /// Range coud be computed on the fly, and is just kept here for convenience. + payloads: Vec<(String, Box, Range)>, current_offset: usize, } @@ -116,42 +118,61 @@ impl SplitPayloadBuilder { /// Creates a new SplitPayloadBuilder for given files and hotcache. pub fn get_split_payload( split_files: &[PathBuf], + serialized_split_fields: &[u8], hotcache: &[u8], ) -> anyhow::Result { let mut split_payload_builder = SplitPayloadBuilder::default(); for file in split_files { split_payload_builder.add_file(file)?; } + split_payload_builder.add_payload( + SPLIT_FIELDS_FILE_NAME.to_string(), + Box::new(serialized_split_fields.to_vec()), + ); let offsets = split_payload_builder.finalize(hotcache)?; Ok(offsets) } + /// Adds the payload to the bundle file. + pub fn add_payload(&mut self, file_name: String, payload: Box) { + let range = self.current_offset as u64..self.current_offset as u64 + payload.len(); + self.current_offset += payload.len() as usize; + self.payloads.push((file_name, payload, range)); + } + /// Adds the file to the bundle file. - /// - /// The hotcache needs to be the last file that is added, in order to be able to read - /// the hotcache and the metadata in one continuous read. pub fn add_file(&mut self, path: &Path) -> io::Result<()> { let file = std::fs::metadata(path)?; - let file_range = self.current_offset as u64..self.current_offset as u64 + file.len(); - self.current_offset += file.len() as usize; - self.metadata.files.insert(path.to_owned(), file_range); + let file_name = path + .file_name() + .and_then(std::ffi::OsStr::to_str) + .map(ToOwned::to_owned) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid file name in path {:?}", path), + ) + })?; + + let file_payload = FilePayload { + path: path.to_owned(), + len: file.len(), + }; + + self.add_payload(file_name, Box::new(file_payload)); + Ok(()) } /// Writes the bundle file offsets metadata at the end of the bundle file, /// and returns the byte-range of this metadata information. pub fn finalize(self, hotcache: &[u8]) -> anyhow::Result { + // Add the fields metadata to the bundle metadata. // Build the footer. - let mut footer_bytes = Vec::new(); - // Fix paths to be relative let metadata_with_fixed_paths = self - .metadata - .files + .payloads .iter() - .map(|(path, range)| { - let file_name = path.file_name().ok_or_else(|| { - anyhow::anyhow!("could not extract file_name from path {path:?}") - })?; + .map(|(file_name, _, range)| { let file_name = PathBuf::from(file_name); Ok((file_name, range.start..range.end)) }) @@ -163,23 +184,19 @@ impl SplitPayloadBuilder { let metadata_json = BundleStorageFileOffsetsVersions::serialize(&bundle_storage_file_offsets); + // The hotcache needs to be the next to the metadata in order to be able to read both + // in one continuous read. + let mut footer_bytes = Vec::new(); footer_bytes.extend(&metadata_json); footer_bytes.extend((metadata_json.len() as u32).to_le_bytes()); footer_bytes.extend(hotcache); footer_bytes.extend((hotcache.len() as u32).to_le_bytes()); - let mut payloads: Vec> = Vec::new(); - - let mut sorted_files = self.metadata.files.iter().collect::>(); - sorted_files.sort_by_key(|(_file, range)| range.start); - - for (path, byte_range) in sorted_files { - let file_payload = FilePayload { - path: path.to_owned(), - len: byte_range.end - byte_range.start, - }; - payloads.push(Box::new(file_payload)); - } + let mut payloads: Vec> = self + .payloads + .into_iter() + .map(|(_, payload, _)| payload) + .collect(); payloads.push(Box::new(footer_bytes.to_vec())); @@ -250,9 +267,9 @@ mod tests { file2.write_all(b"world")?; let split_payload = - SplitPayloadBuilder::get_split_payload(&[test_filepath1, test_filepath2], b"abc")?; + SplitPayloadBuilder::get_split_payload(&[test_filepath1, test_filepath2], &[], b"abc")?; - assert_eq!(split_payload.len(), 91); + assert_eq!(split_payload.len(), 128); Ok(()) } @@ -383,6 +400,7 @@ mod tests { let split_streamer = SplitPayloadBuilder::get_split_payload( &[test_filepath1.clone(), test_filepath2.clone()], + &[], &[1, 2, 3], )?;