diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 6fa140846acf39..4ee245af359bcf 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -222,7 +222,7 @@ class MemTable { // when the sum of all memtable (_insert_manual_mem_tracker + _flush_hook_mem_tracker) exceeds the limit. std::shared_ptr _insert_mem_tracker; std::shared_ptr _flush_mem_tracker; - // Only the rows will be inserted into SkipList can allocate memory from _arena. + // Only the rows will be inserted into block can allocate memory from _arena. // In this way, we can make MemTable::memory_usage() to be more accurate, and eventually // reduce the number of segment files that are generated by current load std::unique_ptr _arena; diff --git a/be/src/olap/skiplist.h b/be/src/olap/skiplist.h deleted file mode 100644 index d8e3335e502204..00000000000000 --- a/be/src/olap/skiplist.h +++ /dev/null @@ -1,462 +0,0 @@ -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. - -#pragma once - -// Thread safety -// ------------- -// -// Writes require external synchronization, most likely a mutex. -// Reads require a guarantee that the SkipList will not be destroyed -// while the read is in progress. Apart from that, reads progress -// without any internal locking or synchronization. -// -// Invariants: -// -// (1) Allocated nodes are never deleted until the SkipList is -// destroyed. This is trivially guaranteed by the code since we -// never delete any skip list nodes. -// -// (2) The contents of a Node except for the next/prev pointers are -// immutable after the Node has been linked into the SkipList. -// Only Insert() modifies the list, and it is careful to initialize -// a node and use release-stores to publish the nodes in one or -// more lists. -// -// ... prev vs. next pointer ordering ... - -#include - -#include - -#include "common/logging.h" -#include "util/random.h" -#include "vec/common/arena.h" - -namespace doris { - -template -class SkipList { -private: - struct Node; - enum { kMaxHeight = 12 }; - -public: - typedef Key key_type; - // One Hint object is to show position info of one row. - // It is used in the following scenarios: - // // 1. check for existence - // bool is_exist = skiplist->Find(key, &hint); - // // 2. Do something separately based on the value of is_exist - // if (is_exist) { - // do_something1 (); - // } else { - // do_something2 (); - // skiplist->InsertWithHint(key, is_exist, hint); - // } - // - // Note: The user should guarantee that there must not be any other insertion - // between calling Find() and InsertWithHint(). - struct Hint { - Node* curr = nullptr; - Node* prev[kMaxHeight]; - }; - - // Create a new SkipList object that will use "cmp" for comparing keys, - // and will allocate memory using "*arena". - // NOTE: Objects allocated in the arena must remain allocated for - // the lifetime of the skiplist object. - explicit SkipList(Comparator* cmp, vectorized::Arena* arena, bool can_dup); - - // Insert key into the list. - void Insert(const Key& key, bool* overwritten); - // Use hint to insert a key. the hint is from previous Find() - void InsertWithHint(const Key& key, bool is_exist, Hint* hint); - - // Returns true if an entry that compares equal to key is in the list. - bool Contains(const Key& key) const; - // Like Contains(), but it will return the position info as a hint. We can use this - // position info to insert directly using InsertWithHint(). - bool Find(const Key& key, Hint* hint) const; - - // Iteration over the contents of a skip list - class Iterator { - public: - // Initialize an iterator over the specified list. - // The returned iterator is not valid. - explicit Iterator(const SkipList* list); - - // Returns true if the iterator is positioned at a valid node. - bool Valid() const; - - // Returns the key at the current position. - // REQUIRES: Valid() - const Key& key() const; - - // Advances to the next position. - // REQUIRES: Valid() - void Next(); - - // Advances to the previous position. - // REQUIRES: Valid() - void Prev(); - - // Advance to the first entry with a key >= target - void Seek(const Key& target); - - // Position at the first entry in list. - // Final state of iterator is Valid() if list is not empty. - void SeekToFirst(); - - // Position at the last entry in list. - // Final state of iterator is Valid() if list is not empty. - void SeekToLast(); - - private: - const SkipList* list_ = nullptr; - Node* node_ = nullptr; - // Intentionally copyable - }; - -private: - // Immutable after construction - Comparator* const compare_; - // When value is true, means indicates that duplicate values are allowed. - bool _can_dup; - vectorized::Arena* const _arena; // Arena used for allocations of nodes - - Node* const head_; - - // Modified only by Insert(). Read racily by readers, but stale - // values are ok. - std::atomic max_height_; // Height of the entire list - - int GetMaxHeight() const { return max_height_.load(std::memory_order_relaxed); } - - // Read/written only by Insert(). - Random rnd_; - - Node* NewNode(const Key& key, int height); - int RandomHeight(); - bool Equal(const Key& a, const Key& b) const { return ((*compare_)(a, b) == 0); } - - // Return true if key is greater than the data stored in "n" - bool KeyIsAfterNode(const Key& key, Node* n) const; - - // Return the earliest node that comes at or after key. - // Return nullptr if there is no such node. - // - // If prev is non-nullptr, fills prev[level] with pointer to previous - // node at "level" for every level in [0..max_height_-1]. - Node* FindGreaterOrEqual(const Key& key, Node** prev) const; - - // Return the latest node with a key < key. - // Return head_ if there is no such node. - Node* FindLessThan(const Key& key) const; - - // Return the last node in the list. - // Return head_ if list is empty. - Node* FindLast() const; - - // No copying allowed - SkipList(const SkipList&); - void operator=(const SkipList&); -}; - -// Implementation details follow -template -struct SkipList::Node { - explicit Node(const Key& k) : key(k) {} - - Key const key; - - // Accessors/mutators for links. Wrapped in methods so we can - // add the appropriate barriers as necessary. - Node* Next(int n) { - DCHECK(n >= 0); - // Use an 'acquire load' so that we observe a fully initialized - // version of the returned Node. - return (next_[n].load(std::memory_order_acquire)); - } - void SetNext(int n, Node* x) { - DCHECK(n >= 0); - // Use a 'release store' so that anybody who reads through this - // pointer observes a fully initialized version of the inserted node. - next_[n].store(x, std::memory_order_release); - } - - // No-barrier variants that can be safely used in a few locations. - Node* NoBarrier_Next(int n) { - DCHECK(n >= 0); - return next_[n].load(std::memory_order_relaxed); - } - void NoBarrier_SetNext(int n, Node* x) { - DCHECK(n >= 0); - next_[n].store(x, std::memory_order_relaxed); - } - -private: - // Array of length equal to the node height. next_[0] is lowest level link. - std::atomic next_[1]; -}; - -template -typename SkipList::Node* SkipList::NewNode(const Key& key, - int height) { - char* mem = _arena->alloc(sizeof(Node) + sizeof(std::atomic) * (height - 1)); - return new (mem) Node(key); -} - -template -SkipList::Iterator::Iterator(const SkipList* list) { - list_ = list; - node_ = nullptr; -} - -template -bool SkipList::Iterator::Valid() const { - return node_ != nullptr; -} - -template -const Key& SkipList::Iterator::key() const { - DCHECK(Valid()); - return node_->key; -} - -template -void SkipList::Iterator::Next() { - DCHECK(Valid()); - node_ = node_->Next(0); -} - -template -void SkipList::Iterator::Prev() { - // Instead of using explicit "prev" links, we just search for the - // last node that falls before key. - DCHECK(Valid()); - node_ = list_->FindLessThan(node_->key); - if (node_ == list_->head_) { - node_ = nullptr; - } -} - -template -void SkipList::Iterator::Seek(const Key& target) { - node_ = list_->FindGreaterOrEqual(target, nullptr); -} - -template -void SkipList::Iterator::SeekToFirst() { - node_ = list_->head_->Next(0); -} - -template -void SkipList::Iterator::SeekToLast() { - node_ = list_->FindLast(); - if (node_ == list_->head_) { - node_ = nullptr; - } -} - -template -int SkipList::RandomHeight() { - // Increase height with probability 1 in kBranching - static const unsigned int kBranching = 4; - int height = 1; - while (height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)) { - height++; - } - DCHECK(height > 0); - DCHECK(height <= kMaxHeight); - return height; -} - -template -bool SkipList::KeyIsAfterNode(const Key& key, Node* n) const { - // nullptr n is considered infinite - return (n != nullptr) && ((*compare_)(n->key, key) < 0); -} - -template -typename SkipList::Node* SkipList::FindGreaterOrEqual( - const Key& key, Node** prev) const { - Node* x = head_; - int level = GetMaxHeight() - 1; - while (true) { - Node* next = x->Next(level); - if (KeyIsAfterNode(key, next)) { - // Keep searching in this list - x = next; - } else { - if (prev != nullptr) prev[level] = x; - if (level == 0) { - return next; - } else { - // Switch to next list - level--; - } - } - } -} - -template -typename SkipList::Node* SkipList::FindLessThan( - const Key& key) const { - Node* x = head_; - int level = GetMaxHeight() - 1; - while (true) { - DCHECK(x == head_ || (*compare_)(x->key, key) < 0); - Node* next = x->Next(level); - if (next == nullptr || (*compare_)(next->key, key) >= 0) { - if (level == 0) { - return x; - } else { - // Switch to next list - level--; - } - } else { - x = next; - } - } -} - -template -typename SkipList::Node* SkipList::FindLast() const { - Node* x = head_; - int level = GetMaxHeight() - 1; - while (true) { - Node* next = x->Next(level); - if (next == nullptr) { - if (level == 0) { - return x; - } else { - // Switch to next list - level--; - } - } else { - x = next; - } - } -} - -template -SkipList::SkipList(Comparator* cmp, vectorized::Arena* arena, bool can_dup) - : compare_(cmp), - _can_dup(can_dup), - _arena(arena), - head_(NewNode(0 /* any key will do */, kMaxHeight)), - max_height_(1), - rnd_(0xdeadbeef) { - for (int i = 0; i < kMaxHeight; i++) { - head_->SetNext(i, nullptr); - } -} - -template -void SkipList::Insert(const Key& key, bool* overwritten) { - // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual() - // here since Insert() is externally synchronized. - Node* prev[kMaxHeight]; - Node* x = FindGreaterOrEqual(key, prev); - -#ifndef BE_TEST - // The key already exists and duplicate keys are not allowed, so we need to aggregate them - if (!_can_dup && x != nullptr && Equal(key, x->key)) { - *overwritten = true; - return; - } -#endif - - *overwritten = false; - // Our data structure does not allow duplicate insertion - int height = RandomHeight(); - if (height > GetMaxHeight()) { - for (int i = GetMaxHeight(); i < height; i++) { - prev[i] = head_; - } - //fprintf(stderr, "Change height from %d to %d\n", max_height_, height); - - // It is ok to mutate max_height_ without any synchronization - // with concurrent readers. A concurrent reader that observes - // the new value of max_height_ will see either the old value of - // new level pointers from head_ (nullptr), or a new value set in - // the loop below. In the former case the reader will - // immediately drop to the next level since nullptr sorts after all - // keys. In the latter case the reader will use the new node. - max_height_.store(height, std::memory_order_relaxed); - } - - x = NewNode(key, height); - for (int i = 0; i < height; i++) { - // NoBarrier_SetNext() suffices since we will add a barrier when - // we publish a pointer to "x" in prev[i]. - x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); - prev[i]->SetNext(i, x); - } -} - -// NOTE: Already be checked, the row is exist. -template -void SkipList::InsertWithHint(const Key& key, bool is_exist, Hint* hint) { - Node* x = hint->curr; - DCHECK(!is_exist || x) << "curr pointer must not be null if row exists"; - -#ifndef BE_TEST - // The key already exists and duplicate keys are not allowed, so we need to aggregate them - if (!_can_dup && is_exist) { - return; - } -#endif - - Node** prev = hint->prev; - // Our data structure does not allow duplicate insertion - int height = RandomHeight(); - if (height > GetMaxHeight()) { - for (int i = GetMaxHeight(); i < height; i++) { - prev[i] = head_; - } - //fprintf(stderr, "Change height from %d to %d\n", max_height_, height); - - // It is ok to mutate max_height_ without any synchronization - // with concurrent readers. A concurrent reader that observes - // the new value of max_height_ will see either the old value of - // new level pointers from head_ (nullptr), or a new value set in - // the loop below. In the former case the reader will - // immediately drop to the next level since nullptr sorts after all - // keys. In the latter case the reader will use the new node. - max_height_.store(height, std::memory_order_relaxed); - } - - x = NewNode(key, height); - for (int i = 0; i < height; i++) { - // NoBarrier_SetNext() suffices since we will add a barrier when - // we publish a pointer to "x" in prev[i]. - x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i)); - prev[i]->SetNext(i, x); - } -} - -template -bool SkipList::Contains(const Key& key) const { - Node* x = FindGreaterOrEqual(key, nullptr); - if (x != nullptr && Equal(key, x->key)) { - return true; - } else { - return false; - } -} - -template -bool SkipList::Find(const Key& key, Hint* hint) const { - Node* x = FindGreaterOrEqual(key, hint->prev); - hint->curr = x; - if (x != nullptr && Equal(key, x->key)) { - return true; - } else { - return false; - } -} - -} // namespace doris diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index b21a38c3bf8ed4..8981a7e621c463 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -313,6 +313,10 @@ void TaskScheduler::_do_work(size_t index) { auto status = Status::OK(); try { + // This will enable exception handling logic in allocator.h when memory allocate + // failed or sysem memory is not sufficient. + doris::enable_thread_catch_bad_alloc++; + Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; //TODO: use a better enclose to abstracting these if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) { TUniqueId query_id = task->query_context()->query_id(); diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 60e7c57a6c12fa..8d07b0ec81afe9 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -158,6 +158,8 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr ctx, RdKafka::Message* msg; bool res = _queue.blocking_get(&msg); if (res) { + // conf has to be deleted finally + Defer delete_msg {[msg]() { delete msg; }}; VLOG_NOTICE << "get kafka message" << ", partition: " << msg->partition() << ", offset: " << msg->offset() << ", len: " << msg->len(); @@ -181,7 +183,6 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr ctx, } } } - delete msg; } else { // queue is empty and shutdown eos = true; diff --git a/be/test/olap/skiplist_test.cpp b/be/test/olap/skiplist_test.cpp deleted file mode 100644 index 55c1b28bc53c12..00000000000000 --- a/be/test/olap/skiplist_test.cpp +++ /dev/null @@ -1,423 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "olap/skiplist.h" - -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include "gtest/gtest_pred_impl.h" -#include "testutil/test_util.h" -#include "util/hash_util.hpp" -#include "util/random.h" -#include "util/work_thread_pool.hpp" -#include "vec/common/arena.h" - -namespace doris { - -typedef uint64_t Key; -const int random_seed = 301; - -struct TestComparator { - int operator()(const Key& a, const Key& b) const { - if (a < b) { - return -1; - } else if (a > b) { - return +1; - } else { - return 0; - } - } -}; - -class SkipTest : public testing::Test {}; - -TEST_F(SkipTest, Empty) { - std::unique_ptr arena(new vectorized::Arena()); - - TestComparator* cmp = new TestComparator(); - SkipList list(cmp, arena.get(), false); - EXPECT_TRUE(!list.Contains(10)); - - SkipList::Iterator iter(&list); - EXPECT_TRUE(!iter.Valid()); - iter.SeekToFirst(); - EXPECT_TRUE(!iter.Valid()); - iter.Seek(100); - EXPECT_TRUE(!iter.Valid()); - iter.SeekToLast(); - EXPECT_TRUE(!iter.Valid()); - delete cmp; -} - -TEST_F(SkipTest, InsertAndLookup) { - std::unique_ptr arena(new vectorized::Arena()); - - const int N = 2000; - const int R = 5000; - Random rnd(1000); - std::set keys; - TestComparator* cmp = new TestComparator(); - SkipList list(cmp, arena.get(), false); - for (int i = 0; i < N; i++) { - Key key = rnd.Next() % R; - if (keys.insert(key).second) { - bool overwritten = false; - list.Insert(key, &overwritten); - } - } - - for (int i = 0; i < R; i++) { - if (list.Contains(i)) { - EXPECT_EQ(keys.count(i), 1); - } else { - EXPECT_EQ(keys.count(i), 0); - } - } - - // Simple iterator tests - { - SkipList::Iterator iter(&list); - EXPECT_TRUE(!iter.Valid()); - - iter.Seek(0); - EXPECT_TRUE(iter.Valid()); - EXPECT_EQ(*(keys.begin()), iter.key()); - - iter.SeekToFirst(); - EXPECT_TRUE(iter.Valid()); - EXPECT_EQ(*(keys.begin()), iter.key()); - - iter.SeekToLast(); - EXPECT_TRUE(iter.Valid()); - EXPECT_EQ(*(keys.rbegin()), iter.key()); - } - - // Forward iteration test - for (int i = 0; i < R; i++) { - SkipList::Iterator iter(&list); - iter.Seek(i); - - // Compare against model iterator - std::set::iterator model_iter = keys.lower_bound(i); - for (int j = 0; j < 3; j++) { - if (model_iter == keys.end()) { - EXPECT_TRUE(!iter.Valid()); - break; - } else { - EXPECT_TRUE(iter.Valid()); - EXPECT_EQ(*model_iter, iter.key()); - ++model_iter; - iter.Next(); - } - } - } - - // Backward iteration test - { - SkipList::Iterator iter(&list); - iter.SeekToLast(); - - // Compare against model iterator - for (std::set::reverse_iterator model_iter = keys.rbegin(); model_iter != keys.rend(); - ++model_iter) { - EXPECT_TRUE(iter.Valid()); - EXPECT_EQ(*model_iter, iter.key()); - iter.Prev(); - } - EXPECT_TRUE(!iter.Valid()); - } - delete cmp; -} - -// Only non-DUP model will use Find() and InsertWithHint(). -TEST_F(SkipTest, InsertWithHintNoneDupModel) { - std::unique_ptr arena(new vectorized::Arena()); - - const int N = 2000; - const int R = 5000; - Random rnd(1000); - std::set keys; - TestComparator* cmp = new TestComparator(); - SkipList list(cmp, arena.get(), false); - SkipList::Hint hint; - for (int i = 0; i < N; i++) { - Key key = rnd.Next() % R; - bool is_exist = list.Find(key, &hint); - if (keys.insert(key).second) { - EXPECT_FALSE(is_exist); - list.InsertWithHint(key, is_exist, &hint); - } else { - EXPECT_TRUE(is_exist); - } - } - - for (int i = 0; i < R; i++) { - if (list.Contains(i)) { - EXPECT_EQ(keys.count(i), 1); - } else { - EXPECT_EQ(keys.count(i), 0); - } - } - delete cmp; -} - -// We want to make sure that with a single writer and multiple -// concurrent readers (with no synchronization other than when a -// reader's iterator is created), the reader always observes all the -// data that was present in the skip list when the iterator was -// constructor. Because insertions are happening concurrently, we may -// also observe new values that were inserted since the iterator was -// constructed, but we should never miss any values that were present -// at iterator construction time. -// -// We generate multi-part keys: -// -// where: -// key is in range [0..K-1] -// gen is a generation number for key -// hash is hash(key,gen) -// -// The insertion code picks a random key, sets gen to be 1 + the last -// generation number inserted for that key, and sets hash to Hash(key,gen). -// -// At the beginning of a read, we snapshot the last inserted -// generation number for each key. We then iterate, including random -// calls to Next() and Seek(). For every key we encounter, we -// check that it is either expected given the initial snapshot or has -// been concurrently added since the iterator started. -class ConcurrentTest { -private: - static const uint32_t K = 4; - - static uint64_t key(Key key) { return (key >> 40); } - static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; } - static uint64_t hash(Key key) { return key & 0xff; } - - static uint64_t hash_numbers(uint64_t k, uint64_t g) { - uint64_t data[2] = {k, g}; - return HashUtil::hash(reinterpret_cast(data), sizeof(data), 0); - } - - static Key make_key(uint64_t k, uint64_t g) { - EXPECT_EQ(sizeof(Key), sizeof(uint64_t)); - EXPECT_LE(k, K); // We sometimes pass K to seek to the end of the skiplist - EXPECT_LE(g, 0xffffffffu); - return ((k << 40) | (g << 8) | (hash_numbers(k, g) & 0xff)); - } - - static bool is_valid_key(Key k) { return hash(k) == (hash_numbers(key(k), gen(k)) & 0xff); } - - static Key random_target(Random* rnd) { - switch (rnd->Next() % 10) { - case 0: - // Seek to beginning - return make_key(0, 0); - case 1: - // Seek to end - return make_key(K, 0); - default: - // Seek to middle - return make_key(rnd->Next() % K, 0); - } - } - - // Per-key generation - struct State { - std::atomic generation[K]; - void set(int k, int v) { generation[k].store(v, std::memory_order_release); } - int get(int k) { return generation[k].load(std::memory_order_acquire); } - - State() { - for (int k = 0; k < K; k++) { - set(k, 0); - } - } - }; - - // Current state of the test - State _current; - - std::unique_ptr _arena; - std::shared_ptr _comparator; - // SkipList is not protected by _mu. We just use a single writer - // thread to modify it. - SkipList _list; - -public: - ConcurrentTest() - : _arena(new vectorized::Arena()), - _comparator(new TestComparator()), - _list(_comparator.get(), _arena.get(), false) {} - - // REQUIRES: External synchronization - void write_step(Random* rnd) { - const uint32_t k = rnd->Next() % K; - const int g = _current.get(k) + 1; - const Key new_key = make_key(k, g); - bool overwritten = false; - _list.Insert(new_key, &overwritten); - _current.set(k, g); - } - - void read_step(Random* rnd) { - // Remember the initial committed state of the skiplist. - State initial_state; - for (int k = 0; k < K; k++) { - initial_state.set(k, _current.get(k)); - } - - Key pos = random_target(rnd); - SkipList::Iterator iter(&_list); - iter.Seek(pos); - while (true) { - Key current; - if (!iter.Valid()) { - current = make_key(K, 0); - } else { - current = iter.key(); - EXPECT_TRUE(is_valid_key(current)) << current; - } - EXPECT_LE(pos, current) << "should not go backwards"; - - // Verify that everything in [pos,current) was not present in - // initial_state. - while (pos < current) { - EXPECT_LT(key(pos), K) << pos; - - // Note that generation 0 is never inserted, so it is ok if - // <*,0,*> is missing. - EXPECT_TRUE((gen(pos) == 0) || - (gen(pos) > static_cast(initial_state.get(key(pos))))) - << "key: " << key(pos) << "; gen: " << gen(pos) - << "; initgen: " << initial_state.get(key(pos)); - - // Advance to next key in the valid key space - if (key(pos) < key(current)) { - pos = make_key(key(pos) + 1, 0); - } else { - pos = make_key(key(pos), gen(pos) + 1); - } - } - - if (!iter.Valid()) { - break; - } - - if (rnd->Next() % 2) { - iter.Next(); - pos = make_key(key(pos), gen(pos) + 1); - } else { - Key new_target = random_target(rnd); - if (new_target > pos) { - pos = new_target; - iter.Seek(new_target); - } - } - } - } -}; -const uint32_t ConcurrentTest::K; - -// Simple test that does single-threaded testing of the ConcurrentTest -// scaffolding. -TEST_F(SkipTest, ConcurrentWithoutThreads) { - ConcurrentTest test; - Random rnd(random_seed); - for (int i = 0; i < 10000; i++) { - test.read_step(&rnd); - test.write_step(&rnd); - } -} - -class TestState { -public: - ConcurrentTest _t; - int _seed; - std::atomic _quit_flag; - - enum ReaderState { STARTING, RUNNING, DONE }; - - explicit TestState(int s) : _seed(s), _quit_flag(false), _state(STARTING) {} - - void wait(ReaderState s) { - std::unique_lock l(_mu); - while (_state != s) { - _cv_state.wait(l); - } - } - - void change(ReaderState s) { - std::lock_guard l(_mu); - _state = s; - _cv_state.notify_one(); - } - -private: - std::mutex _mu; - ReaderState _state; - std::condition_variable _cv_state; -}; - -static void concurrent_reader(void* arg) { - TestState* state = reinterpret_cast(arg); - Random rnd(state->_seed); - state->change(TestState::RUNNING); - while (!state->_quit_flag.load(std::memory_order_acquire)) { - state->_t.read_step(&rnd); - } - state->change(TestState::DONE); -} - -static void run_concurrent(int run) { - const int seed = random_seed + (run * 100); - Random rnd(seed); - const int N = LOOP_LESS_OR_MORE(10, 1000); - const int kSize = 1000; - PriorityThreadPool thread_pool(10, 100, "ut"); - for (int i = 0; i < N; i++) { - if ((i % 100) == 0) { - fprintf(stderr, "Run %d of %d\n", i, N); - } - TestState state(seed + 1); - thread_pool.offer(std::bind(concurrent_reader, &state)); - state.wait(TestState::RUNNING); - for (int i = 0; i < kSize; i++) { - state._t.write_step(&rnd); - } - state._quit_flag.store(true, std::memory_order_release); // Any non-nullptr arg will do - state.wait(TestState::DONE); - } -} - -TEST_F(SkipTest, Concurrent) { - for (int i = 1; i < LOOP_LESS_OR_MORE(2, 6); ++i) { - run_concurrent(i); - } -} - -} // namespace doris