Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed May 24, 2024
1 parent 19044ee commit b5d7464
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 6 deletions.
6 changes: 6 additions & 0 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ PARQUET_EXPORT
::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink);

/// \brief Write encrypted metadata-only Parquet file to indicated Arrow OutputStream
PARQUET_EXPORT
::arrow::Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, ArrowOutputStream* sink,
FileEncryptionProperties* file_encryption_properties);

/// \brief Write a Table to Parquet.
///
/// This writes one table in a single shot. To write a Parquet file with
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,17 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
}
}

void WriteEncryptedMetadataFile(
const FileMetaData& metadata, ArrowOutputStream* sink,
FileEncryptionProperties* file_encryption_properties,
const std::shared_ptr<WriterProperties> writer_properties) {
auto encryptor = std::make_shared<InternalFileEncryptor>(
file_encryption_properties, ::arrow::default_memory_pool());

return WriteEncryptedFileMetadata(metadata, sink, encryptor->GetFooterEncryptor(),
file_encryption_properties->encrypted_footer());
}

void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata,
ArrowOutputStream* sink) {
crypto_metadata.WriteTo(sink);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ PARQUET_EXPORT
void WriteMetaDataFile(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink);

PARQUET_EXPORT
void WriteEncryptedMetadataFile(const FileMetaData& file_metadata,
ArrowOutputStream* sink,
FileEncryptionProperties* file_encryption_properties);

PARQUET_EXPORT
void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
ArrowOutputStream* sink,
Expand Down
5 changes: 5 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,11 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
const CFileMetaData& file_metadata,
const COutputStream* sink)

CStatus WriteEncryptedMetadataFile(
const CFileMetaData& file_metadata,
COutputStream* sink,
CFileEncryptionProperties* encryption_properties)

cdef class FileEncryptionProperties:
"""File-level encryption properties for the low-level API"""
cdef:
Expand Down
15 changes: 12 additions & 3 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ cdef class FileMetaData(_Weakrefable):
c_metadata = other.sp_metadata
self._metadata.AppendRowGroups(deref(c_metadata))

def write_metadata_file(self, where):
def write_metadata_file(self, where, encryption_properties=None):
"""
Write the metadata to a metadata-only Parquet file.
Expand All @@ -1054,10 +1054,13 @@ cdef class FileMetaData(_Weakrefable):
where : path or file-like object
Where to write the metadata. Should be a writable path on
the local filesystem, or a writable file-like object.
encryption_properties : EncryptionProperties
Optional encryption properties to use when encrypting metadata.
"""
cdef:
shared_ptr[COutputStream] sink
c_string c_where
shared_ptr[CFileEncryptionProperties] properties

try:
where = _stringify_path(where)
Expand All @@ -1068,9 +1071,15 @@ cdef class FileMetaData(_Weakrefable):
with nogil:
sink = GetResultValue(FileOutputStream.Open(c_where))

if encryption_properties is not None:
properties = (<FileEncryptionProperties> encryption_properties).unwrap()

with nogil:
check_status(
WriteMetaDataFile(deref(self._metadata), sink.get()))
if encryption_properties is not None:
check_status(
WriteEncryptedMetadataFile(deref(self._metadata), sink.get(), properties.get()))
else:
check_status(WriteMetaDataFile(deref(self._metadata), sink.get()))


cdef class ParquetSchema(_Weakrefable):
Expand Down
7 changes: 4 additions & 3 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2147,7 +2147,7 @@ def file_visitor(written_file):


def write_metadata(schema, where, metadata_collector=None, filesystem=None,
**kwargs):
encryption_properties=None, **kwargs):
"""
Write metadata-only Parquet file from schema. This can be used with
`write_to_dataset` to generate `_common_metadata` and `_metadata` sidecar
Expand All @@ -2162,6 +2162,7 @@ def write_metadata(schema, where, metadata_collector=None, filesystem=None,
filesystem : FileSystem, default None
If nothing passed, will be inferred from `where` if path-like, else
`where` is already a file-like object so no filesystem is needed.
encryption_properties : FileEncryptionProperties, default None
**kwargs : dict,
Additional kwargs for ParquetWriter class. See docstring for
`ParquetWriter` for more information.
Expand Down Expand Up @@ -2213,9 +2214,9 @@ def write_metadata(schema, where, metadata_collector=None, filesystem=None,
metadata.append_row_groups(m)
if filesystem is not None:
with filesystem.open_output_stream(where) as f:
metadata.write_metadata_file(f)
metadata.write_metadata_file(f, encryption_properties)
else:
metadata.write_metadata_file(where)
metadata.write_metadata_file(where, encryption_properties)


def read_metadata(where, memory_map=False, decryption_properties=None,
Expand Down
87 changes: 87 additions & 0 deletions python/pyarrow/tests/test_dataset_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,90 @@ def unwrap_key(self, wrapped_key: bytes, _: str) -> bytes:
dataset = ds.dataset(path, format=file_format, filesystem=mockfs)
new_table = dataset.to_table()
assert table == new_table


def test_dataset_metadata_encryption_decryption(tempdir):
directory = tempdir / "data_dir"
directory.mkdir()
metadata_path = directory / "_metadata"

table = pa.table(
{
"col1": [1, 2, 3],
"col2": [1, 2, 3],
"year": [2020, 2020, 2021]
}
)

class KmsClient(pe.KmsClient):
def unwrap_key(self, wrapped_key, master_key_identifier):
return base64.b64decode(wrapped_key)

def wrap_key(self, key_bytes, master_key_identifier):
return base64.b64encode(key_bytes)

crypto_factory = pe.CryptoFactory(lambda *a, **k: KmsClient())
encryption_config = pe.EncryptionConfiguration(
footer_key="TEST",
column_keys={
"TEST": ["col2"]
},
double_wrapping=False,
plaintext_footer=False,
)
kms_connection_config = pe.KmsConnectionConfig()
parquet_encryption_cfg = ds.ParquetEncryptionConfig(
crypto_factory, kms_connection_config, encryption_config
)
encryption_properties = crypto_factory.file_encryption_properties(
kms_connection_config, encryption_config)

metadata_collector = []

pq.write_to_dataset(
table,
directory,
partitioning=ds.partitioning(
schema=pa.schema([
pa.field("year", pa.int16())
]),
flavor="hive"
),
encryption_config=parquet_encryption_cfg,
metadata_collector=metadata_collector
)

pq.write_metadata(
pa.schema(
field
for field in table.schema
if field.name != "year"
),
metadata_path,
metadata_collector,
encryption_properties=encryption_properties
)

decryption_config = pe.DecryptionConfiguration(cache_lifetime=300)
kms_connection_config = pe.KmsConnectionConfig()

decryption_properties = crypto_factory.file_decryption_properties(
kms_connection_config, decryption_config)
pq_scan_opts = ds.ParquetFragmentScanOptions(
decryption_properties=decryption_properties
)
pformat = pa.dataset.ParquetFileFormat(default_fragment_scan_options=pq_scan_opts)

dataset = ds.parquet_dataset(
metadata_path,
format=pformat,
partitioning=ds.partitioning(
schema=pa.schema([
pa.field("year", pa.int16())
]),
flavor="hive"
)
)

new_table = dataset.to_table()
assert table == new_table

0 comments on commit b5d7464

Please sign in to comment.