diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index ca391b4354c07..77e6494dbdade 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -34,6 +34,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" +#include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" #include "arrow/util/range.h" #include "arrow/util/tracing_internal.h" @@ -42,6 +43,7 @@ #include "parquet/arrow/writer.h" #include "parquet/encryption/crypto_factory.h" #include "parquet/encryption/encryption.h" +#include "parquet/encryption/internal_file_decryptor.h" #include "parquet/encryption/kms_client.h" #include "parquet/file_reader.h" #include "parquet/properties.h" @@ -1080,9 +1082,18 @@ Result> ParquetDatasetFactory::Make( std::vector>> paths_with_row_group_ids; std::unordered_map paths_to_index; + const bool metadata_only_file = + metadata_source.path() == "_metadata" && metadata->key_value_metadata(); for (int i = 0; i < metadata->num_row_groups(); i++) { - auto row_group = metadata->RowGroup(i); + std::unique_ptr row_group; + + if (metadata_only_file) { + PARQUET_ASSIGN_OR_THROW(auto aad, metadata->key_value_metadata()->Get( + "row_group_aad_" + std::to_string(i))); + metadata->set_file_decryptor_aad(aad); + } + row_group = metadata->RowGroup(i); ARROW_ASSIGN_OR_RAISE(auto path, FileFromRowGroup(filesystem.get(), base_path, *row_group, options.validate_column_chunk_paths)); diff --git a/cpp/src/arrow/dataset/file_parquet_encryption_test.cc b/cpp/src/arrow/dataset/file_parquet_encryption_test.cc index 0287d593d12d3..0ff745b36fec6 100644 --- a/cpp/src/arrow/dataset/file_parquet_encryption_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_encryption_test.cc @@ -32,11 +32,14 @@ #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/type.h" +#include "arrow/util/key_value_metadata.h" #include "parquet/arrow/reader.h" +#include "parquet/arrow/schema.h" #include "parquet/encryption/crypto_factory.h" #include "parquet/encryption/encryption_internal.h" #include "parquet/encryption/kms_client.h" #include "parquet/encryption/test_in_memory_kms.h" +#include "parquet/file_writer.h" constexpr std::string_view kFooterKeyMasterKey = "0123456789012345"; constexpr std::string_view kFooterKeyMasterKeyId = "footer_key"; @@ -51,6 +54,62 @@ using arrow::internal::checked_pointer_cast; namespace arrow { namespace dataset { +class DatasetTestBase : public ::testing::Test { + public: + void SetUp() override { + // Creates a mock file system using the current time point. + EXPECT_OK_AND_ASSIGN(file_system_, fs::internal::MockFileSystem::Make( + std::chrono::system_clock::now(), {})); + ASSERT_OK(file_system_->CreateDir(std::string(kBaseDir))); + + // Init dataset and partitioning. + ASSERT_NO_FATAL_FAILURE(PrepareTableAndPartitioning()); + + auto file_format = std::make_shared(); + auto parquet_file_write_options = + checked_pointer_cast(file_format->DefaultWriteOptions()); + + // Write dataset. + auto dataset = std::make_shared(table_); + EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); + EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + + FileSystemDatasetWriteOptions write_options; + write_options.file_write_options = parquet_file_write_options; + write_options.filesystem = file_system_; + write_options.base_dir = kBaseDir; + write_options.partitioning = partitioning_; + write_options.basename_template = "part{i}.parquet"; + ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner))); + } + + void PrepareTableAndPartitioning() { + // Prepare table data. + auto table_schema = schema({field("a", int64()), field("c", int64()), + field("e", int64()), field("part", utf8())}); + table_ = TableFromJSON(table_schema, {R"([ + [ 0, 9, 1, "a" ], + [ 1, 8, 2, "a" ], + [ 2, 7, 1, "a" ], + [ 3, 6, 2, "a" ], + [ 4, 5, 1, "a" ], + [ 5, 4, 2, "a" ], + [ 6, 3, 1, "b" ], + [ 7, 2, 2, "b" ], + [ 8, 1, 1, "b" ], + [ 9, 0, 2, "b" ] + ])"}); + + // Use a Hive-style partitioning scheme. + partitioning_ = std::make_shared(schema({field("part", utf8())})); + } + + protected: + std::shared_ptr file_system_; + std::shared_ptr table_; + std::shared_ptr partitioning_; +}; + // Base class to test writing and reading encrypted dataset. class DatasetEncryptionTestBase : public ::testing::Test { public: @@ -240,6 +299,180 @@ TEST_F(DatasetEncryptionTest, ReadSingleFile) { ASSERT_EQ(checked_pointer_cast(table->column(2)->chunk(0))->GetView(0), 1); } +Result>> ReadMetadata( + const std::vector& paths, std::shared_ptr& file_system, + const parquet::ReaderProperties& reader_properties) { + if (paths.empty()) { + return Status::Invalid("No files to read metadata from"); + } + + std::vector> metadata; + + for (const auto& path : paths) { + PARQUET_ASSIGN_OR_THROW(auto input, file_system->OpenInputFile(path)); + auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties); + auto file_metadata = parquet_reader->metadata(); + file_metadata->set_file_path(path); + metadata.push_back(file_metadata); + } + return metadata; +} + +// Write encrypted _metadata file and read the dataset using the metadata. +TEST_F(DatasetEncryptionTest, ReadDatasetFromEncryptedMetadata) { + // Create decryption properties. + auto decryption_config = + std::make_shared(); + auto parquet_decryption_config = std::make_shared(); + parquet_decryption_config->crypto_factory = crypto_factory_; + parquet_decryption_config->kms_connection_config = kms_connection_config_; + parquet_decryption_config->decryption_config = std::move(decryption_config); + + // Set scan options. + auto parquet_scan_options = std::make_shared(); + parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config); + + auto file_format = std::make_shared(); + file_format->default_fragment_scan_options = std::move(parquet_scan_options); + + auto reader_properties = parquet::default_reader_properties(); + decryption_config = std::make_shared(); + reader_properties.file_decryption_properties( + crypto_factory_->GetFileDecryptionProperties(*kms_connection_config_, + *decryption_config)); + auto encryption_config = std::make_shared( + std::string(kFooterKeyName)); + encryption_config->column_keys = kColumnKeyMapping; + + auto parquet_encryption_config = std::make_shared(); + // Directly assign shared_ptr objects to ParquetEncryptionConfig members + parquet_encryption_config->crypto_factory = crypto_factory_; + parquet_encryption_config->kms_connection_config = kms_connection_config_; + parquet_encryption_config->encryption_config = encryption_config; + + auto parquet_file_write_options = + checked_pointer_cast(file_format->DefaultWriteOptions()); + parquet_file_write_options->parquet_encryption_config = + std::move(parquet_encryption_config); + + std::vector paths = {"part=a/part0.parquet", "part=c/part0.parquet", + "part=e/part0.parquet", "part=g/part0.parquet", + "part=i/part0.parquet"}; + + auto file_encryption_properties = crypto_factory_->GetFileEncryptionProperties( + *kms_connection_config_, *encryption_config); + + auto table_schema = + schema({field("a", int64()), field("c", int64()), field("e", int64())}); + std::shared_ptr schema; + auto writer_props = parquet::WriterProperties::Builder().build(); + + PARQUET_ASSIGN_OR_THROW(auto metadata_list, + ReadMetadata(paths, file_system_, reader_properties)); + PARQUET_ASSIGN_OR_THROW(auto metadata, parquet::FileMetaData::CoalesceMetadata( + metadata_list, writer_props)); + + // TODO: Make sure plaintext footer mode works + // encryption_config->plaintext_footer = true; + file_encryption_properties = crypto_factory_->GetFileEncryptionProperties( + *kms_connection_config_, *encryption_config); + + // Write metadata to _metadata file. + std::string metadata_path = "_metadata"; + ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path)); + WriteEncryptedMetadataFile(*metadata, stream, file_encryption_properties); + ARROW_EXPECT_OK(stream->Close()); + + // Set scan options. + decryption_config = std::make_shared(); + parquet_decryption_config = std::make_shared(); + parquet_decryption_config->crypto_factory = crypto_factory_; + parquet_decryption_config->kms_connection_config = kms_connection_config_; + parquet_decryption_config->decryption_config = std::move(decryption_config); + + parquet_scan_options = std::make_shared(); + parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config); + + file_format = std::make_shared(); + file_format->default_fragment_scan_options = std::move(parquet_scan_options); + + ParquetFactoryOptions parquet_factory_options; + parquet_factory_options.partitioning = partitioning_; + parquet_factory_options.partition_base_dir = kBaseDir; + + ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(metadata_path)); + auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties); + auto file_metadata = parquet_reader->metadata(); + + ASSERT_TRUE(file_metadata->Equals(*metadata)); + + // Create parquet dataset factory + ASSERT_OK_AND_ASSIGN(auto parquet_dataset_factory, + ParquetDatasetFactory::Make(metadata_path, file_system_, + file_format, parquet_factory_options)); + + // Create the dataset + ASSERT_OK_AND_ASSIGN(auto dataset, parquet_dataset_factory->Finish()); + + // Read dataset into table + ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); + ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable()); + + // Verify the data was read correctly + ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks()); + + // Validate the table + ASSERT_OK(combined_table->ValidateFull()); + AssertTablesEqual(*table_, *combined_table); +} + +// Write _metadata file and read the dataset using the metadata. +TEST_F(DatasetTestBase, ReadDatasetFromMetadata) { + auto reader_properties = parquet::default_reader_properties(); + + std::vector paths = {"part=a/part0.parquet", "part=b/part0.parquet"}; + std::vector> metadata; + + for (const auto& path : paths) { + ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(path)); + auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties); + auto file_metadata = parquet_reader->metadata(); + // Make sure file_paths are stored in metadata + file_metadata->set_file_path(path); + metadata.push_back(file_metadata); + } + metadata[0]->AppendRowGroups(*metadata[1]); + + std::string metadata_path = "_metadata"; + ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path)); + WriteMetaDataFile(*metadata[0], stream.get()); + ARROW_EXPECT_OK(stream->Close()); + + auto file_format = std::make_shared(); + ParquetFactoryOptions factory_options; + factory_options.partitioning = partitioning_; + factory_options.partition_base_dir = kBaseDir; + ASSERT_OK_AND_ASSIGN(auto dataset_factory, + ParquetDatasetFactory::Make(metadata_path, file_system_, + file_format, factory_options)); + + // Create the dataset + ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish()); + + // Read dataset into table + ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); + ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable()); + + // Verify the data was read correctly + ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks()); + + // Validate the table + ASSERT_OK(combined_table->ValidateFull()); + AssertTablesEqual(*combined_table, *table_); +} + // GH-39444: This test covers the case where parquet dataset scanner crashes when // processing encrypted datasets over 2^15 rows in multi-threaded mode. class LargeRowEncryptionTest : public DatasetEncryptionTestBase { diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 463713df1b1aa..4a4efebf5d5b6 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -589,6 +589,14 @@ Status WriteMetaDataFile(const FileMetaData& file_metadata, return Status::OK(); } +Status WriteEncryptedMetadataFile( + const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink, + std::shared_ptr file_encryption_properties) { + PARQUET_CATCH_NOT_OK(::parquet::WriteEncryptedMetadataFile(file_metadata, sink, + file_encryption_properties)); + return Status::OK(); +} + Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool, std::shared_ptr<::arrow::io::OutputStream> sink, int64_t chunk_size, std::shared_ptr properties, diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 4e1ddafd9a082..36bdeec02f4b2 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -155,6 +155,12 @@ PARQUET_EXPORT ::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata, ::arrow::io::OutputStream* sink); +/// \brief Write encrypted metadata-only Parquet file to indicated Arrow OutputStream +PARQUET_EXPORT +::arrow::Status WriteEncryptedMetadataFile( + const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink, + std::shared_ptr file_encryption_properties); + /// \brief Write a Table to Parquet. /// /// This writes one table in a single shot. To write a Parquet file with diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index baa9e00da2351..c2b2908cfcc90 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -567,6 +567,46 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata, } } +void WriteEncryptedMetadataFile( + const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink, + std::shared_ptr file_encryption_properties) { + auto file_encryptor = std::make_unique( + file_encryption_properties.get(), ::arrow::default_memory_pool()); + + if (file_encryption_properties->encrypted_footer()) { + PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4)); + + PARQUET_ASSIGN_OR_THROW(int64_t position, sink->Tell()); + auto metadata_start = static_cast(position); + + auto writer_props = parquet::WriterProperties::Builder() + .encryption(file_encryption_properties) + ->build(); + auto builder = FileMetaDataBuilder::Make(metadata.schema(), writer_props); + + auto footer_metadata = builder->Finish(metadata.key_value_metadata()); + auto crypto_metadata = builder->GetCryptoMetaData(); + + WriteFileCryptoMetaData(*crypto_metadata, sink.get()); + + auto footer_encryptor = file_encryptor->GetFooterEncryptor(); + WriteEncryptedFileMetadata(metadata, sink.get(), footer_encryptor, true); + + PARQUET_ASSIGN_OR_THROW(position, sink->Tell()); + auto footer_and_crypto_len = static_cast(position - metadata_start); + PARQUET_THROW_NOT_OK( + sink->Write(reinterpret_cast(&footer_and_crypto_len), 4)); + PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4)); + } else { + // Encrypted file with plaintext footer mode. + PARQUET_THROW_NOT_OK(sink->Write(kParquetMagic, 4)); + auto footer_signing_encryptor = file_encryptor->GetFooterSigningEncryptor(); + WriteEncryptedFileMetadata(metadata, sink.get(), footer_signing_encryptor, false); + } + + file_encryptor->WipeOutEncryptionKeys(); +} + void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata, ArrowOutputStream* sink) { crypto_metadata.WriteTo(sink); diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index d5ea1d7c98a0e..60b8474df90c8 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -117,6 +117,11 @@ PARQUET_EXPORT void WriteMetaDataFile(const FileMetaData& file_metadata, ::arrow::io::OutputStream* sink); +PARQUET_EXPORT +void WriteEncryptedMetadataFile( + const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink, + std::shared_ptr file_encryption_properties); + PARQUET_EXPORT void WriteEncryptedFileMetadata(const FileMetaData& file_metadata, ArrowOutputStream* sink, @@ -125,9 +130,10 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata, PARQUET_EXPORT void WriteEncryptedFileMetadata(const FileMetaData& file_metadata, - ::arrow::io::OutputStream* sink, + std::shared_ptr<::arrow::io::OutputStream> sink, const std::shared_ptr& encryptor = NULLPTR, bool encrypt_footer = false); + PARQUET_EXPORT void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata, ::arrow::io::OutputStream* sink); diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 8f577be45b96d..41d0dd54a9b8d 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -894,6 +894,13 @@ class FileMetaData::FileMetaDataImpl { out->impl_->key_value_metadata_ = key_value_metadata_; out->impl_->file_decryptor_ = file_decryptor_; + if (key_value_metadata_ && key_value_metadata_->Contains("row_group_aad_0")) { + PARQUET_ASSIGN_OR_THROW( + auto aad, + key_value_metadata_->Get("row_group_aad_" + std::to_string(row_groups[0]))); + out->set_file_decryptor_aad(aad); + } + return out; } @@ -1018,6 +1025,10 @@ EncryptionAlgorithm FileMetaData::encryption_algorithm() const { return impl_->encryption_algorithm(); } +const std::string& FileMetaData::file_aad() const { + return impl_->file_decryptor()->file_aad(); +} + const std::string& FileMetaData::footer_signing_key_metadata() const { return impl_->footer_signing_key_metadata(); } @@ -1027,6 +1038,13 @@ void FileMetaData::set_file_decryptor( impl_->set_file_decryptor(std::move(file_decryptor)); } +void FileMetaData::set_file_decryptor_aad(const std::string& aad) { + auto fd = impl_->file_decryptor(); + auto file_decryptor = std::make_shared( + fd->properties(), aad, fd->algorithm(), fd->footer_key_metadata(), fd->pool()); + set_file_decryptor(file_decryptor); +} + const std::shared_ptr& FileMetaData::file_decryptor() const { return impl_->file_decryptor(); } @@ -1078,6 +1096,37 @@ void FileMetaData::WriteTo(::arrow::io::OutputStream* dst, return impl_->WriteTo(dst, encryptor); } +::arrow::Result> FileMetaData::CoalesceMetadata( + std::vector>& metadata_list, + std::shared_ptr& writer_props) { + if (metadata_list.empty()) { + return ::arrow::Status::Invalid("No metadata to coalesce"); + } + + std::vector values, keys; + + // Read metadata from all dataset files and store AADs and paths as key-value metadata. + for (size_t i = 0; i < metadata_list.size(); i++) { + const auto& file_metadata = metadata_list[i]; + keys.push_back("row_group_aad_" + std::to_string(i)); + values.push_back(file_metadata->file_aad()); + if (i > 0) { + metadata_list[0]->AppendRowGroups(*file_metadata); + } + } + + // Create a new FileMetadata object with the created AADs and paths as + // key_value_metadata. + auto fmd_builder = + parquet::FileMetaDataBuilder::Make(metadata_list[0]->schema(), writer_props); + const std::shared_ptr file_aad_metadata = + ::arrow::key_value_metadata(keys, values); + auto metadata = fmd_builder->Finish(file_aad_metadata); + metadata->AppendRowGroups(*metadata_list[0]); + + return metadata; +} + class FileCryptoMetaData::FileCryptoMetaDataImpl { public: FileCryptoMetaDataImpl() = default; diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index dc97d816daa74..86cf3a742fb61 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -341,6 +341,8 @@ class PARQUET_EXPORT FileMetaData { EncryptionAlgorithm encryption_algorithm() const; const std::string& footer_signing_key_metadata() const; + const std::string& file_aad() const; + /// \brief Verify signature of FileMetaData when file is encrypted but footer /// is not encrypted (plaintext footer). bool VerifySignature(const void* signature); @@ -390,6 +392,23 @@ class PARQUET_EXPORT FileMetaData { /// debug text (if true). std::string SerializeUnencrypted(bool scrub, bool debug) const; + /// \brief Coalesce metadata from multiple files into a single metadata file by + /// appending row groups. + /// + /// \param[in] metadata_list list of FileMetaData objects to coalesce. + /// \param[in] writer_props WriterProperties to use to create coalesced FileMetaData + /// object. + /// \return a new FileMetaData object containing all row groups from the input + /// FileMetaData objects. + static ::arrow::Result> CoalesceMetadata( + std::vector>& metadata_list, + std::shared_ptr& writer_props); + + /// \brief Set the AAD of decryptor of the file. + /// + /// \param[in] aad AAD to set on the decryptor. + void set_file_decryptor_aad(const std::string& aad); + private: friend FileMetaDataBuilder; friend class SerializedFile;