Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added not_check_session feature #24

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ M.upgrade(space, {
ttr = true|number, -- requires `runat` field
-- Time To Release. Task is returned into [R]eady unless processed (turned to ack|release from taken) within time
-- if number, then with default ttl, otherwise only if set during take

not_check_session = true, -- requires 'ttr'
-- if true, not check session on ack/bury/release and not release task on disconnect
},
-- Set tubes for which statistics collector will be enabled
tube_stats = { 'tube-1', 'tube-2' },
Expand Down
144 changes: 144 additions & 0 deletions test/network_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,147 @@ function g.test_untake()

tt:close()
end

function g.test_not_untake_with_not_check_session()
local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]]
queue:format({
{ name = 'id', type = 'unsigned' },
{ name = 'status', type = 'string' },
{ name = 'runat', type = 'number' },
{ name = 'payload', type = 'any' },
})
local F = { id = 1, status = 2, runat = 3, payload = 4 }

queue:create_index('primary', { parts = {'id'} })
queue:create_index('status', { parts = {'status', 'id'} })
queue:create_index('runat', { parts = {'runat', 'id'} })

xqueue.upgrade(queue, {
features = {
id = 'time64',
delayed = true,
retval = 'table',
not_check_session = true,
ttr = 60,
},
fields = {
status = 'status',
runat = 'runat',
},
})

local tt = netbox.connect('127.0.0.1:3301', { wait_connected = true })
t.assert(tt:ping(), "connected to self")

local task = queue:put({ payload = { time = clock.time() } })
t.assert(task, ":put() has been inserted task")

local awaiter_fin = fiber.channel()

fiber.create(function()
local ret, is_processed = queue:wait(task, 3)
t.assert_equals(ret.id, task.id, "Task has been awaited")
t.assert_equals(is_processed, true, "Task has been processed by the consumer")
t.assert(awaiter_fin:put({ ret, is_processed }), "awaiter results were measured")
end)

local taken = tt:call('box.space.queue:take', {1}, { timeout = 1 })
t.assert(taken, ":take() returned task via the network")
t.assert_equals(task.id, taken.id, "retutned the same task")
t.assert(box.space.queue.xq.taken, nil, "taken table is empty")
t.assert(box.space.queue.xq.bysid, nil, "bysid table is empty")

-- not untake on close connection
tt:close()
fiber.sleep(1)

t.assert_equals(queue:get({ task.id }).status, 'T', "task still in T status")

tt = netbox.connect('127.0.0.1:3301', { wait_connected = true })

local processed_at = clock.time()
local acked = tt:call('box.space.queue:ack', {taken, { update = {{'=', F.payload, { processed_at = processed_at }}} }}, {timeout = 1})
t.assert_equals(acked[1], taken.id, ":ack() returned taken but completed task")

local awaiter_res = awaiter_fin:get()
t.assert_equals(awaiter_res[1].id, acked[1], "awaiter saw acknowledged task")
t.assert_equals(awaiter_res[2], true, "awaiter saw task as processed")

tt:close()
end

function g.test_untake_with_not_check_session_by_ttr()
local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]]
queue:format({
{ name = 'id', type = 'unsigned' },
{ name = 'status', type = 'string' },
{ name = 'runat', type = 'number' },
{ name = 'payload', type = 'any' },
})
local F = { id = 1, status = 2, runat = 3, payload = 4 }

queue:create_index('primary', { parts = {'id'} })
queue:create_index('status', { parts = {'status', 'id'} })
queue:create_index('runat', { parts = {'runat', 'id'} })

xqueue.upgrade(queue, {
features = {
id = 'time64',
delayed = true,
retval = 'table',
not_check_session = true,
ttr = 5,
},
fields = {
status = 'status',
runat = 'runat',
},
})

local tt = netbox.connect('127.0.0.1:3301', { wait_connected = true })
t.assert(tt:ping(), "connected to self")

local task = queue:put({ payload = { time = clock.time() } })
t.assert(task, ":put() has been inserted task")

local awaiter_fin = fiber.channel()

local taken = tt:call('box.space.queue:take', {1}, { timeout = 1 })
t.assert(taken, ":take() returned task via the network")
t.assert_equals(task.id, taken.id, "retutned the same task")
t.assert(queue.xq.taken, nil, "taken table is empty")
t.assert(queue.xq.bysid, nil, "bysid table is empty")

