Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(clustering/rpc): tunings and fixes for rubustness #13771

Merged
merged 26 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
04b7f9c
pagesize must not be less than 2
chronolaw Oct 17, 2024
6f8301f
change lmdb key format
chronolaw Oct 21, 2024
401ab4a
Revert "change lmdb key format"
chronolaw Oct 22, 2024
c4dd964
use item.ws_id directly
chronolaw Oct 22, 2024
e849bac
set global key in lmdb
chronolaw Oct 22, 2024
c3c6b74
fix(db): use request_aware_table correctly
chronolaw Oct 22, 2024
e01e704
fix(dbless): get default workspace correctly
chronolaw Oct 23, 2024
a8d4479
check latest_version and d.ws_id
chronolaw Oct 24, 2024
8c1b950
fix dp do_sync()
chronolaw Oct 24, 2024
e227a34
fix declarative export sync
chronolaw Oct 24, 2024
ff56567
typo fix
chronolaw Oct 24, 2024
9386ffd
check latest_version == ngx_null in CP
chronolaw Oct 25, 2024
d061361
style fix
chronolaw Oct 25, 2024
15624ec
fix(clustring): inc sync add ws (#13790)
chronolaw Oct 25, 2024
f3ed16b
fix(clustering): delta version is null (#13789)
chronolaw Oct 25, 2024
3467d0c
rpc.lua style clean
chronolaw Oct 27, 2024
7294a20
sync comments clean
chronolaw Oct 27, 2024
cacf764
refactor(clustering): change sync data structure (#13794)
chronolaw Oct 28, 2024
fc26c42
comments clean in rpc.lua
chronolaw Oct 28, 2024
60ef6c3
fix(incremental sync): added comment for 2 times do_sync (#13801)
chobits Oct 29, 2024
c657588
Refactor/rename delta row (#13807)
chronolaw Oct 30, 2024
8d34725
comments clean
chronolaw Oct 30, 2024
d6aa49a
check delta.entity nil
chronolaw Oct 31, 2024
8774284
fix(incremental sync): fix cache_key handling for select_by_cache_key…
chobits Oct 31, 2024
63c17e2
refactor(dbless): clean logic of select_by_field (#13817)
chronolaw Oct 31, 2024
b37bdd1
fix(incremental sync): fixed ws_id processing in _set_entity_for_txn …
chobits Nov 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,19 @@ function _M:notify_all_nodes()
end


function _M:entity_delta_writer(row, name, options, ws_id, is_delete)
function _M:entity_delta_writer(entity, name, options, ws_id, is_delete)
-- composite key, like { id = ... }
local schema = kong.db[name].schema
local pk = schema:extract_pk_values(entity)

assert(schema:validate_primary_key(pk))

local deltas = {
{
type = name,
id = row.id,
pk = pk,
ws_id = ws_id,
row = is_delete and ngx_null or row,
entity = is_delete and ngx_null or entity,
},
}

Expand All @@ -99,7 +105,7 @@ function _M:entity_delta_writer(row, name, options, ws_id, is_delete)

self:notify_all_nodes()

return row -- for other hooks
return entity -- for other hooks
end


Expand Down Expand Up @@ -131,21 +137,21 @@ function _M:register_dao_hooks()
end
end

local function post_hook_writer_func(row, name, options, ws_id)
local function post_hook_writer_func(entity, name, options, ws_id)
if not is_db_export(name) then
return row
return entity
end

return self:entity_delta_writer(row, name, options, ws_id)
return self:entity_delta_writer(entity, name, options, ws_id)
end

local function post_hook_delete_func(row, name, options, ws_id, cascade_entries)
local function post_hook_delete_func(entity, name, options, ws_id, cascade_entries)
if not is_db_export(name) then
return row
return entity
end

-- set lmdb value to ngx_null then return row
return self:entity_delta_writer(row, name, options, ws_id, true)
-- set lmdb value to ngx_null then return entity
return self:entity_delta_writer(entity, name, options, ws_id, true)
end

local dao_hooks = {
Expand Down
98 changes: 60 additions & 38 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ local txn = require("resty.lmdb.transaction")
local declarative = require("kong.db.declarative")
local constants = require("kong.constants")
local concurrency = require("kong.concurrency")
local isempty = require("table.isempty")


local insert_entity_for_txn = declarative.insert_entity_for_txn
local delete_entity_for_txn = declarative.delete_entity_for_txn
local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }


local pairs = pairs
local ipairs = ipairs
local fmt = string.format
local ngx_null = ngx.null
Expand All @@ -39,16 +40,16 @@ end


function _M:init_cp(manager)
-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
-- { { namespace = "default", version = 1000, }, }
local purge_delay = manager.conf.cluster_data_plane_purge_delay

local function gen_delta_result(res, wipe)
return { default = { deltas = res, wipe = wipe, }, }
end

-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
-- example: { default = { version = 1000, }, }
manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, current_versions)
ngx_log(ngx_DEBUG, "[kong.sync.v2] config push (connected client)")

Expand All @@ -57,19 +58,13 @@ function _M:init_cp(manager)
rpc_peers = kong.rpc:get_peers()
end

local default_namespace
for namespace, v in pairs(current_versions) do
if namespace == "default" then
default_namespace = v
break
end
end
local default_namespace = current_versions.default

if not default_namespace then
return nil, "default namespace does not exist inside params"
end

-- { { namespace = "default", version = 1000, }, }
-- { default = { version = 1000, }, }
local default_namespace_version = default_namespace.version

-- XXX TODO: follow update_sync_status() in control_plane.lua
Expand Down Expand Up @@ -117,7 +112,7 @@ function _M:init_cp(manager)
return nil, err
end

if #res == 0 then
if isempty(res) then
ngx_log(ngx_DEBUG,
"[kong.sync.v2] no delta for node_id: ", node_id,
", current_version: ", default_namespace_version,
Expand All @@ -126,7 +121,7 @@ function _M:init_cp(manager)
end

-- some deltas are returned, are they contiguous?
if res[1].version == default_namespace.version + 1 then
if res[1].version == default_namespace_version + 1 then
-- doesn't wipe dp lmdb, incremental sync
return gen_delta_result(res, false)
end
Expand Down Expand Up @@ -155,7 +150,7 @@ function _M:init_dp(manager)
-- DP
-- Method: kong.sync.v2.notify_new_version
-- Params: new_versions: list of namespaces and their new versions, like:
-- { { new_version = 1000, }, }, possible field: namespace = "default"
-- { default = { new_version = 1000, }, }
manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions)
-- TODO: currently only default is supported, and anything else is ignored
local default_new_version = new_versions.default
Expand Down Expand Up @@ -199,24 +194,33 @@ local function do_sync()
return true
end

local ns_delta

for namespace, delta in pairs(ns_deltas) do
if namespace == "default" then
ns_delta = delta
break -- should we break here?
end
end
-- ns_deltas should look like:
-- { default = { deltas = { ... }, wipe = true, }, }

local ns_delta = ns_deltas.default
if not ns_delta then
return nil, "default namespace does not exist inside params"
end

if #ns_delta.deltas == 0 then
local deltas = ns_delta.deltas

if isempty(deltas) then
ngx_log(ngx_DEBUG, "no delta to sync")
return true
end

-- we should find the correct default workspace
-- and replace the old one with it
local default_ws_changed
for _, delta in ipairs(deltas) do
if delta.type == "workspaces" and delta.entity.name == "default" then
kong.default_workspace = delta.entity.id
default_ws_changed = true
break
end
end
assert(type(kong.default_workspace) == "string")

local t = txn.begin(512)

local wipe = ns_delta.wipe
Expand All @@ -227,18 +231,25 @@ local function do_sync()
local db = kong.db

local version = 0
local opts = {}
local crud_events = {}
local crud_events_n = 0

for _, delta in ipairs(ns_delta.deltas) do
-- delta should look like:
-- { type = ..., entity = { ... }, version = 1, ws_id = ..., }
for _, delta in ipairs(deltas) do
local delta_type = delta.type
local delta_row = delta.row
local delta_entity = delta.entity
local ev

if delta_row ~= ngx_null then
-- delta must have ws_id to generate the correct lmdb key
-- set the correct workspace for item
opts.workspace = assert(delta.ws_id)

if delta_entity ~= nil and delta_entity ~= ngx_null then
-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_row)
local old_entity, err = db[delta_type]:select(delta_entity)
if err then
return nil, err
end
Expand All @@ -247,29 +258,29 @@ local function do_sync()

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
end
end

local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil)
local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts)
if not res then
return nil, err
end

ev = { delta_type, crud_event_type, delta_row, old_entity, }
ev = { delta_type, crud_event_type, delta_entity, old_entity, }

else
-- delete the entity
local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key
local old_entity, err = kong.db[delta_type]:select(delta.pk) -- composite key
if err then
return nil, err
end

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
end
Expand All @@ -281,13 +292,22 @@ local function do_sync()
crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev

-- XXX TODO: could delta.version be nil or ngx.null
if type(delta.version) == "number" and delta.version ~= version then
-- delta.version should not be nil or ngx.null
assert(type(delta.version) == "number")

if delta.version ~= version then
version = delta.version
end
end -- for _, delta

-- store current sync version
t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))

