Skip to content

Commit

Permalink
feat(sync): full sync pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
StarlightIbuki committed Nov 28, 2024
1 parent e457f6c commit 27d3e9a
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 44 deletions.
212 changes: 172 additions & 40 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@ local constants = require("kong.constants")
local concurrency = require("kong.concurrency")
local isempty = require("table.isempty")
local events = require("kong.runloop.events")
local lrucache = require("resty.lrucache")


local EMPTY = require("kong.tools.table").EMPTY
local insert_entity_for_txn = declarative.insert_entity_for_txn
local delete_entity_for_txn = declarative.delete_entity_for_txn
local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY
local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }
local encode_base64 = ngx.encode_base64
local decode_base64 = ngx.decode_base64
local cjson_encode = require("cjson.safe").encode



local assert = assert
Expand All @@ -25,16 +32,57 @@ local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_INFO = ngx.INFO
local ngx_NOTICE = ngx.NOTICE
local ngx_DEBUG = ngx.DEBUG


-- number of versions behind before a full sync is forced
local FULL_SYNC_THRESHOLD = 512


function _M.new(strategy)
local function decode_pagination_status(pagination_status)
if not pagination_status then
return nil, nil
end

-- base64 encoded json
local decoded = decode_base64(pagination_status)
if not decoded then
return nil, "failed to base64 decode pagination status:" .. err
end

decoded, err = cjson_decode(decoded)
if not decoded then
return nil, "failed to cjson decode pagination status:" .. err
end

return decoded.version, decoded.page_size, decoded.next_page
end



local function encode_pagination_status(version, page_size, next_page)
local data = {
version = version,
page_size = page_size,
next_page = next_page,
}

local json, err = cjson_encode(data)
if not json then
return nil, "failed to encode pagination:" .. err
end

return encode_base64(json)
end


function _M.new(strategy, opts)
opts = opts or EMPTY

local self = {
strategy = strategy,
page_size = opts.page_size,
}

return setmetatable(self, _MT)
Expand All @@ -46,20 +94,78 @@ local function inc_sync_result(res)
end


local function full_sync_result()
local deltas, err = declarative.export_config_sync()
if not deltas then
return nil, err
function _M:full_sync_result(full_sync_status)
local target_version, page_size, next_page = decode_pagination_status(full_sync_status)

-- page_size mean err here
if (not target_version) and page_size then
return nil, "communication error: " .. page_size
end

-- try fetch from cache
local config_deltas
if target_version then
config_deltas = self.full_sync_cache:get(target_version)
-- DP tries to fetch unknown version or cache expired/evicted/missed
-- we consider it the first time full sync call
if not config_deltas then
ngx_log(ngx_NOTICE, "full sync cache miss for version: ", target_version)
end
end

-- first time full sync call, need wipe and begin the full sync session
local first_time = not config_deltas
if first_time then
-- set the target_version for the first time
config_deltas, target_version = declarative.export_config_sync()
if not config_deltas then
return nil, target_version
end

local ok, err = self.full_sync_cache:set(target_version, config_deltas)
if not ok then
return "failed to cache full sync deltas: " .. err
end
end

local begin = next_page or 1
page_size = page_size or self.page_size
next_page = begin + page_size

-- at this point,
-- config_deltas, target_version, page_size, next_page are all guaranteed to be non-nil

-- no more deltas. end the session for DP
local last_time = next_page > #config_deltas

-- get the deltas for the current page
local deltas, n = {}, 1
for i = begin, next_page - 1 do
local delta = config_deltas[i]
if not delta then
break
end

deltas[n] = delta
n = n + 1
end

-- TODO: handle new deltas those which happen during the full sync

local full_sync_status
if not last_time then
full_sync_status = encode_pagination_status(target_version, page_size, next_page)
end

-- wipe dp lmdb, full sync
return { default = { deltas = deltas, wipe = true, }, }
return { default = { deltas = deltas, wipe = first_time, full_sync_done = last_time, full_sync_status = full_sync_status }, }
end


function _M:init_cp(manager)
local purge_delay = manager.conf.cluster_data_plane_purge_delay

self.full_sync_cache = lrucache.new(10)

-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
Expand Down Expand Up @@ -103,7 +209,8 @@ function _M:init_cp(manager)

-- is the node empty? If so, just do a full sync to bring it up to date faster
if default_namespace_version == 0 or
latest_version - default_namespace_version > FULL_SYNC_THRESHOLD
latest_version - default_namespace_version > FULL_SYNC_THRESHOLD or
default_namespace.full_sync_status
then
-- we need to full sync because holes are found

Expand All @@ -112,7 +219,7 @@ function _M:init_cp(manager)
", current_version: ", default_namespace_version,
", forcing a full sync")

return full_sync_result()
return self:full_sync_result(default_namespace.full_sync_status)
end

-- do we need an incremental sync?
Expand Down Expand Up @@ -199,14 +306,19 @@ local function is_rpc_ready()
end


local function do_sync()
local function do_sync(dp_status)
if not is_rpc_ready() then
return nil, "rpc is not ready"
end

