Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport -> release/3.9.x] fix(wasm): execute filter plugins in a consistent order #13967

Open
wants to merge 1 commit into
base: release/3.9.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions kong/runloop/wasm.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ local ipairs = ipairs
local type = type
local assert = assert
local insert = table.insert
local sort = table.sort
local cjson_encode = cjson.encode
local cjson_decode = cjson.decode
local fmt = string.format
Expand Down Expand Up @@ -116,6 +117,21 @@ local STATUS_ENABLED = "wasm support is enabled"
local ENABLED = false
local STATUS = STATUS_DISABLED

local function filter_plugin_compare(a, b)
if a.name ~= b.name then
return a.name < b.name
end

if a.updated_at and b.updated_at and a.updated_at ~= b.updated_at then
return a.updated_at < b.updated_at
end

if a.created_at and b.created_at and a.created_at ~= b.created_at then
return a.created_at < b.created_at
end

return a.id < b.id
end

local hash_chain
do
Expand Down Expand Up @@ -485,28 +501,36 @@ local function rebuild_state(db, version, old_state)

local plugin_pagesize = db.plugins.pagination.max_page_size

local filter_plugins = {}

for plugin, err in db.plugins:each(plugin_pagesize, GLOBAL_QUERY_OPTS) do
if err then
return nil, "failed iterating plugins: " .. tostring(err)
end

if _M.filters_by_name[plugin.name] and plugin.enabled then
local chain = get_or_insert_chain(chains, {
id = uuid.uuid(),
enabled = true,
route = plugin.route,
service = plugin.service,
filters = {},
})

insert(chain.filters, {
name = plugin.name,
enabled = true,
config = serialize_configuration(plugin.config),
})
insert(filter_plugins, plugin)
end
end

sort(filter_plugins, filter_plugin_compare)

for _, plugin in ipairs(filter_plugins) do
local chain = get_or_insert_chain(chains, {
id = uuid.uuid(),
enabled = true,
route = plugin.route,
service = plugin.service,
filters = {},
})

insert(chain.filters, {
name = plugin.name,
enabled = true,
config = serialize_configuration(plugin.config),
})
end

local routes = db.routes
local select_route = routes.select

Expand Down
202 changes: 198 additions & 4 deletions spec/02-integration/20-wasm/12-filters-as-plugins_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,23 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()


lazy_setup(function()
assert(helpers.file.copy(FILTER_PATH .. "/tests.wasm",
FILTER_PATH .. "/tests-01.wasm"))
assert(helpers.file.copy(FILTER_PATH .. "/tests.wasm",
FILTER_PATH .. "/tests-02.wasm"))

require("kong.runloop.wasm").enable({
{ name = "response_transformer",
path = FILTER_PATH .. "/response_transformer.wasm",
},
{
name = "tests-01",
path = FILTER_PATH .. "/tests-01.wasm",
},
{
name = "tests-02",
path = FILTER_PATH .. "/tests-02.wasm",
},
})

bp, db = helpers.get_db_utils(strategy, {
Expand All @@ -130,14 +143,14 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()
"plugins",
})

helpers.start_kong({
assert(helpers.start_kong({
database = strategy,
nginx_conf = "spec/fixtures/custom_nginx.template",
nginx_main_worker_processes = "2",
wasm = true,
wasm_filters = "response_transformer",
wasm_filters = "response_transformer,tests-01,tests-02",
plugins = "response-transformer",
})
}))

admin = helpers.admin_client()
proxy = helpers.proxy_client()
Expand All @@ -154,6 +167,8 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()
end

helpers.stop_kong()
helpers.file.delete(FILTER_PATH .. "/tests-01.wasm")
helpers.file.delete(FILTER_PATH .. "/tests-02.wasm")
end)

