Skip to content

Commit

Permalink
utube: implement ready space for vinyl engine
Browse files Browse the repository at this point in the history
In case of a transaction conflict for 'vinyl' we need to retry an
entire transaction.

Part of #230
  • Loading branch information
DerekBum committed Jun 24, 2024
1 parent df24f17 commit 645fec6
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 107 deletions.
215 changes: 139 additions & 76 deletions queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ function tube.new(space, on_task_change, opts)

local space_ready_buffer_name = space.name .. "_ready_buffer"
local space_ready_buffer = box.space[space_ready_buffer_name]
-- Feature implemented only for memtx engine for now.
-- https://github.com/tarantool/queue/issues/230.
if opts.storage_mode == tube.STORAGE_MODE_READY_BUFFER and opts.engine == 'vinyl' then
error(string.format('"%s" storage mode cannot be used with vinyl engine',
tube.STORAGE_MODE_READY_BUFFER))
end

local ready_space_mode = (opts.storage_mode == tube.STORAGE_MODE_READY_BUFFER)
if ready_space_mode then
Expand Down Expand Up @@ -167,6 +161,10 @@ local function commit()
box.commit()
end

local function rollback()
box.rollback()
end

local function empty()
end

Expand All @@ -179,14 +177,31 @@ local function begin_if_not_in_txn()

if not box.is_in_txn() then
box.begin(transaction_opts)
return commit
return commit, rollback
else
return empty
return empty, empty
end
end

-- Try commiting operations until success. This is required for 'vinyl' engine.
-- In case of a transaction conflict for 'vinyl' we need to retry an entire
-- transaction.
local function try_commit_several_times(func, ...)
local ok = false
local ret
while not ok do
local commit_func, rollback_func = begin_if_not_in_txn()
ok, ret = pcall(func, commit_func, ...)
if ok then
return ret
end
rollback_func()
require('fiber').yield()
end
end

-- put task in space
function method.put(self, data, opts)
local function put(self, data, opts, commit_func)
-- Taking the minimum is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
Expand All @@ -197,8 +212,6 @@ function method.put(self, data, opts)
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
local commit_func = begin_if_not_in_txn()

local max = self.space.index.task_id:max()

local id = max and max[1] + 1 or 0
Expand All @@ -213,11 +226,18 @@ function method.put(self, data, opts)
return task
end

-- put task in space
function method.put(self, data, opts)
local commit_body = function(commit_func)
return put(self, data, opts, commit_func)
end

return try_commit_several_times(commit_body)
end

-- Take the first task form the ready_buffer.
local function take_ready(self)
local function take_ready(self, commit_func)
while true do
local commit_func = begin_if_not_in_txn()

local task_ready = self.space_ready_buffer.index.task_id:min()
if task_ready == nil then
commit_func()
Expand Down Expand Up @@ -247,45 +267,57 @@ local function take_ready(self)
end
end

local function take(self)
for s, task in self.space.index.status:pairs(state.READY,
{ iterator = 'GE' }) do
if task[2] ~= state.READY then
break
end
-- Taking the minimum is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
-- It is hapenning because 'min' for several takes in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- Current fix does not resolve that bug in situations when we already are in transaction
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
local commit_func = begin_if_not_in_txn()
local taken = self.space.index.utube:min{state.TAKEN, task[3]}
local take_complete = false
local function take_step(self, task, commit_func)
-- Taking the minimum is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
-- It is hapenning because 'min' for several takes in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- Current fix does not resolve that bug in situations when we already are in transaction
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
local taken = self.space.index.utube:min{state.TAKEN, task[3]}
local take_complete = false

if taken == nil or taken[2] ~= state.TAKEN then
task = self.space:update(task[1], { { '=', 2, state.TAKEN } })
take_complete = true
end
if taken == nil or taken[2] ~= state.TAKEN then
task = self.space:update(task[1], { { '=', 2, state.TAKEN } })
take_complete = true
end

commit_func()
if take_complete then
self.on_task_change(task, 'take')
return task
end
commit_func()
if take_complete then
self.on_task_change(task, 'take')
return task
end
end

-- take task
function method.take(self)
if self.ready_space_mode then
return take_ready(self)
local commit_body = function(commit_func)
return take_ready(self, commit_func)
end

return try_commit_several_times(commit_body)
end

for _, task in self.space.index.status:pairs(state.READY,
{ iterator = 'GE' }) do
if task[2] ~= state.READY then
break
end

local commit_body = function(commit_func)
return take_step(self, task, commit_func)
end

local ret = try_commit_several_times(commit_body)
if ret ~= nil then
return ret
end
end
return take(self)
end

-- touch task
Expand All @@ -300,9 +332,7 @@ local function delete_ready(self, id, utube)
end

-- delete task
function method.delete(self, id)
local commit_func = begin_if_not_in_txn()

local function delete(self, id, commit_func)
local task = self.space:get(id)
self.space:delete(id)
if task ~= nil then
Expand Down Expand Up @@ -331,10 +361,17 @@ function method.delete(self, id)
return task
end

-- release task
function method.release(self, id, opts)
local commit_func = begin_if_not_in_txn()
-- delete task
function method.delete(self, id)
local commit_body = function(commit_func)
return delete(self, id, commit_func)
end

return try_commit_several_times(commit_body)
end

