diff --git a/Cargo.toml b/Cargo.toml index 8d04f6799..b3cea6b08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,12 +39,12 @@ rust-version = "1.77.1" anyhow = "1.0.72" apache-avro = "0.17" array-init = "2" -arrow-arith = { version = "52" } -arrow-array = { version = "52" } -arrow-ord = { version = "52" } -arrow-schema = { version = "52" } -arrow-select = { version = "52" } -arrow-string = { version = "52" } +arrow-arith = { version = "53" } +arrow-array = { version = "53" } +arrow-ord = { version = "53" } +arrow-schema = { version = "53" } +arrow-select = { version = "53" } +arrow-string = { version = "53" } async-stream = "0.3.5" async-trait = "0.1" async-std = "1.12" @@ -71,7 +71,7 @@ murmur3 = "0.5.2" once_cell = "1" opendal = "0.49" ordered-float = "4" -parquet = "52" +parquet = "53" paste = "1" pilota = "0.11.2" pretty_assertions = "1.4" diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 2ff43e0f0..20d352df8 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -655,43 +655,57 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result { + ($limit_type:tt) => { paste::paste! { /// Gets the $limit_type value from a parquet Statistics struct, as a Datum pub(crate) fn []( primitive_type: &PrimitiveType, stats: &Statistics ) -> Result> { - Ok(Some(match (primitive_type, stats) { - (PrimitiveType::Boolean, Statistics::Boolean(stats)) => Datum::bool(*stats.$limit_type()), - (PrimitiveType::Int, Statistics::Int32(stats)) => Datum::int(*stats.$limit_type()), - (PrimitiveType::Date, Statistics::Int32(stats)) => Datum::date(*stats.$limit_type()), - (PrimitiveType::Long, Statistics::Int64(stats)) => Datum::long(*stats.$limit_type()), - (PrimitiveType::Time, Statistics::Int64(stats)) => Datum::time_micros(*stats.$limit_type())?, + Ok(match (primitive_type, stats) { + (PrimitiveType::Boolean, Statistics::Boolean(stats)) => stats.[<$limit_type _opt>]().map(|val|Datum::bool(*val)), + (PrimitiveType::Int, Statistics::Int32(stats)) => stats.[<$limit_type _opt>]().map(|val|Datum::int(*val)), + (PrimitiveType::Date, Statistics::Int32(stats)) => stats.[<$limit_type _opt>]().map(|val|Datum::date(*val)), + (PrimitiveType::Long, Statistics::Int64(stats)) => stats.[<$limit_type _opt>]().map(|val|Datum::long(*val)), + (PrimitiveType::Time, Statistics::Int64(stats)) => { + let Some(val) = stats.[<$limit_type _opt>]() else { + return Ok(None); + }; + + Some(Datum::time_micros(*val)?) + } (PrimitiveType::Timestamp, Statistics::Int64(stats)) => { - Datum::timestamp_micros(*stats.$limit_type()) + stats.[<$limit_type _opt>]().map(|val|Datum::timestamp_micros(*val)) } (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => { - Datum::timestamptz_micros(*stats.$limit_type()) + stats.[<$limit_type _opt>]().map(|val|Datum::timestamptz_micros(*val)) } (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => { - Datum::timestamp_nanos(*stats.$limit_type()) + stats.[<$limit_type _opt>]().map(|val|Datum::timestamp_nanos(*val)) } (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => { - Datum::timestamptz_nanos(*stats.$limit_type()) + stats.[<$limit_type _opt>]().map(|val|Datum::timestamptz_nanos(*val)) } - (PrimitiveType::Float, Statistics::Float(stats)) => Datum::float(*stats.$limit_type()), - (PrimitiveType::Double, Statistics::Double(stats)) => Datum::double(*stats.$limit_type()), + (PrimitiveType::Float, Statistics::Float(stats)) => stats.[<$limit_type _opt>]().map(|val|Datum::float(*val)), + (PrimitiveType::Double, Statistics::Double(stats)) => stats.[<$limit_type _opt>]().map(|val|Datum::double(*val)), (PrimitiveType::String, Statistics::ByteArray(stats)) => { - Datum::string(stats.$limit_type().as_utf8()?) + let Some(val) = stats.[<$limit_type _opt>]() else { + return Ok(None); + }; + + Some(Datum::string(val.as_utf8()?)) } (PrimitiveType::Decimal { precision: _, scale: _, }, Statistics::ByteArray(stats)) => { - Datum::new( + let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else { + return Ok(None); + }; + + Some(Datum::new( primitive_type.clone(), - PrimitiveLiteral::Int128(i128::from_le_bytes(stats.[<$limit_type _bytes>]().try_into()?)), - ) + PrimitiveLiteral::Int128(i128::from_le_bytes(bytes.try_into()?)), + )) } ( PrimitiveType::Decimal { @@ -699,10 +713,12 @@ macro_rules! get_parquet_stat_as_datum { scale: _, }, Statistics::Int32(stats)) => { - Datum::new( - primitive_type.clone(), - PrimitiveLiteral::Int128(i128::from(*stats.$limit_type())), - ) + stats.[<$limit_type _opt>]().map(|val| { + Datum::new( + primitive_type.clone(), + PrimitiveLiteral::Int128(i128::from(*val)), + ) + }) } ( @@ -712,40 +728,46 @@ macro_rules! get_parquet_stat_as_datum { }, Statistics::Int64(stats), ) => { - Datum::new( - primitive_type.clone(), - PrimitiveLiteral::Int128(i128::from(*stats.$limit_type())), - ) + stats.[<$limit_type _opt>]().map(|val| { + Datum::new( + primitive_type.clone(), + PrimitiveLiteral::Int128(i128::from(*val)), + ) + }) } (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => { - let raw = stats.[<$limit_type _bytes>](); - if raw.len() != 16 { + let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else { + return Ok(None); + }; + if bytes.len() != 16 { return Err(Error::new( ErrorKind::Unexpected, "Invalid length of uuid bytes.", )); } - Datum::uuid(Uuid::from_bytes( - raw[..16].try_into().unwrap(), - )) + Some(Datum::uuid(Uuid::from_bytes( + bytes[..16].try_into().unwrap(), + ))) } (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => { - let raw = stat.[<$limit_type _bytes>](); - if raw.len() != *len as usize { + let Some(bytes) = stat.[<$limit_type _bytes_opt>]() else { + return Ok(None); + }; + if bytes.len() != *len as usize { return Err(Error::new( ErrorKind::Unexpected, "Invalid length of fixed bytes.", )); } - Datum::fixed(raw.to_vec()) + Some(Datum::fixed(bytes.to_vec())) } (PrimitiveType::Binary, Statistics::ByteArray(stat)) => { - Datum::binary(stat.[<$limit_type _bytes>]().to_vec()) + return Ok(stat.[<$limit_type _bytes_opt>]().map(|bytes|Datum::binary(bytes.to_vec()))) } _ => { return Ok(None); } - })) + }) } } } diff --git a/crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs index 4bf53d6ee..1886b6062 100644 --- a/crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs @@ -81,8 +81,7 @@ impl<'a> RowGroupMetricsEvaluator<'a> { } fn null_count(&self, field_id: i32) -> Option { - self.stats_for_field_id(field_id) - .map(|stats| stats.null_count()) + self.stats_for_field_id(field_id)?.null_count_opt() } fn value_count(&self) -> u64 { @@ -141,10 +140,6 @@ impl<'a> RowGroupMetricsEvaluator<'a> { return Ok(None); }; - if !stats.has_min_max_set() { - return Ok(None); - } - get_parquet_stat_min_as_datum(&primitive_type, stats) } @@ -153,10 +148,6 @@ impl<'a> RowGroupMetricsEvaluator<'a> { return Ok(None); }; - if !stats.has_min_max_set() { - return Ok(None); - } - get_parquet_stat_max_as_datum(&primitive_type, stats) } @@ -593,7 +584,7 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(None, None, None, 1, false)), + Some(Statistics::float(None, None, None, Some(1), false)), 1, None, )?; @@ -620,7 +611,7 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(None, None, None, 1, false)), + Some(Statistics::float(None, None, None, Some(1), false)), 1, None, )?; @@ -647,7 +638,7 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(None, None, None, 0, false)), + Some(Statistics::float(None, None, None, Some(0), false)), 1, None, )?; @@ -674,7 +665,7 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(None, None, None, 0, false)), + Some(Statistics::float(None, None, None, Some(0), false)), 1, None, )?; @@ -701,7 +692,7 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(None, None, None, 1, false)), + Some(Statistics::float(None, None, None, Some(1), false)), 1, None, )?; @@ -728,7 +719,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(0.0), Some(2.0), None, 0, false)), + Some(Statistics::float( + Some(0.0), + Some(2.0), + None, + Some(0), + false, + )), 1, None, )?; @@ -755,7 +752,7 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(None, Some(2.0), None, 0, false)), + Some(Statistics::float(None, Some(2.0), None, Some(0), false)), 1, None, )?; @@ -782,7 +779,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(0.0), Some(0.9), None, 0, false)), + Some(Statistics::float( + Some(0.0), + Some(0.9), + None, + Some(0), + false, + )), 1, None, )?; @@ -809,7 +812,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(0.0), Some(2.0), None, 0, false)), + Some(Statistics::float( + Some(0.0), + Some(2.0), + None, + Some(0), + false, + )), 1, None, )?; @@ -836,7 +845,7 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(None, None, None, 1, false)), + Some(Statistics::float(None, None, None, Some(1), false)), 1, None, )?; @@ -863,7 +872,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(f32::NAN), Some(2.0), None, 0, false)), + Some(Statistics::float( + Some(f32::NAN), + Some(2.0), + None, + Some(0), + false, + )), 1, None, )?; @@ -890,7 +905,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(1.5), Some(2.0), None, 0, false)), + Some(Statistics::float( + Some(1.5), + Some(2.0), + None, + Some(0), + false, + )), 1, None, )?; @@ -917,7 +938,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(0.0), Some(f32::NAN), None, 0, false)), + Some(Statistics::float( + Some(0.0), + Some(f32::NAN), + None, + Some(0), + false, + )), 1, None, )?; @@ -944,7 +971,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(0.0), Some(0.5), None, 0, false)), + Some(Statistics::float( + Some(0.0), + Some(0.5), + None, + Some(0), + false, + )), 1, None, )?; @@ -971,7 +1004,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(0.0), Some(2.0), None, 0, false)), + Some(Statistics::float( + Some(0.0), + Some(2.0), + None, + Some(0), + false, + )), 1, None, )?; @@ -998,7 +1037,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(1.0), Some(1.0), None, 0, false)), + Some(Statistics::float( + Some(1.0), + Some(1.0), + None, + Some(0), + false, + )), 1, None, )?; @@ -1027,7 +1072,7 @@ mod tests { 1, None, 1, - Some(Statistics::byte_array(None, None, None, 1, false)), + Some(Statistics::byte_array(None, None, None, Some(1), false)), )?; let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; @@ -1054,7 +1099,7 @@ mod tests { 1, None, 1, - Some(Statistics::byte_array(None, None, None, 0, false)), + Some(Statistics::byte_array(None, None, None, Some(0), false)), )?; let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; @@ -1086,7 +1131,7 @@ mod tests { Some(ByteArray::from(vec![255u8])), Some(ByteArray::from(vec![32u8])), None, - 0, + Some(0), false, )), )?; @@ -1120,7 +1165,7 @@ mod tests { Some(ByteArray::from("ice".as_bytes())), Some(ByteArray::from(vec![255u8])), None, - 0, + Some(0), false, )), )?; @@ -1150,7 +1195,7 @@ mod tests { None, 1, // Max val of 0xFF is not valid utf8 - Some(Statistics::byte_array(None, None, None, 1, false)), + Some(Statistics::byte_array(None, None, None, Some(1), false)), )?; let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; @@ -1182,7 +1227,7 @@ mod tests { Some(ByteArray::from("id".as_bytes())), Some(ByteArray::from("ie".as_bytes())), None, - 0, + Some(0), false, )), )?; @@ -1216,7 +1261,7 @@ mod tests { Some(ByteArray::from("h".as_bytes())), Some(ByteArray::from("ib".as_bytes())), None, - 0, + Some(0), false, )), )?; @@ -1250,7 +1295,7 @@ mod tests { Some(ByteArray::from("h".as_bytes())), Some(ByteArray::from("j".as_bytes())), None, - 0, + Some(0), false, )), )?; @@ -1279,7 +1324,7 @@ mod tests { 1, None, 1, - Some(Statistics::byte_array(None, None, None, 1, false)), + Some(Statistics::byte_array(None, None, None, Some(1), false)), )?; let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; @@ -1311,7 +1356,7 @@ mod tests { Some(ByteArray::from(vec![255u8])), Some(ByteArray::from(vec![32u8])), None, - 0, + Some(0), false, )), )?; @@ -1345,7 +1390,7 @@ mod tests { Some(ByteArray::from("iceberg".as_bytes())), Some(ByteArray::from(vec![255u8])), None, - 0, + Some(0), false, )), )?; @@ -1379,7 +1424,7 @@ mod tests { None, Some(ByteArray::from("iceberg".as_bytes())), None, - 0, + Some(0), false, )), )?; @@ -1413,7 +1458,7 @@ mod tests { Some(ByteArray::from("ice".as_bytes())), Some(ByteArray::from("iceberg".as_bytes())), None, - 0, + Some(0), false, )), )?; @@ -1447,7 +1492,7 @@ mod tests { Some(ByteArray::from("iceberg".as_bytes())), None, None, - 0, + Some(0), false, )), )?; @@ -1481,7 +1526,7 @@ mod tests { Some(ByteArray::from("iceberg".as_bytes())), Some(ByteArray::from("icy".as_bytes())), None, - 0, + Some(0), false, )), )?; @@ -1515,7 +1560,7 @@ mod tests { Some(ByteArray::from("iceberg".as_bytes())), Some(ByteArray::from("iceberg".as_bytes())), None, - 0, + Some(0), false, )), )?; @@ -1548,7 +1593,7 @@ mod tests { Some(ByteArray::from("iceberg".as_bytes())), Some(ByteArray::from("iceberg".as_bytes())), None, - 1, + Some(1), false, )), )?; @@ -1577,7 +1622,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(11.0), Some(12.0), None, 0, false)), + Some(Statistics::float( + Some(11.0), + Some(12.0), + None, + Some(0), + false, + )), 1, None, )?; @@ -1606,7 +1657,7 @@ mod tests { 1, None, 1, - Some(Statistics::byte_array(None, None, None, 0, false)), + Some(Statistics::byte_array(None, None, None, Some(0), false)), )?; let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; @@ -1633,7 +1684,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(f32::NAN), Some(1.0), None, 0, false)), + Some(Statistics::float( + Some(f32::NAN), + Some(1.0), + None, + Some(0), + false, + )), 1, None, )?; @@ -1660,7 +1717,7 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(4.0), None, None, 0, false)), + Some(Statistics::float(Some(4.0), None, None, Some(0), false)), 1, None, )?; @@ -1687,7 +1744,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(0.0), Some(f32::NAN), None, 0, false)), + Some(Statistics::float( + Some(0.0), + Some(f32::NAN), + None, + Some(0), + false, + )), 1, None, )?; @@ -1714,7 +1777,13 @@ mod tests { let row_group_metadata = create_row_group_metadata( 1, 1, - Some(Statistics::float(Some(0.0), Some(1.0), None, 0, false)), + Some(Statistics::float( + Some(0.0), + Some(1.0), + None, + Some(0), + false, + )), 1, None, )?; @@ -1748,7 +1817,7 @@ mod tests { Some(ByteArray::from("iceberg".as_bytes())), Some(ByteArray::from("iceberg".as_bytes())), None, - 0, + Some(0), false, )), )?; @@ -1862,7 +1931,7 @@ mod tests { .set_num_rows(num_rows) .set_column_metadata(vec![ col_1_meta.build()?, - // .set_statistics(Statistics::float(None, None, None, 1, false)) + // .set_statistics(Statistics::float(None, None, None, Some(1), false)) col_2_meta.build()?, ]) .build();