-
Notifications
You must be signed in to change notification settings - Fork 1
Project 5 Milestone 1
- Implement the transaction concept that can support Isolation and Consistency using your lock manager(lock table).
- Your lock manager should provide:
- Conflict-serializable schedule for transactions
- Strict-2PL
- Deadlock detection (abort the transaction if detected)
- Record-level locking with Shared(S)/Exclusive(X) mode
We were given in the specification that we don't have to consider any transactions that contains insert or delete operation. What this means is that if a schedule is conflict serializable, then it is a serializable schedule.
To identify whether a schedule is conflict serializable, we can use Strict-2PL, and implement scheduling as follows:
bool conflict_exists(hash_table_entry_t* list, lock_t* lock) {
lock_t* curr = list->head;
while (curr != nullptr) {
if (curr == lock) return false;
if (curr->trx_id != lock->trx_id && curr->record_id == lock->record_id && (lock->lock_mode | curr->lock_mode) == 1) {
return true;
}
curr = curr->next;
}
return false;
}
When this function returns true, it means that there is a conflict between the lock and the lock in the list. If it returns false, it means that there is no conflict. When there is conflict, the thread that is trying to acquire the lock should sleep and wait for the preceeding thread to release the lock.
Waiting threads(sleeping threads) are woken up in the shrinking phase of 2PL protocol. It follows the following steps:
- S = acquire and continue
- S..S = acquire and continue
- X = acquire and finish
- SX = acquire if and only if they have the same trx_id, then finish
- S..SX = wait for other S locks to finish
Here, S means shared lock, X means exclusive lock, and the left hand side represent the lock list when we are trying to acquire the lock.
As stated in the lecture, if we use strict-2PL, we can assure that the schedule is conflict serializable. This is why our lock manager should provide Strict-2PL.
strict-2PL in my project is implemented as follows:
lock_t* trx_acquire(uint64_t trx_id, lock_t* lock) {
pthread_mutex_lock(&trx_table_latch);
auto it = trx_table.find(trx_id);
if (it != trx_table.end()) {
trx_entry_t* trx_entry = it->second;
lock->trx_next = trx_entry->lock;
trx_entry->lock = lock;
}
hash_table_entry_t* list = lock->sentinel;
update_wait_for_graph(list, lock);
if (detect_deadlock(lock->trx_id)) {
pthread_mutex_unlock(&trx_table_latch);
return nullptr;
}
while (conflict_exists(list, lock)) {
pthread_cond_wait(&lock->lock_table_cond, &trx_table_latch);
}
pthread_mutex_unlock(&trx_table_latch);
return lock;
}
This is part of Expanding Phase
where new locks are being acquired without having to release the existing lock.
In the Shrinking Phase
, a transaction can choose to commit, or to abort. In this project, abort is only called when deadlock is detected.
int trx_commit(int trx_id) {
pthread_mutex_lock(&trx_table_latch);
auto it = trx_table.find(trx_id);
if (it != trx_table.end()) {
trx_entry_t* trx_entry = it->second;
lock_t* lock = trx_entry->lock;
while (lock != nullptr) {
lock_t* tmp = lock->trx_next;
lock_release(lock);
lock = tmp;
}
trx_table.erase(trx_id);
}
pthread_mutex_unlock(&trx_table_latch);
return trx_id;
}
This function releases all locks that the transaction holds.
Even if we use conflict serializable schedule, we still need to detect deadlock. This is because we are not running a sigle threaded system. Although we can prevent deadlocks by using one of the two algorithms - wait-die
or wound-wait
. This is not what we are trying to implement here.
I maintained a wait-for graph to detect deadlocks. This is implemented as follows:
여기에 있는 deadlock detection 알고리즘은 latch sequence 때문에 제대로 작동하지 않습니다. Project 6 위키 최상단에 있는 latch 순서를 참고해서 작성해야합니다.
void update_wait_for_graph(hash_table_entry_t* list, lock_t* lock) {
lock_t* cur = lock->prev;
while (cur != nullptr) {
if(cur->record_id != lock->record_id) {
cur = cur->prev;
continue;
}
if(cur->lock_mode == LOCK_MODE_EXCLUSIVE){
if(cur->trx_id != lock->trx_id) {
trx_table[lock->trx_id]->wait_for.insert(cur->trx_id);
}
break;
} else if(lock->lock_mode == LOCK_MODE_EXCLUSIVE) {
if(cur->trx_id != lock->trx_id) {
trx_table[lock->trx_id]->wait_for.insert(cur->trx_id);
}
}
cur = cur->prev;
}
}
bool detect_deadlock(int trx_id) {
std::set<int> visited;
visited.insert(trx_id);
std::stack<std::pair<int,int>> dfs_stack;
dfs_stack.push({trx_id,trx_id});
while(dfs_stack.size() > 0) {
int par_trx_id = dfs_stack.top().first;
int curr_trx_id = dfs_stack.top().second;
dfs_stack.pop();
trx_entry_t* par_trx = trx_table[par_trx_id];
trx_entry_t* curr_trx = trx_table[curr_trx_id];
if(par_trx == nullptr) continue;
if(curr_trx == nullptr){
par_trx->wait_for.erase(curr_trx_id);
continue;
}
if(par_trx_id != curr_trx_id && trx_id == curr_trx_id) {
return true;
}
if(par_trx_id != curr_trx_id && visited.find(curr_trx_id) != visited.end()) {
continue;
}
visited.insert(curr_trx_id);
for(auto it = curr_trx->wait_for.begin(); it != curr_trx->wait_for.end();it++) {
dfs_stack.push({curr_trx_id,*it});
}
}
return false;
}
Because the lock_release function works as it is, we know that the deadlocks occur because of the X locks. Therefore, when we draw a wait-for-like structure, we can divide our lock table using X locks as pivots. And draw the wait-for edges between X locks and S locks. For instance, if we have a lock table like this:
We can draw the wait-for edges as so:
And draw wait-for-like structure like this:
we can identify cycles in this graph using DFS in the complexity of O(V+E)
To implement rollback feature in the project, I added following fields in the lock_t object.
char* original_value;
int original_size;
For the milestone 2, however, we need to move this to the trx_entry_t
object. This is because, in the milestone 2, we implement the implicit locking and we will not be able to roll back if we store this value in the lock object.
The only case when the transaction is aborted is when it detects a deadlock in detect_deadlock()
functinon. In this case, trx_acquire()
function returns nullptr, and the db_find()
or db_update()
will notice this and invoke trx_abort()
function.
void trx_abort(uint64_t trx_id) {
pthread_mutex_lock(&trx_table_latch);
auto it = trx_table.find(trx_id);
if (it != trx_table.end()) {
trx_entry_t* trx_entry = it->second;
lock_t* temp_lock = trx_entry->lock;
while (temp_lock != nullptr) {
if(temp_lock->original_value != nullptr) {
control_block_t* ctrl_block = buf_read_page(temp_lock->sentinel->table_id, temp_lock->sentinel->page_id);
slot_t slot = PageIO::BPT::LeafPage::get_nth_slot(ctrl_block->frame, temp_lock->record_id);
ctrl_block->frame->set_data(temp_lock->original_value, slot.get_offset(), temp_lock->original_size);
return_ctrl_block(&ctrl_block, 1);
}
lock_t* next = temp_lock->trx_next;
lock_release(temp_lock);
temp_lock = next;
}
trx_table.erase(trx_id);
}
pthread_mutex_unlock(&trx_table_latch);
}
trx_abort()
function is basically equivalent to the trx_commit()
function. The only difference is that it does the rollback procedure. Later, I will extract the common part of these two functions and create a subroutine. This subroutine will do the lock_release()
and trx_table.erase()
part.
To allow multiple threads to simultaneously access the pages from the buffer and to assure no pages in use is flushed, we use a pthread_mutex_t
to protect the buffer and the pages. Current implementation does not support multiple threads inserting and removing records from the database. One of the main reason of this is because, when structure modifications occur, we need to acquire two pages from the buffer. When multiple threads try to insert or remove, we will easily get the buffer pool flooded with control blocks that cannot be returned, causing program failure.
The buffer latches and page latches are implemented as follows:
control_block_t* find_victim() {
control_block_t* cur = victim;
pthread_mutex_lock(&cur->page_latch);
return cur;
}
control_block_t* add_new_page(int64_t table_id, pagenum_t page_number) {
control_block_t* cur = find_victim();
if (cur->is_dirty) {
file_write_page(cur->table_id, cur->pagenum, cur->frame);
}
if (cur->table_id >= 0)
pagemap.erase(std::make_pair(cur->table_id, cur->pagenum));
move_to_beg_of_list(cur);
file_read_page(table_id, page_number, cur->frame);
pagemap.emplace(std::make_pair(table_id, page_number), cur);
cur->table_id = table_id;
cur->pagenum = page_number;
cur->is_dirty = 0;
pthread_mutex_unlock(&buffer_manager_latch);
return cur;
}
void return_ctrl_block(control_block_t** ctrl_block, int is_dirty) {
if (ctrl_block == nullptr || (*ctrl_block) == nullptr) return;
(*ctrl_block)->is_dirty |= is_dirty;
control_block_t* tmp = *ctrl_block;
(*ctrl_block) = nullptr;
pthread_mutex_unlock(&(tmp->page_latch));
}
When buf_read_page()
is called, it internally calls add_new_page()
if the page is not in the buffer. This function will acquire page latch for the victim page(wait for another thread to finish using that page) and read from disk. This way, the caller will acquire the page latch without considering the buffer latches.
When returning the control block, the caller can invoke return_ctrl_block()
to unlock the page latch. The reason why it recieves a double pointer of control block is to prevent double-returning(similar to double free) of the control block.
- To improve the performance of getting data from the in-memory page structure, I have used reduced the number of copies occured in the
get_data()
function.
template<class T>
T page_t::get_data(uint16_t offset) {
#if DEBUG_MODE
offsetCheck<T>(offset);
#endif
T* ret = (T*)(data + offset);
return *ret;
}
- Just to get the testing passed, I have changed the
file_open_table_file()
to callsync()
after writing all 2560 pages. Ifsync()
is called every time it writes a page, the test will fail due to slow syncing process.
int64_t file_open_table_file(const char* pathname)
{
int64_t fd = FileIO::open(pathname);
if (FileIO::size(fd) == 0)
{
// defult size = 10MiB = 2560 pages (including header)
page_t header;
for (pagenum_t pagenum = INITIAL_FREE_PAGES; pagenum > 0; pagenum--)
{
page_t free_page;
PageIO::FreePage::set_next_free_pagenum(&free_page, pagenum - 1);
FileIO::write(fd, &free_page, PAGE_SIZE, pagenum * PAGE_SIZE);
}
PageIO::HeaderPage::set_num_pages(&header, INITIAL_FREE_PAGES + 1);
PageIO::HeaderPage::set_free_pagenum(&header, INITIAL_FREE_PAGES);
FileIO::write(fd, &header, PAGE_SIZE, 0);
}
sync();
return fd;
}
- For the debugging purpose, I have added a
#if DEBUG_MODE
block to the places where it should output in the debugging phase, but not in the release phase.
After finishing the milestone 1, and before starting milestone 2, I tried to tidy up my code a little bit.
Using the tool called Valgrind, I searched for memory leaks and fixed this too.
Testing server was prvided. But I created my own test too. The test I made is:
- SingleThread: Single Thread find and update
- SingleThreadRandom: Single Thread find and update but randomizeed
- SLoockOnlyTest: 10 threads run s locks
- XLockOnlyDisjointTest: 10 threads run x locks, they don't conflict each other
- XLockOnly: 10 threads run x loocks, they do conflict but doesn't cause deadlock
- MixedLock: 5 threads run s locks, 5 run x locks, they don't cause deadlock
- deadlock: cause deadlock, see if it aborts correctly