-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2159] Adding support for partition level copy in Iceberg distcp #4058
Merged
phet
merged 33 commits into
apache:master
from
Blazer-007:iceberg_distcp_partition_copy_0
Oct 23, 2024
Merged
Changes from all commits
Commits
Show all changes
33 commits
Select commit
Hold shift + click to select a range
02ae2fc
initial changes for iceberg distcp partition copy
Blazer-007 981357c
added datetime filter predicate with unit tests
Blazer-007 7cd9353
changing string.class to object.class
Blazer-007 82d10d3
updated replace partition to use serialized data files
Blazer-007 c43d3e1
some code cleanup
Blazer-007 0cf7638
added unit test
Blazer-007 63bb9aa
added replace partition unit test
Blazer-007 6e1cf6b
refactored and added more test
Blazer-007 065cde3
added javadoc
Blazer-007 a13220d
removed extra lines
Blazer-007 e1d812f
some minor changes
Blazer-007 4364044
added retry and tests for replace partitions commit step
Blazer-007 66d81a3
minor test changes
Blazer-007 24b4823
added metadata validator
Blazer-007 d8356e1
removed validator class for now
Blazer-007 4dcc88b
addressed comments and removed some classes for now
Blazer-007 46bd976
fixing checkstyle bugs and disabling newly added tests to find root c…
Blazer-007 e1e6f57
addressed pr comments and added few extra logs
Blazer-007 b6163ba
refactored classes
Blazer-007 6c73a25
removed extra import statements
Blazer-007 9c35733
enabled the tests
Blazer-007 cdc863a
fixed iceberg table tests
Blazer-007 1dbe929
some refactoring
Blazer-007 383ed91
refactored tests as per review comments
Blazer-007 942ad8d
throw tablenotfoundexception in place of nosuchtableexception
Blazer-007 6a4cf78
fixed throwing proper exception
Blazer-007 2adaa8b
removed unused imports
Blazer-007 c948854
replcaed runtime exception with ioexception
Blazer-007 a55ee61
added check to avoid printing same log line
Blazer-007 1afc37a
fixed import order
Blazer-007 bb35070
added catch for CheckedExceptionFunction.WrappedIOException wrapper
Blazer-007 eeb8d25
fixed compile issue
Blazer-007 675e8bb
removing unwanted logging
Blazer-007 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
164 changes: 164 additions & 0 deletions
164
.../java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.gobblin.data.management.copy.iceberg; | ||
|
||
import java.io.IOException; | ||
import java.time.Duration; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.Properties; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.ExecutionException; | ||
|
||
import org.apache.iceberg.DataFile; | ||
import org.apache.iceberg.catalog.TableIdentifier; | ||
import org.apache.iceberg.util.SerializationUtil; | ||
|
||
import com.github.rholder.retry.Attempt; | ||
import com.github.rholder.retry.RetryException; | ||
import com.github.rholder.retry.RetryListener; | ||
import com.github.rholder.retry.Retryer; | ||
import com.google.common.collect.ImmutableMap; | ||
import com.typesafe.config.Config; | ||
import com.typesafe.config.ConfigFactory; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import org.apache.gobblin.commit.CommitStep; | ||
import org.apache.gobblin.util.retry.RetryerFactory; | ||
|
||
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS; | ||
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES; | ||
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE; | ||
import static org.apache.gobblin.util.retry.RetryerFactory.RetryType; | ||
|
||
/** | ||
* Commit step for overwriting partitions in an Iceberg table. | ||
* <p> | ||
* This class implements the {@link CommitStep} interface and provides functionality to overwrite | ||
* partitions in the destination Iceberg table using serialized data files. | ||
* </p> | ||
*/ | ||
@Slf4j | ||
public class IcebergOverwritePartitionsStep implements CommitStep { | ||
private final String destTableIdStr; | ||
private final Properties properties; | ||
private final byte[] serializedDataFiles; | ||
private final String partitionColName; | ||
private final String partitionValue; | ||
public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + | ||
".catalog.overwrite.partitions.retries"; | ||
private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of( | ||
RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L), | ||
RETRY_TIMES, 3, | ||
RETRY_TYPE, RetryType.FIXED_ATTEMPT.name())); | ||
|
||
/** | ||
* Constructs an {@code IcebergReplacePartitionsStep} with the specified parameters. | ||
* | ||
* @param destTableIdStr the identifier of the destination table as a string | ||
* @param serializedDataFiles [from List<DataFiles>] the serialized data files to be used for replacing partitions | ||
* @param properties the properties containing configuration | ||
*/ | ||
public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, byte[] serializedDataFiles, Properties properties) { | ||
this.destTableIdStr = destTableIdStr; | ||
this.partitionColName = partitionColName; | ||
this.partitionValue = partitionValue; | ||
this.serializedDataFiles = serializedDataFiles; | ||
this.properties = properties; | ||
} | ||
|
||
@Override | ||
public boolean isCompleted() { | ||
return false; | ||
} | ||
|
||
/** | ||
* Executes the partition replacement in the destination Iceberg table. | ||
* Also, have retry mechanism as done in {@link IcebergRegisterStep#execute()} | ||
* | ||
* @throws IOException if an I/O error occurs during execution | ||
*/ | ||
@Override | ||
public void execute() throws IOException { | ||
Blazer-007 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Unlike IcebergRegisterStep::execute, which validates dest table metadata has not changed between copy entity | ||
// generation and the post-copy commit, do no such validation here, so dest table writes may continue throughout | ||
// our copying. any new data written in the meanwhile to THE SAME partitions we are about to overwrite will be | ||
// clobbered and replaced by the copy entities from our execution. | ||
IcebergTable destTable = createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr)); | ||
List<DataFile> dataFiles = SerializationUtil.deserializeFromBytes(this.serializedDataFiles); | ||
try { | ||
log.info("~{}~ Starting partition overwrite - partition: {}; value: {}; numDataFiles: {}; path[0]: {}", | ||
this.destTableIdStr, | ||
this.partitionColName, | ||
this.partitionValue, | ||
dataFiles.size(), | ||
dataFiles.get(0).path() | ||
); | ||
Retryer<Void> overwritePartitionsRetryer = createOverwritePartitionsRetryer(); | ||
overwritePartitionsRetryer.call(() -> { | ||
destTable.overwritePartition(dataFiles, this.partitionColName, this.partitionValue); | ||
return null; | ||
}); | ||
log.info("~{}~ Successful partition overwrite - partition: {}; value: {}", | ||
this.destTableIdStr, | ||
this.partitionColName, | ||
this.partitionValue | ||
); | ||
} catch (ExecutionException executionException) { | ||
String msg = String.format("~%s~ Failed to overwrite partitions", this.destTableIdStr); | ||
log.error(msg, executionException); | ||
throw new RuntimeException(msg, executionException.getCause()); | ||
} catch (RetryException retryException) { | ||
String interruptedNote = Thread.currentThread().isInterrupted() ? "... then interrupted" : ""; | ||
String msg = String.format("~%s~ Failure attempting to overwrite partition [num failures: %d] %s", | ||
this.destTableIdStr, | ||
retryException.getNumberOfFailedAttempts(), | ||
interruptedNote); | ||
Throwable informativeException = retryException.getLastFailedAttempt().hasException() | ||
? retryException.getLastFailedAttempt().getExceptionCause() | ||
: retryException; | ||
log.error(msg, informativeException); | ||
throw new RuntimeException(msg, informativeException); | ||
} | ||
} | ||
|
||
protected IcebergCatalog createDestinationCatalog() throws IOException { | ||
return IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.DESTINATION); | ||
} | ||
|
||
private Retryer<Void> createOverwritePartitionsRetryer() { | ||
Config config = ConfigFactory.parseProperties(this.properties); | ||
Config retryerOverridesConfig = config.hasPath(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX) | ||
? config.getConfig(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX) | ||
: ConfigFactory.empty(); | ||
|
||
return RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG), Optional.of(new RetryListener() { | ||
@Override | ||
public <V> void onRetry(Attempt<V> attempt) { | ||
if (attempt.hasException()) { | ||
String msg = String.format("~%s~ Exception while overwriting partitions [attempt: %d; elapsed: %s]", | ||
destTableIdStr, | ||
attempt.getAttemptNumber(), | ||
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString()); | ||
log.warn(msg, attempt.getExceptionCause()); | ||
} | ||
} | ||
})); | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for good measure you could also make
IcebergTable.TableNotFoundException
a declared/checked exception here.I'm tempted to re-situate the exception as
IcebergCatalog.TableNotFoundException
, but I don't want two classes w/ the same semantics - and renaming public interfaces is probably too late... so I'll make peace with the current nameThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed not throwing here instead catching NoSuchTableException in BaseIcebergCatalog::openTable and throwing IcebergTable.TableNotFoundException from there.