-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(clustering/sync): avoiding long delay caused by race condition #13896
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,9 +13,11 @@ local events = require("kong.runloop.events") | |
local insert_entity_for_txn = declarative.insert_entity_for_txn | ||
local delete_entity_for_txn = declarative.delete_entity_for_txn | ||
local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY | ||
local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY | ||
local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY | ||
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS | ||
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } | ||
local MAX_RETRY = 5 | ||
|
||
|
||
local ipairs = ipairs | ||
|
@@ -146,6 +148,7 @@ end | |
|
||
|
||
function _M:init_dp(manager) | ||
local kong_shm = ngx.shared.kong | ||
-- DP | ||
-- Method: kong.sync.v2.notify_new_version | ||
-- Params: new_versions: list of namespaces and their new versions, like: | ||
|
@@ -164,6 +167,8 @@ function _M:init_dp(manager) | |
|
||
local lmdb_ver = tonumber(declarative.get_current_hash()) or 0 | ||
if lmdb_ver < version then | ||
-- set lastest version to shm | ||
kong_shm:set(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY, version) | ||
return self:sync_once() | ||
end | ||
|
||
|
@@ -363,45 +368,69 @@ local function sync_handler(premature) | |
return | ||
end | ||
|
||
local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() | ||
-- `do_sync()` is run twice in a row to report back new version number | ||
-- to CP quickly after sync. (`kong.sync.v2.get_delta` is used for both pulling delta | ||
-- as well as status reporting) | ||
for _ = 1, 2 do | ||
local ok, err = do_sync() | ||
if not ok then | ||
return nil, err | ||
end | ||
end -- for | ||
|
||
return true | ||
end) | ||
local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, do_sync) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure, could we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We deign the sync.v2 to work without privileged worker (and worker no.0) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if not res and err ~= "timeout" then | ||
ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) | ||
end | ||
end | ||
|
||
|
||
local function start_sync_timer(timer_func, delay) | ||
local hdl, err = timer_func(delay, sync_handler) | ||
local sync_once_impl | ||
|
||
if not hdl then | ||
|
||
local function start_sync_once_timer(retry_count) | ||
local ok, err = ngx.timer.at(0, sync_once_impl, retry_count or 0) | ||
if not ok then | ||
return nil, err | ||
end | ||
|
||
return true | ||
end | ||
|
||
|
||
function sync_once_impl(premature, retry_count) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we use a simple loop? like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. Recreating the timer prevents long-live timer from causing resource leak |
||
if premature then | ||
return | ||
end | ||
|
||
sync_handler() | ||
chronolaw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY) | ||
local current_version = tonumber(declarative.get_current_hash()) or 0 | ||
|
||
-- retry if the version is not updated | ||
if not latest_notified_version or current_version < latest_notified_version then | ||
retry_count = retry_count or 0 | ||
if retry_count > MAX_RETRY then | ||
ngx_log(ngx_ERR, "sync_once retry count exceeded. retry_count: ", retry_count) | ||
return | ||
end | ||
|
||
return start_sync_once_timer(retry_count + 1) | ||
end | ||
end | ||
|
||
|
||
function _M:sync_once(delay) | ||
--- XXX TODO: check rpc connection is ready | ||
local hdl, err = ngx.timer.at(delay or 0, sync_once_impl, 0) | ||
|
||
if not hdl then | ||
return nil, err | ||
end | ||
|
||
return start_sync_timer(ngx.timer.at, delay or 0) | ||
return true | ||
end | ||
|
||
|
||
function _M:sync_every(delay) | ||
return start_sync_timer(ngx.timer.every, delay) | ||
local hdl, err = ngx.timer.every(delay, sync_handler) | ||
|
||
if not hdl then | ||
return nil, err | ||
end | ||
|
||
return true | ||
end | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only run sync in one worker, is it necessary to store it in shared memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not true. @dndx Could you confirm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We run incremental sync inside all workers, it's just that only one worker can sync at the same time.