From 23160b66caa3216b0d3b99876b5acbb7a4ef07da Mon Sep 17 00:00:00 2001 From: Chrono Date: Wed, 4 Sep 2024 17:01:43 +0800 Subject: [PATCH] refactor(lib): add common utils (#68) * refactor(lib): add common utils * get_worker_name * get_worker_id * lint fix * apply suggestions * fix mistake --- lualib/resty/events/broker.lua | 23 +++++++++-------- lualib/resty/events/init.lua | 14 +++++++---- lualib/resty/events/protocol.lua | 23 ++++++----------- lualib/resty/events/utils.lua | 43 ++++++++++++++++++++++++++++++++ lualib/resty/events/worker.lua | 24 ++++++++++-------- 5 files changed, 87 insertions(+), 40 deletions(-) create mode 100644 lualib/resty/events/utils.lua diff --git a/lualib/resty/events/broker.lua b/lualib/resty/events/broker.lua index b5d54e0..2f918a7 100644 --- a/lualib/resty/events/broker.lua +++ b/lualib/resty/events/broker.lua @@ -1,29 +1,38 @@ local codec = require "resty.events.codec" local lrucache = require "resty.lrucache" local queue = require "resty.events.queue" +local utils = require "resty.events.utils" local server = require("resty.events.protocol").server -local is_timeout = server.is_timeout -local is_closed = server.is_closed + + +local is_timeout = utils.is_timeout +local is_closed = utils.is_closed +local get_worker_id = utils.get_worker_id +local get_worker_name = utils.get_worker_name + local setmetatable = setmetatable local random = math.random + local ngx = ngx -- luacheck: ignore local log = ngx.log local exit = ngx.exit local exiting = ngx.worker.exiting -local ngx_worker_id = ngx.worker.id local worker_count = ngx.worker.count local ERR = ngx.ERR local DEBUG = ngx.DEBUG local NOTICE = ngx.NOTICE + local spawn = ngx.thread.spawn local kill = ngx.thread.kill local wait = ngx.thread.wait + local decode = codec.decode + local MAX_UNIQUE_EVENTS = 1024 local WEAK_KEYS_MT = { __mode = "k", } @@ -38,12 +47,6 @@ do end -local function get_worker_name(worker_id) - return worker_id == -1 and - "privileged agent" or "worker #" .. worker_id -end - - local function terminating(self, worker_connection) return not self._clients[worker_connection] or exiting() end @@ -215,7 +218,7 @@ function _M:init() end function _M:run() - local broker_id = ngx_worker_id() + local broker_id = get_worker_id() if broker_id ~= self._opts.broker_id then log(ERR, "broker got connection from worker on non-broker worker #", broker_id) return exit(444) diff --git a/lualib/resty/events/init.lua b/lualib/resty/events/init.lua index 78f5352..6b60db8 100644 --- a/lualib/resty/events/init.lua +++ b/lualib/resty/events/init.lua @@ -1,22 +1,26 @@ +local utils = require "resty.events.utils" local events_broker = require "resty.events.broker" local events_worker = require "resty.events.worker" - local disable_listening = require "resty.events.disable_listening" -local ngx = ngx -local ngx_worker_id = ngx.worker.id + +local get_worker_id = utils.get_worker_id + local type = type local setmetatable = setmetatable local str_sub = string.sub -local worker_count = ngx.worker.count() + +local worker_count = utils.get_worker_count() + local _M = { _VERSION = "0.3.0", } local _MT = { __index = _M, } + local function check_options(opts) assert(type(opts) == "table", "Expected a table, got " .. type(opts)) @@ -107,7 +111,7 @@ end function _M:init_worker() local opts = self.opts - local worker_id = ngx_worker_id() or -1 + local worker_id = get_worker_id() local is_broker = opts.broker_id == worker_id or opts.testing == true diff --git a/lualib/resty/events/protocol.lua b/lualib/resty/events/protocol.lua index 392cdbe..d22e5b3 100644 --- a/lualib/resty/events/protocol.lua +++ b/lualib/resty/events/protocol.lua @@ -1,13 +1,15 @@ local frame = require "resty.events.frame" local codec = require "resty.events.codec" +local utils = require "resty.events.utils" + local _recv_frame = frame.recv local _send_frame = frame.send local encode = codec.encode local decode = codec.decode -local ngx = ngx -local worker_id = ngx.worker.id + +local ngx = ngx -- luacheck: ignore local worker_pid = ngx.worker.pid local tcp = ngx.socket.tcp local req_sock = ngx.req.socket @@ -15,12 +17,15 @@ local ngx_header = ngx.header local send_headers = ngx.send_headers local flush = ngx.flush local subsystem = ngx.config.subsystem +local get_worker_id = utils.get_worker_id + local type = type local str_sub = string.sub local str_find = string.find local setmetatable = setmetatable + -- for high traffic pressure local DEFAULT_TIMEOUT = 5000 -- 5000ms local WORKER_INFO = { @@ -28,14 +33,6 @@ local WORKER_INFO = { pid = 0, } -local function is_timeout(err) - return err and str_sub(err, -7) == "timeout" -end - -local function is_closed(err) - return err and (str_sub(err, -6) == "closed" or - str_sub(err, -11) == "broken pipe") -end local function recv_frame(self) local sock = self.sock @@ -56,8 +53,6 @@ local function send_frame(self, payload) end local _Server = { - is_closed = is_closed, - is_timeout = is_timeout, recv_frame = recv_frame, send_frame = send_frame, } @@ -109,8 +104,6 @@ function _Server.new() end local _Client = { - is_closed = is_closed, - is_timeout = is_timeout, recv_frame = recv_frame, send_frame = send_frame, } @@ -171,7 +164,7 @@ function _Client:connect(addr) end end -- subsystem == "http" - WORKER_INFO.id = worker_id() or -1 + WORKER_INFO.id = get_worker_id() WORKER_INFO.pid = worker_pid() local _, err = _send_frame(sock, encode(WORKER_INFO)) diff --git a/lualib/resty/events/utils.lua b/lualib/resty/events/utils.lua new file mode 100644 index 0000000..947e9ee --- /dev/null +++ b/lualib/resty/events/utils.lua @@ -0,0 +1,43 @@ +local str_sub = string.sub + + +local ngx = ngx -- luacheck: ignore +local ngx_worker_id = ngx.worker.id +local ngx_worker_count = ngx.worker.count + + +local function is_timeout(err) + return err and str_sub(err, -7) == "timeout" +end + + +local function is_closed(err) + return err and (str_sub(err, -6) == "closed" or + str_sub(err, -11) == "broken pipe") +end + + +local function get_worker_id() + return ngx_worker_id() or -1 -- -1 represents priviledged worker +end + + +local function get_worker_name(worker_id) + return worker_id == -1 and -- -1 represents priviledged worker + "privileged agent" or "worker #" .. worker_id +end + + +local function get_worker_count() + return ngx_worker_count() +end + + +return { + is_timeout = is_timeout, + is_closed = is_closed, + + get_worker_id = get_worker_id, + get_worker_name = get_worker_name, + get_worker_count = get_worker_count, +} diff --git a/lualib/resty/events/worker.lua b/lualib/resty/events/worker.lua index fcf7d36..402d113 100644 --- a/lualib/resty/events/worker.lua +++ b/lualib/resty/events/worker.lua @@ -2,38 +2,48 @@ local cjson = require "cjson.safe" local codec = require "resty.events.codec" local queue = require "resty.events.queue" local callback = require "resty.events.callback" +local utils = require "resty.events.utils" + local frame_validate = require("resty.events.frame").validate local client = require("resty.events.protocol").client -local is_timeout = client.is_timeout +local is_timeout = utils.is_timeout +local get_worker_id = utils.get_worker_id +local get_worker_name = utils.get_worker_name + local type = type local assert = assert local setmetatable = setmetatable local random = math.random + local ngx = ngx -- luacheck: ignore local log = ngx.log local sleep = ngx.sleep local exiting = ngx.worker.exiting -local ngx_worker_id = ngx.worker.id local ERR = ngx.ERR local DEBUG = ngx.DEBUG local NOTICE = ngx.NOTICE + local spawn = ngx.thread.spawn local kill = ngx.thread.kill local wait = ngx.thread.wait + local timer_at = ngx.timer.at + local encode = codec.encode local decode = codec.decode local cjson_encode = cjson.encode + local EVENTS_COUNT_LIMIT = 100 local EVENTS_SLEEP_TIME = 0.05 + local EMPTY_T = {} local EVENT_T = { @@ -56,12 +66,6 @@ local _M = {} local _MT = { __index = _M, } -local function get_worker_name(worker_id) - return worker_id == -1 and - "privileged agent" or "worker #" .. worker_id -end - - -- gen a random number [0.01, 0.05] -- it means that delay will be 10ms~50ms local function random_delay() @@ -340,7 +344,7 @@ end function _M:init() assert(self._opts) - self._worker_id = ngx_worker_id() or -1 + self._worker_id = get_worker_id() start_timers(self) @@ -354,7 +358,7 @@ local function post_event(self, source, event, data, spec) EVENT_T.source = source EVENT_T.event = event EVENT_T.data = data - EVENT_T.wid = self._worker_id or ngx_worker_id() or -1 + EVENT_T.wid = self._worker_id or get_worker_id() -- encode event info str, err = encode(EVENT_T)