diff --git a/README.md b/README.md index 1c029ea..08252de 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,9 @@ M.upgrade(space, { ttr = true|number, -- requires `runat` field -- Time To Release. Task is returned into [R]eady unless processed (turned to ack|release from taken) within time -- if number, then with default ttl, otherwise only if set during take + + not_check_session = true, -- requires 'ttr' + -- if true, not check session on ack/bury/release and not release task on disconnect }, -- Set tubes for which statistics collector will be enabled tube_stats = { 'tube-1', 'tube-2' }, diff --git a/test/network_test.lua b/test/network_test.lua index 19d7506..5f731f8 100644 --- a/test/network_test.lua +++ b/test/network_test.lua @@ -87,3 +87,147 @@ function g.test_untake() tt:close() end + +function g.test_not_untake_with_not_check_session() + local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]] + queue:format({ + { name = 'id', type = 'unsigned' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'payload', type = 'any' }, + }) + local F = { id = 1, status = 2, runat = 3, payload = 4 } + + queue:create_index('primary', { parts = {'id'} }) + queue:create_index('status', { parts = {'status', 'id'} }) + queue:create_index('runat', { parts = {'runat', 'id'} }) + + xqueue.upgrade(queue, { + features = { + id = 'time64', + delayed = true, + retval = 'table', + not_check_session = true, + ttr = 60, + }, + fields = { + status = 'status', + runat = 'runat', + }, + }) + + local tt = netbox.connect('127.0.0.1:3301', { wait_connected = true }) + t.assert(tt:ping(), "connected to self") + + local task = queue:put({ payload = { time = clock.time() } }) + t.assert(task, ":put() has been inserted task") + + local awaiter_fin = fiber.channel() + + fiber.create(function() + local ret, is_processed = queue:wait(task, 3) + t.assert_equals(ret.id, task.id, "Task has been awaited") + t.assert_equals(is_processed, true, "Task has been processed by the consumer") + t.assert(awaiter_fin:put({ ret, is_processed }), "awaiter results were measured") + end) + + local taken = tt:call('box.space.queue:take', {1}, { timeout = 1 }) + t.assert(taken, ":take() returned task via the network") + t.assert_equals(task.id, taken.id, "retutned the same task") + t.assert(box.space.queue.xq.taken, nil, "taken table is empty") + t.assert(box.space.queue.xq.bysid, nil, "bysid table is empty") + + -- not untake on close connection + tt:close() + fiber.sleep(1) + + t.assert_equals(queue:get({ task.id }).status, 'T', "task still in T status") + + tt = netbox.connect('127.0.0.1:3301', { wait_connected = true }) + + local processed_at = clock.time() + local acked = tt:call('box.space.queue:ack', {taken, { update = {{'=', F.payload, { processed_at = processed_at }}} }}, {timeout = 1}) + t.assert_equals(acked[1], taken.id, ":ack() returned taken but completed task") + + local awaiter_res = awaiter_fin:get() + t.assert_equals(awaiter_res[1].id, acked[1], "awaiter saw acknowledged task") + t.assert_equals(awaiter_res[2], true, "awaiter saw task as processed") + + tt:close() +end + +function g.test_untake_with_not_check_session_by_ttr() + local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]] + queue:format({ + { name = 'id', type = 'unsigned' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'payload', type = 'any' }, + }) + local F = { id = 1, status = 2, runat = 3, payload = 4 } + + queue:create_index('primary', { parts = {'id'} }) + queue:create_index('status', { parts = {'status', 'id'} }) + queue:create_index('runat', { parts = {'runat', 'id'} }) + + xqueue.upgrade(queue, { + features = { + id = 'time64', + delayed = true, + retval = 'table', + not_check_session = true, + ttr = 5, + }, + fields = { + status = 'status', + runat = 'runat', + }, + }) + + local tt = netbox.connect('127.0.0.1:3301', { wait_connected = true }) + t.assert(tt:ping(), "connected to self") + + local task = queue:put({ payload = { time = clock.time() } }) + t.assert(task, ":put() has been inserted task") + + local awaiter_fin = fiber.channel() + + local taken = tt:call('box.space.queue:take', {1}, { timeout = 1 }) + t.assert(taken, ":take() returned task via the network") + t.assert_equals(task.id, taken.id, "retutned the same task") + t.assert(queue.xq.taken, nil, "taken table is empty") + t.assert(queue.xq.bysid, nil, "bysid table is empty") + + fiber.create(function() + local ret, is_processed = queue:wait(task, 7) + t.assert_equals(ret.id, task.id, "Task has been awaited") + t.assert_equals(is_processed, true, "Task has been processed by the consumer") + t.assert(awaiter_fin:put({ ret, is_processed }), "awaiter results were measured") + end) + + -- not untake on close connection + tt:close() + t.assert_equals(queue:get({ task.id }).status, 'T', "task still in T status") + + -- untake by ttr + fiber.sleep(5) + t.assert_equals(queue:get({ task.id }).status, 'R', "task was returned to R status") + + tt = netbox.connect('127.0.0.1:3301', { wait_connected = true }) + + taken = tt:call('box.space.queue:take', {1}, { timeout = 1 }) + t.assert(taken, ":take() returned task via the network (2nd)") + t.assert_equals(task.id, taken.id, "retutned the same task (2nd)") + t.assert(queue.xq.taken, nil, "taken table is empty") + t.assert(queue.xq.bysid, nil, "bysid table is empty") + + local processed_at = clock.time() + local acked = tt:call('box.space.queue:ack', {taken, { update = {{'=', F.payload, { processed_at = processed_at }}} }}, {timeout = 1}) + t.assert_equals(acked[1], taken.id, ":ack() returned taken but completed task") + + local awaiter_res = awaiter_fin:get() + t.assert_equals(awaiter_res[1].id, acked[1], "awaiter saw acknowledged task") + t.assert_equals(awaiter_res[2], true, "awaiter saw task as processed") + + tt:close() +end diff --git a/test/replicaset_test.lua b/test/replicaset_test.lua index 90fd564..cedf8ab 100644 --- a/test/replicaset_test.lua +++ b/test/replicaset_test.lua @@ -189,6 +189,29 @@ function g.test_start(test) retval = 'table', } --[[@as xqueue.upgrade.options]] + local queue_not_check_session = { + name = 'queue_not_check_session', + format = { + { name = 'id', type = 'string' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'payload', type = 'any' }, + }, + features = { + id = 'uuid', + keep = false, + delayed = true, + not_check_session = true, + ttr = 60, + }, + fields = { + id = 'id', + status = 'status', + runat = 'runat', + }, + retval = 'table', + } --[[@as xqueue.upgrade.options]] + local had_error local await = {} for _, srv in pairs(replica_set.servers) do @@ -200,6 +223,12 @@ function g.test_start(test) had_error = err log.error("%s: %s", srv.alias, err) end + + local ok, err = pcall(srv.exec, srv, setup_queue, {queue_not_check_session}, { timeout = 20 }) + if not ok then + had_error = err + log.error("%s: %s", srv.alias, err) + end end) table.insert(await, fib) end @@ -214,6 +243,13 @@ function g.test_start(test) local task = rw:call('box.space.simpleq:put', {{payload = {cookie = cookie}}, { delay = 5 } }) t.assert(task, "task was returned") + local task_not_check_session = rw:call('box.space.queue_not_check_session:put', {{payload = {}} }) + t.assert(task_not_check_session, "task was returned") + + local taken = rw:call('box.space.queue_not_check_session:take', { 1 }, { timeout = 1 }) + t.assert(taken, ":take() returned task from master") + t.assert_equals(task_not_check_session.id, taken.id, "returned the same task") + local SWITCH_TIMEOUT = 10 if params.name == 'raft' then @@ -274,8 +310,24 @@ function g.test_start(test) t.assert_equals(trw.alias, rw.alias, "after rw-switch luatest succesfully derived new leader") end + local awaiter_fin = fiber.channel() + + fiber.create(function() + local ret, is_processed + ret, is_processed = rw:call('box.space.queue_not_check_session:wait', { task_not_check_session, 1 }, { timeout = 1 }) + t.assert_equals(ret.id, task_not_check_session.id, "Task has been awaited") + t.assert_equals(is_processed, true, "Task has been processed by the consumer") + t.assert(awaiter_fin:put({ ret, is_processed }), "awaiter results were measured") + end) + + local acked = rw:call('box.space.queue_not_check_session:ack', { taken }, { timeout = 1 }) + t.assert_equals(acked[1], taken.id, ":ack() returned taken but completed task") + + local awaiter_res = awaiter_fin:get() + t.assert_equals(awaiter_res[1].id, acked[1], "awaiter saw acknowledged task") + t.assert_equals(awaiter_res[2], true, "awaiter saw task as processed") + local task = rw:call('box.space.simpleq:take', { 5 }, { timeout = 6 }) t.assert(task, "delayed task has been succesfully taken from new leader") t.assert_equals(task.payload.cookie, cookie, "task.cookie is valid") - end diff --git a/xqueue.lua b/xqueue.lua index c3c803e..fc28b18 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -124,6 +124,9 @@ Interface: -- if number, then with default ttl, otherwise only if set during put/release ttr = true|number, -- requires `runat` field -- if number, then with default ttl, otherwise only if set + + not_check_session = true, -- if true, not check session on ack/bury/release and not release task on disconnect + -- requires 'ttr' }, }) @@ -245,6 +248,7 @@ local methods = {} ---@field ttr boolean|number ---@field ttl_default number? ---@field ttr_default number? +---@field not_check_session boolean ---@class xq:table ---@field NEVER integer (Default: 0) @@ -304,6 +308,7 @@ local methods = {} ---Requires runat field and index. ---@field ttr? boolean|number should xqueue allow Time-To-Release on tasks. When specified with number, this value used for ttl_default. ---Requires runat field and index. +---@field not_check_session? boolean should xqueue not check session. Requires enabled ttr feature. ---@class xqueue.upgrade.options ---@field format? boxSpaceFormat @@ -745,6 +750,28 @@ function M.upgrade(space,opts,depth) features.ttr = false end + if opts.features.not_check_session then + -- feature not_check_session require ttr because in this case + -- task isn't released on disconnect so to prevent the task from being frozen in the queue we need ttr + if not features.ttr then + error(string.format("feature not_check_session requires enabled ttr" ),2+depth) + end + + features.not_check_session = true + + setmetatable(self.bysid, { + __serialize = 'map', + __newindex = function(_, _, _) end, + __index = function(_, _) return {} end, + }) + setmetatable(self.taken, { + __serialize = 'map', + __newindex = function(_, _, _) end, + }) + else + features.not_check_session = false + end + if fields.tube then features.tube = true end @@ -865,7 +892,7 @@ function M.upgrade(space,opts,depth) if not r then log.error("Worker for {%s} has error: %s", key, e) else - if xq.taken[ key ] then + if xq.taken[ key ] or self.features.not_check_session then space:ack(task) end end @@ -1000,6 +1027,10 @@ function M.upgrade(space,opts,depth) if not t then error(string.format( "Task {%s} was not found", key ),2) end + -- if not need to check session that return task + if self.features.not_check_session then + return t + end if not self.taken[key] then error(string.format( "Task %s not taken by any", key ),2) end @@ -1017,9 +1048,9 @@ function M.upgrade(space,opts,depth) if self.bysid[ sid ] then self.bysid[ sid ][ key ] = nil else - log.error( "Task {%s} marked as taken by sid=%s but bysid is null", key, sid) + log.error("Task {%s} marked as taken by sid=%s but bysid is null", key, sid) end - else + elseif not self.features.not_check_session then log.error( "Task {%s} not marked as taken, untake by sid=%s", key, box.session.id() ) end @@ -1154,11 +1185,17 @@ function M.upgrade(space,opts,depth) local t = space:get(realkey) if t then if t[ self.fields.status ] == 'T' then - self:wakeup(space:update({ realkey }, { - { '=',self.fields.status,'R' }, - self.have_runat and { '=', self.fields.runat, self.NEVER } or nil - })) - log.info("Rst: T->R {%s}", realkey ) + -- if not check session we doesn't release task + -- it can be ack/bury/release from another connection + if self.features.not_check_session then + log.info("Rst: task %s taken by session %s not released", realkey, sid) + else + self:wakeup(space:update({ realkey }, { + { '=',self.fields.status,'R' }, + self.have_runat and { '=', self.fields.runat, self.NEVER } or nil + })) + log.info("Rst: T->R {%s}", realkey ) + end else log.error( "Rst: %s->? {%s}: wrong status", t[self.fields.status], realkey ) end