-- when in a partial sync, even if a update notification triggers a sync, it will
-- be blocked by the mutex, and it will continue to do the rest of the sync
local in_full_sync = dp_status.full_sync_status

local msg = { default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
full_sync_status = in_full_sync,
},
}

Expand All @@ -217,7 +329,7 @@ local function do_sync()
end

-- ns_deltas should look like:
-- { default = { deltas = { ... }, wipe = true, }, }
-- { default = { deltas = { ... }, wipe = true, full_sync_done = false, full_sync_status = ...}, }

local ns_delta = ns_deltas.default
if not ns_delta then
Expand All @@ -231,6 +343,11 @@ local function do_sync()
return true
end

if ns_delta.full_sync_status then
-- full sync is in progress
in_full_sync = true
end

-- we should find the correct default workspace
-- and replace the old one with it
local default_ws_changed
Expand All @@ -247,9 +364,15 @@ local function do_sync()

local t = txn.begin(512)

-- begining of the full sync session, wipe the lmdb and purge the cache
local wipe = ns_delta.wipe
if wipe then
t:db_drop(false)
kong.core_cache:purge()
kong.cache:purge()
-- we are at a unready state
-- consider the config empty
t:set(DECLARATIVE_HASH_KEY, DECLARATIVE_EMPTY_CONFIG_HASH)
end

local db = kong.db
Expand Down Expand Up @@ -298,8 +421,8 @@ local function do_sync()
", version: ", delta_version,
", type: ", delta_type)

-- wipe the whole lmdb, should not have events
if not wipe then
-- during the full sync, should not emit events
if not in_full_sync then
ev = { delta_type, old_entity and "update" or "create", delta_entity, old_entity, }
end

Expand All @@ -310,8 +433,8 @@ local function do_sync()
return nil, err
end

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
-- during the full sync, should not emit events
if old_entity and not in_full_sync then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
Expand All @@ -323,14 +446,14 @@ local function do_sync()
", version: ", delta_version,
", type: ", delta_type)

-- wipe the whole lmdb, should not have events
if not wipe then
-- delete the entity, opts for getting correct lmdb key
if not in_full_sync then
ev = { delta_type, "delete", old_entity, }
end
end -- if delta_entity ~= nil and delta_entity ~= ngx_null

-- wipe the whole lmdb, should not have events
if not wipe then
-- during the full sync, should not emit events
if not in_full_sync then
crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev
end
Expand All @@ -343,9 +466,12 @@ local function do_sync()
end
end -- for _, delta

-- store current sync version
t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))

-- only update the sync version if not in full sync/ full sync done
if (not in_full_sync) or ns_delta.full_sync_done then
-- store current sync version
t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))
end

-- store the correct default workspace uuid
if default_ws_changed then
t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace)
Expand All @@ -356,32 +482,32 @@ local function do_sync()
return nil, err
end

if wipe then
kong.core_cache:purge()
kong.cache:purge()
dp_status.full_sync_status = ns_delta.full_sync_status

-- the full sync is done
if ns_delta.full_sync_done then
-- Trigger other workers' callbacks like reconfigure_handler.
--
-- Full sync could rebuild route, plugins and balancer route, so their
-- hashes are nil.
-- Until this point, the dataplane is not ready to serve requests or to
-- do delta syncs.
local reconfigure_data = { kong.default_workspace, nil, nil, nil, }
local ok, err = events.declarative_reconfigure_notify(reconfigure_data)
if not ok then
return nil, err
end
return events.declarative_reconfigure_notify(reconfigure_data)
end

else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.entity, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
-- emit the CRUD events
-- if in_full_sync, no events should be added into the queue
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.entity, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end

return true
end


local function sync_handler(premature)
local function sync_handler(premature, dp_status)
if premature then
return
end
Expand All @@ -391,7 +517,7 @@ local function sync_handler(premature)
-- to CP quickly after sync. (`kong.sync.v2.get_delta` is used for both pulling delta
-- as well as status reporting)
for _ = 1, 2 do
local ok, err = do_sync()
local ok, err = do_sync(dp_status)
if not ok then
return nil, err
end
Expand All @@ -402,11 +528,17 @@ local function sync_handler(premature)
if not res and err ~= "timeout" then
ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err)
end

if dp_status.full_sync_status then
-- full sync is in progress
-- continue to sync
return _M:sync_once()
end
end


local function start_sync_timer(timer_func, delay)
local hdl, err = timer_func(delay, sync_handler)
local function start_sync_timer(timer_func, delay, dp_status)
local hdl, err = timer_func(delay, sync_handler, dp_status)

if not hdl then
return nil, err
Expand All @@ -417,12 +549,12 @@ end


function _M:sync_once(delay)
return start_sync_timer(ngx.timer.at, delay or 0)
return start_sync_timer(ngx.timer.at, delay or 0, self)
end


function _M:sync_every(delay)
return start_sync_timer(ngx.timer.every, delay)
return start_sync_timer(ngx.timer.every, delay, self)
end


Expand Down
Loading

0 comments on commit 27d3e9a

Please sign in to comment.