Skip to content

Commit

Permalink
utube: fix slow take on busy utubes
Browse files Browse the repository at this point in the history
If some of the utube for tasks at the top of the queue were busy
most of the time, `take` would slow down for every other task.
This problem is fixed by creating a new space `space_ready`. It
contains first task with `READY` status from each utube.

This solution shows great results for the stated problem, with the cost
of slowing the `put` method (it is ~3 times slower). Thus, this workaround is
disabled by default. To enable it, user should set the `v2 = true` as an
option while creating the tube. As example:
```lua
local test_queue = queue.create_tube('test_queue', 'utube',
        {temporary = true, v2 = true})
```

Part of #228
  • Loading branch information
DerekBum committed Apr 28, 2024
1 parent aa7c092 commit 8b14921
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 34 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Added
- `v2` boolean option for creating a `utube` tube (#228). It enables the
workaround for slow takes while working with busy tubes.

### Fixed

- Stuck in `INIT` state if an instance failed to enter the `running` mode
in time (#226). This fix works only for Tarantool versions >= 2.10.0.
- Slow takes on busy `utube` tubes (#228). The workaround could be enabled by
passing the `v2 = true` option while creating the tube.

## [1.3.3] - 2023-09-13

Expand Down
101 changes: 101 additions & 0 deletions queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,35 @@ function tube.create_space(space_name, opts)
type = 'tree',
parts = {2, str_type(), 3, str_type(), 1, num_type()}
})
space.v2 = opts.v2
return space
end

-- start tube on space
function tube.new(space, on_task_change)
validate_space(space)

local space_ready_name = space.name .. "_utube_ready"
local space_ready = box.space[space_ready_name]
if space.v2 and not space_ready then
-- Create a space for first ready tasks from each utube.
space_ready = box.schema.create_space(space_ready_name, space_opts)
space_ready:create_index('task_id', {
type = 'tree',
parts = {1, num_type()}
})
space_ready:create_index('utube', {
type = 'tree',
parts = {2, str_type()}
})
end

on_task_change = on_task_change or (function() end)
local self = setmetatable({
space = space,
space_ready = space_ready,
on_task_change = on_task_change,
v2 = space.v2 or false,
}, { __index = method })
return self
end
Expand All @@ -73,6 +91,20 @@ function method.normalize_task(self, task)
return task and task:transform(3, 1)
end

local function put_ready_task(self, id, utube)
local added = self.space_ready.index.utube:get{utube}
if added == nil then
self.space_ready:insert{id, utube}
end
end

local function put_next_ready(self, utube)
local next_task = self.space.index.utube:min{state.READY, utube}
if next_task ~= nil then
put_ready_task(self, next_task[1], utube)
end
end

-- put task in space
function method.put(self, data, opts)
local max
Expand All @@ -98,12 +130,62 @@ function method.put(self, data, opts)

local id = max and max[1] + 1 or 0
local task = self.space:insert{id, state.READY, tostring(opts.utube), data}
if self.v2 then
put_ready_task(self, id, task[3])
end
self.on_task_change(task, 'put')
return task
end

local function delete_ready(self, id, utube)
self.space_ready:delete(id)
put_next_ready(self, utube)
end

local function take_ready(self)
for s, task_ready in self.space_ready:pairs({}, { iterator = 'GE' }) do
local id = task_ready[1]
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())
local task

if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
task = self.space:get(id)
box.commit()
else
task = self.space:get(id)
end

if task[2] == state.READY then
local taken

if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
taken = self.space.index.utube:min{state.TAKEN, task[3]}
box.commit()
else
taken = self.space.index.utube:min{state.TAKEN, task[3]}
end

if taken == nil or taken[2] ~= state.TAKEN then
task = self.space:update(id, { { '=', 2, state.TAKEN } })

delete_ready(self, id, task[3])

self.on_task_change(task, 'take')
return task
end
end
end
end

-- take task
function method.take(self)
if self.v2 then
return take_ready(self)
end

for s, task in self.space.index.status:pairs(state.READY,
{ iterator = 'GE' }) do
if task[2] ~= state.READY then
Expand Down Expand Up @@ -146,6 +228,10 @@ function method.delete(self, id)
local task = self.space:get(id)
self.space:delete(id)
if task ~= nil then
if self.v2 then
delete_ready(self, id, task[3])
end

task = task:transform(2, 1, state.DONE)

local neighbour = self.space.index.utube:min{state.READY, task[3]}
Expand All @@ -157,10 +243,22 @@ function method.delete(self, id)
return task
end

local function on_ready_status_change(self, utube)
local prev_task = self.space_ready.index.utube:get{utube}
if prev_task ~= nil then
delete_ready(self, prev_task[1], utube)
else
put_next_ready(self, utube)
end
end

-- release task
function method.release(self, id, opts)
local task = self.space:update(id, {{ '=', 2, state.READY }})
if task ~= nil then
if self.v2 then
on_ready_status_change(self, task[3])
end
self.on_task_change(task, 'release')
end
return task
Expand Down Expand Up @@ -193,6 +291,9 @@ function method.kick(self, count)
end

task = self.space:update(task[1], {{ '=', 2, state.READY }})
if self.v2 then
on_ready_status_change(self, task[3])
end
self.on_task_change(task, 'kick')
end
return count
Expand Down
72 changes: 38 additions & 34 deletions t/030-utube.t
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,49 @@ test:ok(queue, 'queue is loaded')

