-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2147] Added lookback time fetch in partitioned filesource #4044
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ | |
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; | ||
import org.apache.gobblin.source.extractor.hadoop.HadoopFsHelper; | ||
import org.apache.gobblin.util.DatePartitionType; | ||
import org.apache.gobblin.util.measurement.GrowthMilestoneTracker; | ||
|
||
import static org.apache.gobblin.source.PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN; | ||
|
||
|
@@ -54,9 +55,9 @@ | |
* | ||
* For example, if {@link ConfigurationKeys#SOURCE_FILEBASED_DATA_DIRECTORY} is set to /my/data/, then the class assumes | ||
* folders following the pattern /my/data/daily/[year]/[month]/[day] are present. It will iterate through all the data | ||
* under these folders starting from the date specified by {@link #DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE} until | ||
* either {@link #DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB} files have been processed, or until there is no more data | ||
* to process. For example, if {@link #DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE} is set to 2015/01/01, then the job | ||
* under these folders starting from the date specified by {@link PartitionedFileSourceBase#DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE} until | ||
* either {@link PartitionedFileSourceBase#DATE_PARTITIONED_SOURCE_MAX_FILES_PER_JOB} files have been processed, or until there is no more data | ||
* to process. For example, if {@link PartitionedFileSourceBase#DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE} is set to 2015/01/01, then the job | ||
* will read from the folder /my/data/daily/2015/01/01/, /my/data/daily/2015/01/02/, /my/data/2015/01/03/ etc. | ||
* | ||
*/ | ||
|
@@ -114,6 +115,10 @@ public List<FileInfo> getFilesToProcess(long minWatermark, int maxFilesToReturn) | |
throw new IOException("Error initializing FileSystem", e); | ||
} | ||
|
||
GrowthMilestoneTracker growthTracker = new GrowthMilestoneTracker(); | ||
Long iteration = 0L; | ||
LOG.info("Starting processing files from {} to {}", lowWaterMarkDate, currentDay); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add context about which partition of which dataset |
||
|
||
for (DateTime date = lowWaterMarkDate; !date.isAfter(currentDay) && filesToProcess.size() < maxFilesToReturn; | ||
date = date.withFieldAdded(incrementalUnit, 1)) { | ||
|
||
|
@@ -129,8 +134,15 @@ public List<FileInfo> getFilesToProcess(long minWatermark, int maxFilesToReturn) | |
new FileInfo(fileStatus.getPath().toString(), fileStatus.getLen(), date.getMillis(), partitionPath)); | ||
} | ||
} | ||
|
||
if (growthTracker.isAnotherMilestone(iteration++)) { | ||
LOG.info("~{}~ collected files to process", filesToProcess.size()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wouldn't this log the same number of files every time there's a milestone? I believe you intend to log how many files were processed thus far when reaching each milestone... correct? ideally you might put some description in the first substitution ( also, collapse adjacent logging calls, rather than keeping separate. e.g.
|
||
LOG.info("Last Source Path processed : ~{}~", sourcePath); | ||
} | ||
} | ||
|
||
LOG.info("Finished processing files"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here. also, let's provide the count of how many completed - |
||
|
||
return filesToProcess; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,14 @@ | |
*/ | ||
package org.apache.gobblin.source; | ||
|
||
import java.util.Optional; | ||
|
||
import org.joda.time.DateTimeFieldType; | ||
import org.joda.time.Duration; | ||
import org.joda.time.chrono.ISOChronology; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import org.apache.gobblin.configuration.State; | ||
import org.apache.gobblin.util.DatePartitionType; | ||
|
@@ -32,6 +39,7 @@ | |
* objects. | ||
*/ | ||
public class PartitionAwareFileRetrieverUtils { | ||
private static final Logger LOG = LoggerFactory.getLogger(PartitionAwareFileRetrieverUtils.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we use the lombok annotation |
||
/** | ||
* Retrieve the lead time duration from the LEAD_TIME and LEAD_TIME granularity config settings. | ||
*/ | ||
|
@@ -52,4 +60,29 @@ public static Duration getLeadTimeDurationFromConfig(State state) { | |
|
||
return new Duration(leadTime * leadTimeGranularity.getUnitMilliseconds()); | ||
} | ||
|
||
/** | ||
* Calculates the lookback time duration based on the provided lookback time string. | ||
* | ||
* @param lookBackTime the lookback time string, which should include a numeric value followed by a time unit character. | ||
* For example, "5d" for 5 days or "10h" for 10 hours. | ||
Comment on lines
+67
to
+68
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the flexibility here (often we've asked for two props--one for the number and the other for the time unit). in any event, please link to the spec that documents the suffixes and what they mean (e.g. 'd', 'h', 'M', etc.) |
||
* @return an {@link Optional} containing the {@link Duration} if the lookback time is valid, or | ||
* an empty {@link Optional} if the lookback time is invalid or cannot be parsed. | ||
*/ | ||
public static Optional<Duration> getLookbackTimeDuration(String lookBackTime) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. although I am a big proponent of since string parsing like this regularly requires error handling, it seems reasonable and basically expected to throw an exception. you can avow in javadoc that the |
||
try { | ||
DateTimeFieldType lookBackTimeGranularity = DatePartitionType.getLowestIntervalUnit(lookBackTime); | ||
if (lookBackTimeGranularity != null) { | ||
long lookBackTimeGranularityInMillis = | ||
lookBackTimeGranularity.getDurationType().getField(ISOChronology.getInstance()).getUnitMillis(); | ||
long lookBack = Long.parseLong(lookBackTime.substring(0, lookBackTime.length() - 1)); | ||
return Optional.of(new Duration(lookBack * lookBackTimeGranularityInMillis)); | ||
} | ||
LOG.warn("There is no valid time granularity for lookback time: {}", lookBackTime); | ||
return Optional.empty(); | ||
} catch(NumberFormatException ex) { | ||
LOG.warn("Exception Caught while parsing lookback time", ex); | ||
return Optional.empty(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,14 +17,28 @@ | |
|
||
package org.apache.gobblin.source; | ||
|
||
import com.google.common.base.Throwables; | ||
import java.io.IOException; | ||
import java.net.URI; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.hadoop.fs.Path; | ||
import org.joda.time.DateTime; | ||
import org.joda.time.Duration; | ||
import org.joda.time.format.DateTimeFormat; | ||
import org.joda.time.format.DateTimeFormatter; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import com.google.common.base.Throwables; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import org.apache.gobblin.configuration.ConfigurationKeys; | ||
import org.apache.gobblin.configuration.SourceState; | ||
import org.apache.gobblin.configuration.State; | ||
|
@@ -43,13 +57,6 @@ | |
import org.apache.gobblin.source.workunit.WorkUnit; | ||
import org.apache.gobblin.util.DatePartitionType; | ||
import org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner; | ||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.hadoop.fs.Path; | ||
import org.joda.time.Duration; | ||
import org.joda.time.format.DateTimeFormat; | ||
import org.joda.time.format.DateTimeFormatter; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
||
/** | ||
|
@@ -119,6 +126,9 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS | |
private static final String DATE_PARTITIONED_SOURCE_MAX_WORKUNITS_PER_JOB = | ||
DATE_PARTITIONED_SOURCE_PREFIX + ".max.workunits.per.job"; | ||
|
||
private static final String DATE_PARTITIONED_SOURCE_LOOKBACK_TIME = | ||
DATE_PARTITIONED_SOURCE_PREFIX + ".lookback.time"; | ||
|
||
// Default configuration parameter values | ||
|
||
/** | ||
|
@@ -143,6 +153,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS | |
private SourceState sourceState; | ||
private FileSystem fs; | ||
private long lowWaterMark; | ||
private String lookBackTime; | ||
private int maxFilesPerJob; | ||
private int maxWorkUnitsPerJob; | ||
private int fileCount; | ||
|
@@ -171,6 +182,8 @@ protected void init(SourceState state) { | |
|
||
this.sourceState = state; | ||
|
||
this.lookBackTime = getLookBackTimeProp(state); | ||
|
||
this.lowWaterMark = | ||
getLowWaterMark(state.getPreviousWorkUnitStates(), state.getProp(DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE, | ||
String.valueOf(DEFAULT_DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE))); | ||
|
@@ -347,11 +360,22 @@ private void addSchemaFile(PartitionAwareFileRetriever.FileInfo dataFile, WorkUn | |
} | ||
|
||
/** | ||
* Gets the LWM for this job runs. The new LWM is the HWM of the previous run + 1 unit (day,hour,minute..etc). | ||
* Gets the LWM for this job runs. | ||
* If the lookback property {@link PartitionedFileSourceBase#DATE_PARTITIONED_SOURCE_LOOKBACK_TIME)} or | ||
* {@link ConfigurationKeys#DEFAULT_COPY_LOOKBACK_TIME_KEY} is provided | ||
* then the LWM is set to the current time minus the lookback time. | ||
* Otherwise, The new LWM is the HWM of the previous run + 1 unit (day,hour,minute.etc.). | ||
* If there was no previous execution then it is set to the given lowWaterMark + 1 unit. | ||
*/ | ||
private long getLowWaterMark(Iterable<WorkUnitState> previousStates, String lowWaterMark) { | ||
|
||
if (StringUtils.isNotEmpty(this.lookBackTime)) { | ||
Optional<Duration> lookBackDuration = PartitionAwareFileRetrieverUtils.getLookbackTimeDuration(this.lookBackTime); | ||
if (lookBackDuration.isPresent()) { | ||
return new DateTime().minus(lookBackDuration.get()).getMillis(); | ||
} | ||
} | ||
|
||
long lowWaterMarkValue = retriever.getWatermarkFromString(lowWaterMark); | ||
|
||
// Find the max HWM from the previous states, this is the new current LWM | ||
|
@@ -370,4 +394,11 @@ private long getLowWaterMark(Iterable<WorkUnitState> previousStates, String lowW | |
protected PartitionAwareFileRetriever getRetriever() { | ||
return retriever; | ||
} | ||
|
||
private String getLookBackTimeProp(SourceState state) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could be |
||
if (state.contains(DATE_PARTITIONED_SOURCE_LOOKBACK_TIME)) { | ||
return state.getProp(DATE_PARTITIONED_SOURCE_LOOKBACK_TIME); | ||
} | ||
return state.getProp(ConfigurationKeys.DEFAULT_COPY_LOOKBACK_TIME_KEY, ""); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check the datapacks MP, this config is used to set another config (configX), which is used at various places in gobblin to find the lookback period. Please use that configX
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The property is "gobblin.copy.recursive.lookback.time", isn't it will be confusing to use other DatasetFinder config in other finder class ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not 100% clear here...
is the issue that: