Skip to content

Commit

Permalink
[Enhancement]use awaitility.await() in schema testcases #37817 (#38869)
Browse files Browse the repository at this point in the history
## Proposed changes

Issue Number: close #xxx
#37817
use awaitility.await() in schema testcases(part2)

Co-authored-by: Xinyi Zou <[email protected]>
  • Loading branch information
2 people authored and dataroaring committed Oct 5, 2024
1 parent 81d390f commit 9d7bfef
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite ("test_dup_keys_schema_change") {
def tableName = "schema_change_dup_keys_regression_test"
Expand Down Expand Up @@ -98,20 +100,15 @@ suite ("test_dup_keys_schema_change") {
sql """
ALTER TABLE ${tableName} DROP COLUMN sex
"""
int max_try_time = 3000
while (max_try_time--){
int max_try_time = 300
Awaitility.await().atMost(max_try_time, TimeUnit.SECONDS).with().pollDelay(100, TimeUnit.MILLISECONDS).await().until(() -> {
String result = getJobState(tableName)
if (result == "FINISHED") {
sleep(3000)
break
} else {
sleep(100)
if (max_try_time < 1){
assertEquals(1,2)
}
return true;
}
}
Thread.sleep(1000)
return false;
});

qt_sc """ select * from ${tableName} where user_id = 3 order by new_column """


Expand Down Expand Up @@ -154,18 +151,16 @@ suite ("test_dup_keys_schema_change") {

// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(100)
Awaitility.await().untilAsserted(() -> {
String tablet_id = tablet[0]
backend_id = tablet[2]
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
return compactionStatus.run_status;
});
}

qt_sc """ select count(*) from ${tableName} """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite ("test_dup_mv_schema_change") {
def tableName = "schema_change_dup_mv_regression_test"
Expand All @@ -25,18 +27,13 @@ suite ("test_dup_mv_schema_change") {
}

def waitForJob = (tbName, timeout) -> {
while (timeout--){
Awaitility.await().atMost(timeout, TimeUnit.SECONDS).with().pollDelay(100, TimeUnit.MILLISECONDS).await().until(() -> {
String result = getJobState(tbName)
if (result == "FINISHED") {
sleep(3000)
break
} else {
sleep(100)
if (timeout < 1){
assertEquals(1,2)
}
return true;
}
}
return false;
});
}
try {
String backend_id;
Expand Down Expand Up @@ -164,18 +161,16 @@ suite ("test_dup_mv_schema_change") {

// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(100)
Awaitility.await().untilAsserted(() -> {
String tablet_id = tablet[0]
backend_id = tablet[2]
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
return compactionStatus.run_status;
});
}

qt_sc """ select count(*) from ${tableName} """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite ("test_dup_rollup_schema_change") {
def getMVJobState = { tableName ->
Expand All @@ -27,18 +29,13 @@ suite ("test_dup_rollup_schema_change") {
return jobStateResult[0][9]
}
def waitForMVJob = (tbName, timeout) -> {
while (timeout--){
Awaitility.await().atMost(timeout, TimeUnit.SECONDS).with().pollDelay(100, TimeUnit.MILLISECONDS).await().until(() -> {
String result = getMVJobState(tbName)
if (result == "FINISHED") {
sleep(3000)
break
} else {
sleep(100)
if (timeout < 1){
assertEquals(1,2)
}
return true;
}
}
return false;
});
}

def tableName = "schema_change_dup_rollup_regression_test"
Expand Down Expand Up @@ -182,18 +179,16 @@ suite ("test_dup_rollup_schema_change") {

// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(100)
Awaitility.await().untilAsserted(() -> {
String tablet_id = tablet[0]
backend_id = tablet[2]
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
return compactionStatus.run_status;
});
}

qt_sc """ select count(*) from ${tableName} """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite ("test_dup_vals_schema_change") {
def tableName = "schema_change_dup_vals_regression_test"
Expand Down Expand Up @@ -137,18 +139,16 @@ suite ("test_dup_vals_schema_change") {

// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(100)
Awaitility.await().untilAsserted(() -> {
String tablet_id = tablet[0]
backend_id = tablet[2]
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
return compactionStatus.run_status;
});
}
qt_sc """ select count(*) from ${tableName} """

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite ("test_uniq_keys_schema_change") {
def tableName = "schema_change_uniq_keys_regression_test"
Expand Down Expand Up @@ -132,18 +134,16 @@ suite ("test_uniq_keys_schema_change") {

// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(100)
Awaitility.await().untilAsserted(() -> {
String tablet_id = tablet[0]
backend_id = tablet[2]
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
return compactionStatus.run_status;
});
}
qt_sc """ select count(*) from ${tableName} """

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite ("test_uniq_mv_schema_change") {
def tableName = "schema_change_uniq_mv_regression_test"
Expand All @@ -24,18 +26,14 @@ suite ("test_uniq_mv_schema_change") {
return jobStateResult[0][8]
}
def waitForJob = (tbName, timeout) -> {
while (timeout--){
Awaitility.await().atMost(timeout, TimeUnit.SECONDS).with().pollDelay(100, TimeUnit.MILLISECONDS).await().until(() -> {
String result = getMVJobState(tbName)
if (result == "FINISHED") {
sleep(3000)
break
} else {
sleep(100)
if (timeout < 1){
assertEquals(1,2)
}
}
}
return true;
}
return false;
});
// when timeout awaitlity will raise a exception.
}

try {
Expand Down Expand Up @@ -179,18 +177,16 @@ suite ("test_uniq_mv_schema_change") {

// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(100)
Awaitility.await().untilAsserted(() -> {
String tablet_id = tablet[0]
backend_id = tablet[2]
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
return compactionStatus.run_status;
});
}
qt_sc """ select count(*) from ${tableName} """

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite ("test_uniq_rollup_schema_change") {
def tableName = "schema_change_uniq_rollup_regression_test"
Expand All @@ -28,18 +30,14 @@ suite ("test_uniq_rollup_schema_change") {
return jobStateResult[0][9]
}
def waitForMVJob = (tbName, timeout) -> {
while (timeout--){
Awaitility.await().atMost(timeout, TimeUnit.SECONDS).with().pollDelay(100, TimeUnit.MILLISECONDS).await().until(() -> {
String result = getMVJobState(tbName)
if (result == "FINISHED") {
sleep(3000)
break
} else {
sleep(100)
if (timeout < 1){
assertEquals(1,2)
}
}
}
return true;
}
return false;
});
// when timeout awaitlity will raise a exception.
}

try {
Expand Down Expand Up @@ -79,7 +77,7 @@ suite ("test_uniq_rollup_schema_change") {
//add rollup
def rollupName = "rollup_cost"
sql "ALTER TABLE ${tableName} ADD ROLLUP ${rollupName}(`user_id`,`date`,`city`,`sex`, `age`,`cost`);"
waitForMVJob(tableName, 3000)
waitForMVJob(tableName, 300)

sql """ INSERT INTO ${tableName} VALUES
(2, '2017-10-01', 'Beijing', 10, 1, '2020-01-02', '2020-01-02', '2020-01-02', 1, 31, 21)
Expand Down Expand Up @@ -133,19 +131,14 @@ suite ("test_uniq_rollup_schema_change") {
ALTER TABLE ${tableName} DROP COLUMN cost
"""

max_try_time = 3000
while (max_try_time--){
max_try_time = 300
Awaitility.await().atMost(max_try_time, TimeUnit.SECONDS).with().pollDelay(100, TimeUnit.MILLISECONDS).await().until(() -> {
String result = getJobState(tableName)
if (result == "FINISHED") {
sleep(3000)
break
} else {
sleep(100)
if (max_try_time < 1){
assertEquals(1,2)
}
return true;
}
}
return false;
});

qt_sc """ select * from ${tableName} where user_id = 3 """

Expand Down Expand Up @@ -187,18 +180,16 @@ suite ("test_uniq_rollup_schema_change") {

// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(100)
Awaitility.await().untilAsserted(() -> {
String tablet_id = tablet[0]
backend_id = tablet[2]
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
return compactionStatus.run_status;
});
}
qt_sc """ select count(*) from ${tableName} """

Expand Down
Loading

0 comments on commit 9d7bfef

Please sign in to comment.