Skip to content

Commit

Permalink
[GOBBLIN-1926] Fix Reminder Event Epsilon Comparison (#3797)
Browse files Browse the repository at this point in the history
* Fix Reminder Event Epsilon Comparison

* Add TODO comment

---------

Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
umustafi and Urmi Mustafi authored Oct 10, 2023
1 parent e4cae2e commit 9b254b6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected interface CheckedFunction<T, R> {
protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER = "SELECT "
+ "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as utc_event_timestamp, "
+ "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, '+00:00') as utc_lease_acquisition_timestamp, "
+ "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP(3)) / 1000 <= epsilon as is_within_epsilon, CASE "
+ "TIMESTAMPDIFF(microsecond, event_timestamp, CONVERT_TZ(?, '+00:00', @@session.time_zone)) / 1000 <= epsilon as is_within_epsilon, CASE "
+ "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 "
+ "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 "
+ "ELSE 3 END as lease_validity_status, linger, "
Expand Down Expand Up @@ -269,7 +269,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
log.info("Multi-active scheduler about to handle trigger event: [{}, is: {}, triggerEventTimestamp: {}]",
flowAction, isReminderEvent ? "reminder" : "original", eventTimeMillis);
// Query lease arbiter table about this flow action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction, isReminderEvent);
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction, isReminderEvent, eventTimeMillis);

// TODO: change all the `CASE N: ...` statements back to debug statements after uncovering issue
try {
Expand Down Expand Up @@ -299,11 +299,18 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
return new NoLongerLeasingStatus();
}
if (eventTimeMillis > dbEventTimestamp.getTime()) {
// TODO: emit metric here to capture this unexpected behavior
log.warn("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Severe constraint "
+ "violation encountered: a reminder event newer than db event was found when db laundering should "
+ "ensure monotonically increasing laundered event times.", flowAction,
isReminderEvent ? "reminder" : "original", eventTimeMillis, dbEventTimestamp.getTime());
}
if (eventTimeMillis == dbEventTimestamp.getTime()) {
// TODO: change this to a debug after fixing issue
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time"
+ "is the same as db event.", flowAction, isReminderEvent ? "reminder" : "original",
eventTimeMillis, dbEventTimestamp);
}
}

log.info("Multi-active arbiter replacing local trigger event timestamp [{}, is: {}, triggerEventTimestamp: {}] "
Expand Down Expand Up @@ -359,10 +366,13 @@ else if (leaseValidityStatus == 2) {
* Checks leaseArbiterTable for an existing entry for this flow action and event time
*/
protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAction flowAction,
boolean isReminderEvent) throws IOException {
boolean isReminderEvent, long eventTimeMillis) throws IOException {
return withPreparedStatement(isReminderEvent ? thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
if (isReminderEvent) {
getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis), UTC_CAL.get());
}
getInfoStatement.setString(++i, flowAction.getFlowGroup());
getInfoStatement.setString(++i, flowAction.getFlowName());
getInfoStatement.setString(++i, flowAction.getFlowActionType().toString());
Expand Down Expand Up @@ -511,7 +521,7 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: ",
log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: {}",
flowAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
@Slf4j
public class MysqlMultiActiveLeaseArbiterTest {
private static final int EPSILON = 10000;
private static final int MORE_THAN_EPSILON = (int) (EPSILON * 1.1);
private static final int LINGER = 50000;
private static final String USER = "testUser";
private static final String PASSWORD = "testPassword";
Expand Down Expand Up @@ -117,7 +118,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
// Tests CASE 3 of trying to acquire a lease for a distinct flow action event, while the previous event's lease is
// valid
// Allow enough time to pass for this trigger to be considered distinct, but not enough time so the lease expires
Thread.sleep(EPSILON * 3/2);
Thread.sleep(MORE_THAN_EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
Assert.assertTrue(thirdLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus);
Expand Down Expand Up @@ -147,7 +148,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception {

// Tests CASE 6 of no longer leasing a distinct event in DB
// Wait so this event is considered distinct and a new lease will be acquired
Thread.sleep(EPSILON * 3/2);
Thread.sleep(MORE_THAN_EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
Assert.assertTrue(sixthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
Expand Down Expand Up @@ -286,17 +287,21 @@ public void testReminderEventAcquireLeaseOnInvalidLease() throws IOException, In

/*
Tests calling `tryAcquireLease` for a reminder event whose lease has completed in the database and should return
`NoLongerLeasing` status
`NoLongerLeasing` status.
Note: that we wait for enough time to pass that the event would have been considered distinct for a non-reminder case
to ensure that the comparison made for reminder events is against the preserved event time not current time in db
*/
@Test (dependsOnMethods = "testReminderEventAcquireLeaseOnInvalidLease")
public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException {
public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException, InterruptedException {
// Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);

// Sleep enough time for the event to have been considered distinct
Thread.sleep(MORE_THAN_EPSILON);
// Now have a reminder event check-in on the completed lease
LeaseAttemptStatus attemptStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, selectInfoResult.getEventTimeMillis(), true);
Expand Down

0 comments on commit 9b254b6

Please sign in to comment.