-- store the correct default workspace uuid
if default_ws_changed then
t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace)
end

local ok, err = t:commit()
if not ok then
return nil, err
Expand All @@ -299,7 +319,7 @@ local function do_sync()

else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.row, old_entity
-- delta_type, crud_event_type, delta.entity, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
end
Expand All @@ -314,7 +334,9 @@ local function sync_handler(premature)
end

local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function()
-- here must be 2 times
-- `do_sync()` is run twice in a row to report back new version number
-- to CP quickly after sync. (`kong.sync.v2.get_delta` is used for both pulling delta
-- as well as status reporting)
for _ = 1, 2 do
local ok, err = do_sync()
if not ok then
Expand Down
22 changes: 14 additions & 8 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ local buffer = require("string.buffer")

local string_format = string.format
local cjson_encode = cjson.encode
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG
Expand Down Expand Up @@ -66,23 +67,23 @@ local NEW_VERSION_QUERY = [[
new_version integer;
BEGIN
INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version;
INSERT INTO clustering_sync_delta (version, type, id, ws_id, row) VALUES %s;
INSERT INTO clustering_sync_delta (version, type, pk, ws_id, entity) VALUES %s;
END $$;
]]


-- deltas: {
-- { type = "service", "id" = "d78eb00f-8702-4d6a-bfd9-e005f904ae3e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", }
-- { type = "route", "id" = "0a5bac5c-b795-4981-95d2-919ba3390b7e", "ws_id" = "73478cf6-964f-412d-b1c4-8ac88d9e85e9", row = "JSON", }
-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- }
function _M:insert_delta(deltas)
local buf = buffer.new()
for _, d in ipairs(deltas) do
buf:putf("(new_version, %s, %s, %s, %s)",
self.connector:escape_literal(d.type),
self.connector:escape_literal(d.id),
self.connector:escape_literal(d.ws_id),
self.connector:escape_literal(cjson_encode(d.row)))
self.connector:escape_literal(cjson_encode(d.pk)),
self.connector:escape_literal(d.ws_id or kong.default_workspace),
self.connector:escape_literal(cjson_encode(d.entity)))
end

