Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow-up changes based on #3768 #3773

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@

import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Getter;


/**
* A class encapsulating the count of records in the consecutive intervals.
*/
@Getter
public class Histogram {
private long totalRecordCount;
private final List<HistogramGroup> groups;
private final List<Group> groups;

Histogram() {
public Histogram() {
totalRecordCount = 0;
groups = new ArrayList<>();
}

void add(HistogramGroup group) {
void add(Group group) {
groups.add(group);
totalRecordCount += group.getCount();
}
Expand All @@ -41,12 +45,28 @@ void add(Histogram histogram) {
totalRecordCount += histogram.totalRecordCount;
}

HistogramGroup get(int idx) {
Group get(int idx) {
return this.groups.get(idx);
}

@Override
public String toString() {
return groups.toString();
}

/**
* A class to encapsulate the key and the corresponding frequency/count, in the context of a
* histogram. It represents one data point in the histogram.
*/
@Getter
@AllArgsConstructor
static class Group {
private final String key;
private final int count;

@Override
public String toString() {
return key + ":" + count;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
* mapping of number of records to be fetched by time intervals.
*/
@Slf4j
public class SalesforceHistogramService {
public class RecordModTimeHistogramService {
private static final int MIN_SPLIT_TIME_MILLIS = 1000;
private static final String ZERO_TIME_SUFFIX = "-00:00:00";
private static final Gson GSON = new Gson();
Expand All @@ -74,7 +74,7 @@ public class SalesforceHistogramService {
protected SalesforceConnector salesforceConnector;
private final SfConfig sfConfig;

SalesforceHistogramService(SfConfig sfConfig, SalesforceConnector connector) {
RecordModTimeHistogramService(SfConfig sfConfig, SalesforceConnector connector) {
this.sfConfig = sfConfig;
salesforceConnector = connector;
}
Expand All @@ -97,9 +97,9 @@ Histogram getHistogram(String entity, String watermarkColumn, SourceState state,

// exchange the first histogram group key with the global low watermark to ensure that the low watermark is captured
// in the range of generated partitions
HistogramGroup firstGroup = histogram.get(0);
Histogram.Group firstGroup = histogram.get(0);
Date lwmDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT);
histogram.getGroups().set(0, new HistogramGroup(Utils.epochToDate(lwmDate.getTime(), SalesforceSource.SECONDS_FORMAT),
histogram.getGroups().set(0, new Histogram.Group(Utils.epochToDate(lwmDate.getTime(), SalesforceSource.SECONDS_FORMAT),
firstGroup.getCount()));

// refine the histogram
Expand Down Expand Up @@ -178,8 +178,8 @@ private Histogram getRefinedHistogram(SalesforceConnector connector, String enti

log.info("Refining histogram with bucket size limit {}.", bucketSizeLimit);

HistogramGroup currentGroup;
HistogramGroup nextGroup;
Histogram.Group currentGroup;
Histogram.Group nextGroup;
final TableCountProbingContext probingContext =
new TableCountProbingContext(connector, entity, watermarkColumn, bucketSizeLimit, probeLimit);

Expand All @@ -188,9 +188,9 @@ private Histogram getRefinedHistogram(SalesforceConnector connector, String enti
}

// make a copy of the histogram list and add a dummy entry at the end to avoid special processing of the last group
List<HistogramGroup> list = new ArrayList(histogram.getGroups());
List<Histogram.Group> list = new ArrayList(histogram.getGroups());
Date hwmDate = Utils.toDate(partition.getHighWatermark(), Partitioner.WATERMARKTIMEFORMAT);
list.add(new HistogramGroup(Utils.epochToDate(hwmDate.getTime(), SalesforceSource.SECONDS_FORMAT), 0));
list.add(new Histogram.Group(Utils.epochToDate(hwmDate.getTime(), SalesforceSource.SECONDS_FORMAT), 0));

for (int i = 0; i < list.size() - 1; i++) {
currentGroup = list.get(i);
Expand Down Expand Up @@ -261,7 +261,7 @@ private Histogram parseDayBucketingHistogram(JsonArray records) {
String time = element.get("time").getAsString() + ZERO_TIME_SUFFIX;
int count = element.get("cnt").getAsInt();

histogram.add(new HistogramGroup(time, count));
histogram.add(new Histogram.Group(time, count));
}

return histogram;
Expand All @@ -278,7 +278,7 @@ private void getHistogramRecursively(TableCountProbingContext probingContext, Hi
if (count <= probingContext.bucketSizeLimit
|| probingContext.probeCount > probingContext.probeLimit
|| (midpointEpoch - startEpoch < MIN_SPLIT_TIME_MILLIS)) {
histogram.add(new HistogramGroup(Utils.epochToDate(startEpoch, SalesforceSource.SECONDS_FORMAT), count));
histogram.add(new Histogram.Group(Utils.epochToDate(startEpoch, SalesforceSource.SECONDS_FORMAT), count));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,20 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> {
public static final String USE_ALL_OBJECTS = "use.all.objects";
public static final boolean DEFAULT_USE_ALL_OBJECTS = false;

@VisibleForTesting
static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing";
static final String MIN_TARGET_PARTITION_SIZE = "salesforce.minTargetPartitionSize";
public static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing";
public static final String MIN_TARGET_PARTITION_SIZE = "salesforce.minTargetPartitionSize";
static final int DEFAULT_MIN_TARGET_PARTITION_SIZE = 250000;

@VisibleForTesting
static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning";
@VisibleForTesting
static final String EARLY_STOP_TOTAL_RECORDS_LIMIT = "salesforce.earlyStopTotalRecordsLimit";
public static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning";
public static final String EARLY_STOP_TOTAL_RECORDS_LIMIT = "salesforce.earlyStopTotalRecordsLimit";
private static final long DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT = DEFAULT_MIN_TARGET_PARTITION_SIZE * 4;

static final String SECONDS_FORMAT = "yyyy-MM-dd-HH:mm:ss";

private boolean isEarlyStopped = false;
protected SalesforceConnector salesforceConnector = null;

private SalesforceHistogramService salesforceHistogramService;
private RecordModTimeHistogramService histogramService;

public SalesforceSource() {
this.lineageInfo = Optional.absent();
Expand All @@ -103,9 +100,9 @@ public SalesforceSource() {
}

@VisibleForTesting
SalesforceSource(SalesforceHistogramService salesforceHistogramService) {
SalesforceSource(RecordModTimeHistogramService histogramService) {
this.lineageInfo = Optional.absent();
this.salesforceHistogramService = salesforceHistogramService;
this.histogramService = histogramService;
}

@Override
Expand Down Expand Up @@ -133,11 +130,11 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity
}
@Override
protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) {
SalesforceConnector connector = getConnector(state);

SfConfig sfConfig = new SfConfig(state.getProperties());
if (salesforceHistogramService == null) {
salesforceHistogramService = new SalesforceHistogramService(sfConfig, connector);
if (histogramService == null) {
salesforceConnector = getConnector(state);
histogramService = new RecordModTimeHistogramService(sfConfig, getConnector(state));
Comment on lines +136 to +137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add comment on why another getConnector(state), rather than reusing

edit: reading on I see it doesn't create a new connector... just looks that way

}

List<WorkUnit> workUnits;
Expand Down Expand Up @@ -294,7 +291,7 @@ List<WorkUnit> generateWorkUnitsHelper(SourceEntity sourceEntity, SourceState st

Partition partition = partitioner.getGlobalPartition(previousWatermark);
Histogram histogram =
salesforceHistogramService.getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition);
histogramService.getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition);

// we should look if the count is too big, cut off early if count exceeds the limit, or bucket size is too large

Expand All @@ -303,7 +300,7 @@ List<WorkUnit> generateWorkUnitsHelper(SourceEntity sourceEntity, SourceState st
// TODO: we should consider move this logic into getRefinedHistogram so that we can early terminate the search
if (isEarlyStopEnabled(state)) {
histogramAdjust = new Histogram();
for (HistogramGroup group : histogram.getGroups()) {
for (Histogram.Group group : histogram.getGroups()) {
histogramAdjust.add(group);
long earlyStopRecordLimit = state.getPropAsLong(EARLY_STOP_TOTAL_RECORDS_LIMIT, DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT);
if (histogramAdjust.getTotalRecordCount() > earlyStopRecordLimit) {
Expand All @@ -316,7 +313,7 @@ List<WorkUnit> generateWorkUnitsHelper(SourceEntity sourceEntity, SourceState st

long expectedHighWatermark = partition.getHighWatermark();
if (histogramAdjust.getGroups().size() < histogram.getGroups().size()) {
HistogramGroup lastPlusOne = histogram.get(histogramAdjust.getGroups().size());
Histogram.Group lastPlusOne = histogram.get(histogramAdjust.getGroups().size());
long earlyStopHighWatermark = Long.parseLong(Utils.toDateTimeFormat(lastPlusOne.getKey(), SECONDS_FORMAT, Partitioner.WATERMARKTIMEFORMAT));
log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {}, expected HW : {}]",
state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark);
Expand Down Expand Up @@ -354,13 +351,13 @@ String generateSpecifiedPartitions(Histogram histogram, int minTargetPartitionSi
log.info("maxPartitions: " + maxPartitions);
log.info("interval: " + interval);

List<HistogramGroup> groups = histogram.getGroups();
List<Histogram.Group> groups = histogram.getGroups();
List<String> partitionPoints = new ArrayList<>();
DescriptiveStatistics statistics = new DescriptiveStatistics();

int count = 0;
HistogramGroup group;
Iterator<HistogramGroup> it = groups.iterator();
Histogram.Group group;
Iterator<Histogram.Group> it = groups.iterator();

while (it.hasNext()) {
group = it.next();
Expand Down Expand Up @@ -427,18 +424,18 @@ protected Set<SourceEntity> getSourceEntities(State state) {
return super.getSourceEntities(state);
}

SalesforceConnector connector = getConnector(state);
salesforceConnector = getConnector(state);
try {
if (!connector.connect()) {
if (!salesforceConnector.connect()) {
throw new RuntimeException("Failed to connect.");
}
} catch (RestApiConnectionException e) {
throw new RuntimeException("Failed to connect.", e);
}

List<Command> commands = RestApiConnector.constructGetCommand(connector.getFullUri("/sobjects"));
List<Command> commands = RestApiConnector.constructGetCommand(salesforceConnector.getFullUri("/sobjects"));
try {
CommandOutput<?, ?> response = connector.getResponse(commands);
CommandOutput<?, ?> response = salesforceConnector.getResponse(commands);
Iterator<String> itr = (Iterator<String>) response.getResults().values().iterator();
if (itr.hasNext()) {
String next = itr.next();
Expand All @@ -462,9 +459,9 @@ private static Set<SourceEntity> getSourceEntities(String response) {
}

protected SalesforceConnector getConnector(State state) {
if (this.salesforceConnector == null) {
this.salesforceConnector = new SalesforceConnector(state);
if (salesforceConnector == null) {
salesforceConnector = new SalesforceConnector(state);
Comment on lines +462 to +463
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not thread-safe... should it be synchronized? (if there's any need to explicitly close the connector, there's a potential resource leak.)

}
return this.salesforceConnector;
return salesforceConnector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void testSourceLineageInfo() {
@Test
void testGenerateSpecifiedPartitionFromSinglePointHistogram() {
Histogram histogram = new Histogram();
histogram.add(new HistogramGroup("2014-02-13-00:00:00", 10));
histogram.add(new Histogram.Group("2014-02-13-00:00:00", 10));
SalesforceSource source = new SalesforceSource();

long expectedHighWatermark = 20170407152123L;
Expand Down Expand Up @@ -114,13 +114,13 @@ void testGenerateWorkUnitsHelperForSinglePartitionAndEarlyStop(long earlyStopRec
state.setProp(SalesforceSource.EARLY_STOP_TOTAL_RECORDS_LIMIT, earlyStopRecordCount);
long previousWtm = 20140213000000L;

SalesforceHistogramService salesforceHistogramService = mock(SalesforceHistogramService.class);
RecordModTimeHistogramService histogramService = mock(RecordModTimeHistogramService.class);
String deltaFieldKey = state.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY);
Partition partition = new Partitioner(state).getGlobalPartition(previousWtm);
when(salesforceHistogramService.getHistogram(sourceEntity.getSourceEntityName(), deltaFieldKey, state, partition))
when(histogramService.getHistogram(sourceEntity.getSourceEntityName(), deltaFieldKey, state, partition))
.thenReturn(getHistogram());

List<WorkUnit> actualWorkUnits = new SalesforceSource(salesforceHistogramService).generateWorkUnitsHelper(sourceEntity, state, previousWtm);
List<WorkUnit> actualWorkUnits = new SalesforceSource(histogramService).generateWorkUnitsHelper(sourceEntity, state, previousWtm);
Assert.assertEquals(actualWorkUnits.size(), 1);
double actualHighWtm = (double) new Gson().fromJson(actualWorkUnits.get(0).getExpectedHighWatermark(), HashMap.class).get("value");
Assert.assertEquals(actualHighWtm, Double.parseDouble(String.valueOf(expectedHighWtm)));
Expand All @@ -139,7 +139,7 @@ private Histogram getHistogram() {
Histogram histogram = new Histogram();
for (String group: HISTOGRAM.split(", ")) {
String[] groupInfo = group.split("::");
histogram.add(new HistogramGroup(groupInfo[0], Integer.parseInt(groupInfo[1])));
histogram.add(new Histogram.Group(groupInfo[0], Integer.parseInt(groupInfo[1])));
}
return histogram;
}
Expand Down