Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Jun 3, 2024
1 parent 63b6a61 commit e0c9418
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 118 deletions.
111 changes: 38 additions & 73 deletions cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,91 +243,56 @@ TEST_F(DatasetEncryptionTest, ReadSingleFile) {
}

TEST_F(DatasetEncryptionTest, EncryptedFileMetaDataWrite) {
std::string_view kFooterKeyMasterKey = "0123456789012345";
std::string_view kFooterKeyMasterKeyId = "footer_key";
std::string_view kFooterKeyName = "footer_key";
std::string_view kColumnMasterKey = "1234567890123450";
std::string_view kColumnMasterKeyId = "col_key";

auto table_schema = schema({field("a", int64()), field("c", int64()),
field("e", int64()), field("part", utf8())});
auto table = TableFromJSON(table_schema, {R"([
[ 0, 9, 1, "a" ],
[ 1, 8, 2, "a" ],
[ 2, 7, 1, "c" ],
[ 3, 6, 2, "c" ],
[ 4, 5, 1, "e" ],
[ 5, 4, 2, "e" ],
[ 6, 3, 1, "g" ],
[ 7, 2, 2, "g" ],
[ 8, 1, 1, "i" ],
[ 9, 0, 2, "i" ]
])"});

// Prepare encryption properties.
std::unordered_map<std::string, std::string> key_map;
key_map.emplace(kColumnMasterKeyId, kColumnMasterKey);
key_map.emplace(kFooterKeyMasterKeyId, kFooterKeyMasterKey);

auto crypto_factory = std::make_shared<parquet::encryption::CryptoFactory>();
auto kms_client_factory =
std::make_shared<::parquet::encryption::TestOnlyInMemoryKmsClientFactory>(
/*wrap_locally=*/true, key_map);
crypto_factory->RegisterKmsClientFactory(std::move(kms_client_factory));
auto kms_connection_config =
std::make_shared<parquet::encryption::KmsConnectionConfig>();

auto encryption_config = std::make_shared<parquet::encryption::EncryptionConfiguration>(
std::string(kFooterKeyName));
encryption_config->column_keys = kColumnKeyMapping;

auto file_encryption_properties = crypto_factory->GetFileEncryptionProperties(
*kms_connection_config, *encryption_config);
auto writer_properties = ::parquet::WriterProperties::Builder()
.encryption(file_encryption_properties)
->build();

// Create the ReaderProperties object using the FileDecryptionProperties object
auto decryption_config =
std::make_shared<parquet::encryption::DecryptionConfiguration>();
auto parquet_decryption_config = std::make_shared<ParquetDecryptionConfig>();
parquet_decryption_config->crypto_factory = crypto_factory_;
parquet_decryption_config->kms_connection_config = kms_connection_config_;
parquet_decryption_config->decryption_config = decryption_config;

auto file_encryption_properties = crypto_factory_->GetFileEncryptionProperties(
*kms_connection_config_, *encryption_config);
auto file_decryption_properties = crypto_factory_->GetFileDecryptionProperties(
*kms_connection_config_, *decryption_config.get());

auto file_decryption_properties = crypto_factory->GetFileDecryptionProperties(
*kms_connection_config, *decryption_config);
auto reader_properties = parquet::default_reader_properties();
reader_properties.file_decryption_properties(file_decryption_properties);

PARQUET_ASSIGN_OR_THROW(
auto stream, ::arrow::io::BufferOutputStream::Create(1024, default_memory_pool()));
ASSERT_OK_NO_THROW(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(),
stream, 10, writer_properties));
auto writer_properties = ::parquet::WriterProperties::Builder()
.encryption(file_encryption_properties)
->build();

EXPECT_OK_AND_ASSIGN(auto buffer, stream->Finish())
EXPECT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream("/foo.parquet"));
ASSERT_OK_NO_THROW(::parquet::arrow::WriteTable(*table_, ::arrow::default_memory_pool(),
stream, 3, writer_properties));
ARROW_EXPECT_OK(stream->Close());

// Read entire file as a single Arrow table
std::unique_ptr<::parquet::arrow::FileReader> reader;
::parquet::arrow::FileReaderBuilder reader_builder;
ASSERT_OK(reader_builder.Open(std::make_shared<::arrow::io::BufferReader>(buffer),
reader_properties));
ASSERT_OK(reader_builder.Build(&reader));
auto metadata = reader->parquet_reader()->metadata();

PARQUET_ASSIGN_OR_THROW(
stream, ::arrow::io::BufferOutputStream::Create(1024, default_memory_pool()));
file_encryption_properties = crypto_factory->GetFileEncryptionProperties(
*kms_connection_config, *encryption_config);
::parquet::WriteEncryptedMetadataFile(*metadata.get(), stream,
file_encryption_properties);
EXPECT_OK_AND_ASSIGN(buffer, stream->Finish());

ASSERT_OK(reader_builder.Open(std::make_shared<::arrow::io::BufferReader>(buffer),
reader_properties));
ASSERT_OK(reader_builder.Build(&reader));

ASSERT_TRUE(
metadata->schema()->Equals(*reader->parquet_reader()->metadata()->schema()));
ASSERT_EQ(metadata->num_columns(), reader->parquet_reader()->metadata()->num_columns());
ASSERT_EQ(reader->parquet_reader()->metadata()->num_rows(), 0);
ASSERT_EQ(reader->parquet_reader()->metadata()->num_row_groups(), 0);
std::shared_ptr<Table> out;
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile("/foo.parquet"));
std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
parquet::ParquetFileReader::Open(input, reader_properties);

auto metadata = parquet_reader->metadata();

file_encryption_properties = crypto_factory_->GetFileEncryptionProperties(
*kms_connection_config_, *encryption_config);

ASSERT_OK_AND_ASSIGN(stream, file_system_->OpenOutputStream("/_metadata"));
WriteEncryptedMetadataFile(*metadata.get(), stream.get(), file_encryption_properties);
ARROW_EXPECT_OK(stream->Close());

ASSERT_OK_AND_ASSIGN(auto input2, file_system_->OpenInputFile("/_metadata"));
parquet_reader = parquet::ParquetFileReader::Open(input2, reader_properties);
auto metadata2 = parquet_reader->metadata();

ASSERT_EQ(metadata2->num_row_groups(), 4);
ASSERT_EQ(metadata2->num_rows(), 10);
ASSERT_EQ(metadata2->num_columns(), 4);
ASSERT_TRUE(metadata->Equals(*metadata2.get()));
}

// GH-39444: This test covers the case where parquet dataset scanner crashes when
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ Status WriteMetaDataFile(const FileMetaData& file_metadata,
}

Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
const FileMetaData& file_metadata, ArrowOutputStream* sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
PARQUET_CATCH_NOT_OK(::parquet::WriteEncryptedMetadataFile(file_metadata, sink,
file_encryption_properties));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ ::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata,
/// \brief Write encrypted metadata-only Parquet file to indicated Arrow OutputStream
PARQUET_EXPORT
::arrow::Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
const FileMetaData& file_metadata, ::arrow::io::OutputStream* sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties);

/// \brief Write a Table to Parquet.
Expand Down
74 changes: 37 additions & 37 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -568,44 +568,44 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
}

// TODO: remove
// void WriteEncryptedMetadataFile2(
// const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
// std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
// auto file_encryptor = std::make_unique<InternalFileEncryptor>(
// file_encryption_properties.get(), ::arrow::default_memory_pool());
//
// if (file_encryption_properties->encrypted_footer()) {
// PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
// } else {
// // Encrypted file with plaintext footer mode.
// PARQUET_THROW_NOT_OK(sink->Write(kParquetMagic, 4));
// }
//
// if (file_encryption_properties->encrypted_footer()) {
// PARQUET_ASSIGN_OR_THROW(int64_t position, sink->Tell());
// uint64_t metadata_start = static_cast<uint64_t>(position);
//
// auto writer_props = parquet::WriterProperties::Builder()
// .encryption(file_encryption_properties)
// ->build();
// auto crypto_metadata =
// FileMetaDataBuilder::Make(metadata.schema(), writer_props)->GetCryptoMetaData();
// WriteFileCryptoMetaData(*crypto_metadata, sink.get());
//
// auto footer_encryptor = file_encryptor->GetFooterEncryptor();
// WriteEncryptedFileMetadata(metadata, sink.get(), footer_encryptor, true);
// PARQUET_ASSIGN_OR_THROW(position, sink->Tell());
// uint32_t footer_and_crypto_len = static_cast<uint32_t>(position - metadata_start);
// PARQUET_THROW_NOT_OK(
// sink->Write(reinterpret_cast<uint8_t*>(&footer_and_crypto_len), 4));
// PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
// } else { // Encrypted file with plaintext footer
// auto footer_signing_encryptor = file_encryptor->GetFooterSigningEncryptor();
// WriteEncryptedFileMetadata(metadata, sink.get(), footer_signing_encryptor, false);
// }
//}

