From 5c2d4ca8d2e56f4ea564c870b9d2dcc4e071caa3 Mon Sep 17 00:00:00 2001 From: xumin Date: Mon, 9 Dec 2024 16:24:34 +0800 Subject: [PATCH 1/4] feat(rpc): handle callback exception --- kong/clustering/rpc/socket.lua | 35 ++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 045ca8c7557..465b80c5536 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -59,22 +59,41 @@ function _M:_get_next_id() end +function _M:_report_error(payload_id, typ, err) + local res, rpc_err = self.outgoing:push( + new_error(payload_id, typ, err) + ) + + if not res then + ngx_log(ngx_WARN, "[rpc] unable to push RPC call error: ", rpc_err) + end +end + + function _M._dispatch(premature, self, cb, payload) if premature then return end - local res, err = cb(self.node_id, unpack(payload.params)) + local ok, res, except, err, trace + xpcall( + function() res, err = cb(self.node_id, unpack(payload.params)) end, + function(err) + except = err + trace = debug.traceback(nil, 2) + end + ) + + if not ok then + ngx_log(ngx_WARN, "[rpc] exception during callback: ", except, "\n", trace) + + return self:_report_error(payload.id, jsonrpc.INTERNAL_ERROR, except) + end + if not res then ngx_log(ngx_WARN, "[rpc] RPC callback failed: ", err) - res, err = self.outgoing:push(new_error(payload.id, jsonrpc.SERVER_ERROR, - err)) - if not res then - ngx_log(ngx_WARN, "[rpc] unable to push RPC call error: ", err) - end - - return + return self:_report_error(payload.id, jsonrpc.SERVER_ERROR, err) end -- success From e22128cd1b9c49a340c019266e7c7b0d1a08810b Mon Sep 17 00:00:00 2001 From: xumin Date: Mon, 9 Dec 2024 15:26:11 +0800 Subject: [PATCH 2/4] feat(sync): full sync pagination KAG-5892 --- kong-3.10.0-0.rockspec | 5 + kong/db/resumable_chunker/chain.lua | 41 +++ kong/db/resumable_chunker/dao.lua | 25 ++ kong/db/resumable_chunker/init.lua | 54 ++++ kong/db/resumable_chunker/utils.lua | 20 ++ .../01-db/12-resumable_chunker_spec.lua | 184 +++++++++++++ .../03-db/23-resumable_chunker_spec.lua | 251 ++++++++++++++++++ 7 files changed, 580 insertions(+) create mode 100644 kong/db/resumable_chunker/chain.lua create mode 100644 kong/db/resumable_chunker/dao.lua create mode 100644 kong/db/resumable_chunker/init.lua create mode 100644 kong/db/resumable_chunker/utils.lua create mode 100644 spec/01-unit/01-db/12-resumable_chunker_spec.lua create mode 100644 spec/02-integration/03-db/23-resumable_chunker_spec.lua diff --git a/kong-3.10.0-0.rockspec b/kong-3.10.0-0.rockspec index 2ba7c9e54f2..7dc1f94ef49 100644 --- a/kong-3.10.0-0.rockspec +++ b/kong-3.10.0-0.rockspec @@ -305,6 +305,11 @@ build = { ["kong.db.strategies.off.connector"] = "kong/db/strategies/off/connector.lua", ["kong.db.strategies.off.tags"] = "kong/db/strategies/off/tags.lua", + ["kong.db.resumable_chunker] = "kong/db/resumable_chunker/init.lua", + ["kong.db.resumable_chunker.chain] = "kong/db/resumable_chunker/chain.lua", + ["kong.db.resumable_chunker.strategy] = "kong/db/resumable_chunker/strategy.lua", + ["kong.db.resumable_chunker.utils] = "kong/db/resumable_chunker/utils.lua", + ["kong.db.migrations.state"] = "kong/db/migrations/state.lua", ["kong.db.migrations.subsystems"] = "kong/db/migrations/subsystems.lua", ["kong.db.migrations.core"] = "kong/db/migrations/core/init.lua", diff --git a/kong/db/resumable_chunker/chain.lua b/kong/db/resumable_chunker/chain.lua new file mode 100644 index 00000000000..d7861c708fd --- /dev/null +++ b/kong/db/resumable_chunker/chain.lua @@ -0,0 +1,41 @@ +local EMPTY = require("kong.tools.table").EMPTY +local inplace_merge = require("kong.db.resumable_chunker.utils").inplace_merge + +local _M = {} +local _MT = { __index = _M } + +local BEGIN = { 1, nil } + +function _M.from_chain(list, options) + options = options or EMPTY + list.options = options + return setmetatable(list, _MT) +end + +function _M:next(size, offset) + size = size or self.options.size + offset = offset or BEGIN + local ind, inner_ind = offset[1], offset[2] + + if not self[ind] then + return EMPTY + end + + local rows, len = nil, 0 + repeat + local next_row, err + next_row, err, inner_ind = self[ind]:next(size - len, inner_ind) + if not next_row then + return nil, err, { ind, inner_ind } + end + rows, len = inplace_merge(rows, next_row) + + if not inner_ind then -- end of the current chain. continue with the next one + ind = ind + 1 + end + until len >= size or not self[ind] + + return rows or EMPTY, nil, self[ind] and { ind, inner_ind } or nil +end + +return _M diff --git a/kong/db/resumable_chunker/dao.lua b/kong/db/resumable_chunker/dao.lua new file mode 100644 index 00000000000..cb2bf4111e4 --- /dev/null +++ b/kong/db/resumable_chunker/dao.lua @@ -0,0 +1,25 @@ +local _M = {} + +function _M.from_dao(dao, options) + return setmetatable({ + dao = dao, + name = dao.schema.name, + options = options, + }, { + __index = _M, + }) +end + +function _M:next(size, offset) + local rows, err, err_t, offset = self.dao:page(size, offset, self.options) + + if rows then + for _, row in ipairs(rows) do + row.__type = self.name + end + end + + return rows, err, offset +end + +return _M diff --git a/kong/db/resumable_chunker/init.lua b/kong/db/resumable_chunker/init.lua new file mode 100644 index 00000000000..dafaa056865 --- /dev/null +++ b/kong/db/resumable_chunker/init.lua @@ -0,0 +1,54 @@ +local schema_topological_sort = require("kong.db.schema.topological_sort") +local from_chain = require("kong.db.resumable_chunker.chain").from_chain +local from_dao = require("kong.db.resumable_chunker.dao").from_dao +local inplace_merge = require("kong.db.resumable_chunker.utils").inplace_merge +local EMPTY = require("kong.tools.table").EMPTY + + +local _M = {} +local _MT = { __index = _M } + +-- TODO: handling disabled entities +-- it may require a change to the dao or even the strategy (by filtering the rows when querying) +function _M.from_db(db, options) + options = options or EMPTY + local schemas, n = {}, 0 + + local skip_ws = options.skip_ws + + for a, dao in pairs(db.daos) do + local schema = dao.schema + if schema.db_export ~= false and not (skip_ws and schema.name == "workspaces") then + n = n + 1 + schemas[n] = schema + end + end + + local sorted_schemas, err = schema_topological_sort(schemas) + if not sorted_schemas then + return nil, err + end + + local sorted_daos = {} + for i, schema in ipairs(sorted_schemas) do + sorted_daos[i] = db.daos[schema.name] + end + + return _M.from_daos(sorted_daos, options) +end + +function _M.from_daos(sorted_daos, options) + options = options or EMPTY + + local chains, n = {}, 0 + for _, dao in ipairs(sorted_daos) do + local chain = from_dao(dao, options) + n = n + 1 + chains[n] = chain + end + + return from_chain(chains, options) +end + + +return _M diff --git a/kong/db/resumable_chunker/utils.lua b/kong/db/resumable_chunker/utils.lua new file mode 100644 index 00000000000..c81478640f6 --- /dev/null +++ b/kong/db/resumable_chunker/utils.lua @@ -0,0 +1,20 @@ +-- to avoid unnecessary table creation +local function inplace_merge(lst, lst2) + if lst == nil then + return lst2, #lst2 + end + + + local n = #lst + local m = #lst2 + for i = 1, m do + n = n + 1 + lst[n] = lst2[i] + end + + return lst, n +end + +return { + inplace_merge = inplace_merge, +} \ No newline at end of file diff --git a/spec/01-unit/01-db/12-resumable_chunker_spec.lua b/spec/01-unit/01-db/12-resumable_chunker_spec.lua new file mode 100644 index 00000000000..ebb3925d930 --- /dev/null +++ b/spec/01-unit/01-db/12-resumable_chunker_spec.lua @@ -0,0 +1,184 @@ +local resumable_chunker = require("kong.db.resumable_chunker") + +local function insert_dao(db, daos, name) + local dao = {} + table.insert(daos, dao) + db[name] = dao + dao.schema = { name = name } + return dao +end + +local function mock_field(db, daos, name, tbl) + local dao = insert_dao(db, daos, name) + + local rows = {} + for _, row in ipairs(tbl) do + table.insert(rows, { field = row }) + end + + function dao.page(self, size, offset) + offset = offset or 1 + local ret = {} + for i = 1, size do + local row = rows[offset] + if not row then + return ret, nil, nil, nil + end + ret[i] = row + offset = offset + 1 + end + + return ret, nil, nil, rows[offset] and offset or nil + end +end + +local function mock_error_field(db, daos, name) + local dao = insert_dao(db, daos, name) + + function dao.page(self, size, offset) + return nil, "error: " .. name + end +end + +local function process_row(rows) + for i, row in ipairs(rows) do + rows[i] = row.field + end + return rows +end + +describe("resumable_chunker.from_daos", function() + it("handling empty table", function () + local db, daos = {}, {} + local chunker = resumable_chunker.from_daos(daos) + local rows, err, offset = chunker:next(1) + assert.same({}, rows) + assert.is_nil(err) + assert.is_nil(offset) + + mock_field(db, daos, "field", {}) + local chunker = resumable_chunker.from_daos(daos) + local rows, err, offset = chunker:next(1) + assert.same({}, rows) + assert.is_nil(err) + assert.is_nil(offset) + end) + + it("handling exact size", function () + local strategy = {} + local db, daos = {}, {} + mock_field(db, daos, "field", { 1, 2, 3, 4, 5 }) + local chunker = resumable_chunker.from_daos(daos) + local rows, err, offset = chunker:next(5) + assert.are.same({ 1, 2, 3, 4, 5 }, process_row(rows)) + assert.is_nil(err) + assert.is_nil(offset) + end) + + it("handling less than size", function () + local strategy = {} + local db, daos = {}, {} + mock_field(db, daos, "field", { 1, 2, 3, 4, 5 }) + local chunker = resumable_chunker.from_daos(daos) + local rows, err, offset = chunker:next(6) + assert.are.same({ 1, 2, 3, 4, 5 }, process_row(rows)) + assert.is_nil(err) + assert.is_nil(offset) + end) + + it("handling more than size", function () + local strategy = {} + local db, daos = {}, {} + mock_field(db, daos, "field", { 1, 2, 3, 4, 5 }) + local chunker = resumable_chunker.from_daos(daos) + local rows, err, offset = chunker:next(4) + assert.are.same({ 1, 2, 3, 4 }, process_row(rows)) + assert.is_nil(err) + assert.truthy(offset) + local rows, err, offset = chunker:next(4, offset) + assert.are.same({ 5 }, process_row(rows)) + assert.is_nil(err) + assert.is_nil(offset) + end) + + it("handling multiple table", function () + local strategy = {} + local db, daos = {}, {} + mock_field(db, daos, "field1", { 1, 2, 3, 4, 5 }) + mock_field(db, daos, "field2", { 6, 7, 8, 9, 10 }) + local chunker = resumable_chunker.from_daos(daos) + local rows, err, offset = chunker:next(6) + assert.are.same({ 1, 2, 3, 4, 5, 6 }, process_row(rows)) + assert.is_nil(err) + assert.truthy(offset) + local rows, err, offset = chunker:next(6, offset) + assert.are.same({ 7, 8, 9, 10 }, process_row(rows)) + assert.is_nil(err) + assert.is_nil(offset) + end) + + it("handling exhausted table", function () + local strategy = {} + local db, daos = {}, {} + mock_field(db, daos, "field1", { 1, 2, 3, 4, 5 }) + mock_field(db, daos, "field2", { 6, 7, 8, 9, 10 }) + local chunker = resumable_chunker.from_daos(daos) + local rows, err, offset = chunker:next(11) + assert.are.same({ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, process_row(rows)) + assert.is_nil(err) + assert.is_nil(offset) + end) + + it("handling error", function () + local strategy = {} + local db, daos = {}, {} + mock_field(db, daos, "field", { 1, 2, 3, 4, 5 }) + mock_error_field(db, daos, "error") + local chunker = resumable_chunker.from_daos(daos) + local rows, err, offset = chunker:next(4) + assert.are.same({ 1, 2, 3, 4 }, process_row(rows)) + assert.is_nil(err) + assert.truthy(offset) + local rows, err, offset = chunker:next(4, offset) + assert.is_nil(rows) + assert.are.same("error: error", err) + end) + + it("resumable", function () + local strategy = {} + local db, daos = {}, {} + mock_field(db, daos, "field1", { 1, 2, 3, 4, 5 }) + mock_field(db, daos, "field2", { 6, 7, 8, 9, 10 }) + local chunker = resumable_chunker.from_daos(daos) + local rows, err, offset = chunker:next(6) + assert.are.same({ 1, 2, 3, 4, 5, 6 }, process_row(rows)) + assert.is_nil(err) + assert.truthy(offset) + local offset1 = offset + local rows, err, offset = chunker:next(6, offset) + assert.are.same({ 7, 8, 9, 10 }, process_row(rows)) + assert.is_nil(err) + assert.is_nil(offset) + local rows, err, offset = chunker:next(5) + assert.are.same({ 1, 2, 3, 4, 5 }, process_row(rows)) + assert.is_nil(err) + assert.truthy(offset) + local offset2 = offset + local rows, err, offset = chunker:next(6, offset) + assert.are.same({ 6, 7, 8, 9, 10 }, process_row(rows)) + assert.is_nil(err) + assert.is_nil(offset) + local rows, err, offset = chunker:next(3, offset1) + assert.are.same({ 7, 8, 9 }, process_row(rows)) + assert.is_nil(err) + assert.truthy(offset) + local rows, err, offset = chunker:next(3, offset2) + assert.are.same({ 6, 7, 8 }, process_row(rows)) + assert.is_nil(err) + assert.truthy(offset) + local rows, err, offset = chunker:next(3, offset) + assert.are.same({ 9, 10 }, process_row(rows)) + assert.is_nil(err) + assert.is_nil(offset) + end) +end) diff --git a/spec/02-integration/03-db/23-resumable_chunker_spec.lua b/spec/02-integration/03-db/23-resumable_chunker_spec.lua new file mode 100644 index 00000000000..625112fa60d --- /dev/null +++ b/spec/02-integration/03-db/23-resumable_chunker_spec.lua @@ -0,0 +1,251 @@ +local helpers = require("spec.helpers") +local resumable_chunker = require("kong.db.resumable_chunker") + +local fmod = math.fmod + +for _, strategy in helpers.each_strategy() do + describe("kong.db.resumable_chunker #" .. strategy, function() + local db, bp + + -- Note by default the page size is 100, we should keep this number + -- less than 100/(tags_per_entity) + -- otherwise the 'limits maximum queries in single request' tests + -- for Cassandra might fail + local test_entity_count = 10 + + local total_entities + local validate_result, count, count_rows + local random_modification, revert_modification + local rebuild_db + + local typs = { "service", "route" } + + lazy_setup(function() + function rebuild_db() + bp, db = helpers.get_db_utils(strategy) + + local services = {} + for i = 1, test_entity_count do + local service = { + host = "example-" .. i .. ".test", + name = "service" .. i, + tags = { "team_ a", "level "..fmod(i, 5), "service"..i } + } + local row, err, err_t = bp.services:insert(service) + assert.is_nil(err) + assert.is_nil(err_t) + assert.same(service.tags, row.tags) + services[i] = row + end + + local routes = {} + for i = 1, test_entity_count do + local route = { + name = "route" .. i, + protocols = { "http" }, + methods = { "GET" }, + paths = { "/route" .. i }, + service = services[i], + tags = { "team_ a", "level "..fmod(i, 5), "route"..i } + } + local row, err, err_t = bp.routes:insert(route) + assert.is_nil(err) + assert.is_nil(err_t) + assert.same(route.tags, row.tags) + routes[i] = row + end + + for i = 1, test_entity_count do + local plugin = { + instance_name = "route_plugin" .. i, + name = "key-auth", + route = routes[i], + tags = { "team_ a", "level "..fmod(i, 5), "route_plugin"..i } + } + local row, err, err_t = bp.plugins:insert(plugin) + assert.is_nil(err) + assert.is_nil(err_t) + assert.same(plugin.tags, row.tags) + end + + for i = 1, test_entity_count do + local plugin = { + instance_name = "service_plugin" .. i, + name = "key-auth", + service = services[i], + tags = { "team_ a", "level "..fmod(i, 5), "service_plugin"..i } + } + local row, err, err_t = bp.plugins:insert(plugin) + assert.is_nil(err) + assert.is_nil(err_t) + assert.same(plugin.tags, row.tags) + + end + + for i = 1, 1 do + local plugin = { + name = "key-auth", + tags = { "team_ a", "level "..fmod(i, 5), "global_plugin"..i } + } + local row, err, err_t = bp.plugins:insert(plugin) + assert.is_nil(err) + assert.is_nil(err_t) + assert.same(plugin.tags, row.tags) + end + + local consumers = {} + for i = 1, test_entity_count do + local consumer = { + username = "consumer" .. i, + custom_id = "custom_id" .. i, + tags = { "team_ a", "level "..fmod(i, 5), "consumer"..i } + } + local row, err, err_t = bp.consumers:insert(consumer) + assert.is_nil(err) + assert.is_nil(err_t) + assert.same(consumer.tags, row.tags) + consumers[i] = row + end + end + + rebuild_db() + + function validate_result(counter, tolerate) + if not tolerate then + if not total_entities then + total_entities = counter.n + else + assert.same(total_entities, counter.n) + end + end + + for _, typ in ipairs({ "service", "route", "route_plugin", "service_plugin" }) do + counter[typ] = counter[typ] or {} + if not tolerate then + assert.same(test_entity_count, counter[typ].n, typ) + end + for i = 1, test_entity_count do + counter[typ][i] = counter[typ][i] or 0 + if not tolerate then + assert.same(1, counter[typ][i], typ .. i) + else + assert.truthy(counter[typ][i] >= 1, typ .. i) + end + end + end + assert.same(1, counter.global_plugin.n, "global_plugin") + assert.same(1, counter.global_plugin[1], "global_plugin1") + end + + function count(counter, row) + local tag = row.tags and row.tags[3] + if tag then + local typ, n = tag:match("^(%D+)%s*(%d+)$") + n = tonumber(n) + counter[typ] = counter[typ] or {} + counter[typ][n] = (counter[typ][n] or 0) + 1 + counter[typ].n = (counter[typ].n or 0) + 1 + counter.n = (counter.n or 0) + 1 + end + end + + function count_rows(counter, rows) + for _, row in ipairs(rows) do + count(counter, row) + end + end + + local function record(typ, mod_record, row) + table.insert(mod_record, row) + end + + function random_modification(mod_record) + local typ = typs[math.random(#typs)] + local rows = assert(db[typ .. "s"]:page(100)) + if #rows == 0 then + return + end + + local row = rows[math.random(#rows)] + + local n = row.tags[3]:match("%d+") + + record(typ, mod_record, row) + -- also add cascading entities + local plugin = db.plugins:select_by_instance_name(typ .. "_plugin" .. n) + record("plugin", mod_record, plugin) + if typ == "service" then + local route = db.routes:select_by_name("route" .. n) + local routes_plugin = db.plugins:select_by_instance_name("route_plugin" .. n) + record("route", mod_record, route) + record("plugin", mod_record, routes_plugin) + end + + db[typ .. "s"]:delete(row) + end + + function revert_modification(counter, mod_record) + count_rows(counter, mod_record) + end + end) + + for _, page_size in ipairs({ 1, 2, 7, 10, 13, 60, 125 }) do + it("works for page size: " .. page_size, function() + local counter = {} + local chunker = resumable_chunker.from_db(kong.db) + + local rows, err, offset + repeat + rows, err, offset = chunker:next(page_size, offset) + assert.is_nil(err) + + if offset then + assert.same(page_size, #rows) + end + + count_rows(counter, rows) + until not offset + + validate_result(counter) + end) + end + + describe("lock free modification", function() + before_each(rebuild_db) + -- The test is slow because it requires rebuilding the database every time + -- so we cut down the number of iterations + for _, page_size in ipairs({ 1, 10, 13, 125 }) do + it("for page size: #" .. page_size, function () + local counter = {} + local mod_record = {} + local chunker = resumable_chunker.from_db(kong.db) + + local rows, err, offset + local n = 0 + repeat + rows, err, offset = chunker:next(page_size, offset) + assert.is_nil(err) + + if offset then + assert.same(page_size, #rows) + end + + n = n + page_size + + while n > test_entity_count do + for _ = 1, math.random(0, test_entity_count) do + random_modification(mod_record) + end + n = n - test_entity_count + end + + count_rows(counter, rows) + until not offset + + revert_modification(counter, mod_record) + validate_result(counter, true) + end) + end + end) + end) +end From 367b4654f6cca7d15a7e72f82cd67ba9700a412e Mon Sep 17 00:00:00 2001 From: xumin Date: Mon, 9 Dec 2024 18:15:14 +0800 Subject: [PATCH 3/4] feat(sync): implement --- kong/clustering/services/sync/rpc.lua | 295 +++++++++++++++++++++----- kong/constants.lua | 1 + 2 files changed, 245 insertions(+), 51 deletions(-) diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 4100afbb967..62429c8d7e8 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -8,13 +8,19 @@ local constants = require("kong.constants") local concurrency = require("kong.concurrency") local isempty = require("table.isempty") local events = require("kong.runloop.events") +local lrucache = require("resty.lrucache") +local resumable_chunker = require("kong.db.resumable_chunker") +local clustering_utils = require("kong.clustering.utils") +local EMPTY = require("kong.tools.table").EMPTY local insert_entity_for_txn = declarative.insert_entity_for_txn local delete_entity_for_txn = declarative.delete_entity_for_txn local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY +local CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY = constants.CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY +local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } local MAX_RETRY = 5 @@ -27,16 +33,23 @@ local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR local ngx_INFO = ngx.INFO +local ngx_NOTICE = ngx.NOTICE local ngx_DEBUG = ngx.DEBUG +local json_encode = clustering_utils.json_encode +local json_decode = clustering_utils.json_decode + -- number of versions behind before a full sync is forced local DEFAULT_FULL_SYNC_THRESHOLD = 512 -function _M.new(strategy) +function _M.new(strategy, opts) + opts = opts or EMPTY + local self = { strategy = strategy, + page_size = opts.page_size, } return setmetatable(self, _MT) @@ -48,17 +61,129 @@ local function inc_sync_result(res) end -local function full_sync_result() +local function paged_full_sync_payload(page, next_token) + return { + default = { + full_sync = true, + deltas = page, + next = next_token and assert(json_encode(next_token)), + }, + } +end + + +local function lagacy_full_sync() local deltas, err = declarative.export_config_sync() if not deltas then return nil, err end - -- wipe dp lmdb, full sync return { default = { deltas = deltas, wipe = true, }, } end +local function page_to_deltas(page) + local deltas = {} + for i, entity in ipairs(page) do + local typ = entity.__type + entity.__type = nil + local delta = { + type = typ, + entity = entity, + version = 0, -- pin to the 0 to let DP report itself as not ready + ws_id = kong.default_workspace, + } + + deltas[i] = delta + end + + return deltas +end + + +local function full_sync(self, workspace) + local pageable = workspace.pageable + local next_token = workspace.next + + if not pageable then + if next_token then + -- how do I emit a client error? + return nil, "next_token is set for none pageable DP" + end + + return lagacy_full_sync() + end + + local offset, begin_version, end_version + if next_token then + local err + next_token, err = json_decode(next_token) + if not next_token then + return nil, "invalid next_token" + end + + offset, begin_version, end_version = + next_token.offset, next_token.begin_version, next_token.end_version + else + begin_version = self.strategy:get_latest_version() + end + + -- DP finished syncing DB entities. Now trying to catch up with the fix-up deltas + if not offset then + if not end_version then + return nil, "invalid next_token" + end + + local res, err = self.strategy:get_delta(end_version) + if not res then + return nil, err + end + + -- history is lost. Unable to make a consistent full sync + if not isempty(res) and res[1].version ~= default_namespace_version + 1 then + return nil, "history lost, unable to make a consistent full sync" + end + + return paged_full_sync_payload(res, nil) -- nil next_token marks the end + end + + local pager = self.pager + if not pager then + pager = resumable_chunker.from_db(manager.db, { + size = self.page_size, + }) + self.pager = pager + end + + local page, err, new_offset = pager:fetch(nil, offset) + if not page then + return nil, err + end + + local deltas = page_to_deltas(page) + + if not new_offset then + end_version = self.strategy:get_latest_version() + + -- no changes during the full sync session. No need for fix-up deltas + if end_version == begin_version then + return paged_full_sync_payload(deltas, nil) + end + + -- let DP initiate another call to get fix-up deltas + return paged_full_sync_payload(deltas, { + end_version = end_version, + }) + end + + -- more DB pages to fetch + return paged_full_sync_payload(deltas, { + offset = new_offset, + begin_version = begin_version, + }) +end + + function _M:init_cp(manager) local purge_delay = manager.conf.cluster_data_plane_purge_delay @@ -109,7 +234,8 @@ function _M:init_cp(manager) -- is the node empty? If so, just do a full sync to bring it up to date faster if default_namespace_version == 0 or - latest_version - default_namespace_version > FULL_SYNC_THRESHOLD + (latest_version - default_namespace_version > FULL_SYNC_THRESHOLD) or + default_namespace.next -- a full-sync session is in progress then -- we need to full sync because holes are found @@ -118,7 +244,7 @@ function _M:init_cp(manager) ", current_version: ", default_namespace_version, ", forcing a full sync") - return full_sync_result() + return full_sync(self, default_namespace) end -- do we need an incremental sync? @@ -148,7 +274,7 @@ function _M:init_cp(manager) ", current_version: ", default_namespace_version, ", forcing a full sync") - return full_sync_result() + return full_sync(self, default_namespace) end) end @@ -210,32 +336,84 @@ local function is_rpc_ready() end +local function purge(t) + t:db_drop(false) + -- we are at a unready state + -- consider the config empty + t:set(DECLARATIVE_HASH_KEY, DECLARATIVE_EMPTY_CONFIG_HASH) + kong.core_cache:purge() + kong.cache:purge() +end + + +local function paginated_error_handle() + -- a failed full sync. + local t = txn.begin(512) + purge(t) + local ok, err = t:commit() + if not ok then + error("failed to reset DB when handling error: " .. err) + end + + kong_shm:set(CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY, 0) + + -- retry immediately + return _M:sync_once(0) +end + + local function do_sync() if not is_rpc_ready() then return nil, "rpc is not ready" end + local next_token = kong_shm:get(CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY) + + local version + if next_token then + version = 0 + else + version = tonumber(declarative.get_current_hash()) or 0 + end + local msg = { default = - { version = - tonumber(declarative.get_current_hash()) or 0, + { version = version, + next = next_token, + pageable = true, }, } - local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg) - if not ns_deltas then + local result, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg) + if not result then ngx_log(ngx_ERR, "sync get_delta error: ", err) + + if next_token then + return paginated_error_handle() + end + return true end - -- ns_deltas should look like: - -- { default = { deltas = { ... }, wipe = true, }, } + -- result should look like: + -- { default = { deltas = { ... }, wipe = true, full_sync_done = false, next_token = ...}, } - local ns_delta = ns_deltas.default - if not ns_delta then + local payload = result.default + if not payload then return nil, "default namespace does not exist inside params" end - local deltas = ns_delta.deltas + local full_sync, first_page, last_page + if payload.full_sync then + full_sync = true + first_page = not next_token and payload.next + last_page = not payload.next + + elseif payload.wipe then + -- lagacy full sync + full_sync, first_page, last_page = true, true, true + end + + local deltas = payload.deltas if isempty(deltas) then -- no delta to sync @@ -258,9 +436,11 @@ local function do_sync() local t = txn.begin(512) - local wipe = ns_delta.wipe - if wipe then - t:db_drop(false) + -- a full sync begins + if first_page then + -- reset the lmdb + purge(t) + next_token = payload.next end local db = kong.db @@ -291,8 +471,8 @@ local function do_sync() return nil, err end - -- If we will wipe lmdb, we don't need to delete it from lmdb. - if old_entity and not wipe then + -- If we are purging, we don't need to delete it. + if old_entity and not full_sync then local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts) if not res then return nil, err @@ -309,8 +489,8 @@ local function do_sync() ", version: ", delta_version, ", type: ", delta_type) - -- wipe the whole lmdb, should not have events - if not wipe then + -- during the full sync, should not emit events + if not full_sync then ev = { delta_type, old_entity and "update" or "create", delta_entity, old_entity, } end @@ -321,8 +501,8 @@ local function do_sync() return nil, err end - -- If we will wipe lmdb, we don't need to delete it from lmdb. - if old_entity and not wipe then + -- during the full sync, should not emit events + if old_entity and not in_full_sync then local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts) if not res then return nil, err @@ -334,14 +514,14 @@ local function do_sync() ", version: ", delta_version, ", type: ", delta_type) - -- wipe the whole lmdb, should not have events - if not wipe then + -- delete the entity, opts for getting correct lmdb key + if not in_full_sync then ev = { delta_type, "delete", old_entity, } end end -- if delta_entity ~= nil and delta_entity ~= ngx_null - -- wipe the whole lmdb, should not have events - if not wipe then + -- during the full sync, should not emit events + if not full_sync then crud_events_n = crud_events_n + 1 crud_events[crud_events_n] = ev end @@ -354,9 +534,12 @@ local function do_sync() end end -- for _, delta - -- store current sync version - t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) - + -- only update the sync version if not in full sync/ full sync done + if (not full_sync) or last_page then + -- store current sync version + t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) + end + -- store the correct default workspace uuid if default_ws_changed then t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace) @@ -367,37 +550,47 @@ local function do_sync() return nil, err end - if wipe then - kong.core_cache:purge() - kong.cache:purge() + if full_sync then + -- the full sync is done + if last_page then + kong_shm:set(CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY, nil) + + -- Trigger other workers' callbacks like reconfigure_handler. + -- + -- Full sync could rebuild route, plugins and balancer route, so their + -- hashes are nil. + -- Until this point, the dataplane is not ready to serve requests or to + -- do delta syncs. + local reconfigure_data = { kong.default_workspace, nil, nil, nil, } + return events.declarative_reconfigure_notify(reconfigure_data) - -- Trigger other workers' callbacks like reconfigure_handler. - -- - -- Full sync could rebuild route, plugins and balancer route, so their - -- hashes are nil. - local reconfigure_data = { kong.default_workspace, nil, nil, nil, } - local ok, err = events.declarative_reconfigure_notify(reconfigure_data) - if not ok then - return nil, err - end + else + kong_shm:set(CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY, payload.next) - else - for _, event in ipairs(crud_events) do - -- delta_type, crud_event_type, delta.entity, old_entity - db[event[1]]:post_crud_event(event[2], event[3], event[4]) + -- get next page imeediately without releasing the mutex + -- no need to yield or wait for other workers as the DP is unable to proxy and nothing else + -- can be done until the full sync is done + return do_sync() end end + -- emit the CRUD events + -- if in_full_sync, no events should be added into the queue + for _, event in ipairs(crud_events) do + -- delta_type, crud_event_type, delta.entity, old_entity + db[event[1]]:post_crud_event(event[2], event[3], event[4]) + end + return true end -local function sync_handler(premature) +local function sync_handler(premature, try_counter, dp_status) if premature then return end - local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, do_sync) + local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() do_sync(dp_status) end) if not res and err ~= "timeout" then ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) end @@ -446,12 +639,12 @@ end function _M:sync_once(delay) - return ngx.timer.at(delay or 0, sync_once_impl, 0) + return ngx.timer.at(delay or 0, sync_once_impl, 0, self) end function _M:sync_every(delay) - return ngx.timer.every(delay, sync_handler) + return ngx.timer.every(delay, sync_handler, nil, self) end diff --git a/kong/constants.lua b/kong/constants.lua index 4b04b01ce3f..ef02e4948d2 100644 --- a/kong/constants.lua +++ b/kong/constants.lua @@ -235,6 +235,7 @@ local constants = { GENERIC = "generic or unknown error", }, CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = "clustering_data_planes:latest_version", + CLUSTERING_DATA_PLANES_PAGED_FULL_SYNC_NEXT_TOKEN_KEY = "clustering_data_planes:paged_full_sync_next_token", CLEAR_HEALTH_STATUS_DELAY = 300, -- 300 seconds From 570e566902fa4b4142d153a5f6775a2fda614b19 Mon Sep 17 00:00:00 2001 From: xumin Date: Mon, 16 Dec 2024 17:43:02 +0800 Subject: [PATCH 4/4] tests(sync): full sync --- spec/01-unit/19-hybrid/04-rpc_spec.lua | 31 +++++ spec/helpers.lua | 20 +++ spec/helpers/rpc_mock/client.lua | 36 ++++++ spec/helpers/rpc_mock/default.lua | 13 ++ spec/helpers/rpc_mock/server.lua | 164 +++++++++++++++++++++++++ spec/helpers/rpc_proxy/handler.lua | 54 ++++++++ spec/helpers/rpc_proxy/schema.lua | 13 ++ 7 files changed, 331 insertions(+) create mode 100644 spec/01-unit/19-hybrid/04-rpc_spec.lua create mode 100644 spec/helpers/rpc_mock/client.lua create mode 100644 spec/helpers/rpc_mock/default.lua create mode 100644 spec/helpers/rpc_mock/server.lua create mode 100644 spec/helpers/rpc_proxy/handler.lua create mode 100644 spec/helpers/rpc_proxy/schema.lua diff --git a/spec/01-unit/19-hybrid/04-rpc_spec.lua b/spec/01-unit/19-hybrid/04-rpc_spec.lua new file mode 100644 index 00000000000..060b353afeb --- /dev/null +++ b/spec/01-unit/19-hybrid/04-rpc_spec.lua @@ -0,0 +1,31 @@ +-- by importing helpers, we initialize the kong PDK module +local helpers = require "spec.helpers" +local server = require("spec.helpers.rpc_mock.server") +local client = require("spec.helpers.rpc_mock.client") + +describe("rpc v2", function() + describe("full sync pagination", function() + describe("server side", function() + local server_mock + local port + lazy_setup(function() + server_mock = server.new() + server_mock:start() + port = server_mock.listen + end) + lazy_teardown(function() + server_mock:stop() + end) + end) + + describe("client side", function() + local client_mock + lazy_setup(function() + client_mock = server.new() + end) + lazy_teardown(function() + client_mock:stop() + end) + end) + end) +end) diff --git a/spec/helpers.lua b/spec/helpers.lua index 22b67c4434d..c41854e8bf2 100644 --- a/spec/helpers.lua +++ b/spec/helpers.lua @@ -28,6 +28,26 @@ local server = reload_module("spec.internal.server") local client = reload_module("spec.internal.client") local wait = reload_module("spec.internal.wait") +-- redo the patches to apply the kong global patches +local _timerng + +_timerng = require("resty.timerng").new({ + min_threads = 16, + max_threads = 32, +}) + +_timerng:start() + +_G.timerng = _timerng + +_G.ngx.timer.at = function (delay, callback, ...) + return _timerng:at(delay, callback, ...) +end + +_G.ngx.timer.every = function (interval, callback, ...) + return _timerng:every(interval, callback, ...) +end + ---------------- -- Variables/constants diff --git a/spec/helpers/rpc_mock/client.lua b/spec/helpers/rpc_mock/client.lua new file mode 100644 index 00000000000..42ce5a4270e --- /dev/null +++ b/spec/helpers/rpc_mock/client.lua @@ -0,0 +1,36 @@ +-- by importing helpers, we ensure the kong PDK module is initialized +local helpers = require "spec.helpers" +local rpc_mgr = require("kong.clustering.rpc.manager") +local default_cert_meta = require("spec.helpers.rpc_mock.default").default_cert_meta + +local _M = {} +local _MT = { __index = _M, } + +local default_dp_conf = { + role = "data_plane", + cluster_control_plane = "127.0.0.1", +} + +setmetatable(default_dp_conf, default_cert_meta) +local default_meta = { __index = default_dp_conf, } + +local function do_nothing() end + +local function client_stop(rpc_mgr) + -- a hacky way to stop rpc_mgr from reconnecting + rpc_mgr.try_connect = do_nothing + + -- this will stop all connections + for _, socket in rpc_mgr.sockets do + socket:stop() + end +end + +function _M.new(opts) + opts = opts or {} + setmetatable(opts, default_meta) + local ret = rpc_mgr.new(default_dp_conf, opts.name or "dp") + ret.stop = client_stop +end + +return _M diff --git a/spec/helpers/rpc_mock/default.lua b/spec/helpers/rpc_mock/default.lua new file mode 100644 index 00000000000..568322e3ff7 --- /dev/null +++ b/spec/helpers/rpc_mock/default.lua @@ -0,0 +1,13 @@ +local default_cert = { + cluster_mtls = "shared", + clustetr_ca_cert = "path/kong_clustering_ca.crt", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", +} + +local default_cert_meta = { __index = default_cert, } + +return { + default_cert = default_cert, + default_cert_meta = default_cert_meta, +} diff --git a/spec/helpers/rpc_mock/server.lua b/spec/helpers/rpc_mock/server.lua new file mode 100644 index 00000000000..9b42a64de15 --- /dev/null +++ b/spec/helpers/rpc_mock/server.lua @@ -0,0 +1,164 @@ +local helpers = require("spec.helpers") +local client = require("spec.helpers.rpc_mock.client") +local default_cert_meta = require("spec.helpers.rpc_mock.default").default_cert_meta + +local _M = {} +local _MT = { __index = _M, } + +function _M.new(opts) + opts = opts or {} + opts.prefix = opts.prefix or ("servroot_rpc_mock_" .. plugin.id) + opts.role = "control_plane" + opts.plugins = "rpc-proxy" + opts.listen = opts.listen or helpers.get_available_port() + opts.cluster_listen = opts.cluster_listen or "0.0.0.0:" .. opts.listen + opts.mocks = opts.mocks or {} + opts.prehooks = opts.prehooks or {} + opts.posthooks = opts.posthooks or {} + if opts.interception = nil then + opts.interception = true + end + + for k, v in pairs(default_cert_meta) do + if opts[k] == nil then + opts[k] = v + end + end + + return setmetatable(opts, _MT) +end + +function _M.start(self) + local bp, db = helpers.get_db_utils(strategy, nil, { "rpc-proxy" }) + + local plugin = db.plugins:insert({ + name = "rpc-proxy", + config = {}, + }) + + assert(helpers.start_kong(self)) + + self.proxy_client = client.new({ + cluster_control_plane = listen, + }) + + if self.interception then + self:enable_inception() + end + + self.proxy_client.callbacks:register("kong.rpc.proxy.mock", function(proxy_id, proxy_payload) + local method, node_id, payload = proxy_payload.method, proxy_payload.node_id, proxy_payload.payload + local cb = self.mocks[method] + if cb then + local res, err = cb(node_id, payload, proxy_id, self) + return { + mock = true, + result = res, + error = err, + } + end + + local prehook = self.prehooks[method] or self.prehooks["*"] + local posthook = self.posthooks[method] or self.posthooks["*"] + local result = { + prehook = prehook and true, + posthook = posthook and true, + } + + if prehook then + local res, err = prehook(node_id, payload, proxy_id, self, method) + if not res then + return nil, err + end + + result.args = res + end + + return result + end) + + self.proxy_client.callbacks:register("kong.rpc.proxy.posthook", function(proxy_id, proxy_payload) + local method, node_id, payload = proxy_payload.method, proxy_payload.node_id, proxy_payload.payload + local cb = self.posthooks[method] or self.posthooks["*"] + if not cb then + return nil, "no callback registered for method: " .. method + end + + return cb(node_id, payload, proxy_id, self, method) + end) + + self = setmetatable(self, _MT) + + if next(self.callbacks) then + self:register_proxy() + end +end + +function _M:register_proxy() + local hooked = {} + for api_name, cb in pairs(self.callbacks) do + table.insert(hooked, api_name) + end + + return self.proxy_client:call("kong.rpc.proxy.register", { + proxy_apis = hooked, + }) +end + +function _M:mock_api(api_name, cb) + self.mocks[api_name] = cb +end + +function _M:prehook_api(api_name, cb) + self.prehooks[api_name] = cb +end + +function _M:posthook_api(api_name, cb) + self.posthooks[api_name] = cb +end + +local get_records(server) + local records = server.records + if not records then + records = {} + server.records = records + end + return records +end + +-- TODO: add req ID for correlation +local function default_inception_prehook(node_id, payload, proxy_id, server, method) + local records = get_records(server) + records[#records + 1] = { + request = true + node_id = node_id, + payload = payload, + proxy_id = proxy_id, + method = method, + } + return payload +end + +local function default_inception_posthook(node_id, payload, proxy_id, server, method) + local records = get_records(server) + records[#records + 1] = { + response = true, + node_id = node_id, + payload = payload, + proxy_id = proxy_id + method = method, + } + return payload +end + +function _M:enable_inception() + self:prehook_api["*"] = default_inception_prehook + self:posthook_api["*"] = default_inception_posthook +end + +function _M:stop() + helpers.stop_kong(self.prefix) + self.proxy_client:stop() +end + +return _M diff --git a/spec/helpers/rpc_proxy/handler.lua b/spec/helpers/rpc_proxy/handler.lua new file mode 100644 index 00000000000..a61eaf9761e --- /dev/null +++ b/spec/helpers/rpc_proxy/handler.lua @@ -0,0 +1,54 @@ +local rpc_mgr = require("kong.clustering.rpc.manager") + +local _M = { + PRIORITY = 1000, + VERSION = kong_meta.version, +} + +local original_callbacks = {} + +function _M.init_worker() + kong.rpc.callbacks:register("kong.rpc.proxy.register", function(node_id, register_payload) + local proxy_apis = register_payload.proxy_apis + + for _, proxy_api in ipairs(proxy_apis) do + kong.log.info("Hook registering RPC proxy API: ", proxy_api) + local original = kong.rpc.callbacks[proxy_api] + if original and not original_callbacks[proxy_api] then + original_callbacks[proxy_api] = original + end + kong.rpc.callbacks[proxy_api] = nil + kong.rpc.callbacks:register(proxy_api, function(client_id, payload) + local res, err = kong.rpc:call(node_id, "kong.rpc.proxy", { method = proxy_api, node_id = client_id, payload = payload }) + if not res then + return nil, "Failed to proxy(" .. node_id .. "): " .. err + end + + if res.error then + return nil, res.error + end + + if res.prehook or res.posthook then + if res.prehook then + payload = res.args + end + + local origin_res, origin_err = original(client_id, payload) + + if res.posthook then + res, err = kong.rpc:call(node_id, "kong.rpc.proxy.posthook", { method = proxy_api, node_id = client_id, payload = {result = origin_res, error = origin_err} }) + if not res then + return nil, "Failed to call post hook(" .. node_id .. "): " .. err + end + + return res.result, res.error + end + elseif res.mock then + return res.result, res.error + end + + return nil, "invalid response from proxy" + end) + end + end) +end diff --git a/spec/helpers/rpc_proxy/schema.lua b/spec/helpers/rpc_proxy/schema.lua new file mode 100644 index 00000000000..eee01bfe7af --- /dev/null +++ b/spec/helpers/rpc_proxy/schema.lua @@ -0,0 +1,13 @@ +return { + name = "rpc-proxy", + fields = { + { protocols = typedefs.protocols }, + { + config = { + type = "record", + fields = { + }, + }, + }, + }, +}