Skip to content

Commit

Permalink
test(object-cache): add unit tests for object cache manifest and mani…
Browse files Browse the repository at this point in the history
…fest list retrieval
  • Loading branch information
sdd committed Aug 17, 2024
1 parent 53f21e7 commit 263c929
Showing 1 changed file with 229 additions and 0 deletions.
229 changes: 229 additions & 0 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,232 @@ impl ObjectCache {
Ok(CachedItem::ManifestList(Arc::new(manifest_list)))
}
}

#[cfg(test)]
mod tests {
use std::fs;

use tempfile::TempDir;
use tera::{Context, Tera};
use uuid::Uuid;

use super::*;
use crate::io::{FileIO, OutputFile};
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
};
use crate::table::Table;
use crate::TableIdent;

struct TableTestFixture {
table_location: String,
table: Table,
}

impl TableTestFixture {
fn new() -> Self {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
let table_metadata1_location = table_location.join("metadata/v1.json");

let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();

let table_metadata = {
let template_json_str = fs::read_to_string(format!(
"{}/testdata/example_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
))
.unwrap();
let mut context = Context::new();
context.insert("table_location", &table_location);
context.insert("manifest_list_1_location", &manifest_list1_location);
context.insert("manifest_list_2_location", &manifest_list2_location);
context.insert("table_metadata_1_location", &table_metadata1_location);

let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
};

let table = Table::builder()
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
.file_io(file_io.clone())
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
.build()
.unwrap();

Self {
table_location: table_location.to_str().unwrap().to_string(),
table,
}
}

fn next_manifest_file(&self) -> OutputFile {
self.table
.file_io()
.new_output(format!(
"{}/metadata/manifest_{}.avro",
self.table_location,
Uuid::new_v4()
))
.unwrap()
}

async fn setup_manifest_files(&mut self) {
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec().unwrap();

// Write data files
let data_file_manifest = ManifestWriter::new(
self.next_manifest_file(),
current_snapshot.snapshot_id(),
vec![],
)
.write(Manifest::new(
ManifestMetadata::builder()
.schema((*current_schema).clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
vec![ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build()],
))
.await
.unwrap();

// Write to manifest list
let mut manifest_list_write = ManifestListWriter::v2(
self.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot
.parent_snapshot_id()
.unwrap_or(EMPTY_SNAPSHOT_ID),
current_snapshot.sequence_number(),
);
manifest_list_write
.add_manifests(vec![data_file_manifest].into_iter())
.unwrap();
manifest_list_write.close().await.unwrap();
}
}

#[tokio::test]
async fn test_get_manifest_list_and_manifest_from_disabled_cache() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone());

let result_manifest_list = object_cache
.get_manifest_list(
fixture.table.metadata().current_snapshot().unwrap(),
&fixture.table.metadata_ref(),
)
.await
.unwrap();

assert_eq!(result_manifest_list.entries().len(), 1);

let manifest_file = result_manifest_list.entries().first().unwrap();
let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap();

assert_eq!(
result_manifest
.entries()
.first()
.unwrap()
.file_path()
.split("/")
.last()
.unwrap(),
"1.parquet"
);
}

#[tokio::test]
async fn test_get_manifest_list_and_manifest_from_default_cache() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone());

// not in cache
let result_manifest_list = object_cache
.get_manifest_list(
fixture.table.metadata().current_snapshot().unwrap(),
&fixture.table.metadata_ref(),
)
.await
.unwrap();

assert_eq!(result_manifest_list.entries().len(), 1);

// retrieve cached version
let result_manifest_list = object_cache
.get_manifest_list(
fixture.table.metadata().current_snapshot().unwrap(),
&fixture.table.metadata_ref(),
)
.await
.unwrap();

assert_eq!(result_manifest_list.entries().len(), 1);

let manifest_file = result_manifest_list.entries().first().unwrap();

// not in cache
let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap();

assert_eq!(
result_manifest
.entries()
.first()
.unwrap()
.file_path()
.split("/")
.last()
.unwrap(),
"1.parquet"
);

// retrieve cached version
let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap();

assert_eq!(
result_manifest
.entries()
.first()
.unwrap()
.file_path()
.split("/")
.last()
.unwrap(),
"1.parquet"
);
}
}

0 comments on commit 263c929

Please sign in to comment.