Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prometheus): expose controlplane connectivity state as a gauge #14020

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
message: |
**Prometheus**: Added gauge to expose connectivity state to controlplane.
type: feature
scope: Plugin
27 changes: 23 additions & 4 deletions kong/clustering/data_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,20 @@ function _M.new(clustering)
end


local function set_control_plane_connected(reachable)
local ok, err = ngx.shared.kong:safe_set("control_plane_connected", reachable, PING_WAIT)
if not ok then
ngx_log(ngx_ERR, _log_prefix, "failed to set control_plane_connected key in shm to ", reachable, " :", err)
end
end


function _M:init_worker(basic_info)
-- ROLE = "data_plane"

self.plugins_list = basic_info.plugins
self.filters = basic_info.filters
set_control_plane_connected(false)

local function start_communicate()
assert(ngx.timer.at(0, function(premature)
Expand Down Expand Up @@ -139,13 +148,17 @@ local function send_ping(c, log_suffix)

local _, err = c:send_ping(hash)
if err then
set_control_plane_connected(false)
ngx_log(is_timeout(err) and ngx_NOTICE or ngx_WARN, _log_prefix,
"unable to send ping frame to control plane: ", err, log_suffix)

-- only log a ping if the hash changed
elseif hash ~= prev_hash then
prev_hash = hash
ngx_log(ngx_INFO, _log_prefix, "sent ping frame to control plane with hash: ", hash, log_suffix)
else
set_control_plane_connected(true)
-- only log a ping if the hash changed
if hash ~= prev_hash then
aryan9600 marked this conversation as resolved.
Show resolved Hide resolved
prev_hash = hash
ngx_log(ngx_INFO, _log_prefix, "sent ping frame to control plane with hash: ", hash, log_suffix)
end
end
end

Expand Down Expand Up @@ -197,6 +210,7 @@ function _M:communicate(premature)

local c, uri, err = clustering_utils.connect_cp(self, "/v1/outlet")
if not c then
set_control_plane_connected(false)
ngx_log(ngx_WARN, _log_prefix, "connection to control plane ", uri, " broken: ", err,
" (retrying after ", reconnection_delay, " seconds)", log_suffix)

Expand Down Expand Up @@ -229,6 +243,7 @@ function _M:communicate(premature)
filters = self.filters,
labels = labels, }))
if err then
set_control_plane_connected(false)
ngx_log(ngx_ERR, _log_prefix, "unable to send basic information to control plane: ", uri,
" err: ", err, " (retrying after ", reconnection_delay, " seconds)", log_suffix)

Expand All @@ -238,6 +253,7 @@ function _M:communicate(premature)
end))
return
end
set_control_plane_connected(true)

local config_semaphore = semaphore.new(0)

Expand Down Expand Up @@ -344,16 +360,19 @@ function _M:communicate(premature)
local data, typ, err = c:recv_frame()
if err then
if not is_timeout(err) then
set_control_plane_connected(false)
return nil, "error while receiving frame from control plane: " .. err
end

local waited = ngx_time() - last_seen
if waited > PING_WAIT then
set_control_plane_connected(false)
return nil, "did not receive pong frame from control plane within " .. PING_WAIT .. " seconds"
end

goto continue
end
set_control_plane_connected(true)

if typ == "close" then
ngx_log(ngx_DEBUG, _log_prefix, "received close frame from control plane", log_suffix)
Expand Down
18 changes: 18 additions & 0 deletions kong/plugins/prometheus/exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ local lower = string.lower
local ngx_timer_pending_count = ngx.timer.pending_count
local ngx_timer_running_count = ngx.timer.running_count
local get_all_upstreams = balancer.get_all_upstreams

if not balancer.get_all_upstreams then -- API changed since after Kong 2.5
get_all_upstreams = require("kong.runloop.balancer.upstreams").get_all_upstreams
end
Expand Down Expand Up @@ -65,6 +66,14 @@ local function init()
"0 is unreachable",
nil,
prometheus.LOCAL_STORAGE)
if role == "data_plane" then
metrics.cp_connected = prometheus:gauge("control_plane_connected",
"Kong connected to control plane, " ..
"0 is unconnected",
nil,
prometheus.LOCAL_STORAGE)
end

metrics.node_info = prometheus:gauge("node_info",
"Kong Node metadata information",
{"node_id", "version"},
Expand Down Expand Up @@ -449,6 +458,15 @@ local function metric_data(write_fn)
kong.log.err("prometheus: failed to reach database while processing",
"/metrics endpoint: ", err)
end

if role == "data_plane" then
local cp_reachable = ngx.shared.kong:get("control_plane_connected")
if cp_reachable then
metrics.cp_connected:set(1)
else
metrics.cp_connected:set(0)
end
end
end

local phase = get_phase()
Expand Down
87 changes: 87 additions & 0 deletions spec/03-plugins/26-prometheus/04-status_api_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -529,3 +529,90 @@ describe("Plugin: prometheus (access) granular metrics switch", function()

end)
end

describe("CP/DP connectivity state #", function ()
local status_client
local cp_running

local function get_metrics()
if not status_client then
status_client = helpers.http_client("127.0.0.1", tcp_status_port, 20000)
status_client.reopen = true -- retry on a closed connection
end

local res, err = status_client:get("/metrics")

assert.is_nil(err, "failed GET /metrics: " .. tostring(err))
return assert.res_status(200, res)
end

setup(function()
local bp = helpers.get_db_utils()

bp.plugins:insert {
protocols = { "http", "https", "grpc", "grpcs", "tcp", "tls" },
name = "prometheus",
}

assert(helpers.start_kong({
role = "data_plane",
database = "off",
prefix = "prom_dp",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
cluster_control_plane = "127.0.0.1:9005",
proxy_listen = "0.0.0.0:9000",
worker_state_update_frequency = 1,
status_listen = "0.0.0.0:" .. tcp_status_port,
nginx_worker_processes = 1,
dedicated_config_processing = "on",
plugins = "bundled, prometheus",
}))
status_client = helpers.http_client("127.0.0.1", tcp_status_port, 20000)
end)

teardown(function()
if status_client then
status_client:close()
end

helpers.stop_kong("prom_dp")
if cp_running then
helpers.stop_kong("prom_cp")
end
end)

it("exposes control plane connectivity status", function ()
assert.eventually(function()
local body = get_metrics()
assert.matches('kong_control_plane_connected 0', body, nil, true)
end).has_no_error("metric kong_control_plane_connected => 0")

assert(helpers.start_kong({
role = "control_plane",
prefix = "prom_cp",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
cluster_listen = "127.0.0.1:9005",
plugins = "bundled, prometheus",
}))
cp_running = true

-- it takes some time for the cp<->dp connection to get established and the
-- metric to reflect that. On failure, re-connection attempts are spaced out
-- in `math.random(5, 10)` second intervals, so a generous timeout is used
-- in case we get unlucky and have to wait multiple retry cycles
assert.with_timeout(30).eventually(function()
local body = get_metrics()
assert.matches('kong_control_plane_connected 1', body, nil, true)
end).has_no_error("metric kong_control_plane_connected => 1")

helpers.stop_kong("prom_cp")
aryan9600 marked this conversation as resolved.
Show resolved Hide resolved
cp_running = false

assert.eventually(function()
local body = get_metrics()
assert.matches('kong_control_plane_connected 0', body, nil, true)
end).has_no_error("metric kong_control_plane_connected => 0")
end)
end)
Loading