-- release task
local function release(self, id, opts, commit_func)
local task = self.space:update(id, {{ '=', 2, state.READY }})
if task ~= nil then
if self.ready_space_mode then
Expand All @@ -357,10 +394,17 @@ function method.release(self, id, opts)
return task
end

-- bury task
function method.bury(self, id)
local commit_func = begin_if_not_in_txn()
-- release task
function method.release(self, id, opts)
local commit_body = function(commit_func)
return release(self, id, opts, commit_func)
end

return try_commit_several_times(commit_body)
end

-- bury task
local function bury(self, id, commit_func)
local current_task = self.space:get{id}
local task = self.space:update(id, {{ '=', 2, state.BURIED }})
if task ~= nil then
Expand Down Expand Up @@ -390,35 +434,54 @@ function method.bury(self, id)
return task
end

-- unbury several tasks
function method.kick(self, count)
for i = 1, count do
local commit_func = begin_if_not_in_txn()
-- bury task
function method.bury(self, id)
local commit_body = function(commit_func)
return bury(self, id, commit_func)
end

local task = self.space.index.status:min{ state.BURIED }
if task == nil then
return i - 1
end
if task[2] ~= state.BURIED then
return i - 1
end
return try_commit_several_times(commit_body)
end

task = self.space:update(task[1], {{ '=', 2, state.READY }})
if self.ready_space_mode then
local prev_task = self.space_ready_buffer.index.utube:get{task[3]}
if prev_task ~= nil then
if prev_task[1] > task[1] then
self.space_ready_buffer:delete(prev_task[1])
self.space_ready_buffer:insert({task[1], task[2]})
end
else
put_ready(self, task[3])
-- unbury several tasks
local function kick_step(self, id, commit_func)
local task = self.space.index.status:min{ state.BURIED }
if task == nil then
return id - 1
end
if task[2] ~= state.BURIED then
return id - 1
end

task = self.space:update(task[1], {{ '=', 2, state.READY }})
if self.ready_space_mode then
local prev_task = self.space_ready_buffer.index.utube:get{task[3]}
if prev_task ~= nil then
if prev_task[1] > task[1] then
self.space_ready_buffer:delete(prev_task[1])
self.space_ready_buffer:insert({task[1], task[2]})
end
else
put_ready(self, task[3])
end
end

commit_func()
commit_func()

self.on_task_change(task, 'kick')
end

self.on_task_change(task, 'kick')
-- unbury several tasks
function method.kick(self, count)
for i = 1, count do
local commit_body = function(commit_func)
return kick_step(self, i, commit_func)
end

local ret = try_commit_several_times(commit_body)
if ret ~= nil then
return ret
end
end
return count
end
Expand Down
40 changes: 9 additions & 31 deletions t/030-utube.t
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,16 @@ test:ok(queue, 'queue is loaded')
local tube = queue.create_tube('test', 'utube', { engine = engine })
local tube2 = queue.create_tube('test_stat', 'utube', { engine = engine })
local tube_ready, tube2_ready
if engine ~= 'vinyl' then
tube_ready = queue.create_tube('test_ready', 'utube',
{ engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER })
tube2_ready = queue.create_tube('test_stat_ready', 'utube',
{ engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER })
end
tube_ready = queue.create_tube('test_ready', 'utube',
{ engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER })
tube2_ready = queue.create_tube('test_stat_ready', 'utube',
{ engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER })
test:ok(tube, 'test tube created')
test:is(tube.name, 'test', 'tube.name')
test:is(tube.type, 'utube', 'tube.type')

test:test('Utube statistics', function(test)
if engine ~= 'vinyl' then
test:plan(13 * 2)
else
test:plan(13)
end
test:plan(13 * 2)
for _, tube_stat in ipairs({tube2, tube2_ready}) do
if tube_stat == nil then
break
Expand Down Expand Up @@ -78,11 +72,7 @@ end)


test:test('Easy put/take/ack', function(test)
if engine ~= 'vinyl' then
test:plan(12 * 2)
else
test:plan(12)
end
test:plan(12 * 2)

for _, test_tube in ipairs({tube, tube_ready}) do
if test_tube == nil then
Expand Down Expand Up @@ -110,11 +100,7 @@ test:test('Easy put/take/ack', function(test)
end)

test:test('ack in utube', function(test)
if engine ~= 'vinyl' then
test:plan(8 * 2)
else
test:plan(8)
end
test:plan(8 * 2)

for _, test_tube in ipairs({tube, tube_ready}) do
if test_tube == nil then
Expand Down Expand Up @@ -145,11 +131,7 @@ test:test('ack in utube', function(test)
end
end)
test:test('bury in utube', function(test)
if engine ~= 'vinyl' then
test:plan(8 * 2)
else
test:plan(8)
end
test:plan(8 * 2)

for _, test_tube in ipairs({tube, tube_ready}) do
if test_tube == nil then
Expand Down Expand Up @@ -180,11 +162,7 @@ test:test('bury in utube', function(test)
end
end)
test:test('instant bury', function(test)
if engine ~= 'vinyl' then
test:plan(1 * 2)
else
test:plan(1)
end
test:plan(1 * 2)
tube:put(1, {ttr=60})
local taken = tube:take(.1)
test:is(tube:bury(taken[1])[2], '!', 'task is buried')
Expand Down

0 comments on commit 645fec6

Please sign in to comment.