before_each(function()
Expand Down Expand Up @@ -229,7 +244,6 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()

local expected = 4
assert.equals(expected, #json.data)
helpers.intercept(json.data)
local found = 0

for _, plugin in ipairs(json.data) do
Expand Down Expand Up @@ -339,6 +353,186 @@ describe("#wasm filters as plugins (#" .. strategy .. ")", function()
assert.equals(fc_value, assert.response(res).has.header(FILTER_CHAIN_HEADER))
end)
end)

describe("order of execution", function()
it("filter plugins execute at the end of any existing filter chain", function()
local lua_plugin = {
name = "response-transformer",
route = { id = route.id },
config = {
add = {
headers = {
"X-Added-By-Lua-Plugin:1",
"X-Replace-Me:lua",
"X-Append-Me:lua",
"X-Remove-Me:lua",
},
}
}
}

local plugin = {
name = "response_transformer",
route = { id = route.id },
config = cjson.encode({
add = {
headers = {
"X-Added-First:plugin",
"X-Added-By-Filter-Plugin:1",
"X-Not-Removed-By-Filter-Chain:plugin",
},
},
append = {
headers = {
"X-Append-Me:plugin",
},
},
replace = {
headers = {
"X-Replace-Me:plugin",
"X-Replaced-By-Filter-Plugin:plugin",
},
},
remove = {
headers = {
"X-Remove-Me",
"X-Removed-By-Filter-Plugin",
},
},
}),
}

local res, header, assert_no_header
do
function header(name)
return assert.response(res).has.header(name)
end

function assert_no_header(name)
return assert.response(res).has.no.header(name)
end
end

create_plugin(plugin)
create_plugin(lua_plugin)

helpers.wait_for_all_config_update()
res = proxy:get("/status/200")
assert.response(res).has.status(200)

-- sanity
assert.equals("1", header("X-Added-By-Filter-Plugin"))
assert.equals("1", header("X-Added-By-Lua-Plugin"))
assert_no_header("X-Remove-Me")

assert.equals("plugin", header("X-Added-First"))

-- added by Lua plugin, filter plugin appends
assert.same({ "lua", "plugin" }, header("X-Append-Me"))

-- replaced last by filter plugin
assert.same("plugin", header("X-Replace-Me"))

-- not replaced, because it was not added
assert_no_header("X-Replaced-By-Filter-Plugin")

local filter_chain = {
route = { id = route.id },
filters = {
{
name = "response_transformer",
config = cjson.encode({
add = {
headers = {
"X-Added-First:filter-chain",
"X-Added-By-Filter-Chain:1",
"X-Removed-By-Filter-Plugin:filter-chain",
"X-Replaced-By-Filter-Plugin:filter-chain",
},
},
append = {
headers = {
"X-Append-Me:filter-chain",
},
},
replace = {
headers = {
"X-Replace-Me:filter-chain",
"X-Replaced-By-Filter-Chain:filter-chain",
},
},
remove = {
headers = {
"X-Not-Removed-By-Filter-Chain",
},
},
}),
}
}
}

create_filter_chain(filter_chain)
helpers.wait_for_all_config_update()
res = proxy:get("/status/200")
assert.response(res).has.status(200)

-- sanity
assert.equals("1", header("X-Added-By-Filter-Plugin"))
assert.equals("1", header("X-Added-By-Lua-Plugin"))
assert.equals("1", header("X-Added-By-Filter-Chain"))
assert_no_header("X-Remove-Me")

-- added first by the filter chain
assert.equals("filter-chain", header("X-Added-First"))

-- added by Lua, appended to by filter chain and filter plugin
assert.same({ "lua", "filter-chain", "plugin" }, header("X-Append-Me"))
-- added after the filter chain tried to remove it
assert.same("plugin", header("X-Not-Removed-By-Filter-Chain"))

-- replaced last by filter plugin
assert.same("plugin", header("X-Replace-Me"))

assert_no_header("X-Removed-By-Filter-Plugin")
assert.same("plugin", header("X-Replaced-By-Filter-Plugin"))
end)

it("filter plugins execute in a consistent order", function()
-- should always run first because `tests-01` < `tests-02`
local plugin_1 = {
name = "tests-01",
config = "name=first",
route = { id = route.id },
}

local plugin_2 = {
name = "tests-02",
config = "name=last",
route = { id = route.id },
}

for _, order_added in ipairs({
{ plugin_1, plugin_2 },
{ plugin_2, plugin_1 },
}) do
bp.plugins:truncate()

create_plugin(order_added[1])
create_plugin(order_added[2])

helpers.wait_for_all_config_update()
local res = proxy:get("/status/200", {
headers = {
["X-PW-Phase"] = "request_headers",
["X-PW-Test"] = "dump_config",
}
})

local body = assert.res_status(200, res)
assert.equals("name=first", body)
end
end)
end)
end)

end -- each strategy
4 changes: 4 additions & 0 deletions spec/fixtures/proxy_wasm_filters/tests/src/test_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ impl TestHttp {
return self.send_http_dispatch(config);
}
"update_metrics" => self.update_metrics(),
"dump_config" => {
let res = self.config.as_ref().map(|config| config.to_string());
self.send_plain_response(StatusCode::OK, res.as_deref());
}
_ => (),
}
}
Expand Down
10 changes: 10 additions & 0 deletions spec/fixtures/proxy_wasm_filters/tests/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ impl FromStr for TestConfig {
}
}

impl std::fmt::Display for TestConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut values: Vec<String> = self.map.iter().map(|(k, v)| format!("{k}={v}")).collect();

values.sort();

write!(f, "{}", values.join(" "))
}
}

#[derive(Debug, Eq, PartialEq, enum_utils::FromStr)]
#[enumeration(rename_all = "snake_case")]
pub enum TestPhase {
Expand Down
Loading