Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-781. Direct support for PathOutputCommitter #1361

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class ParquetProperties {

public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;

public static final boolean DEFAULT_PAGE_PATH_OUTPUT_COMMITTER_ENABLED = false;

public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();

private static final int MIN_SLAB_SIZE = 64;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.parquet.column.ParquetProperties.DEFAULT_ADAPTIVE_BLOOM_FILTER_ENABLED;
import static org.apache.parquet.column.ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED;
import static org.apache.parquet.column.ParquetProperties.DEFAULT_PAGE_PATH_OUTPUT_COMMITTER_ENABLED;
import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;

Expand Down Expand Up @@ -98,7 +99,14 @@
* </pre>
* <p>
* if none of those is set the data is uncompressed.
*
* <p>
* This class also generates the committer required to manifest the work in the
* destination directory if and when the job is committed.
* This has historically always created an instance of {@link ParquetOutputCommitter}.
* If {@link #PAGE_PATH_OUTPUT_COMMITTER_ENABLED} is true, the superclass is used
* to create the committer, which on Hadoop 3.1 and later involves the
* {@code PathOutputCommitterFactory} mechanism to dynamically choose a committer
* for the target filesystem. Such committers do not generated summary files.
* @param <T> the type of the materialized records
*/
public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
Expand Down Expand Up @@ -158,6 +166,13 @@ public static enum JobSummaryLevel {
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";

/**
* Use the output committer created by the superclass, rather than a {@link ParquetOutputCommitter}.
* This delivers correctness and scalability on cloud storage, but will not write schema files.
* Value: {@value}.
*/
public static final String PAGE_PATH_OUTPUT_COMMITTER_ENABLED = "parquet.path.outputcommitter.enabled";

public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
String level = conf.get(JOB_SUMMARY_LEVEL);
String deprecatedFlag = conf.get(ENABLE_JOB_SUMMARY);
Expand Down Expand Up @@ -390,7 +405,7 @@ public static boolean getPageWriteChecksumEnabled(Configuration conf) {
}

private WriteSupport<T> writeSupport;
private ParquetOutputCommitter committer;
private OutputCommitter committer;

/**
* constructor used when this OutputFormat in wrapped in another one (In Pig for example)
Expand Down Expand Up @@ -555,7 +570,22 @@ public WriteSupport<T> getWriteSupport(Configuration configuration) {
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new ParquetOutputCommitter(output, context);
final Configuration conf = context.getConfiguration();
if (conf.getBoolean(PAGE_PATH_OUTPUT_COMMITTER_ENABLED, DEFAULT_PAGE_PATH_OUTPUT_COMMITTER_ENABLED)) {
// hand off creation of a committer to superclass.
// On hadoop 3.1+ this will use a factory mechanism to dynamically
// bind to a filesystem specific committer, an explict override
// or fall back to the classic FileOutputCommitter
committer = super.getOutputCommitter(context);
LOG.debug("Writing to {} with output committer {}", committer, output);

if (ParquetOutputFormat.getJobSummaryLevel(conf) != JobSummaryLevel.NONE) {
// warn if summary file generation has been requested, as they won't be created.
LOG.warn("Committer {} does not support summary files", committer);
}
} else {
committer = new ParquetOutputCommitter(output, context);
}
}
return committer;
}
Expand Down