fiber.create(function()
local ret, is_processed = queue:wait(task, 7)
t.assert_equals(ret.id, task.id, "Task has been awaited")
t.assert_equals(is_processed, true, "Task has been processed by the consumer")
t.assert(awaiter_fin:put({ ret, is_processed }), "awaiter results were measured")
end)

-- not untake on close connection
tt:close()
t.assert_equals(queue:get({ task.id }).status, 'T', "task still in T status")

-- untake by ttr
fiber.sleep(5)
t.assert_equals(queue:get({ task.id }).status, 'R', "task was returned to R status")

tt = netbox.connect('127.0.0.1:3301', { wait_connected = true })

taken = tt:call('box.space.queue:take', {1}, { timeout = 1 })
t.assert(taken, ":take() returned task via the network (2nd)")
t.assert_equals(task.id, taken.id, "retutned the same task (2nd)")
t.assert(queue.xq.taken, nil, "taken table is empty")
t.assert(queue.xq.bysid, nil, "bysid table is empty")

local processed_at = clock.time()
local acked = tt:call('box.space.queue:ack', {taken, { update = {{'=', F.payload, { processed_at = processed_at }}} }}, {timeout = 1})
t.assert_equals(acked[1], taken.id, ":ack() returned taken but completed task")

local awaiter_res = awaiter_fin:get()
t.assert_equals(awaiter_res[1].id, acked[1], "awaiter saw acknowledged task")
t.assert_equals(awaiter_res[2], true, "awaiter saw task as processed")

tt:close()
end
54 changes: 53 additions & 1 deletion test/replicaset_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,29 @@ function g.test_start(test)
retval = 'table',
} --[[@as xqueue.upgrade.options]]

local queue_not_check_session = {
name = 'queue_not_check_session',
format = {
{ name = 'id', type = 'string' },
{ name = 'status', type = 'string' },
{ name = 'runat', type = 'number' },
{ name = 'payload', type = 'any' },
},
features = {
id = 'uuid',
keep = false,
delayed = true,
not_check_session = true,
ttr = 60,
},
fields = {
id = 'id',
status = 'status',
runat = 'runat',
},
retval = 'table',
} --[[@as xqueue.upgrade.options]]

local had_error
local await = {}
for _, srv in pairs(replica_set.servers) do
Expand All @@ -200,6 +223,12 @@ function g.test_start(test)
had_error = err
log.error("%s: %s", srv.alias, err)
end

local ok, err = pcall(srv.exec, srv, setup_queue, {queue_not_check_session}, { timeout = 20 })
if not ok then
had_error = err
log.error("%s: %s", srv.alias, err)
end
end)
table.insert(await, fib)
end
Expand All @@ -214,6 +243,13 @@ function g.test_start(test)
local task = rw:call('box.space.simpleq:put', {{payload = {cookie = cookie}}, { delay = 5 } })
t.assert(task, "task was returned")

local task_not_check_session = rw:call('box.space.queue_not_check_session:put', {{payload = {}} })
t.assert(task_not_check_session, "task was returned")

local taken = rw:call('box.space.queue_not_check_session:take', { 1 }, { timeout = 1 })
t.assert(taken, ":take() returned task from master")
t.assert_equals(task_not_check_session.id, taken.id, "returned the same task")

local SWITCH_TIMEOUT = 10

if params.name == 'raft' then
Expand Down Expand Up @@ -274,8 +310,24 @@ function g.test_start(test)
t.assert_equals(trw.alias, rw.alias, "after rw-switch luatest succesfully derived new leader")
end

local awaiter_fin = fiber.channel()

fiber.create(function()
local ret, is_processed
ret, is_processed = rw:call('box.space.queue_not_check_session:wait', { task_not_check_session, 1 }, { timeout = 1 })
t.assert_equals(ret.id, task_not_check_session.id, "Task has been awaited")
t.assert_equals(is_processed, true, "Task has been processed by the consumer")
t.assert(awaiter_fin:put({ ret, is_processed }), "awaiter results were measured")
end)

local acked = rw:call('box.space.queue_not_check_session:ack', { taken }, { timeout = 1 })
t.assert_equals(acked[1], taken.id, ":ack() returned taken but completed task")