void WriteEncryptedMetadataFile(
const FileMetaData& metadata, ::arrow::io::OutputStream* sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
auto file_encryptor = std::make_unique<InternalFileEncryptor>(
file_encryption_properties.get(), ::arrow::default_memory_pool());

if (file_encryption_properties->encrypted_footer()) {
PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
} else {
// Encrypted file with plaintext footer mode.
PARQUET_THROW_NOT_OK(sink->Write(kParquetMagic, 4));
}

if (file_encryption_properties->encrypted_footer()) {
PARQUET_ASSIGN_OR_THROW(int64_t position, sink->Tell());
uint64_t metadata_start = static_cast<uint64_t>(position);

auto writer_props = parquet::WriterProperties::Builder()
.encryption(file_encryption_properties)
->build();
auto crypto_metadata =
FileMetaDataBuilder::Make(metadata.schema(), writer_props)->GetCryptoMetaData();
WriteFileCryptoMetaData(*crypto_metadata, sink);

auto footer_encryptor = file_encryptor->GetFooterEncryptor();
WriteEncryptedFileMetadata(metadata, sink, footer_encryptor, true);
PARQUET_ASSIGN_OR_THROW(position, sink->Tell());
uint32_t footer_and_crypto_len = static_cast<uint32_t>(position - metadata_start);
PARQUET_THROW_NOT_OK(
sink->Write(reinterpret_cast<uint8_t*>(&footer_and_crypto_len), 4));
PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
} else { // Encrypted file with plaintext footer
auto footer_signing_encryptor = file_encryptor->GetFooterSigningEncryptor();
WriteEncryptedFileMetadata(metadata, sink, footer_signing_encryptor, false);
}
}

void WriteEncryptedMetadataFile2(
const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
auto schema = ::arrow::internal::checked_pointer_cast<GroupNode>(
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void WriteMetaDataFile(const FileMetaData& file_metadata,

PARQUET_EXPORT
void WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
const FileMetaData& file_metadata, ::arrow::io::OutputStream* sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties);

PARQUET_EXPORT
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:

CStatus WriteEncryptedMetadataFile(
const CFileMetaData& file_metadata,
shared_ptr[COutputStream]& sink,
const COutputStream* sink,
shared_ptr[CFileEncryptionProperties]& encryption_properties)

cdef class FileEncryptionProperties:
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ cdef class FileMetaData(_Weakrefable):
with nogil:
if encryption_properties is not None:
check_status(
WriteEncryptedMetadataFile(deref(self._metadata), sink, properties))
WriteEncryptedMetadataFile(deref(self._metadata), sink.get(), properties))
else:
check_status(WriteMetaDataFile(deref(self._metadata), sink.get()))

Expand Down
6 changes: 3 additions & 3 deletions python/pyarrow/tests/test_dataset_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def test_dataset_metadata_encryption_decryption(tempdir):
write_options = pformat.make_write_options(encryption_config=parquet_encryption_cfg)

path = str(tempdir / "sample_dataset")
metadata_file = str(tempdir / "_metadata")
metadata_file = str(tempdir / "sample_dataset" / "_metadata")
mockfs = fs._MockFileSystem()
mockfs.create_dir(path)

Expand Down Expand Up @@ -328,6 +328,6 @@ def test_dataset_metadata_encryption_decryption(tempdir):
metadata_file, decryption_properties=decryption_properties, filesystem=mockfs)

assert metadata.num_columns == 2
assert metadata.num_rows == 0
assert metadata.num_row_groups == 0
assert metadata.num_rows == 6
assert metadata.num_row_groups == 4
assert metadata.schema.to_arrow_schema() == metadata_schema

0 comments on commit e0c9418

Please sign in to comment.