Skip to content

Commit

Permalink
Python wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Sep 30, 2024
1 parent d46fea4 commit 7c4b5bb
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 10 deletions.
6 changes: 3 additions & 3 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1097,15 +1097,15 @@ void FileMetaData::WriteTo(::arrow::io::OutputStream* dst,
}

::arrow::Result<std::shared_ptr<parquet::FileMetaData>> FileMetaData::CoalesceMetadata(
const std::vector<std::shared_ptr<parquet::FileMetaData>>& metadata_list,
const std::shared_ptr<parquet::WriterProperties>& writer_props) {
std::vector<std::shared_ptr<parquet::FileMetaData>>& metadata_list,
std::shared_ptr<parquet::WriterProperties>& writer_props) {
if (metadata_list.empty()) {
return ::arrow::Status::Invalid("No metadata to coalesce");
}

std::vector<std::string> values, keys;

const auto& metadata = metadata_list[0];
auto metadata = metadata_list[0];
// Read metadata from all dataset files and store AADs and paths as key-value metadata.
for (size_t i = 1; i < metadata_list.size(); i++) {
const auto& file_metadata = metadata_list[i];
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ class PARQUET_EXPORT FileMetaData {
/// \param[in] writer_props
/// \return
static ::arrow::Result<std::shared_ptr<FileMetaData>> CoalesceMetadata(
const std::vector<std::shared_ptr<FileMetaData>>& metadata_list,
const std::shared_ptr<WriterProperties>& writer_props);
std::vector<std::shared_ptr<FileMetaData>>& metadata_list,
std::shared_ptr<WriterProperties>& writer_props);

/// \brief Set the AAD of decryptor of the file.
///
Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
inline EncryptionAlgorithm encryption_algorithm() const
inline const c_string& footer_signing_key_metadata() const

cdef CResult[shared_ptr[CFileMetaData]] CFileMetaData_CoalesceMetadata \
" parquet::FileMetaData::CoalesceMetadata"(const vector[shared_ptr[CFileMetaData]]& metadata_list,
const shared_ptr[WriterProperties]& properties)

cdef shared_ptr[CFileMetaData] CFileMetaData_Make \
" parquet::FileMetaData::Make"(const void* serialized_metadata,
uint32_t* metadata_len)
Expand Down
19 changes: 19 additions & 0 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,25 @@ cdef class FileMetaData(_Weakrefable):
c_metadata = other.sp_metadata
self._metadata.AppendRowGroups(deref(c_metadata))

@classmethod
def coalesce_metadata(cls, metadata_list):
"""
"""
cdef:
FileMetaData metadata = FileMetaData.__new__(FileMetaData)
vector[shared_ptr[CFileMetaData]] c_metadata_list
shared_ptr[WriterProperties] c_properties = _create_writer_properties()
shared_ptr[CFileMetaData] c_metadata

for metadata in metadata_list:
c_metadata_list.push_back((<FileMetaData> metadata).sp_metadata)

c_metadata = GetResultValue(
CFileMetaData_CoalesceMetadata(c_metadata_list, c_properties))
metadata.init(c_metadata)
return metadata

def write_metadata_file(self, where, encryption_properties=None):
"""
Write the metadata to a metadata-only Parquet file.
Expand Down
12 changes: 7 additions & 5 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import warnings

import pyarrow as pa
# from pyarrow._parquet cimport _create_writer_properties

try:
import pyarrow._parquet as _parquet
Expand Down Expand Up @@ -2053,7 +2054,7 @@ def write_to_dataset(table, root_path, partition_cols=None,
The metadata attribute will be the parquet metadata of the file.
This metadata will have the file path attribute set and can be used
to build a _metadata file. The metadata attribute will be None if
to build a _metadata file. The metadata attribute will be None if
the format is not parquet.
Example visitor which simple collects the filenames created::
Expand Down Expand Up @@ -2149,7 +2150,10 @@ def file_visitor(written_file):

if metadata_collector is not None:
def file_visitor(written_file):
metadata_collector.append(written_file.metadata)
metadata = written_file.metadata
# TODO: is set_file_path needed?
metadata.set_file_path(written_file.path)
metadata_collector.append(metadata)

# map format arguments
parquet_format = ds.ParquetFileFormat()
Expand Down Expand Up @@ -2251,12 +2255,10 @@ def write_metadata(schema, where, metadata_collector=None, filesystem=None,
writer.close()

if metadata_collector is not None:
metadata = read_metadata(where, filesystem=filesystem, **read_metadata_kwargs)
if hasattr(where, "seek"):
where.seek(cursor_position) # file-like, set cursor back.

for m in metadata_collector:
metadata.append_row_groups(m)
metadata = FileMetaData.coalesce_metadata(metadata_collector)
if filesystem is not None:
with filesystem.open_output_stream(where) as f:
metadata.write_metadata_file(f, **write_metadata_kwargs)
Expand Down

0 comments on commit 7c4b5bb

Please sign in to comment.