local awaiter_res = awaiter_fin:get()
t.assert_equals(awaiter_res[1].id, acked[1], "awaiter saw acknowledged task")
t.assert_equals(awaiter_res[2], true, "awaiter saw task as processed")

local task = rw:call('box.space.simpleq:take', { 5 }, { timeout = 6 })
t.assert(task, "delayed task has been succesfully taken from new leader")
t.assert_equals(task.payload.cookie, cookie, "task.cookie is valid")

end
53 changes: 45 additions & 8 deletions xqueue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ Interface:
-- if number, then with default ttl, otherwise only if set during put/release
ttr = true|number, -- requires `runat` field
-- if number, then with default ttl, otherwise only if set

not_check_session = true, -- if true, not check session on ack/bury/release and not release task on disconnect
-- requires 'ttr'
},
})

Expand Down Expand Up @@ -245,6 +248,7 @@ local methods = {}
---@field ttr boolean|number
---@field ttl_default number?
---@field ttr_default number?
---@field not_check_session boolean

---@class xq:table
---@field NEVER integer (Default: 0)
Expand Down Expand Up @@ -304,6 +308,7 @@ local methods = {}
---Requires runat field and index.
---@field ttr? boolean|number should xqueue allow Time-To-Release on tasks. When specified with number, this value used for ttl_default.
---Requires runat field and index.
---@field not_check_session? boolean should xqueue not check session. Requires enabled ttr feature.

---@class xqueue.upgrade.options
---@field format? boxSpaceFormat
Expand Down Expand Up @@ -745,6 +750,28 @@ function M.upgrade(space,opts,depth)
features.ttr = false
end

if opts.features.not_check_session then
-- feature not_check_session require ttr because in this case
-- task isn't released on disconnect so to prevent the task from being frozen in the queue we need ttr
if not features.ttr then
error(string.format("feature not_check_session requires enabled ttr" ),2+depth)
end

features.not_check_session = true

setmetatable(self.bysid, {
__serialize = 'map',
__newindex = function(_, _, _) end,
__index = function(_, _) return {} end,
})
setmetatable(self.taken, {
__serialize = 'map',
__newindex = function(_, _, _) end,
})
else
features.not_check_session = false
end

if fields.tube then
features.tube = true
end
Expand Down Expand Up @@ -865,7 +892,7 @@ function M.upgrade(space,opts,depth)
if not r then
log.error("Worker for {%s} has error: %s", key, e)
else
if xq.taken[ key ] then
if xq.taken[ key ] or self.features.not_check_session then
space:ack(task)
end
end
Expand Down Expand Up @@ -1000,6 +1027,10 @@ function M.upgrade(space,opts,depth)
if not t then
error(string.format( "Task {%s} was not found", key ),2)
end
-- if not need to check session that return task
if self.features.not_check_session then
return t
end
if not self.taken[key] then
error(string.format( "Task %s not taken by any", key ),2)
end
Expand All @@ -1017,9 +1048,9 @@ function M.upgrade(space,opts,depth)
if self.bysid[ sid ] then
self.bysid[ sid ][ key ] = nil
else
log.error( "Task {%s} marked as taken by sid=%s but bysid is null", key, sid)
log.error("Task {%s} marked as taken by sid=%s but bysid is null", key, sid)
end
else
elseif not self.features.not_check_session then
log.error( "Task {%s} not marked as taken, untake by sid=%s", key, box.session.id() )
end

Expand Down Expand Up @@ -1154,11 +1185,17 @@ function M.upgrade(space,opts,depth)
local t = space:get(realkey)
if t then
if t[ self.fields.status ] == 'T' then
self:wakeup(space:update({ realkey }, {
{ '=',self.fields.status,'R' },
self.have_runat and { '=', self.fields.runat, self.NEVER } or nil
}))
log.info("Rst: T->R {%s}", realkey )
-- if not check session we doesn't release task
-- it can be ack/bury/release from another connection
if self.features.not_check_session then
log.info("Rst: task %s taken by session %s not released", realkey, sid)
else
self:wakeup(space:update({ realkey }, {
{ '=',self.fields.status,'R' },
self.have_runat and { '=', self.fields.runat, self.NEVER } or nil
}))
log.info("Rst: T->R {%s}", realkey )
end
else
log.error( "Rst: %s->? {%s}: wrong status", t[self.fields.status], realkey )
end
Expand Down