diff --git a/plugins/Jobs.cpp b/plugins/Jobs.cpp index 2f4106bbb..439a55904 100644 --- a/plugins/Jobs.cpp +++ b/plugins/Jobs.cpp @@ -898,11 +898,12 @@ void BedrockJobsCommand::process(SQLite& db) { // interrupted in a non-fatal way. // // Parameters: - // - jobID - ID of the job to requeue - // - delay - Number of seconds to wait before retrying - // - nextRun - datetime of next scheduled run - // - name - An arbitrary string identifier (case insensitive) - // - data - Data to associate with this job + // - jobID - ID of the job to requeue + // - delay - Number of seconds to wait before retrying + // - nextRun - datetime of next scheduled run + // - name - An arbitrary string identifier (case insensitive) + // - data - Data to associate with this job + // - jobPriority - The new priority to set for this job // // - FinishJob( jobID, [data] ) // @@ -1005,11 +1006,22 @@ void BedrockJobsCommand::process(SQLite& db) { return; } - // If this is RetryJob and we want to update the name, let's do that + // If this is RetryJob and we want to update the name and/or priority, let's do that const string& name = request["name"]; - if (!name.empty() && SIEquals(requestVerb, "RetryJob")) { - if (!db.writeIdempotent("UPDATE jobs SET name=" + SQ(name) + " WHERE jobID=" + SQ(jobID) + ";")) { - STHROW("502 Failed to update job name"); + if (SIEquals(requestVerb, "RetryJob")) { + list updates; + if (!name.empty()) { + updates.push_back("name=" + SQ(name) + " "); + } + if (request.isSet("jobPriority")) { + _validatePriority(request.calc64("jobPriority")); + updates.push_back("priority=" + SQ(request["jobPriority"]) + " "); + } + if (!updates.empty()) { + bool success = db.writeIdempotent("UPDATE jobs SET " + SComposeList(updates, ", ") + " WHERE jobID=" + SQ(jobID) + ";"); + if (!success) { + STHROW("502 Failed to update job name/priority"); + } } } diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index d1924e627..db8b215dd 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2259,6 +2259,11 @@ void SQLiteNode::handleBeginTransaction(Peer* peer, const SData& message) { commandIt->second->transaction = message; } + // calculate and log replication timers + if (leaderSentTimestamp > followerDequeueTimestamp) { + SWARN("Leader replication timestamp is " << (leaderSentTimestamp - followerDequeueTimestamp) << " usecs newer than our timestamp. Possible clock synchronization issue."); + leaderSentTimestamp = followerDequeueTimestamp; + } uint64_t transitTimeUS = followerDequeueTimestamp - leaderSentTimestamp; uint64_t applyTimeUS = STimeNow() - followerDequeueTimestamp; float transitTimeMS = (float)transitTimeUS / 1000.0; diff --git a/test/tests/jobs/RetryJobTest.cpp b/test/tests/jobs/RetryJobTest.cpp index 56645b63c..d1d4a4c8c 100644 --- a/test/tests/jobs/RetryJobTest.cpp +++ b/test/tests/jobs/RetryJobTest.cpp @@ -16,7 +16,9 @@ struct RetryJobTest : tpunit::TestFixture { TEST(RetryJobTest::hasRepeat), TEST(RetryJobTest::inRunqueuedState), TEST(RetryJobTest::simplyRetryWithNextRun), + TEST(RetryJobTest::changeNameAndPriority), TEST(RetryJobTest::changeName), + TEST(RetryJobTest::changePriority), TEST(RetryJobTest::hasRepeatWithNextRun), TEST(RetryJobTest::hasRepeatWithDelay), TEST(RetryJobTest::hasDelayAndNextRun), @@ -388,11 +390,42 @@ struct RetryJobTest : tpunit::TestFixture { ASSERT_EQUAL(result[0][0], nextRun); } - // Update the name + // Update the name and priority + void changeNameAndPriority() { + // Create the job + SData command("CreateJob"); + command["name"] = "job"; + command["jobPriority"] = "500"; + STable response = tester->executeWaitVerifyContentTable(command); + string jobID = response["jobID"]; + + // Get the job + command.clear(); + command.methodLine = "GetJob"; + command["name"] = "job"; + tester->executeWaitVerifyContent(command); + + // Retry it passing name and priority + command.clear(); + command.methodLine = "RetryJob"; + command["jobID"] = jobID; + command["name"] = "newName"; + command["jobPriority"] = "1000"; + command["nextRun"] = getTimeInFuture(10); + tester->executeWaitVerifyContent(command); + + // Confirm the data updated + SQResult result; + tester->readDB("SELECT name, priority FROM jobs WHERE jobID = " + jobID + ";", result); + ASSERT_EQUAL(result[0][0], "newName"); + ASSERT_EQUAL(result[0][1], "1000"); + } + void changeName() { // Create the job SData command("CreateJob"); command["name"] = "job"; + command["jobPriority"] = "500"; STable response = tester->executeWaitVerifyContentTable(command); string jobID = response["jobID"]; @@ -402,7 +435,7 @@ struct RetryJobTest : tpunit::TestFixture { command["name"] = "job"; tester->executeWaitVerifyContent(command); - // Retry it + // Retry it passing only name command.clear(); command.methodLine = "RetryJob"; command["jobID"] = jobID; @@ -412,8 +445,38 @@ struct RetryJobTest : tpunit::TestFixture { // Confirm the data updated SQResult result; - tester->readDB("SELECT name FROM jobs WHERE jobID = " + jobID + ";", result); + tester->readDB("SELECT name, priority FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_EQUAL(result[0][0], "newName"); + ASSERT_EQUAL(result[0][1], "500"); + } + + void changePriority() { + // Create the job + SData command("CreateJob"); + command["name"] = "job"; + command["jobPriority"] = "500"; + STable response = tester->executeWaitVerifyContentTable(command); + string jobID = response["jobID"]; + + // Get the job + command.clear(); + command.methodLine = "GetJob"; + command["name"] = "job"; + tester->executeWaitVerifyContent(command); + + // Retry it passing only priority + command.clear(); + command.methodLine = "RetryJob"; + command["jobID"] = jobID; + command["jobPriority"] = "1000"; + command["nextRun"] = getTimeInFuture(10); + tester->executeWaitVerifyContent(command); + + // Confirm the data updated + SQResult result; + tester->readDB("SELECT name, priority FROM jobs WHERE jobID = " + jobID + ";", result); + ASSERT_EQUAL(result[0][0], "job"); + ASSERT_EQUAL(result[0][1], "1000"); } // Repeat should take precedence over nextRun