local tube = queue.create_tube('test', 'utube', { engine = engine })
local tube2 = queue.create_tube('test_stat', 'utube', { engine = engine })
local tubev2 = queue.create_tube('test_stat_v2', 'utube',
{ engine = engine, v2 = true })
test:ok(tube, 'test tube created')
test:is(tube.name, 'test', 'tube.name')
test:is(tube.type, 'utube', 'tube.type')

test:test('Utube statistics', function(test)
test:plan(13)
tube2:put('stat_0')
tube2:put('stat_1')
tube2:put('stat_2')
tube2:put('stat_3')
tube2:put('stat_4')
tube2:delete(4)
tube2:take(.001)
tube2:release(0)
tube2:take(.001)
tube2:ack(0)
tube2:bury(1)
tube2:bury(2)
tube2:kick(1)
tube2:take(.001)

local stats = queue.statistics('test_stat')

-- check tasks statistics
test:is(stats.tasks.taken, 1, 'tasks.taken')
test:is(stats.tasks.buried, 1, 'tasks.buried')
test:is(stats.tasks.ready, 1, 'tasks.ready')
test:is(stats.tasks.done, 2, 'tasks.done')
test:is(stats.tasks.delayed, 0, 'tasks.delayed')
test:is(stats.tasks.total, 3, 'tasks.total')

-- check function call statistics
test:is(stats.calls.delete, 1, 'calls.delete')
test:is(stats.calls.ack, 1, 'calls.ack')
test:is(stats.calls.take, 3, 'calls.take')
test:is(stats.calls.kick, 1, 'calls.kick')
test:is(stats.calls.bury, 2, 'calls.bury')
test:is(stats.calls.put, 5, 'calls.put')
test:is(stats.calls.release, 1, 'calls.release')
test:plan(26)
for _, tube_stat in ipairs({tube2, tubev2}) do
tube_stat:put('stat_0')
tube_stat:put('stat_1')
tube_stat:put('stat_2')
tube_stat:put('stat_3')
tube_stat:put('stat_4')
tube_stat:delete(4)
tube_stat:take(.001)
tube_stat:release(0)
tube_stat:take(.001)
tube_stat:ack(0)
tube_stat:bury(1)
tube_stat:bury(2)
tube_stat:kick(1)
tube_stat:take(.001)

local stats = queue.statistics('test_stat')

-- check tasks statistics
test:is(stats.tasks.taken, 1, 'tasks.taken')
test:is(stats.tasks.buried, 1, 'tasks.buried')
test:is(stats.tasks.ready, 1, 'tasks.ready')
test:is(stats.tasks.done, 2, 'tasks.done')
test:is(stats.tasks.delayed, 0, 'tasks.delayed')
test:is(stats.tasks.total, 3, 'tasks.total')

-- check function call statistics
test:is(stats.calls.delete, 1, 'calls.delete')
test:is(stats.calls.ack, 1, 'calls.ack')
test:is(stats.calls.take, 3, 'calls.take')
test:is(stats.calls.kick, 1, 'calls.kick')
test:is(stats.calls.bury, 2, 'calls.bury')
test:is(stats.calls.put, 5, 'calls.put')
test:is(stats.calls.release, 1, 'calls.release')
end
end)


Expand Down
94 changes: 94 additions & 0 deletions t/benchmark/busy_utubes.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/usr/bin/env tarantool

local clock = require('clock')
local os = require('os')
local fiber = require('fiber')
local queue = require('queue')

-- Set the number of consumers.
local consumers_count = 10
-- Set the number of tasks processed by one consumer per iteration.
local batch_size = 150000

local barrier = fiber.cond()
local wait_count = 0

box.cfg()

local test_queue = queue.create_tube('test_queue', 'utube',
{temporary = true, v2 = true})

local function prepare_tasks()
local test_data = 'test data'

for i = 1, consumers_count do
for _ = 1, batch_size do
test_queue:put(test_data, {utube = tostring(i)})
end
end
end

local function prepare_consumers()
local consumers = {}

-- Make half the utubes busy.
for _ = 1, consumers_count / 2 do
test_queue:take()
end

for i = 1, consumers_count / 2 do
consumers[i] = fiber.create(function()
wait_count = wait_count + 1
-- Wait for all consumers to start.
barrier:wait()

-- Ack the tasks.
for _ = 1, batch_size do
local task = test_queue:take()
test_queue:ack(task[1])
end

wait_count = wait_count + 1
end)
end

return consumers
end

local function multi_consumer_bench()
--- Wait for all consumer fibers.
local wait_all = function()
while (wait_count ~= consumers_count / 2) do
fiber.yield()
end
wait_count = 0
end

fiber.set_max_slice(100)

prepare_tasks()

-- Wait for all consumers to start.
local consumers = prepare_consumers()
wait_all()

-- Start timing of task confirmation.
local start_ack_time = clock.proc64()
barrier:broadcast()
-- Wait for all tasks to be acked.
wait_all()
-- Complete the timing of task confirmation.
local complete_time = clock.proc64()

-- Print the result in milliseconds.
print(string.format("Time it takes to confirm the tasks: %i",
tonumber((complete_time - start_ack_time) / 10^6)))
end

-- Start benchmark.
multi_consumer_bench()

-- Cleanup.
test_queue:drop()

os.exit(0)
Loading

0 comments on commit 8b14921

Please sign in to comment.