Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2147] Added lookback time fetch in partitioned filesource #4044

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Blazer-007
Copy link
Contributor

@Blazer-007 Blazer-007 commented Sep 1, 2024

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • [✅] Here are some details about my PR, including screenshots (if applicable):
    • In partitioned file source based copy even if copy.lookbackTime property was passed in config it wasn't used and files from lowest watermark (if passed, otherwise default value was used) were being processed which can lead to too much processing of files in case granularity isn't configured properly.
    • With this change user can pass "copy.lookbackTime" or "date.partitioned.source.lookback.time"
    • If in case user doesn't passed values or values are not proper or there is any exception while parsing the property value then it will fallback to watermark values.

Tests

  • [✖️] My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • [✅] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@Blazer-007 Blazer-007 marked this pull request as ready for review September 1, 2024 14:39
@Blazer-007 Blazer-007 changed the title [GOBBLIN-] Added lookback time fetch in partitioned filesource [GOBBLIN-2147] Added lookback time fetch in partitioned filesource Sep 1, 2024
/**
* DEFAULT LOOKBACK TIME KEY property
*/
public static final String DEFAULT_COPY_LOOKBACK_TIME_KEY = "copy.lookbackTime";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check the datapacks MP, this config is used to set another config (configX), which is used at various places in gobblin to find the lookback period. Please use that configX

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The property is "gobblin.copy.recursive.lookback.time", isn't it will be confusing to use other DatasetFinder config in other finder class ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not 100% clear here...

is the issue that:

  • the same prop name is already in use elsewhere so we should avoid conflicting?
  • that a config w/ similar semantics already exists, and we might intentionally borrow it here for uniformity?
  • or something else...?

@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 41.14%. Comparing base (444f266) to head (1299752).
Report is 2 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4044      +/-   ##
============================================
+ Coverage     38.79%   41.14%   +2.35%     
- Complexity     1599     2204     +605     
============================================
  Files           388      480      +92     
  Lines         15998    20360    +4362     
  Branches       1585     2355     +770     
============================================
+ Hits           6207     8378    +2171     
- Misses         9293    11090    +1797     
- Partials        498      892     +394     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -129,8 +134,15 @@ public List<FileInfo> getFilesToProcess(long minWatermark, int maxFilesToReturn)
new FileInfo(fileStatus.getPath().toString(), fileStatus.getLen(), date.getMillis(), partitionPath));
}
}

if (growthTracker.isAnotherMilestone(iteration++)) {
LOG.info("~{}~ collected files to process", filesToProcess.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this log the same number of files every time there's a milestone? I believe you intend to log how many files were processed thus far when reaching each milestone... correct?

ideally you might put some description in the first substitution (~{}~) to contextualize the processing, and add the size in addition.

also, collapse adjacent logging calls, rather than keeping separate. e.g.

LOG.info("~{}~ collected {} (of {}) files to process; most-recent source path: '{}'",
   description, iteration, files.size(), sourcePath);

@@ -114,6 +115,10 @@ public List<FileInfo> getFilesToProcess(long minWatermark, int maxFilesToReturn)
throw new IOException("Error initializing FileSystem", e);
}

GrowthMilestoneTracker growthTracker = new GrowthMilestoneTracker();
Long iteration = 0L;
LOG.info("Starting processing files from {} to {}", lowWaterMarkDate, currentDay);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add context about which partition of which dataset

}

LOG.info("Finished processing files");
Copy link
Contributor

@phet phet Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. also, let's provide the count of how many completed - filesToProcess.size()

@@ -32,6 +39,7 @@
* objects.
*/
public class PartitionAwareFileRetrieverUtils {
private static final Logger LOG = LoggerFactory.getLogger(PartitionAwareFileRetrieverUtils.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use the lombok annotation @Slf4j?

Comment on lines +67 to +68
* @param lookBackTime the lookback time string, which should include a numeric value followed by a time unit character.
* For example, "5d" for 5 days or "10h" for 10 hours.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the flexibility here (often we've asked for two props--one for the number and the other for the time unit).

in any event, please link to the spec that documents the suffixes and what they mean (e.g. 'd', 'h', 'M', etc.)

* @return an {@link Optional} containing the {@link Duration} if the lookback time is valid, or
* an empty {@link Optional} if the lookback time is invalid or cannot be parsed.
*/
public static Optional<Duration> getLookbackTimeDuration(String lookBackTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although I am a big proponent of Optional, in this case it loses the specific errors, putting them in the log rather than "returning" them to the caller, where they might be presented in a meaningful end-user error message (rather than logs, which are for developers).

since string parsing like this regularly requires error handling, it seems reasonable and basically expected to throw an exception. you can avow in javadoc that the Duration return value is never null

/**
* DEFAULT LOOKBACK TIME KEY property
*/
public static final String DEFAULT_COPY_LOOKBACK_TIME_KEY = "copy.lookbackTime";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not 100% clear here...

is the issue that:

  • the same prop name is already in use elsewhere so we should avoid conflicting?
  • that a config w/ similar semantics already exists, and we might intentionally borrow it here for uniformity?
  • or something else...?

@@ -370,4 +394,11 @@ private long getLowWaterMark(Iterable<WorkUnitState> previousStates, String lowW
protected PartitionAwareFileRetriever getRetriever() {
return retriever;
}

private String getLookBackTimeProp(SourceState state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be static

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants