Skip to content

Commit

Permalink
[test](insert-overwrite) Add insert overwrite auto detect concurrency…
Browse files Browse the repository at this point in the history
… cases (apache#32935)
  • Loading branch information
zclllyybb committed Apr 11, 2024
1 parent b391857 commit a0f7707
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable {
private Map<Long, List<Long>> taskGroups = Maps.newConcurrentMap();
// for one task group, there may be different requests about changing a partition to new.
// but we only change one time and save the relations in partitionPairs. they're protected by taskLocks
@SerializedName(value = "taskLocks")
private Map<Long, ReentrantLock> taskLocks = Maps.newConcurrentMap();
// <groupId, <oldPartId, newPartId>>
@SerializedName(value = "partitionPairs")
private Map<Long, Map<Long, Long>> partitionPairs = Maps.newConcurrentMap();

public InsertOverwriteManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3459,7 +3459,7 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t

@Override
public TReplacePartitionResult replacePartition(TReplacePartitionRequest request) throws TException {
LOG.info("Receive create partition request: {}", request);
LOG.info("Receive replace partition request: {}", request);
long dbId = request.getDbId();
long tableId = request.getTableId();
List<Long> partitionIds = request.getPartitionIds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ LIST
SHANGHAI
XXX

-- !sql --
7654321
BEIJING
LIST
SHANGHAI
XXX

-- !sql --
Beijing

-- !sql --
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa

-- !sql --
2008-01-01
2008-02-02
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql3 --
100

-- !sql4 --
100

-- !sql5 --
0

Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
// under the License.

suite("test_iot_auto_detect") {
// only nereids now
sql """set enable_nereids_planner = true"""
sql """set enable_fallback_to_original_planner = false"""
sql """set enable_nereids_dml = true"""

// range
sql " drop table if exists range1; "
sql """
create table range1(
Expand All @@ -46,6 +48,7 @@ suite("test_iot_auto_detect") {
sql " insert overwrite table range1 partition(*) values (-100), (-100), (333), (444), (555); "
qt_sql " select * from range1 order by k0; "

// list
sql " drop table if exists list1; "
sql """
create table list1(
Expand All @@ -72,6 +75,50 @@ suite("test_iot_auto_detect") {
qt_sql " select * from list1 order by k0; "
sql """ insert overwrite table list1 partition(*) values ("BEIJING"), ("SHANGHAI"), ("XXX"), ("LIST"), ("7654321"); """
qt_sql " select * from list1 order by k0; "

// with label - transactions
sql """ insert overwrite table list1 partition(*) with label `txn1` values ("BEIJING"), ("7654321"); """
sql """ insert overwrite table list1 partition(*) with label `txn2` values ("SHANGHAI"), ("LIST"); """
sql """ insert overwrite table list1 partition(*) with label `txn3` values ("XXX"); """

def max_try_milli_secs = 10000
while(max_try_milli_secs) {
def result = sql " show load where label like 'txn_' "
if(result[0][2] == "FINISHED" && result[1][2] == "FINISHED" && result[2][2] == "FINISHED" ) {
break
} else {
sleep(1000) // wait 1 second every time
max_try_milli_secs -= 1000
if(max_try_milli_secs <= 0) {
log.info("result: ${result[0][2]}, ${result[1][2]}, ${result[2][2]}")
fail()
}
}
}

qt_sql " select * from list1 order by k0; "

// long partition value
sql " drop table if exists list_long; "
sql """
create table list_long(
k0 varchar null
)
partition by list (k0)
(
PARTITION p1 values in (("Beijing"), ("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")),
PARTITION p2 values in (("nonono"))
)
DISTRIBUTED BY HASH(`k0`) BUCKETS 1
properties("replication_num" = "1");
"""
sql """ insert into list_long values ("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); """
sql """ insert overwrite table list_long partition(*) values ("Beijing"); """
qt_sql " select * from list_long order by k0; "
sql """ insert overwrite table list_long partition(*) values ("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); """
qt_sql " select * from list_long order by k0; "

// miss partitions
try {
sql """ insert overwrite table list1 partition(*) values ("BEIJING"), ("invalid"); """
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
create table test_concurrent_write(
k0 int null
)
partition by range (k0)
(
PARTITION p10 values less than (10),
PARTITION p20 values less than (20),
PARTITION p30 values less than (30),
PARTITION p40 values less than (40),
PARTITION p50 values less than (50),
PARTITION p60 values less than (60),
PARTITION p70 values less than (70),
PARTITION p80 values less than (80),
PARTITION p90 values less than (90),
PARTITION p100 values less than (100),
PARTITION p110 values less than (110),
PARTITION p120 values less than (120),
PARTITION p130 values less than (130),
PARTITION p140 values less than (140),
PARTITION p150 values less than (150),
PARTITION p160 values less than (160),
PARTITION p170 values less than (170),
PARTITION p180 values less than (180),
PARTITION p190 values less than (190),
PARTITION p200 values less than (200),
PARTITION p210 values less than (210),
PARTITION p220 values less than (220),
PARTITION p230 values less than (230),
PARTITION p240 values less than (240),
PARTITION p250 values less than (250),
PARTITION p260 values less than (260),
PARTITION p270 values less than (270),
PARTITION p280 values less than (280),
PARTITION p290 values less than (290),
PARTITION p300 values less than (300),
PARTITION p310 values less than (310),
PARTITION p320 values less than (320),
PARTITION p330 values less than (330),
PARTITION p340 values less than (340),
PARTITION p350 values less than (350),
PARTITION p360 values less than (360),
PARTITION p370 values less than (370),
PARTITION p380 values less than (380),
PARTITION p390 values less than (390),
PARTITION p400 values less than (400),
PARTITION p410 values less than (410),
PARTITION p420 values less than (420),
PARTITION p430 values less than (430),
PARTITION p440 values less than (440),
PARTITION p450 values less than (450),
PARTITION p460 values less than (460),
PARTITION p470 values less than (470),
PARTITION p480 values less than (480),
PARTITION p490 values less than (490),
PARTITION p500 values less than (500),
PARTITION p510 values less than (510),
PARTITION p520 values less than (520),
PARTITION p530 values less than (530),
PARTITION p540 values less than (540),
PARTITION p550 values less than (550),
PARTITION p560 values less than (560),
PARTITION p570 values less than (570),
PARTITION p580 values less than (580),
PARTITION p590 values less than (590),
PARTITION p600 values less than (600),
PARTITION p610 values less than (610),
PARTITION p620 values less than (620),
PARTITION p630 values less than (630),
PARTITION p640 values less than (640),
PARTITION p650 values less than (650),
PARTITION p660 values less than (660),
PARTITION p670 values less than (670),
PARTITION p680 values less than (680),
PARTITION p690 values less than (690),
PARTITION p700 values less than (700),
PARTITION p710 values less than (710),
PARTITION p720 values less than (720),
PARTITION p730 values less than (730),
PARTITION p740 values less than (740),
PARTITION p750 values less than (750),
PARTITION p760 values less than (760),
PARTITION p770 values less than (770),
PARTITION p780 values less than (780),
PARTITION p790 values less than (790),
PARTITION p800 values less than (800),
PARTITION p810 values less than (810),
PARTITION p820 values less than (820),
PARTITION p830 values less than (830),
PARTITION p840 values less than (840),
PARTITION p850 values less than (850),
PARTITION p860 values less than (860),
PARTITION p870 values less than (870),
PARTITION p880 values less than (880),
PARTITION p890 values less than (890),
PARTITION p900 values less than (900),
PARTITION p910 values less than (910),
PARTITION p920 values less than (920),
PARTITION p930 values less than (930),
PARTITION p940 values less than (940),
PARTITION p950 values less than (950),
PARTITION p960 values less than (960),
PARTITION p970 values less than (970),
PARTITION p980 values less than (980),
PARTITION p990 values less than (990),
PARTITION p1000 values less than (1000)
)
DISTRIBUTED BY HASH(`k0`) BUCKETS 1
properties("replication_num" = "1");
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_iot_auto_detect_concurrent") {
// only nereids now
sql """set enable_nereids_planner = true"""
sql """set enable_fallback_to_original_planner = false"""
sql """set enable_nereids_dml = true"""

def db_name = "test_iot_auto_detect_concurrent"
def table_name = "test_concurrent_write"

sql " create database if not exists test_iot_auto_detect_concurrent; "
sql " use test_iot_auto_detect_concurrent; "
sql " drop table if exists test_concurrent_write; "
sql new File("""${context.file.parent}/ddl/test_iot_auto_detect_concurrent.sql""").text

def success_status = true
def load_data = { range, offset, expect_success ->
try {
sql " use test_iot_auto_detect_concurrent; "
sql """set enable_nereids_planner = true"""
sql """set enable_fallback_to_original_planner = false"""
sql """set enable_nereids_dml = true"""
sql """ insert overwrite table test_concurrent_write partition(*)
select number*10+${offset} from numbers("number" = "${range}");
"""
} catch (Exception e) {
if (expect_success) {
success_status = false
log.info("fails one")
}
log.info("successfully catch the failed insert")
return
}
if (!expect_success) {
success_status = false
}
}

def dropping = true
def drop_partition = {
sql " use test_iot_auto_detect_concurrent; "
while (dropping) {
try {
sql """ alter table test_concurrent_write
drop partition p10, drop partition p20, drop partition p30, drop partition p40, drop partition p50,
drop partition p60, drop partition p70, drop partition p80, drop partition p90, drop partition p100;
"""
} catch (Exception e) {}
}
}

def result


/// same data and partitions
success_status = true
sql """ insert into test_concurrent_write select * from numbers("number" = "1000"); """
def thread1 = Thread.start { load_data(100, 0, false) }
def thread2 = Thread.start { load_data(100, 0, false) }
def thread3 = Thread.start { load_data(100, 0, false) }
def thread4 = Thread.start { load_data(100, 0, false) }
def thread5 = Thread.start { load_data(100, 0, false) }
thread1.join()
thread2.join()
thread3.join()
thread4.join()
thread5.join()
// suppose result: success zero or one
if (success_status) { // success zero
result = sql " select count(k0) from test_concurrent_write; "
assertEquals(result[0][0], 1000)
result = sql " select count(distinct k0) from test_concurrent_write; "
assertEquals(result[0][0], 1000)
} else { // success one
result = sql " select count(k0) from test_concurrent_write; "
assertEquals(result[0][0], 100)
result = sql " select count(distinct k0) from test_concurrent_write; "
assertEquals(result[0][0], 100)
}


/// not same data/partitions
success_status = true
sql """ insert overwrite table test_concurrent_write select * from numbers("number" = "1000"); """
def thread6 = Thread.start { load_data(50, 0, true) } // 0, 10 ... 490
def thread7 = Thread.start { load_data(50, 500, true) } // 500, 10 ... 990
thread6.join()
thread7.join()
// suppose result: Success to overwrite with a multiple of ten values
assertTrue(success_status)
qt_sql3 " select count(k0) from test_concurrent_write; "
qt_sql4 " select count(distinct k0) from test_concurrent_write; "


/// with drop partition concurrently
success_status = true
sql """ truncate table test_concurrent_write; """
def thread10 = Thread.start { drop_partition() }
def thread8 = Thread.start { load_data(100, 0, false) }
def thread9 = Thread.start { load_data(100, 0, false) }
thread8.join()
thread9.join()
dropping = false // stop dropping
thread10.join()
// no success insert occur
assertTrue(success_status) // we concerned about this. no
qt_sql5 " select count(k0) from test_concurrent_write; "
}

0 comments on commit a0f7707

Please sign in to comment.