local sql = string_format(NEW_VERSION_QUERY, buf:get())
Expand All @@ -92,14 +93,19 @@ end


function _M:get_latest_version()
local sql = "SELECT MAX(version) AS max_version FROM clustering_sync_version"
local sql = "SELECT MAX(version) FROM clustering_sync_version"

local res, err = self.connector:query(sql)
if not res then
return nil, err
end

return res[1] and res[1].max_version
local ver = res[1] and res[1].max
if ver == ngx_null then
return 0
end

return ver
end


Expand Down
2 changes: 1 addition & 1 deletion kong/db/dao/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ function DAO:cache_key(key, arg2, arg3, arg4, arg5, ws_id)
error("key must be a string or an entity table", 2)
end

if key.ws_id ~= nil and key.ws_id ~= null then
if key.ws_id ~= nil and key.ws_id ~= null and schema.workspaceable then
ws_id = key.ws_id
end

Expand Down
7 changes: 6 additions & 1 deletion kong/db/declarative/export.lua
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp
return nil, err
end

-- it will be ngx.null when the table clustering_sync_version is empty
sync_version = assert(ok[1].max)
if sync_version == null then
sync_version = 0
end
end

emitter:emit_toplevel({
Expand Down Expand Up @@ -359,7 +363,8 @@ local sync_emitter = {

emit_entity = function(self, entity_name, entity_data)
self.out_n = self.out_n + 1
self.out[self.out_n] = { type = entity_name , row = entity_data, version = self.sync_version, }
self.out[self.out_n] = { type = entity_name , entity = entity_data, version = self.sync_version,
ws_id = kong.default_workspace, }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check it later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here ws_id is a fallback, if row(entity) has the field ws_id, do_sync() will use it but not delta.ws_id.

end,

done = function(self)
Expand Down
Loading
Loading