From 7f5b69e44ae40932dbb459ad2ffb7c9b7eee2a3d Mon Sep 17 00:00:00 2001 From: ywc689 Date: Tue, 27 Jun 2023 18:51:56 +0800 Subject: [PATCH] tools: add healthcheck tool and config.mk Signed-off-by: ywc689 --- .github/workflows/build.yaml | 4 +- Makefile | 6 + config.mk | 33 +++ doc/Worker-Performance-Tuning.md | 2 +- doc/tutorial.md | 2 +- scripts/dpvs_log_rotate.sh | 138 ++++++++++ src/Makefile | 4 +- src/config.mk | 128 ++++++--- tools/Makefile | 7 + tools/healthcheck/.gitignore | 3 + tools/healthcheck/Makefile | 22 ++ tools/healthcheck/go.mod | 10 + tools/healthcheck/go.sum | 6 + tools/healthcheck/license.txt | 13 + tools/healthcheck/main.go | 103 +++++++ tools/healthcheck/pkg/helthcheck/checker.go | 252 ++++++++++++++++++ tools/healthcheck/pkg/helthcheck/configs.go | 94 +++++++ .../pkg/helthcheck/ping_checker.go | 203 ++++++++++++++ .../pkg/helthcheck/ping_checker_test.go | 45 ++++ tools/healthcheck/pkg/helthcheck/server.go | 248 +++++++++++++++++ .../healthcheck/pkg/helthcheck/tcp_checker.go | 122 +++++++++ .../pkg/helthcheck/test/tcp_checker.go | 130 +++++++++ .../pkg/helthcheck/test/udp_checker.go | 119 +++++++++ tools/healthcheck/pkg/helthcheck/types.go | 225 ++++++++++++++++ .../healthcheck/pkg/helthcheck/udp_checker.go | 108 ++++++++ tools/healthcheck/pkg/lb/dpvs_agent.go | 228 ++++++++++++++++ tools/healthcheck/pkg/lb/dpvs_agent_test.go | 49 ++++ tools/healthcheck/pkg/lb/types.go | 65 +++++ tools/healthcheck/pkg/server/server.go | 72 +++++ tools/healthcheck/pkg/utils/net.go | 124 +++++++++ tools/healthcheck/test/README.md | 238 +++++++++++++++++ tools/healthcheck/test/dpvs-agent-api.sh | 31 +++ tools/healthcheck/test/stress-test.sh | 67 +++++ 33 files changed, 2859 insertions(+), 42 deletions(-) create mode 100644 config.mk create mode 100755 scripts/dpvs_log_rotate.sh create mode 100644 tools/healthcheck/.gitignore create mode 100644 tools/healthcheck/Makefile create mode 100644 tools/healthcheck/go.mod create mode 100644 tools/healthcheck/go.sum create mode 100644 tools/healthcheck/license.txt create mode 100644 tools/healthcheck/main.go create mode 100644 tools/healthcheck/pkg/helthcheck/checker.go create mode 100644 tools/healthcheck/pkg/helthcheck/configs.go create mode 100644 tools/healthcheck/pkg/helthcheck/ping_checker.go create mode 100644 tools/healthcheck/pkg/helthcheck/ping_checker_test.go create mode 100644 tools/healthcheck/pkg/helthcheck/server.go create mode 100644 tools/healthcheck/pkg/helthcheck/tcp_checker.go create mode 100644 tools/healthcheck/pkg/helthcheck/test/tcp_checker.go create mode 100644 tools/healthcheck/pkg/helthcheck/test/udp_checker.go create mode 100644 tools/healthcheck/pkg/helthcheck/types.go create mode 100644 tools/healthcheck/pkg/helthcheck/udp_checker.go create mode 100644 tools/healthcheck/pkg/lb/dpvs_agent.go create mode 100644 tools/healthcheck/pkg/lb/dpvs_agent_test.go create mode 100644 tools/healthcheck/pkg/lb/types.go create mode 100644 tools/healthcheck/pkg/server/server.go create mode 100644 tools/healthcheck/pkg/utils/net.go create mode 100644 tools/healthcheck/test/README.md create mode 100755 tools/healthcheck/test/dpvs-agent-api.sh create mode 100755 tools/healthcheck/test/stress-test.sh diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 1ac36e39f..360635bb4 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -22,13 +22,13 @@ jobs: - name: make run: make -j - build-debug: + build-all: runs-on: self-hosted env: PKG_CONFIG_PATH: /data/dpdk/dpdklib/lib64/pkgconfig steps: - uses: actions/checkout@v2 - name: config - run: sed -i 's/#CFLAGS +=/CFLAGS +=/' src/config.mk && sed -i 's/^#DEBUG := 1/DEBUG := 1/' src/Makefile + run: sed -i 's/=n$/=y/' config.mk - name: make run: make -j diff --git a/Makefile b/Makefile index 3172b2981..44f214765 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,7 @@ MAKE = make CC = gcc LD = ld +RM = rm SUBDIRS = src tools @@ -29,6 +30,8 @@ export INSDIR export KERNEL = $(shell /bin/uname -r) +include $(CURDIR)/config.mk + all: for i in $(SUBDIRS); do $(MAKE) -C $$i || exit 1; done @@ -43,3 +46,6 @@ distclean: install:all -mkdir -p $(INSDIR) for i in $(SUBDIRS); do $(MAKE) -C $$i install || exit 1; done + +uninstall: + -$(RM) -f $(TARGET) $(INSDIR)/* diff --git a/config.mk b/config.mk new file mode 100644 index 000000000..5d8648f12 --- /dev/null +++ b/config.mk @@ -0,0 +1,33 @@ +# configs +export CONFIG_DPVS_MAX_SOCKET=2 +export CONFIG_DPVS_MAX_LCORE=64 + +## modules +export CONFIG_DPVS_AGENT=n +export CONFIG_IXGEB_PMD=y +export CONFIG_DPVS_LOG=y +export CONFIG_PDUMP=y +export CONFIG_ICMP_REDIRECT_CORE=n + +# debugging and logging +export CONFIG_DEBUG=n +export CONFIG_DPVS_NEIGH_DEBUG=n +export CONFIG_RECORD_BIG_LOOP=n +export CONFIG_DPVS_SAPOOL_DEBUG=n +export CONFIG_DPVS_IPVS_DEBUG=n +export CONFIG_DPVS_SERVICE_DEBUG=n +export CONFIG_SYNPROXY_DEBUG=n +export CONFIG_TIMER_MEASURE=n +export CONFIG_TIMER_DEBUG=n +export DPVS_CFG_PARSER_DEBUG=n +export NETIF_BONDING_DEBUG=n +export CONFIG_TC_DEBUG=n +export CONFIG_DPVS_IPVS_STATS_DEBUG=n +export CONFIG_DPVS_IP_HEADER_DEBUG=n +export CONFIG_DPVS_MBUF_DEBUG=n +export CONFIG_DPVS_IPSET_DEBUG=n +export CONFIG_NDISC_DEBUG=n +export CONFIG_MSG_DEBUG=n +export CONFIG_DPVS_MP_DEBUG=n +export CONFIG_DPVS_NETIF_DEBUG=n +export CONFIG_DPVS_ICMP_DEBUG=n diff --git a/doc/Worker-Performance-Tuning.md b/doc/Worker-Performance-Tuning.md index 8616e2bf8..acbf892d6 100644 --- a/doc/Worker-Performance-Tuning.md +++ b/doc/Worker-Performance-Tuning.md @@ -30,7 +30,7 @@ In case of the following situations, you should consider this performance tuning * There exists big worker loops. - > To observe worker loop time, you should uncomment the macro "CONFIG_RECORD_BIG_LOOP" in src/config.mk,recompile DPVS program and run it. + > To observe worker loop time, you should set "CONFIG_RECORD_BIG_LOOP=y" in `config.mk`,recompile DPVS program and run it. > > Besides, macros "BIG_LOOP_THRESH_SLAVE" and "BIG_LOOP_THRESH_MASTER" define the threshold time of worker loop. Modify them if needed. diff --git a/doc/tutorial.md b/doc/tutorial.md index 9d5e89323..8ea24dea3 100644 --- a/doc/tutorial.md +++ b/doc/tutorial.md @@ -1326,7 +1326,7 @@ Firstly, DPVS runs with `WARNING` log level by default. You can change it in `/e Use low level log such as "INFO" or "DEBUG" may help find more clues to your problem. -Secondly, some modules support more detailed debug log only if you enable it when compile DPVS. The supported flags are defined but commented in [src/config.mk](../src/config.mk), some of which are listed below. Uncomment it and recompile DPVS if you need to debug the corresponding module. +Secondly, some modules support more detailed debug log only if you enable it when compile DPVS. The modular debug options are available in [config.mk](../config.mk), some of which are listed below. Change the value to "y" and recompile DPVS if you want to debug a module. ``` - CONFIG_DPVS_IPVS_DEBUG # for ipvs forwarding debug diff --git a/scripts/dpvs_log_rotate.sh b/scripts/dpvs_log_rotate.sh new file mode 100755 index 000000000..6bb38a84e --- /dev/null +++ b/scripts/dpvs_log_rotate.sh @@ -0,0 +1,138 @@ +#!/bin/env bash + +# Set target directory for cleaning +TARGET_DIR="/var/log/healthcheck" + +# Set log file name pattern +LOG_FILENAME_PATTERN="*\.log\.*" + +# Set the maximum usage percentage and the target usage percentage +MAX_USAGE=80 +TARGET_USAGE=40 + +# Set the minimum number of log files to keep +MIN_FILES=12 + +# Set the maximum number of files to delete in one run +MAX_DELETE=10000 + +OPTS=`getopt -o d:p:u:l:K:D:h --long \ +log-directory:,filename-pattern:,disk-usage-high:,\ +disk-usage-low:,min-files-kept:,max-deletions:,help,\ + -n "$0" -- "$@"` +eval set -- "$OPTS" +while true +do + case "$1" in + -d|--log-directory) + TARGET_DIR="$2" + shift 2 + ;; + -p|--filename-pattern) + LOG_FILENAME_PATTERN="$2" + shift 2 + ;; + -u|--disk-usage-high) + MAX_USAGE="$2" + shift 2 + ;; + -l|--disk-usage-low) + TARGET_USAGE="$2" + shift 2 + ;; + -K|--min-files-kept) + MIN_FILES="$2" + shift 2 + ;; + -D|--max-deletions) + MAX_DELETE="$2" + shift 2 + ;; + -h|--help) + echo "[usage] $0 [ OPTS ]" + echo "OPTS:" + echo " -d|--log-directory DIRECTORY" + echo " -p|--filename-pattern REGEXPR" + echo " -u|--disk-usage-high 0-100" + echo " -l|--disk-usage-low 0-100" + echo " -K|--min-files-kept NUM" + echo " -D|--max-deletions NUM" + echo " -h|--help" + exit 0 + ;; + --) + shift + break + ;; + *) + echo "Param Error!" + exit 1 + ;; + esac +done + +NotRecognized=$(for arg do printf "$arg " ; done) +[ ! -z "$NotRecognized" ] && echo "Unrecognized Opts: ${NotRecognized}" && exit 1 + +echo "CONFIGS:" +echo " log-directory: ${TARGET_DIR}" +echo " filename-pattern: ${LOG_FILENAME_PATTERN}" +echo " disk-usage-high: ${MAX_USAGE}" +echo " disk-usage-low: ${TARGET_USAGE}" +echo " min-files-kept: ${MIN_FILES}" +echo " max-deletions: ${MAX_DELETE}" + +[ ! -d ${TARGET_DIR} ] && echo "invalid --log-directory \"${TARGET_DIR}\", not found!" && exit 1 +echo ${MAX_USAGE} | egrep ^[0-9]+$ >/dev/null; [ $? -ne 0 ] && echo "invalid --disk-usage-high" && exit 1 +echo ${TARGET_USAGE} | egrep ^[0-9]+$ >/dev/null; [ $? -ne 0 ] && echo "invalid --disk-usage-low" && exit 1 +echo ${MIN_FILES} | egrep ^[0-9]+$ >/dev/null; [ $? -ne 0 ] && echo "invalid --min-files-kept" && exit 1 +echo ${MAX_DELETE} | egrep ^[0-9]+$ >/dev/null; [ $? -ne 0 ] && echo "invalid --max-deletions" && exit 1 +res=$(echo "${MAX_USAGE} < 100" | bc); [ "$res" -ne 1 ] && echo "--disk-usage-high must less than 100" && exit 1 +res=$(echo "${MAX_USAGE} > ${TARGET_USAGE}" | bc); [ "$res" -ne 1 ] && echo "--disk-usage-high must greater than --disk-usage-low" && exit 1 + +# Get the current disk usage +CURRENT_USAGE=$(df -P ${TARGET_DIR} | awk 'NR==2 {printf "%d", $5}') + +# Initialize a counter for deleted files +DELETED_FILES=0 + +# Start the cleaning process if the disk usage is higher than the defined MAX_USAGE +if [ "${CURRENT_USAGE}" -gt "${MAX_USAGE}" ]; then + echo "Disk usage is ${CURRENT_USAGE}%, starting cleaning process." + + # Find and delete logs until disk usage reaches TARGET_USAGE or MAX_DELETE files are deleted + while [ "${CURRENT_USAGE}" -gt "${TARGET_USAGE}" ] && [ "${DELETED_FILES}" -lt "${MAX_DELETE}" ]; do + # Check the number of log files + NUM_FILES=$(find ${TARGET_DIR} -type f -name ${LOG_FILENAME_PATTERN} | wc -l) + + # Ensure that at least MIN_FILES log files remain + if [ "${NUM_FILES}" -le "${MIN_FILES}" ]; then + echo "Reached minimum number of log files (${MIN_FILES}), aborting." + exit 1 + fi + + # Find the oldest log file + OLDEST_LOG=$(find ${TARGET_DIR} -type f -name ${LOG_FILENAME_PATTERN} -printf '%T+ %p\n' | sort | head -n1 | cut -d' ' -f2) + + echo "Deleting ${OLDEST_LOG}..." + rm -f "${OLDEST_LOG}" + DELETED_FILES=$((DELETED_FILES+1)) + + # Update current disk usage + CURRENT_USAGE=$(df -P ${TARGET_DIR} | awk 'NR==2 {printf "%d", $5}') + done + + if [ "${DELETED_FILES}" -eq "${MAX_DELETE}" ]; then + echo "Reached the maximum number of deletions (${MAX_DELETE}), aborting." + else + echo "Cleaning process completed. Disk usage is now ${CURRENT_USAGE}%." + fi + +else + echo "Disk usage is ${CURRENT_USAGE}%, no cleaning needed." +fi + +echo "Remaining Files in ${TARGET_DIR} (only show 30 entries):" +ls -lh ${TARGET_DIR} | head -n 30 + +exit 0 diff --git a/src/Makefile b/src/Makefile index b546a0d67..82e75030d 100644 --- a/src/Makefile +++ b/src/Makefile @@ -19,8 +19,6 @@ # Makefile for dpvs (DPVS main program). # -#DEBUG := 1 # enable for debug - TARGET := dpvs ifneq ("$(wildcard VERSION)","") @@ -59,7 +57,7 @@ ifeq ($(shell test $(GCC_VERSION) -ge 70 && echo 1), 1) CFLAGS += -Wstringop-overflow=0 endif -ifeq ($(DEBUG),) +ifneq ($(CONFIG_DEBUG), y) CFLAGS += -O3 else CFLAGS += -g -O0 -D DEBUG diff --git a/src/config.mk b/src/config.mk index 78fb68ba4..2a1ef13dc 100644 --- a/src/config.mk +++ b/src/config.mk @@ -15,52 +15,110 @@ # GNU General Public License for more details. # -# -# enable as needed. -# -# TODO: use standard way to define compile flags. -# - -CONFIG_IXGEB_PMD=y -CONFIG_PDUMP=y +CFLAGS += -D DPVS_MAX_SOCKET=$(CONFIG_DPVS_MAX_SOCKET) +CFLAGS += -D DPVS_MAX_LCORE=$(CONFIG_DPVS_MAX_LCORE) -CFLAGS += -D DPVS_MAX_SOCKET=2 -CFLAGS += -D DPVS_MAX_LCORE=64 - -CFLAGS += -D CONFIG_DPVS_LOG -#CFLAGS += -D CONFIG_ICMP_REDIRECT_CORE -#CFLAGS += -D CONFIG_DPVS_AGENT - -#CFLAGS += -D CONFIG_DPVS_NEIGH_DEBUG -#CFLAGS += -D CONFIG_RECORD_BIG_LOOP -#CFLAGS += -D CONFIG_DPVS_SAPOOL_DEBUG -#CFLAGS += -D CONFIG_DPVS_IPVS_DEBUG -#CFLAGS += -D CONFIG_DPVS_SERVICE_DEBUG -#CFLAGS += -D CONFIG_SYNPROXY_DEBUG -#CFLAGS += -D CONFIG_TIMER_MEASURE -#CFLAGS += -D CONFIG_TIMER_DEBUG -#CFLAGS += -D DPVS_CFG_PARSER_DEBUG -#CFLAGS += -D NETIF_BONDING_DEBUG -#CFLAGS += -D CONFIG_TC_DEBUG -#CFLAGS += -D CONFIG_DPVS_IPVS_STATS_DEBUG -#CFLAGS += -D CONFIG_DPVS_IP_HEADER_DEBUG -#CFLAGS += -D CONFIG_DPVS_MBUF_DEBUG -#CFLAGS += -D CONFIG_DPVS_IPSET_DEBUG -#CFLAGS += -D CONFIG_NDISC_DEBUG -#CFLAGS += -D CONFIG_MSG_DEBUG -#CFLAGS += -D CONFIG_DPVS_MP_DEBUG -#CFLAGS += -D CONFIG_DPVS_NETIF_DEBUG -#CFLAGS += -D CONFIG_DPVS_ICMP_DEBUG +ifeq ($(CONFIG_DPVS_AGENT), y) +CFLAGS += -D CONFIG_DPVS_AGENT +endif # for ixgbe nic ifeq ($(CONFIG_IXGEB_PMD), y) CFLAGS += -D CONFIG_DPVS_FDIR endif +ifeq ($(CONFIG_DPVS_LOG), y) +CFLAGS += -D CONFIG_DPVS_LOG +endif + ifeq ($(CONFIG_PDUMP), y) CFLAGS += -D CONFIG_DPVS_PDUMP endif +ifeq ($(CONFIG_ICMP_REDIRECT_CORE), y) +CFLAGS += -D CONFIG_ICMP_REDIRECT_CORE +endif + +ifeq ($(CONFIG_DPVS_NEIGH_DEBUG), y) +CFLAGS += -D CONFIG_DPVS_NEIGH_DEBUG +endif + +ifeq ($(CONFIG_RECORD_BIG_LOOP), y) +CFLAGS += -D CONFIG_RECORD_BIG_LOOP +endif + +ifeq ($(CONFIG_DPVS_SAPOOL_DEBUG), y) +CFLAGS += -D CONFIG_DPVS_SAPOOL_DEBUG +endif + +ifeq ($(CONFIG_DPVS_IPVS_DEBUG), y) +CFLAGS += -D CONFIG_DPVS_IPVS_DEBUG +endif + +ifeq ($(CONFIG_DPVS_SERVICE_DEBUG), y) +CFLAGS += -D CONFIG_DPVS_SERVICE_DEBUG +endif + +ifeq ($(CONFIG_SYNPROXY_DEBUG), y) +CFLAGS += -D CONFIG_SYNPROXY_DEBUG +endif + +ifeq ($(CONFIG_TIMER_MEASURE), y) +CFLAGS += -D CONFIG_TIMER_MEASURE +endif + +ifeq ($(CONFIG_TIMER_DEBUG), y) +CFLAGS += -D CONFIG_TIMER_DEBUG +endif + +ifeq ($(DPVS_CFG_PARSER_DEBUG), y) +CFLAGS += -D DPVS_CFG_PARSER_DEBUG +endif + +ifeq ($(NETIF_BONDING_DEBUG), y) +CFLAGS += -D NETIF_BONDING_DEBUG +endif + +ifeq ($(CONFIG_TC_DEBUG), y) +CFLAGS += -D CONFIG_TC_DEBUG +endif + +ifeq ($(CONFIG_DPVS_IPVS_STATS_DEBUG), y) +CFLAGS += -D CONFIG_DPVS_IPVS_STATS_DEBUG +endif + +ifeq ($(CONFIG_DPVS_IP_HEADER_DEBUG), y) +CFLAGS += -D CONFIG_DPVS_IP_HEADER_DEBUG +endif + +ifeq ($(CONFIG_DPVS_MBUF_DEBUG), y) +CFLAGS += -D CONFIG_DPVS_MBUF_DEBUG +endif + +ifeq ($(CONFIG_DPVS_IPSET_DEBUG), y) +CFLAGS += -D CONFIG_DPVS_IPSET_DEBUG +endif + +ifeq ($(CONFIG_NDISC_DEBUG), y) +CFLAGS += -D CONFIG_NDISC_DEBUG +endif + +ifeq ($(CONFIG_MSG_DEBUG), y) +CFLAGS += -D CONFIG_MSG_DEBUG +endif + +ifeq ($(CONFIG_DPVS_MP_DEBUG), y) +CFLAGS += -D CONFIG_DPVS_MP_DEBUG +endif + +ifeq ($(CONFIG_DPVS_NETIF_DEBUG), y) +CFLAGS += -D CONFIG_DPVS_NETIF_DEBUG +endif + +ifeq ($(CONFIG_DPVS_ICMP_DEBUG), y) +CFLAGS += -D CONFIG_DPVS_ICMP_DEBUG +endif + GCC_MAJOR = $(shell echo __GNUC__ | $(CC) -E -x c - | tail -n 1) GCC_MINOR = $(shell echo __GNUC_MINOR__ | $(CC) -E -x c - | tail -n 1) GCC_VERSION = $(GCC_MAJOR)$(GCC_MINOR) diff --git a/tools/Makefile b/tools/Makefile index 064670c31..832a568d1 100644 --- a/tools/Makefile +++ b/tools/Makefile @@ -20,6 +20,10 @@ # SUBDIRS = keepalived ipvsadm dpip +ifeq ($(CONFIG_DPVS_AGENT), y) +SUBDIRS += healthcheck +endif + all: config for i in $(SUBDIRS); do $(MAKE) -C $$i || exit 1; done @@ -42,3 +46,6 @@ install: install -m 744 keepalived/bin/keepalived $(INSDIR)/keepalived install -m 744 ipvsadm/ipvsadm $(INSDIR)/ipvsadm install -m 744 dpip/build/dpip $(INSDIR)/dpip +ifeq ($(CONFIG_DPVS_AGENT), y) + install -m 744 healthcheck/healthcheck $(INSDIR)/healthcheck +endif diff --git a/tools/healthcheck/.gitignore b/tools/healthcheck/.gitignore new file mode 100644 index 000000000..a8763e2d1 --- /dev/null +++ b/tools/healthcheck/.gitignore @@ -0,0 +1,3 @@ +healthcheck +*.log +log/* diff --git a/tools/healthcheck/Makefile b/tools/healthcheck/Makefile new file mode 100644 index 000000000..27e506bba --- /dev/null +++ b/tools/healthcheck/Makefile @@ -0,0 +1,22 @@ +TARGET := healthcheck + +GO ?= go +GO_BUILD = CGO_ENABLED=0 $(GO) build +GO_CLEAN = $(GO) clean + +.PHONY: all $(TARGET) clean + +all: $(TARGET) + +$(TARGET): + -$(GO) mod tidy + $(GO_BUILD) -o $@ + +clean: + $(GO_CLEAN) + +license: license.txt +ifeq ($(shell addlicense 2>&1|grep Usage),) + $(error "`addlicense` command not found. You can install it with `go install github.com/google/addlicense`") +endif + -addlicense -f license.txt -v . diff --git a/tools/healthcheck/go.mod b/tools/healthcheck/go.mod new file mode 100644 index 000000000..7c97f9d58 --- /dev/null +++ b/tools/healthcheck/go.mod @@ -0,0 +1,10 @@ +module github.com/iqiyi/dpvs/tools/healthcheck + +go 1.20 + +require ( + github.com/golang/glog v1.1.1 + github.com/google/gops v0.3.27 +) + +require golang.org/x/sys v0.8.0 // indirect diff --git a/tools/healthcheck/go.sum b/tools/healthcheck/go.sum new file mode 100644 index 000000000..dc5c3cfbf --- /dev/null +++ b/tools/healthcheck/go.sum @@ -0,0 +1,6 @@ +github.com/golang/glog v1.1.1 h1:jxpi2eWoU84wbX9iIEyAeeoac3FLuifZpY9tcNUD9kw= +github.com/golang/glog v1.1.1/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ= +github.com/google/gops v0.3.27 h1:BDdWfedShsBbeatZ820oA4DbVOC8yJ4NI8xAlDFWfgI= +github.com/google/gops v0.3.27/go.mod h1:lYqabmfnq4Q6UumWNx96Hjup5BDAVc8zmfIy0SkNCSk= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/tools/healthcheck/license.txt b/tools/healthcheck/license.txt new file mode 100644 index 000000000..d61a47c47 --- /dev/null +++ b/tools/healthcheck/license.txt @@ -0,0 +1,13 @@ +Copyright 2023 IQiYi Inc. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/tools/healthcheck/main.go b/tools/healthcheck/main.go new file mode 100644 index 000000000..064cdabf2 --- /dev/null +++ b/tools/healthcheck/main.go @@ -0,0 +1,103 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "time" + + "github.com/golang/glog" + gops "github.com/google/gops/agent" + + hc "github.com/iqiyi/dpvs/tools/healthcheck/pkg/helthcheck" + "github.com/iqiyi/dpvs/tools/healthcheck/pkg/server" +) + +var ( + notifyChannelSize = flag.Uint("channel_size", + hc.DefaultServerConfig().NotifyChannelSize, + "The size of the notification channel") + + notifyInterval = flag.Duration("notify_interval", + hc.DefaultServerConfig().NotifyInterval, + "The time between notifications") + + fetchInterval = flag.Duration("fetch_interval", + hc.DefaultServerConfig().FetchInterval, + "The time between healthcheck config fetches from DPVS") + + checkInterval = flag.Duration("check_interval", + 3*time.Second, + "The default time interval to run a check") + + checkTimeout = flag.Duration("check_timeout", + 1*time.Second, + "The default timeout before a check fails") + + checkRetry = flag.Uint("check_retry", + 1, + "The default retry count when a check fails") + + dryRun = flag.Bool("dry_run", + hc.DefaultServerConfig().DryRun, + "Skips actual check and always return healthy as result") + + debug = flag.Bool("debug", + hc.DefaultServerConfig().Debug, + "Enable gops for debug") + + lbIfaceType = flag.String("lb_iface_type", + hc.DefaultServerConfig().LbIfaceType, + "Type of load-balancer interface via which to get check objects and update results") + + lbIfaceAddr = flag.String("lb_iface_addr", + hc.DefaultServerConfig().LbIfaceAddr, + "Address of load-balancer interface via which to get check objects and update results") + + lbAutoMethod = flag.Bool("lb_auto_method", + hc.DefaultServerConfig().LbAutoMethod, + "Use default check method for the backends if not specified") +) + +func main() { + flag.Parse() + defer glog.Flush() + + cfg := hc.DefaultServerConfig() + cfg.NotifyChannelSize = *notifyChannelSize + cfg.NotifyInterval = *notifyInterval + cfg.FetchInterval = *fetchInterval + cfg.LbIfaceType = *lbIfaceType + cfg.LbIfaceAddr = *lbIfaceAddr + cfg.LbAutoMethod = *lbAutoMethod + cfg.DryRun = *dryRun + cfg.Debug = *debug + + hc.DefaultCheckConfig.Interval = *checkInterval + hc.DefaultCheckConfig.Timeout = *checkTimeout + hc.DefaultCheckConfig.Retry = *checkRetry + + if cfg.Debug { + if err := gops.Listen(gops.Options{}); err != nil { + glog.Warningf("Unable to start gops: %v", err) + } else { + defer gops.Close() + } + } + + hcs := hc.NewServer(&cfg) + server.ShutdownHandler(hcs) + hcs.Run() +} diff --git a/tools/healthcheck/pkg/helthcheck/checker.go b/tools/healthcheck/pkg/helthcheck/checker.go new file mode 100644 index 000000000..52b2357a4 --- /dev/null +++ b/tools/healthcheck/pkg/helthcheck/checker.go @@ -0,0 +1,252 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// The healthcheck package refers to the framework of "github.com/google/ +// seesaw/healthcheck" heavily, with only some adaption changes for DPVS. + +package hc + +import ( + "sync" + "time" + + log "github.com/golang/glog" +) + +const uweightDefault uint16 = 1 + +// Checks provides a map of healthcheck configurations. +type Checkers struct { + Configs map[Id]*CheckerConfig +} + +// Checker represents a healthcheck instance. +type Checker struct { + CheckerConfig + + lock sync.RWMutex + dryrun bool + uweight uint16 + start time.Time + successes uint64 + failures uint64 + failed uint64 + state State + result *Result + + update chan CheckerConfig + notify chan<- *Notification + quit chan bool +} + +// NewCheck returns an initialised Checker. +func NewChecker(notify chan<- *Notification, state State, weight uint16) *Checker { + // FIXME: how to obtain the original weight if the checker's + // initial state is unhealthy? + if state == StateUnhealthy && weight == 0 { + weight = uweightDefault + } + return &Checker{ + state: state, + uweight: weight, + notify: notify, + update: make(chan CheckerConfig, 1), + quit: make(chan bool, 1), + } +} + +// Status returns the current status for this healthcheck instance. +func (hc *Checker) Status() Status { + hc.lock.RLock() + defer hc.lock.RUnlock() + status := Status{ + LastCheck: hc.start, + Failures: hc.failures, + Successes: hc.successes, + State: hc.state, + } + if hc.state == StateHealthy { + status.Weight = hc.uweight + } + if hc.result != nil { + status.Duration = hc.result.Duration + status.Message = hc.result.String() + } + return status +} + +func (hc *Checker) updateConfig(conf *CheckerConfig) { + hc.CheckerConfig = *conf + if conf.State != StateUnhealthy { + hc.lock.Lock() + weight := hc.uweight + hc.uweight = conf.Weight + hc.lock.Unlock() + if weight != conf.Weight { + log.Infof("%v: user weight changed %d -> %d", hc.Id, weight, conf.Weight) + } + } +} + +// execute invokes the given healthcheck checker with the configured timeout. +func (hc *Checker) execute() *Result { + ch := make(chan *Result, 1) + checker := hc.CheckMethod + timeout := hc.Timeout + target := hc.Target + go func() { + // TODO: Determine a way to ensure that this go routine does not linger. + ch <- checker.Check(target, timeout) + }() + select { + case result := <-ch: + return result + case <-time.After(timeout + time.Second): + return &Result{"Timed out", false, timeout, nil} + } +} + +// Notification generates a healthcheck notification for this checker. +func (hc *Checker) Notification() *Notification { + return &Notification{ + Id: hc.Id, + Target: hc.Target, + Status: hc.Status(), + } +} + +// Notify sends a healthcheck notification for this checker. +func (hc *Checker) Notify() { + hc.notify <- hc.Notification() +} + +// healthcheck executes the given checker. +func (hc *Checker) healthcheck() { + if hc.CheckMethod == nil { + return + } + start := time.Now() + + var result *Result + if hc.dryrun { + result = NewResult(start, "dryrun mode; always succeed", true, nil) + } else { + result = hc.execute() + } + + status := "SUCCESS" + if !result.Success { + status = "FAILURE" + } + log.Infof("%v: %s: %v", hc.Id, status, result) + + hc.lock.Lock() + + hc.start = start + hc.result = result + + var state State + if result.Success { + state = StateHealthy + hc.failed = 0 + hc.successes++ + } else { + hc.failed++ + hc.failures++ + state = StateUnhealthy + } + + if hc.state == StateHealthy && hc.failed > 0 && hc.failed <= uint64(hc.CheckerConfig.Retry) { + log.Infof("%v: Failure %d - retrying...", hc.Id, hc.failed) + state = StateHealthy + } + transition := (hc.state != state) + hc.state = state + + hc.lock.Unlock() + + if transition { + hc.Notify() + } +} + +// Run invokes a healthcheck. It waits for the initial configuration to be +// provided via the configuration channel, after which the configured +// healthchecker is invoked at the given interval. If a new configuration +// is provided the healthchecker is updated and checks are scheduled at the +// new interval. Notifications are generated and sent via the notification +// channel whenever a state transition occurs. Run will terminate once a +// value is received on the quit channel. +func (hc *Checker) Run(start <-chan time.Time) { + // Wait for initial configuration. + select { + case config := <-hc.update: + hc.updateConfig(&config) + case <-hc.quit: + return + } + + // Wait for a tick to avoid a thundering herd at startup and to + // stagger healthchecks that have the same interval. + if start != nil { + <-start + } + log.Infof("Starting healthchecker for %v", hc.Id) + + ticker := time.NewTicker(hc.Interval) + hc.healthcheck() + for { + select { + case <-hc.quit: + ticker.Stop() + log.Infof("Stopping healthchecker for %v", hc.Id) + return + + case config := <-hc.update: + if hc.Interval != config.Interval { + ticker.Stop() + if start != nil { + <-start + } + ticker = time.NewTicker(config.Interval) + } + hc.updateConfig(&config) + + case <-ticker.C: + hc.healthcheck() + } + } +} + +// Stop notifies a running healthcheck that it should quit. +func (hc *Checker) Stop() { + select { + case hc.quit <- true: + default: + } +} + +// SetDryrun enables or disables dryrun mode for a healthcheck. +func (hc *Checker) SetDryrun(dryrun bool) { + hc.dryrun = dryrun +} + +// Update queues a healthcheck configuration update for processing. +func (hc *Checker) Update(config *CheckerConfig) { + select { + case hc.update <- *config: + default: + log.Warningf("Unable to update %v, last update still queued", hc.Id) + } +} diff --git a/tools/healthcheck/pkg/helthcheck/configs.go b/tools/healthcheck/pkg/helthcheck/configs.go new file mode 100644 index 000000000..3d95a6ec1 --- /dev/null +++ b/tools/healthcheck/pkg/helthcheck/configs.go @@ -0,0 +1,94 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// The healthcheck package refers to the framework of "github.com/google/ +// seesaw/healthcheck" heavily, with only some adaption changes for DPVS. + +package hc + +import ( + "fmt" + "time" +) + +// ServerConfig specifies the configuration for a healthcheck server. +type ServerConfig struct { + NotifyChannelSize uint + NotifyInterval time.Duration + FetchInterval time.Duration + LbIfaceType string + LbIfaceAddr string + LbAutoMethod bool + DryRun bool + Debug bool +} + +func (cfg *ServerConfig) String() string { + return fmt.Sprintf("notitfy-channel-size: %v, ", cfg.NotifyChannelSize) + + fmt.Sprintf("notify-interval: %v, ", cfg.NotifyInterval) + + fmt.Sprintf("fetch-interval: %v, ", cfg.FetchInterval) + + fmt.Sprintf("lb-auto-method: %v, ", cfg.LbAutoMethod) + + fmt.Sprintf("dryrun: %v, ", cfg.DryRun) + + fmt.Sprintf("debug: %v", cfg.Debug) +} + +var defaultServerConfig = ServerConfig{ + NotifyChannelSize: 1000, + NotifyInterval: 15 * time.Second, + FetchInterval: 15 * time.Second, + LbIfaceType: "dpvs-agent", // only type supported now + LbIfaceAddr: "localhost:6600", + LbAutoMethod: true, + DryRun: false, + Debug: false, +} + +// DefaultServerConfig returns the default server configuration. +func DefaultServerConfig() ServerConfig { + return defaultServerConfig +} + +// CheckerConfig contains the configuration for a healthcheck. +type CheckerConfig struct { + Id + + Target + State + Weight uint16 + CheckMethod + + Interval time.Duration + Timeout time.Duration + Retry uint +} + +var DefaultCheckConfig CheckerConfig + +// NewConfig returns an initialised Config. +func NewCheckerConfig(id *Id, checker CheckMethod, + target *Target, state State, weight uint16, + interval, timeout time.Duration, retry uint) *CheckerConfig { + config := CheckerConfig{ + Id: *id, + Target: *target, + State: state, + Weight: weight, + CheckMethod: checker, + Interval: interval, + Timeout: timeout, + Retry: retry, + } + config.BindConfig(&config) + return &config +} diff --git a/tools/healthcheck/pkg/helthcheck/ping_checker.go b/tools/healthcheck/pkg/helthcheck/ping_checker.go new file mode 100644 index 000000000..dae8eda92 --- /dev/null +++ b/tools/healthcheck/pkg/helthcheck/ping_checker.go @@ -0,0 +1,203 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// The healthcheck package refers to the framework of "github.com/google/ +// seesaw/healthcheck" heavily, with only some adaption changes for DPVS. + +// ICMP ping healthcheck implementation. + +package hc + +import ( + "bytes" + "fmt" + "math/rand" + "net" + "os" + "time" + + "github.com/iqiyi/dpvs/tools/healthcheck/pkg/utils" +) + +var _ CheckMethod = (*PingChecker)(nil) + +var nextPingCheckerID uint16 + +func init() { + s := rand.NewSource(int64(os.Getpid())) + nextPingCheckerID = uint16(s.Int63() & 0xffff) +} + +// PingChecker contains configuration specific to a ping healthcheck. +type PingChecker struct { + Config *CheckerConfig + + ID uint16 + Seqnum uint16 +} + +// NewPingChecker returns an initialised PingChecker. +func NewPingChecker() *PingChecker { + checker := PingChecker{ID: nextPingCheckerID} + nextPingCheckerID++ + return &checker +} + +func (hc *PingChecker) BindConfig(conf *CheckerConfig) { + hc.Config = conf +} + +// String returns the string representation of a Ping healthcheck. +func (hc *PingChecker) String() string { + return fmt.Sprintf("PING checker for %s", hc.Config.Id) +} + +// Check executes a ping healthcheck. +func (hc *PingChecker) Check(target Target, timeout time.Duration) *Result { + msg := fmt.Sprintf("ICMP ping to host %v", target.IP) + if target.IP.To4() != nil { + target.Proto = utils.IPProtoICMP + } else { + target.Proto = utils.IPProtoICMPv6 + } + seq := hc.Seqnum + hc.Seqnum++ + echo := newICMPEchoRequest(target.Proto, hc.ID, seq, 64, []byte("Healthcheck")) + start := time.Now() + if timeout == time.Duration(0) { + timeout = DefaultCheckConfig.Timeout + } + err := exchangeICMPEcho(target.Network(), target.IP, timeout, echo) + success := err == nil + if err != nil { + err = fmt.Errorf("ping target %v, %v", target.IP, err) + } + return NewResult(start, msg, success, err) +} + +// NB: The code below borrows heavily from pkg/net/ipraw_test.go. + +type icmpMsg []byte + +const ( + ICMP4_ECHO_REQUEST = 8 + ICMP4_ECHO_REPLY = 0 + ICMP6_ECHO_REQUEST = 128 + ICMP6_ECHO_REPLY = 129 +) + +func newICMPEchoRequest(proto utils.IPProto, id, seqnum, msglen uint16, filler []byte) icmpMsg { + switch proto { + case utils.IPProtoICMP: + return newICMPv4EchoRequest(id, seqnum, msglen, filler) + case utils.IPProtoICMPv6: + return newICMPv6EchoRequest(id, seqnum, msglen, filler) + } + return nil +} + +func newICMPv4EchoRequest(id, seqnum, msglen uint16, filler []byte) icmpMsg { + msg := newICMPInfoMessage(id, seqnum, msglen, filler) + msg[0] = ICMP4_ECHO_REQUEST + cs := icmpChecksum(msg) + // place checksum back in header; using ^= avoids the assumption that the + // checksum bytes are zero + msg[2] ^= uint8(cs & 0xff) + msg[3] ^= uint8(cs >> 8) + return msg +} + +func icmpChecksum(msg icmpMsg) uint16 { + cklen := len(msg) + s := uint32(0) + for i := 0; i < cklen-1; i += 2 { + s += uint32(msg[i+1])<<8 | uint32(msg[i]) + } + if cklen&1 == 1 { + s += uint32(msg[cklen-1]) + } + s = (s >> 16) + (s & 0xffff) + s = s + (s >> 16) + return uint16(^s) +} + +func newICMPv6EchoRequest(id, seqnum, msglen uint16, filler []byte) icmpMsg { + msg := newICMPInfoMessage(id, seqnum, msglen, filler) + msg[0] = ICMP6_ECHO_REQUEST + // Note: For IPv6, the OS will compute and populate the ICMP checksum bytes. + return msg +} + +func newICMPInfoMessage(id, seqnum, msglen uint16, filler []byte) icmpMsg { + b := make([]byte, msglen) + copy(b[8:], bytes.Repeat(filler, (int(msglen)-8)/(len(filler)+1))) + b[0] = 0 // type + b[1] = 0 // code + b[2] = 0 // checksum + b[3] = 0 // checksum + b[4] = uint8(id >> 8) // identifier + b[5] = uint8(id & 0xff) // identifier + b[6] = uint8(seqnum >> 8) // sequence number + b[7] = uint8(seqnum & 0xff) // sequence number + return b +} + +func parseICMPEchoReply(msg icmpMsg) (id, seqnum, chksum uint16) { + id = uint16(msg[4])<<8 | uint16(msg[5]) + seqnum = uint16(msg[6])<<8 | uint16(msg[7]) + chksum = uint16(msg[2])<<8 | uint16(msg[3]) + return +} + +func exchangeICMPEcho(network string, ip net.IP, timeout time.Duration, echo icmpMsg) error { + c, err := net.ListenPacket(network, "") + if err != nil { + return err + } + defer c.Close() + + _, err = c.WriteTo(echo, &net.IPAddr{IP: ip}) + if err != nil { + return err + } + + c.SetDeadline(time.Now().Add(timeout)) + reply := make([]byte, 256) + for { + _, addr, err := c.ReadFrom(reply) + if err != nil { + return err + } + if !ip.Equal(net.ParseIP(addr.String())) { + continue + } + if reply[0] != ICMP4_ECHO_REPLY && reply[0] != ICMP6_ECHO_REPLY { + continue + } + xid, xseqnum, _ := parseICMPEchoReply(echo) + rid, rseqnum, rchksum := parseICMPEchoReply(reply) + if rid != xid || rseqnum != xseqnum { + continue + } + if reply[0] == ICMP4_ECHO_REPLY { + cs := icmpChecksum(reply) + if cs != 0 { + return fmt.Errorf("Bad ICMP checksum: %x", rchksum) + } + } + // TODO(angusc): Validate checksum for IPv6 + break + } + return nil +} diff --git a/tools/healthcheck/pkg/helthcheck/ping_checker_test.go b/tools/healthcheck/pkg/helthcheck/ping_checker_test.go new file mode 100644 index 000000000..686ff15cf --- /dev/null +++ b/tools/healthcheck/pkg/helthcheck/ping_checker_test.go @@ -0,0 +1,45 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// The healthcheck package refers to the framework of "github.com/google/ +// seesaw/healthcheck" heavily, with only some adaption changes for DPVS. + +package hc + +import ( + "net" + "testing" + "time" +) + +var targets = []Target{ + {net.ParseIP("127.0.0.1"), 0, 0}, + {net.ParseIP("192.168.88.30"), 0, 0}, + {net.ParseIP("11.22.33.44"), 0, 0}, + {net.ParseIP("::1"), 0, 0}, + {net.ParseIP("2001::1"), 0, 0}, + {net.ParseIP("2001::68"), 0, 0}, +} + +func TestPingChecker(t *testing.T) { + for _, target := range targets { + checker := NewPingChecker() + id := Id(target.IP.String()) + config := NewCheckerConfig(&id, checker, + &target, StateUnknown, 0, + 3*time.Second, 1*time.Second, 3) + result := checker.Check(target, config.Timeout) + t.Logf("%v", result) + } +} diff --git a/tools/healthcheck/pkg/helthcheck/server.go b/tools/healthcheck/pkg/helthcheck/server.go new file mode 100644 index 000000000..bd26265cb --- /dev/null +++ b/tools/healthcheck/pkg/helthcheck/server.go @@ -0,0 +1,248 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// The healthcheck package refers to the framework of "github.com/google/ +// seesaw/healthcheck" heavily, with only some adaption changes for DPVS. + +package hc + +import ( + "fmt" + "math/rand" + "time" + + log "github.com/golang/glog" + "github.com/iqiyi/dpvs/tools/healthcheck/pkg/lb" + "github.com/iqiyi/dpvs/tools/healthcheck/pkg/utils" +) + +// Server contains the data needed to run a healthcheck server. +type Server struct { + config *ServerConfig + comm lb.Comm + + healthchecks map[Id]*Checker + configs chan map[Id]*CheckerConfig + notify chan *Notification + + quit chan bool +} + +// NewServer returns an initialised healthcheck server. +func NewServer(cfg *ServerConfig) *Server { + rand.Seed(time.Now().UnixNano()) + + if cfg == nil { + defaultCfg := DefaultServerConfig() + cfg = &defaultCfg + } + + var comm lb.Comm + switch cfg.LbIfaceType { + case "dpvs-agent": + comm = lb.NewDpvsAgentComm(cfg.LbIfaceAddr) + default: + panic(fmt.Sprintf("lb_iface_type %q not supported", cfg.LbIfaceType)) + } + + return &Server{ + config: cfg, + comm: comm, + + healthchecks: make(map[Id]*Checker), + notify: make(chan *Notification, cfg.NotifyChannelSize), + configs: make(chan map[Id]*CheckerConfig), + + quit: make(chan bool, 1), + } +} + +func (s *Server) NewChecker(typ lb.Checker, proto utils.IPProto) CheckMethod { + // TODO: support user specified Send/Receive data for TCP/UDP checker + var checker CheckMethod + switch typ { + case lb.CheckerTCP: + checker = NewTCPChecker("", "") + case lb.CheckerUDP: + checker = NewUDPChecker("", "") + case lb.CheckerPING: + checker = NewPingChecker() + case lb.CheckerNone: + if s.config.LbAutoMethod { + switch proto { + case utils.IPProtoTCP: + checker = NewTCPChecker("", "") + case utils.IPProtoUDP: + checker = NewUDPChecker("", "") + } + } + } + return checker +} + +// getHealthchecks attempts to get the current healthcheck configurations from DPVS +func (s *Server) getHealthchecks() (*Checkers, error) { + vss, err := s.comm.ListVirtualServices() + if err != nil { + return nil, err + } + results := &Checkers{Configs: make(map[Id]*CheckerConfig)} + for _, vs := range vss { + for _, rs := range vs.RSs { + target := &Target{rs.IP, rs.Port, vs.Protocol} + id := NewId(vs.Id, target) + checker := s.NewChecker(vs.Checker, vs.Protocol) + if checker == nil { + log.Info("Skip checking vs %v with %v", vs.Id, vs.Checker) + continue + } + weight := rs.Weight + state := StateUnknown + if weight > 0 { + state = StateHealthy + } else if rs.Inhibited { + state = StateUnhealthy + } + // TODO: allow users to specify check interval, timeout and retry + config := NewCheckerConfig(id, checker, + target, state, weight, + DefaultCheckConfig.Interval, + DefaultCheckConfig.Timeout, + DefaultCheckConfig.Retry) + results.Configs[*id] = config + } + } + return results, nil +} + +// updater attempts to fetch healthcheck configurations at regular intervals. +// When configurations are successfully retrieved they are provided to the +// manager via the configs channel. +func (s *Server) updater() { + for { + log.Info("Getting healthchecks from DPVS ...") + checkers, err := s.getHealthchecks() + if err != nil { + log.Errorf("Getting healthchecks failed: %v, retry later", err) + time.Sleep(5 * time.Second) + } else if checkers != nil { + log.Infof("DPVS returned %d healthcheck(s)", len(checkers.Configs)) + s.configs <- checkers.Configs + time.Sleep(s.config.FetchInterval) + } else { // It should not happen. + log.Warning("No healthcheck returned from DPVS") + time.Sleep(s.config.FetchInterval) + } + } +} + +// notifier batches healthcheck notifications and sends them to DPVS. +func (s *Server) notifier() { + // TODO: support more concurrency and rate limit + for { + select { + case notification := <-s.notify: + log.Infof("Sending notification >>> %v", notification) + //fmt.Println("Sending notification >>>", notification) + inhibited := false + if notification.Status.State == StateUnhealthy { + inhibited = true + } + vs := &lb.VirtualService{ + Id: notification.Id.Vs(), + Protocol: notification.Target.Proto, + RSs: []lb.RealServer{{ + IP: notification.Target.IP, + Port: notification.Target.Port, + Weight: notification.Status.Weight, + Inhibited: inhibited, + }}, + } + + if err := s.comm.UpdateByChecker([]lb.VirtualService{*vs}); err != nil { + log.Warningf("Failed to Update %v healthy status to %v(weight: %d): %v", + notification.Id, notification.State, notification.Status.Weight, err) + } + } + } +} + +// manager is responsible for controlling the healthchecks that are currently +// running. When healthcheck configurations become available, the manager will +// stop and remove deleted healthchecks, spawn new healthchecks and provide +// the current configurations to each of the running healthchecks. +func (s *Server) manager() { + notifyTicker := time.NewTicker(s.config.NotifyInterval) + var configs map[Id]*CheckerConfig + for { + select { + case configs = <-s.configs: + + // Remove healthchecks that have been deleted. + for id, hc := range s.healthchecks { + if configs[id] == nil { + hc.Stop() + delete(s.healthchecks, id) + } + } + + // Spawn new healthchecks. + for id, conf := range configs { + if s.healthchecks[id] == nil { + hc := NewChecker(s.notify, conf.State, conf.Weight) + hc.SetDryrun(s.config.DryRun) + s.healthchecks[id] = hc + checkTicker := time.NewTicker(time.Duration((1 + rand.Intn( + int(DefaultCheckConfig.Interval.Milliseconds())))) * time.Millisecond) + go hc.Run(checkTicker.C) + } + } + + // Update configurations. + for id, hc := range s.healthchecks { + hc.Update(configs[id]) + } + + case <-notifyTicker.C: + log.Infof("Total checkers: %d", len(s.healthchecks)) + // Send notifications when status changed. + for id, hc := range s.healthchecks { + notification := hc.Notification() + if configs[id].State != notification.State { + // FIXME: Don't resend the notification after a successful one. + hc.notify <- notification + } + } + } + } +} + +// Run runs a healthcheck server. +func (s *Server) Run() { + log.Infof("Starting healthcheck server (%v) ...", s.config) + go s.updater() + go s.notifier() + go s.manager() + + <-s.quit +} + +// Shutdown notifies a healthcheck server to shutdown. +func (s *Server) Shutdown() { + log.Info("Closing healthcheck server ...") + select { + case s.quit <- true: + default: + } +} diff --git a/tools/healthcheck/pkg/helthcheck/tcp_checker.go b/tools/healthcheck/pkg/helthcheck/tcp_checker.go new file mode 100644 index 000000000..5b6026cc0 --- /dev/null +++ b/tools/healthcheck/pkg/helthcheck/tcp_checker.go @@ -0,0 +1,122 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// The healthcheck package refers to the framework of "github.com/google/ +// seesaw/healthcheck" heavily, with only some adaption changes for DPVS. + +package hc + +import ( + "fmt" + "io" + "net" + "time" +) + +var _ CheckMethod = (*TCPChecker)(nil) + +// TCPChecker contains configuration specific to a TCP healthcheck. +type TCPChecker struct { + Config *CheckerConfig + + Receive string + Send string +} + +// NewTCPChecker returns an initialised TCPChecker. +func NewTCPChecker(recv, send string) *TCPChecker { + return &TCPChecker{Receive: recv, Send: send} +} + +func (hc *TCPChecker) BindConfig(conf *CheckerConfig) { + hc.Config = conf +} + +// String returns the string representation of a TCP healthcheck. +func (hc *TCPChecker) String() string { + return fmt.Sprintf("TCP checker for %v", hc.Config.Id) +} + +// Check executes a TCP healthcheck. +func (hc *TCPChecker) Check(target Target, timeout time.Duration) *Result { + msg := fmt.Sprintf("TCP connect to %s", target.Addr()) + start := time.Now() + if timeout == time.Duration(0) { + timeout = DefaultCheckConfig.Timeout + } + deadline := start.Add(timeout) + + dial := net.Dialer{ + Timeout: timeout, + } + conn, err := dial.Dial(target.Network(), target.Addr()) + if err != nil { + msg = fmt.Sprintf("%s: failed to dail", msg) + return NewResult(start, msg, false, err) + } + defer conn.Close() + + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + msg = fmt.Sprintf("%s: failed to create tcp socket", msg) + return NewResult(start, msg, false, err) + } + + if hc.Send == "" && hc.Receive == "" { + msg = fmt.Sprintf("%s succeed", msg) + return NewResult(start, msg, true, err) + } + + err = tcpConn.SetDeadline(deadline) + if err != nil { + msg = fmt.Sprintf("%s: failed to set deadline", msg) + return NewResult(start, msg, false, err) + } + + if hc.Send != "" { + err = writeFull(tcpConn, []byte(hc.Send)) + if err != nil { + msg = fmt.Sprintf("%s: failed to send request", msg) + return NewResult(start, msg, false, err) + } + } + + if hc.Receive != "" { + buf := make([]byte, len(hc.Receive)) + n, err := io.ReadFull(tcpConn, buf) + if err != nil { + msg = fmt.Sprintf("%s: failed to read response", msg) + return NewResult(start, msg, false, err) + } + got := string(buf[0:n]) + if got != hc.Receive { + msg = fmt.Sprintf("%s: unexpected response %q", msg, got) + return NewResult(start, msg, false, err) + } + } + + msg = fmt.Sprintf("%s succeed", msg) + return NewResult(start, msg, true, err) +} + +func writeFull(conn net.Conn, b []byte) error { + for len(b) > 0 { + n, err := conn.Write(b) + if err != nil { + return err + } + b = b[n:] + } + return nil +} diff --git a/tools/healthcheck/pkg/helthcheck/test/tcp_checker.go b/tools/healthcheck/pkg/helthcheck/test/tcp_checker.go new file mode 100644 index 000000000..cd8f1707b --- /dev/null +++ b/tools/healthcheck/pkg/helthcheck/test/tcp_checker.go @@ -0,0 +1,130 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// The healthcheck package refers to the framework of "github.com/google/ +// seesaw/healthcheck" heavily, with only some adaption changes for DPVS. + +package main + +import ( + "errors" + "fmt" + "io" + "net" + "time" +) + +func Check(addr, network, send, recv string, timeout time.Duration) *Result { + msg := fmt.Sprintf("TCP connect to %s", addr) + start := time.Now() + if timeout == time.Duration(0) { + timeout = 3 * time.Second + } + deadline := start.Add(timeout) + + dial := net.Dialer{ + Timeout: timeout, + } + conn, err := dial.Dial(network, addr) + if err != nil { + msg = fmt.Sprintf("%s: failed to dail", msg) + return NewResult(start, msg, false, err) + } + defer conn.Close() + + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + msg = fmt.Sprintf("%s: not a TCP conn", msg) + err = errors.New("failed to create tcp socket") + return NewResult(start, msg, false, err) + } + + if send == "" && recv == "" { + msg = fmt.Sprintf("%s: succeed", msg) + return NewResult(start, msg, true, err) + } + + err = tcpConn.SetDeadline(deadline) + if err != nil { + msg = fmt.Sprintf("%s: failed to set deadline", msg) + return NewResult(start, msg, false, err) + } + + if send != "" { + err = writeFull(tcpConn, []byte(send)) + if err != nil { + msg = fmt.Sprintf("%s: failed to send request", msg) + return NewResult(start, msg, false, err) + } + } + + if recv != "" { + buf := make([]byte, len(recv)) + n, err := io.ReadFull(tcpConn, buf) + if err != nil { + msg = fmt.Sprintf("%s: failed to read response", msg) + return NewResult(start, msg, false, err) + } + got := string(buf[0:n]) + if got != recv { + msg = fmt.Sprintf("%s: unexpected response %q", msg, got) + return NewResult(start, msg, false, err) + } + } + + msg = fmt.Sprintf("%s: succeed", msg) + return NewResult(start, msg, true, err) +} + +func writeFull(conn net.Conn, b []byte) error { + for len(b) > 0 { + n, err := conn.Write(b) + if err != nil { + return err + } + b = b[n:] + } + return nil +} + +type Result struct { + Message string + Success bool + time.Duration + Err error +} + +func (r *Result) String() string { + msg := fmt.Sprintf("[result: %v, duration: %v] ", r.Success, r.Duration) + if r.Err != nil { + return msg + r.Err.Error() + } + return msg + r.Message +} + +func NewResult(start time.Time, msg string, success bool, err error) *Result { + duration := time.Since(start) + return &Result{msg, success, duration, err} +} + +func main() { + fmt.Println(Check("192.168.88.30:80", "tcp4", "", "", 10*time.Second)) + fmt.Println(Check("192.168.88.30:80", "tcp4", "1", "cds1sfdafasdfasdfafafasssssssssssssssssssssssssss", 1*time.Second)) + fmt.Println(Check("192.168.88.31:80", "tcp4", "", "", 10*time.Second)) + fmt.Println(Check("10.130.133.208:80", "tcp4", "", "", 0)) + fmt.Println(Check("1.2.1.2:12123", "tcp4", "", "", 0)) + fmt.Println(Check("[2001::30]:80", "tcp6", "", "", 0)) + fmt.Println(Check("[2001::30]:80", "tcp6", "a", "HTTP", 0)) + fmt.Println(Check("[2001::33]:81", "tcp6", "", "", 0)) +} diff --git a/tools/healthcheck/pkg/helthcheck/test/udp_checker.go b/tools/healthcheck/pkg/helthcheck/test/udp_checker.go new file mode 100644 index 000000000..150587126 --- /dev/null +++ b/tools/healthcheck/pkg/helthcheck/test/udp_checker.go @@ -0,0 +1,119 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// The healthcheck package refers to the framework of "github.com/google/ +// seesaw/healthcheck" heavily, with only some adaption changes for DPVS. + +package main + +import ( + "errors" + "fmt" + "net" + "os" + "time" +) + +func udpPortCheck(udpConn *net.UDPConn) (bool, error) { + return true, nil +} + +func Check(addr, network, send, recv string, timeout time.Duration) *Result { + msg := fmt.Sprintf("UDP check to %s", addr) + start := time.Now() + if timeout == time.Duration(0) { + timeout = 3 + } + deadline := start.Add(timeout) + + dial := net.Dialer{Timeout: timeout} + conn, err := dial.Dial(network, addr) + if err != nil { + msg = fmt.Sprintf("%s: failed to dail", msg) + return NewResult(start, msg, false, err) + } + defer conn.Close() + + udpConn, ok := conn.(*net.UDPConn) + if !ok { + msg = fmt.Sprintf("%s: not an UDP conn", msg) + err = errors.New("failed to create udp socket") + return NewResult(start, msg, false, err) + } + + err = udpConn.SetDeadline(deadline) + if err != nil { + msg = fmt.Sprintf("%s: failed to set deadline", msg) + return NewResult(start, msg, false, err) + } + + if _, err = udpConn.Write([]byte(send)); err != nil { + msg = fmt.Sprintf("%s: failed to send request", msg) + return NewResult(start, msg, false, err) + } + + buf := make([]byte, len(recv)+1) + n, _, err := udpConn.ReadFrom(buf) + if err != nil { + if send == "" && recv == "" { + if neterr, ok := err.(net.Error); ok { + // When Send and Recv is none and i/o timeout, the dest port state + // is undetermined. Check shall return success in the case. + if neterr.Timeout() { + msg = fmt.Sprintf("%s: %s, port state unkown", msg, err) + return NewResult(start, msg, true, nil) + } + } + } + msg = fmt.Sprintf("%s: failed to read response", msg) + return NewResult(start, msg, false, err) + } + + got := string(buf[0:n]) + if got != recv { + msg = fmt.Sprintf("%s: unexpected response %q", msg, got) + return NewResult(start, msg, false, err) + } + msg = fmt.Sprintf("%s: succeed", msg) + return NewResult(start, msg, true, err) +} + +type Result struct { + Message string + Success bool + time.Duration + Err error +} + +func (r *Result) String() string { + msg := fmt.Sprintf("[result: %v, duration: %v] ", r.Success, r.Duration) + if r.Err != nil { + return msg + r.Err.Error() + } + return msg + r.Message +} + +func NewResult(start time.Time, msg string, success bool, err error) *Result { + duration := time.Since(start) + return &Result{msg, success, duration, err} +} + +func main() { + if len(os.Args) != 6 { + fmt.Printf("%s addr network send recv timeout\n", os.Args[0]) + return + } + timeout, _ := time.ParseDuration(os.Args[5]) + fmt.Println(Check(os.Args[1], os.Args[2], os.Args[3], os.Args[4], timeout)) +} diff --git a/tools/healthcheck/pkg/helthcheck/types.go b/tools/healthcheck/pkg/helthcheck/types.go new file mode 100644 index 000000000..560ebc84e --- /dev/null +++ b/tools/healthcheck/pkg/helthcheck/types.go @@ -0,0 +1,225 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// The healthcheck package refers to the framework of "github.com/google/ +// seesaw/healthcheck" heavily, with only some adaption changes for DPVS. + +package hc + +import ( + "fmt" + "net" + "strconv" + "strings" + "time" + + "github.com/iqiyi/dpvs/tools/healthcheck/pkg/utils" +) + +// Id provides the unique identifier of a given healthcheck. It has format +// of {vs}/{rs}, where {vs} is service id confined by the lb_interface_type, +// and {vs} is the backend id within the service of format ip:proto:port. +// Id should be created using NewId() function. +type Id string + +func NewId(vs string, rs *Target) *Id { + id := Id(fmt.Sprintf("%s/%v", vs, rs)) + return &id +} + +func (id Id) Vs() string { + strId := string(id) + idx := strings.LastIndexByte(strId, '/') + if idx < 0 { + return "" + } + return strId[:idx] +} + +func (id Id) Rs() *Target { + strId := string(id) + idx := strings.LastIndexByte(strId, '/') + if idx < 0 { + return nil + } + return NewTargetFromStr(strId[idx+1:]) +} + +// MethodType is the type of check method supported for now. +type MethodType int + +const ( + MethodTypeNone MethodType = iota + MethodTypeTCP + MethodTypeUDP + MethodTypePING +) + +// String returns the name for the given MethodType. +func (h MethodType) String() string { + switch h { + case MethodTypeTCP: + return "TCP" + case MethodTypeUDP: + return "UDP" + case MethodTypePING: + return "PING" + } + return "(unknown)" +} + +// CheckMethod is the interface that must be implemented by a healthcheck. +type CheckMethod interface { + Check(target Target, timeout time.Duration) *Result + BindConfig(conf *CheckerConfig) + String() string +} + +// State represents the current state of a healthcheck. +type State int + +const ( + StateUnknown State = iota + StateUnhealthy + StateHealthy +) + +var stateNames = map[State]string{ + StateUnknown: "Unknown", + StateUnhealthy: "Unhealthy", + StateHealthy: "Healthy", +} + +// String returns the string representation for the given healthcheck state. +func (s State) String() string { + if name, ok := stateNames[s]; ok { + return name + } + return "" +} + +// Target specifies the target for a healthcheck. +type Target struct { + IP net.IP // IP address of the healthcheck target. + Port uint16 + Proto utils.IPProto +} + +// Create a Target from str of format "IPv4:Proto:Port" or "[IPv6]:Proto:Port". +func NewTargetFromStr(str string) *Target { + idx2 := strings.LastIndexByte(str, ':') + idx1 := strings.LastIndexByte(str[:idx2], ':') + if idx1 < 0 || idx2 < 0 || idx1 >= idx2 { + return nil + } + port, err := strconv.ParseUint(str[idx2:], 10, 16) + if err != nil { + return nil + } + proto := utils.IPProtoFromStr(str[idx1:idx2]) + if proto == 0 { + return nil + } + ip := net.ParseIP(strings.TrimRight(strings.TrimLeft(str[:idx1], "["), "]")) + if ip == nil { + return nil + } + return &Target{ip, uint16(port), proto} +} + +// String returns the string representation of a healthcheck target. +func (t Target) String() string { + if t.IP.To4() != nil { + return fmt.Sprintf("%v:%v:%d", t.IP, t.Proto, t.Port) + } + return fmt.Sprintf("[%v]:%v:%d", t.IP, t.Proto, t.Port) +} + +// Addr returns the IP:Port representation of a healthcheck target +func (t Target) Addr() string { + if t.IP.To4() != nil { + return fmt.Sprintf("%v:%d", t.IP, t.Port) + } + return fmt.Sprintf("[%v]:%d", t.IP, t.Port) +} + +// Network returns the network name for the healthcheck target. +func (t *Target) Network() string { + var network string + version := 4 + if t.IP.To4() == nil { + version = 6 + } + switch t.Proto { + case utils.IPProtoICMP: + network = "ip4:icmp" + case utils.IPProtoICMPv6: + network = "ip6:ipv6-icmp" + case utils.IPProtoTCP: + network = fmt.Sprintf("tcp%d", version) + case utils.IPProtoUDP: + network = fmt.Sprintf("udp%d", version) + default: + return "(unknown)" + } + return network +} + +// Result stores the result of a healthcheck performed by a checker. +type Result struct { + Message string + Success bool + time.Duration + Err error +} + +// String returns the string representation of a healthcheck result. +func (r *Result) String() string { + msg := fmt.Sprintf("[result: %v, duration: %v] ", r.Success, r.Duration) + if r.Err != nil { + return msg + r.Err.Error() + } + return msg + r.Message +} + +func NewResult(start time.Time, msg string, success bool, err error) *Result { + // TODO: Make this clock skew safe. + duration := time.Since(start) + return &Result{msg, success, duration, err} +} + +// Status represents the current status of a healthcheck instance. +type Status struct { + LastCheck time.Time + Duration time.Duration + Failures uint64 + Successes uint64 + State + Weight uint16 + Message string +} + +// Notification stores a status notification for a healthcheck. +type Notification struct { + Id + Target + Status +} + +// String returns the string representation for the given notification. +func (n *Notification) String() string { + return fmt.Sprintf("ID %v, %v, Weight %d, Fail %v, Success %v, Last check %s in %v", n.Id, + stateNames[n.Status.State], n.Status.Weight, n.Status.Failures, n.Status.Successes, + n.Status.LastCheck.Format("2006-01-02 15:04:05.000"), n.Status.Duration) +} diff --git a/tools/healthcheck/pkg/helthcheck/udp_checker.go b/tools/healthcheck/pkg/helthcheck/udp_checker.go new file mode 100644 index 000000000..ceb6d4467 --- /dev/null +++ b/tools/healthcheck/pkg/helthcheck/udp_checker.go @@ -0,0 +1,108 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// The healthcheck package refers to the framework of "github.com/google/ +// seesaw/healthcheck" heavily, with only some adaption changes for DPVS. + +package hc + +import ( + "fmt" + "net" + "time" +) + +var _ CheckMethod = (*UDPChecker)(nil) + +// UDPChecker contains configuration specific to a UDP healthcheck. +type UDPChecker struct { + Config *CheckerConfig + + Receive string + Send string +} + +// NewUDPChecker returns an initialised UDPChecker. +func NewUDPChecker(recv, send string) *UDPChecker { + return &UDPChecker{Receive: recv, Send: send} +} + +func (hc *UDPChecker) BindConfig(conf *CheckerConfig) { + hc.Config = conf +} + +// String returns the string representation of a UDP healthcheck. +func (hc *UDPChecker) String() string { + return fmt.Sprintf("UDP checker for %v", hc.Config.Id) +} + +// Check executes a UDP healthcheck. +func (hc *UDPChecker) Check(target Target, timeout time.Duration) *Result { + msg := fmt.Sprintf("UDP check to %s", target.Addr()) + start := time.Now() + if timeout == time.Duration(0) { + timeout = DefaultCheckConfig.Timeout + } + deadline := start.Add(timeout) + + dial := net.Dialer{Timeout: timeout} + conn, err := dial.Dial(target.Network(), target.Addr()) + if err != nil { + msg = fmt.Sprintf("%s: failed to dail", msg) + return NewResult(start, msg, false, err) + } + defer conn.Close() + + udpConn, ok := conn.(*net.UDPConn) + if !ok { + msg = fmt.Sprintf("%s: failed to create udp socket", msg) + return NewResult(start, msg, false, err) + } + + err = udpConn.SetDeadline(deadline) + if err != nil { + msg = fmt.Sprintf("%s: failed to set deadline", msg) + return NewResult(start, msg, false, err) + } + + if _, err = udpConn.Write([]byte(hc.Send)); err != nil { + msg = fmt.Sprintf("%s: failed to send request", msg) + return NewResult(start, msg, false, err) + } + + buf := make([]byte, len(hc.Receive)+1) + n, _, err := udpConn.ReadFrom(buf) + if err != nil { + if hc.Send == "" && hc.Receive == "" { + if neterr, ok := err.(net.Error); ok { + // When hc.Send and hc.Receive is none and i/o timeout, the dest port state + // is undetermined. Check shall return success in the case. + if neterr.Timeout() { + msg = fmt.Sprintf("%s: %s, port state unkown", msg, err) + return NewResult(start, msg, true, nil) + } + } + } + msg = fmt.Sprintf("%s: failed to read response", msg) + return NewResult(start, msg, false, err) + } + + got := string(buf[0:n]) + if got != hc.Receive { + msg = fmt.Sprintf("%s: unexpected response %q", msg, got) + return NewResult(start, msg, false, err) + } + msg = fmt.Sprintf("%s: succeed", msg) + return NewResult(start, msg, true, err) +} diff --git a/tools/healthcheck/pkg/lb/dpvs_agent.go b/tools/healthcheck/pkg/lb/dpvs_agent.go new file mode 100644 index 000000000..0e0072785 --- /dev/null +++ b/tools/healthcheck/pkg/lb/dpvs_agent.go @@ -0,0 +1,228 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lb + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "strings" + "time" + + "github.com/iqiyi/dpvs/tools/healthcheck/pkg/utils" +) + +var _ Comm = (*DpvsAgentComm)(nil) + +var ( + serverDefault = "localhost:6600" + listUri = LbApi{"/v2/vs", http.MethodGet} + noticeUri = LbApi{"/v2/vs/%s/rs?healthcheck=true", http.MethodPut} + + client *http.Client = &http.Client{Timeout: httpClientTimeout} +) + +const httpClientTimeout = 10 * time.Second + +type DpvsAgentComm struct { + listApi LbApi + noticeApis []LbApi +} + +type LbApi struct { + Url string + HttpMethod string // http.MethodGet, http.MethodPut, ... +} + +type DpvsAgentRs struct { + IP string `json:"ip"` + Port uint16 `json:"port"` + Weight uint16 `json:"weight"` + Inhibited bool `json:"inhibited,omitempty"` +} + +type DpvsAgentRsItem struct { + Spec DpvsAgentRs +} + +type DpvsAgentRsList struct { + Items []DpvsAgentRsItem +} + +type DpvsAgentRsListPut struct { + Items []DpvsAgentRs +} + +type DpvsAgentVs struct { + Addr string + Port uint16 + Proto uint16 + DestCheck []string + Rss DpvsAgentRsList `json:"RSs"` +} + +type DpvsAgentVsList struct { + Items []DpvsAgentVs +} + +func (avs *DpvsAgentVs) serviceId() string { + return strings.ToLower(fmt.Sprintf("%s-%d-%s", avs.Addr, avs.Port, + utils.IPProto(avs.Proto))) +} + +func (avs *DpvsAgentVs) toVs() (*VirtualService, error) { + vip := net.ParseIP(avs.Addr) + if vip == nil { + return nil, fmt.Errorf("invalid Vs Addr %q", avs.Addr) + } + vport := avs.Port + proto := utils.IPProto(avs.Proto) + if proto != utils.IPProtoTCP && proto != utils.IPProtoUDP { + return nil, fmt.Errorf("Vs protocol type 0x%x not supported", avs.Port) + } + checker := CheckerNone + for _, name := range avs.DestCheck { + name = strings.ToLower(name) + switch name { + case "tcp": + checker = CheckerTCP + case "udp": + checker = CheckerUDP + case "ping": + checker = CheckerPING + } + } + vs := &VirtualService{ + Checker: checker, + IP: vip, + Port: vport, + Protocol: proto, + RSs: make([]RealServer, len(avs.Rss.Items)), + } + vs.Id = avs.serviceId() + + for i, ars := range avs.Rss.Items { + rip := net.ParseIP(ars.Spec.IP) + if rip == nil { + return nil, fmt.Errorf("%s: invalid Rs IP %q", vs.Id, ars.Spec.IP) + } + rs := &RealServer{ + IP: rip, + Port: ars.Spec.Port, + Weight: ars.Spec.Weight, + Inhibited: ars.Spec.Inhibited, + } + vs.RSs[i] = *rs + } + return vs, nil +} + +func (avslist *DpvsAgentVsList) toVsList() ([]VirtualService, error) { + if len(avslist.Items) == 0 { + return nil, nil + } + vslist := make([]VirtualService, len(avslist.Items)) + for i, avs := range avslist.Items { + vs, err := avs.toVs() + if err != nil { + return nil, err + } + vslist[i] = *vs + } + return vslist, nil +} + +func NewDpvsAgentComm(server string) *DpvsAgentComm { + if len(server) == 0 { + server = serverDefault + } + addr := "http://" + server + return &DpvsAgentComm{ + listApi: LbApi{addr + listUri.Url, listUri.HttpMethod}, + noticeApis: []LbApi{{addr + noticeUri.Url, noticeUri.HttpMethod}}, + } +} + +func (comm *DpvsAgentComm) ListVirtualServices() ([]VirtualService, error) { + req, err := http.NewRequest(comm.listApi.HttpMethod, comm.listApi.Url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + if len(data) == 0 { + return nil, nil + } + //fmt.Printf("Code: %v, Resp: %v, Err: %v\n", resp.StatusCode, string(data), err) + var svcs DpvsAgentVsList + if err = json.Unmarshal(data, &svcs); err != nil { + return nil, err + } + vslist, err := svcs.toVsList() + if err != nil { + return nil, err + } + //fmt.Println(vslist) + return vslist, nil +} + +func (comm *DpvsAgentComm) UpdateByChecker(targets []VirtualService) error { + // TODO: support batch operation + for _, vs := range targets { + for _, rs := range vs.RSs { + ars := &DpvsAgentRsListPut{ + Items: []DpvsAgentRs{ + { + IP: rs.IP.String(), + Port: rs.Port, + Weight: rs.Weight, + Inhibited: rs.Inhibited, + }, + }, + } + data, err := json.Marshal(ars) + if err != nil { + return err + } + for _, notice := range comm.noticeApis { + url := fmt.Sprintf(notice.Url, vs.Id) + req, err := http.NewRequest(notice.HttpMethod, url, bytes.NewBuffer(data)) + req.Header.Set("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + return err + } + //fmt.Println("Code:", resp.Status) + if resp.StatusCode != 200 { + data, _ = io.ReadAll(resp.Body) + return fmt.Errorf("CODE: %v, ERROR: %s", resp.StatusCode, strings.TrimSpace(string(data))) + } + resp.Body.Close() + } + } + } + return nil +} diff --git a/tools/healthcheck/pkg/lb/dpvs_agent_test.go b/tools/healthcheck/pkg/lb/dpvs_agent_test.go new file mode 100644 index 000000000..2d8e00122 --- /dev/null +++ b/tools/healthcheck/pkg/lb/dpvs_agent_test.go @@ -0,0 +1,49 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lb + +import ( + "testing" + "time" +) + +func TestListAndUpdate(t *testing.T) { + comm := NewDpvsAgentComm("") + vss, err := comm.ListVirtualServices() + if err != nil { + t.Errorf("list error: %v", err) + } + t.Logf("list Results: %v", vss) + if len(vss) < 2 { + return + } + t.Logf("Updating %v", vss[1]) + vss[1].RSs[0].Weight = 0 + vss[1].RSs[0].Inhibited = true + //vss[1].RSs[0].Port = 8081 + //vss[1].RSs[1].Weight = 100 + //vss[1].RSs[1].Inhibited = false + //vss[1].RSs[1].IP = net.ParseIP("1.2.3.4") + if err = comm.UpdateByChecker(vss[1:2]); err != nil { + t.Errorf("inhibit rs error: %v", err) + } + time.Sleep(3 * time.Second) + t.Logf("Restoring %v", vss[1]) + vss[1].RSs[0].Weight = 100 + vss[1].RSs[0].Inhibited = false + if err = comm.UpdateByChecker(vss[1:2]); err != nil { + t.Errorf("restore rs error: %v", err) + } +} diff --git a/tools/healthcheck/pkg/lb/types.go b/tools/healthcheck/pkg/lb/types.go new file mode 100644 index 000000000..bc3f8a02e --- /dev/null +++ b/tools/healthcheck/pkg/lb/types.go @@ -0,0 +1,65 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lb + +import ( + "net" + + "github.com/iqiyi/dpvs/tools/healthcheck/pkg/utils" +) + +type Checker uint16 + +const ( + CheckerNone Checker = iota + CheckerTCP + CheckerUDP + CheckerPING +) + +type RealServer struct { + IP net.IP + Port uint16 + Weight uint16 + Inhibited bool +} + +type VirtualService struct { + Id string + Checker Checker + Protocol utils.IPProto + Port uint16 + IP net.IP + RSs []RealServer +} + +type Comm interface { + ListVirtualServices() ([]VirtualService, error) + UpdateByChecker(targets []VirtualService) error +} + +func (checker Checker) String() string { + switch checker { + case CheckerNone: + return "checker_none" + case CheckerTCP: + return "checker_tcp" + case CheckerUDP: + return "checker_udp" + case CheckerPING: + return "checker_ping" + } + return "checker_unknown" +} diff --git a/tools/healthcheck/pkg/server/server.go b/tools/healthcheck/pkg/server/server.go new file mode 100644 index 000000000..d6591d0da --- /dev/null +++ b/tools/healthcheck/pkg/server/server.go @@ -0,0 +1,72 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "os" + "os/signal" + "runtime" + "syscall" + + log "github.com/golang/glog" +) + +// Shutdowner is an interface for a server that can be shutdown. +type Shutdowner interface { + Shutdown() +} + +var signalNames = map[syscall.Signal]string{ + syscall.SIGINT: "SIGINT", + syscall.SIGQUIT: "SIGQUIT", + syscall.SIGTERM: "SIGTERM", + syscall.SIGUSR1: "SIGUSR1", +} + +// signalName returns a string containing the standard name for a given signal. +func signalName(s syscall.Signal) string { + if name, ok := signalNames[s]; ok { + return name + } + return fmt.Sprintf("SIG %d", s) +} + +// ShutdownHandler configures signal handling and initiates a shutdown if a +// SIGINT, SIGQUIT or SIGTERM is received by the process. +func ShutdownHandler(server Shutdowner) { + sigc := make(chan os.Signal, 3) + signal.Notify(sigc, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGUSR1) + go func() { + for s := range sigc { + name := s.String() + if sig, ok := s.(syscall.Signal); ok { + if sig == syscall.SIGUSR1 { + dumpStacks() + continue + } + name = signalName(sig) + } + log.Infof("Received %v, initiating shutdown...", name) + server.Shutdown() + } + }() +} + +func dumpStacks() { + buf := make([]byte, 16384) + buf = buf[:runtime.Stack(buf, true)] + log.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) +} diff --git a/tools/healthcheck/pkg/utils/net.go b/tools/healthcheck/pkg/utils/net.go new file mode 100644 index 000000000..a82e36743 --- /dev/null +++ b/tools/healthcheck/pkg/utils/net.go @@ -0,0 +1,124 @@ +// Copyright 2023 IQiYi Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "fmt" + "net" + "syscall" +) + +// AF represents a network address family. +type AF int + +const ( + IPv4 AF = syscall.AF_INET + IPv6 AF = syscall.AF_INET6 +) + +// String returns the string representation of an AF. +func (af AF) String() string { + switch af { + case IPv4: + return "IPv4" + case IPv6: + return "IPv6" + } + return "(unknown)" +} + +// AFs returns the supported address families. +func AFs() []AF { + return []AF{IPv4, IPv6} +} + +// IP specifies an IP address. +type IP [net.IPv6len]byte + +// NewIP returns a seesaw IP initialised from a net.IP address. +func NewIP(nip net.IP) IP { + var ip IP + copy(ip[:], nip.To16()) + return ip +} + +// ParseIP parses the given string and returns a healthcheck IP initialised +// with the resulting IP address. +func ParseIP(ip string) IP { + return NewIP(net.ParseIP(ip)) +} + +// Equal returns true of the given IP addresses are equal, as determined by +// net.IP.Equal(). +func (ip IP) Equal(eip IP) bool { + return ip.IP().Equal(eip.IP()) +} + +// IP returns the net.IP representation of a healthcheck IP address. +func (ip IP) IP() net.IP { + return net.IP(ip[:]) +} + +// AF returns the address family of a healthcheck IP address. +func (ip IP) AF() AF { + if ip.IP().To4() != nil { + return IPv4 + } + return IPv6 +} + +// String returns the string representation of an IP address. +func (ip IP) String() string { + return fmt.Sprintf("%v", ip.IP()) +} + +// IPProto specifies an IP protocol. +type IPProto uint16 + +const ( + IPProtoICMP IPProto = syscall.IPPROTO_ICMP + IPProtoICMPv6 IPProto = syscall.IPPROTO_ICMPV6 + IPProtoTCP IPProto = syscall.IPPROTO_TCP + IPProtoUDP IPProto = syscall.IPPROTO_UDP +) + +// String returns the name for the given protocol value. +func (proto IPProto) String() string { + switch proto { + case IPProtoICMP: + return "ICMP" + case IPProtoICMPv6: + return "ICMPv6" + case IPProtoTCP: + return "TCP" + case IPProtoUDP: + return "UDP" + } + return fmt.Sprintf("IP(%d)", proto) +} + +func IPProtoFromStr(str string) IPProto { + switch str { + case "TCP": + return IPProtoTCP + case "UDP": + return IPProtoUDP + case "ICMP": + return IPProtoICMP + case "ICMPv6": + return IPProtoICMPv6 + } + return 0 +} diff --git a/tools/healthcheck/test/README.md b/tools/healthcheck/test/README.md new file mode 100644 index 000000000..d8977e4b6 --- /dev/null +++ b/tools/healthcheck/test/README.md @@ -0,0 +1,238 @@ +DPVS 健康检查程序压力测试 +--- + +# 测试方法 + +首先,使用 [stress-test.sh](./stress-test.sh) 脚本生成测试业务。 + +* 测试业务列表只需要有 VS 和 RS 配置,不需要配置 VIP、local IP 等。 +* 通过调整脚本 `Step 2` 中的 i, j 两个循环控制变量的范围控制产生的健康检查配置数量的多少。 +* 通过调整脚本 `Step 2` 中循环控制变量 j 的范围控制每个 VS 下配置的 RS 数量的多少,默认每个 VS 配置 5 个 RS。 + +测试业务生成后 RS 初始状态都是 UP(权重为1): + +``` +## 未启动健康检查服务时测试业务状态 +IP Virtual Server version 1.9.4 (size=0) +Prot LocalAddress:Port Scheduler Flags + -> RemoteAddress:Port Forward Weight ActiveConn InActConn +TCP 192.168.0.1:80 wlc + -> 192.168.19.155:8080 FullNat 1 0 0 + -> 192.168.19.156:8080 FullNat 1 0 0 + -> 192.168.19.157:8080 FullNat 1 0 0 + -> 192.168.19.158:8080 FullNat 1 0 0 + -> 192.168.19.159:8080 FullNat 1 0 0 +TCP 192.168.0.2:80 wlc + -> 192.168.19.160:8080 FullNat 1 0 0 + -> 192.168.19.161:8080 FullNat 1 0 0 + -> 192.168.19.162:8080 FullNat 1 0 0 + -> 192.168.19.163:8080 FullNat 1 0 0 + -> 192.168.19.164:8080 FullNat 1 0 0 +TCP 192.168.0.3:80 wlc + -> 192.168.19.165:8080 FullNat 1 0 0 + -> 192.168.19.166:8080 FullNat 1 0 0 +... +``` + +但实际上这些 RS 是不通的,后续健康检查程序会把所有的 RS 设置为 DOWN(权重为 0,并添加 标志)。 + +``` +## 健康检查完成测试业务状态 +IP Virtual Server version 1.9.4 (size=0) +Prot LocalAddress:Port Scheduler Flags + -> RemoteAddress:Port Forward Weight ActiveConn InActConn +TCP 192.168.0.1:80 wlc + -> 192.168.19.155:8080 FullNat 0 0 0 inhibited + -> 192.168.19.156:8080 FullNat 0 0 0 inhibited + -> 192.168.19.157:8080 FullNat 0 0 0 inhibited + -> 192.168.19.158:8080 FullNat 0 0 0 inhibited + -> 192.168.19.159:8080 FullNat 0 0 0 inhibited +TCP 192.168.0.2:80 wlc + -> 192.168.19.160:8080 FullNat 0 0 0 inhibited + -> 192.168.19.161:8080 FullNat 0 0 0 inhibited + -> 192.168.19.162:8080 FullNat 0 0 0 inhibited + -> 192.168.19.163:8080 FullNat 0 0 0 inhibited + -> 192.168.19.164:8080 FullNat 0 0 0 inhibited +TCP 192.168.0.3:80 wlc + -> 192.168.19.165:8080 FullNat 0 0 0 inhibited + -> 192.168.19.166:8080 FullNat 0 0 0 inhibited +... +``` + +测试业务创建后,`stress-test.sh` 脚本会循环检测当前的 RS 总数和被置为 DOWN 状态的 RS 数量。当我们启动健康检查程序后, + +```sh +./healthcheck -log_dir=./log +``` + +测试业务的 RS 会陆续由初始的 UP 状态而转为 DOWN 状态,我们根据 RS 被置为 DOWN 的数量的增长速度即可评估健康检查程序的并发性能。 + +# 测试数据 + +* RS 数量:测试脚本自动创建的 RS 总量 +* 初始探测耗时:健康检查程序启动到看到第一个 RS 被置为 DOWN 的时间。 +* 耗时(5分位):从第一个 RS 被置为 DOWN 到 50% 的 RS 被置为 DOWN 的时间。 +* 耗时(9分位):从第一个 RS 被置为 DOWN 到 90% 的 RS 被置为 DOWN 的时间。 +* 总耗时:从第一个 RS 被置为 DOWN 到所有的 RS 被置为 DOWN 的时间。 +* CPU 占用:健康检查程序的 CPU 使用量(用 iftop 命令观测得到)。 +* 内存占用:健康检查程序的内存使用量(用 iftop 命令观测得到)。 + +| RS数量 | 初始探测耗时 | 耗时(5分位) | 耗时(9分位) | 总耗时 | CPU占用 | 内存占用 | +| ------ | ------------ | ------------ | ----------- | ------ | ------- | -------- | +| 0 | 0 | 0 | 0 | 0 | 0.1核 | 100MB | +| 1040 | 6s | 1s | 2s | 2s | 0.1核 | 110MB | +| 5080 | 6s | 1s | 2s | 3s | 0.4核 | 160MB | +| 10160 | 5s | 4s | 6s | 8s | 0.8核 | 200MB | +| 26670 | 5s | 9s | 16s | 34s | 1.8核 | 560MB | +| 52070 | 7s | 7s | 33s | 90s | 4.4核 | 1120MB | + +> 说明: 健康检查程序默认配置的 retry 为 1 次、timeout 为 1s、周期为 3s,因此初始探测时间理论上为 1s(timeout) + 3s (delay loop) + 1s (timeout) = 5s。该数据和我们测试数据基本一致,不计入性能延迟。 + + +# 测试日志 + +**1040 个 RS** + +``` +[2023-06-02.17:08:51] total: 1040, inhibited: 0 +[2023-06-02.17:08:55] total: 1040, inhibited: 0 +[2023-06-02.17:08:56] total: 1040, inhibited: 0 +[2023-06-02.17:08:57] total: 1040, inhibited: 0 +[2023-06-02.17:08:58] total: 1040, inhibited: 321 +[2023-06-02.17:08:59] total: 1040, inhibited: 709 +[2023-06-02.17:09:00] total: 1040, inhibited: 1040 +[2023-06-02.17:09:01] total: 1040, inhibited: 1040 +[2023-06-02.17:09:02] total: 1040, inhibited: 1040 +[2023-06-02.17:09:03] total: 1040, inhibited: 1040 +``` + +**5080 个 RS** + +``` +[2023-06-02.17:02:17] total: 5080, inhibited: 0 +[2023-06-02.17:02:18] total: 5080, inhibited: 0 +[2023-06-02.17:02:19] total: 5080, inhibited: 0 +[2023-06-02.17:02:20] total: 5080, inhibited: 0 +[2023-06-02.17:02:21] total: 5080, inhibited: 1474 +[2023-06-02.17:02:22] total: 5080, inhibited: 3340 +[2023-06-02.17:02:23] total: 5080, inhibited: 5078 +[2023-06-02.17:02:25] total: 5080, inhibited: 5080 +[2023-06-02.17:02:26] total: 5080, inhibited: 5080 +[2023-06-02.17:02:27] total: 5080, inhibited: 5080 +[2023-06-02.17:02:28] total: 5080, inhibited: 5080 +``` + +**10160 个 RS** + +``` +[2023-06-02.16:51:21] total: 10160, inhibited: 0 +[2023-06-02.16:51:23] total: 10160, inhibited: 0 +[2023-06-02.16:51:24] total: 10160, inhibited: 0 +[2023-06-02.16:51:25] total: 10160, inhibited: 0 +[2023-06-02.16:51:27] total: 10160, inhibited: 0 +[2023-06-02.16:51:28] total: 10160, inhibited: 52 +[2023-06-02.16:51:29] total: 10160, inhibited: 2050 +[2023-06-02.16:51:30] total: 10160, inhibited: 4021 +[2023-06-02.16:51:32] total: 10160, inhibited: 6027 +[2023-06-02.16:51:33] total: 10160, inhibited: 8094 +[2023-06-02.16:51:34] total: 10160, inhibited: 10116 +[2023-06-02.16:51:36] total: 10160, inhibited: 10160 +[2023-06-02.16:51:37] total: 10160, inhibited: 10160 +[2023-06-02.16:51:38] total: 10160, inhibited: 10160 +[2023-06-02.16:51:39] total: 10160, inhibited: 10160 +``` + +**26670 个 RS** + +``` +[2023-06-02.16:44:45] total: 26670, inhibited: 0 +[2023-06-02.16:44:46] total: 26670, inhibited: 0 +[2023-06-02.16:44:48] total: 26670, inhibited: 0 +[2023-06-02.16:44:50] total: 26670, inhibited: 0 +[2023-06-02.16:44:51] total: 26670, inhibited: 0 +[2023-06-02.16:44:53] total: 26670, inhibited: 1857 +[2023-06-02.16:44:55] total: 26670, inhibited: 4389 +[2023-06-02.16:44:56] total: 26670, inhibited: 6887 +[2023-06-02.16:44:58] total: 26670, inhibited: 9388 +[2023-06-02.16:45:00] total: 26670, inhibited: 12166 +[2023-06-02.16:45:02] total: 26670, inhibited: 15079 +[2023-06-02.16:45:03] total: 26670, inhibited: 17741 +[2023-06-02.16:45:05] total: 26670, inhibited: 20307 +[2023-06-02.16:45:07] total: 26670, inhibited: 23046 +[2023-06-02.16:45:09] total: 26670, inhibited: 25967 +[2023-06-02.16:45:10] total: 26670, inhibited: 26665 +[2023-06-02.16:45:12] total: 26670, inhibited: 26665 +[2023-06-02.16:45:14] total: 26670, inhibited: 26666 +[2023-06-02.16:45:16] total: 26670, inhibited: 26667 +[2023-06-02.16:45:18] total: 26670, inhibited: 26667 +[2023-06-02.16:45:19] total: 26670, inhibited: 26667 +[2023-06-02.16:45:21] total: 26670, inhibited: 26668 +[2023-06-02.16:45:23] total: 26670, inhibited: 26669 +[2023-06-02.16:45:25] total: 26670, inhibited: 26669 +[2023-06-02.16:45:26] total: 26670, inhibited: 26670 +[2023-06-02.16:45:28] total: 26670, inhibited: 26670 +[2023-06-02.16:45:30] total: 26670, inhibited: 26670 +[2023-06-02.16:45:32] total: 26670, inhibited: 26670 +[2023-06-02.16:45:34] total: 26670, inhibited: 26670 +``` + +**52070 个 RS** + +``` +[2023-06-02.16:37:39] total: 52070, inhibited: 0 +[2023-06-02.16:37:42] total: 52070, inhibited: 0 +[2023-06-02.16:37:44] total: 52070, inhibited: 0 +[2023-06-02.16:37:46] total: 52070, inhibited: 0 +[2023-06-02.16:37:48] total: 52070, inhibited: 1 +[2023-06-02.16:37:51] total: 52070, inhibited: 3032 +[2023-06-02.16:37:53] total: 52070, inhibited: 6743 +[2023-06-02.16:37:55] total: 52070, inhibited: 10129 +[2023-06-02.16:37:58] total: 52070, inhibited: 13849 +[2023-06-02.16:38:00] total: 52070, inhibited: 17478 +[2023-06-02.16:38:03] total: 52070, inhibited: 21177 +[2023-06-02.16:38:05] total: 52070, inhibited: 25168 +[2023-06-02.16:38:08] total: 52070, inhibited: 28867 +[2023-06-02.16:38:10] total: 52070, inhibited: 32909 +[2023-06-02.16:38:13] total: 52070, inhibited: 37034 +[2023-06-02.16:38:16] total: 52070, inhibited: 40798 +[2023-06-02.16:38:18] total: 52070, inhibited: 44471 +[2023-06-02.16:38:21] total: 52070, inhibited: 48355 +[2023-06-02.16:38:23] total: 52070, inhibited: 51830 +[2023-06-02.16:38:26] total: 52070, inhibited: 52025 +[2023-06-02.16:38:29] total: 52070, inhibited: 52030 +[2023-06-02.16:38:31] total: 52070, inhibited: 52034 +[2023-06-02.16:38:34] total: 52070, inhibited: 52035 +[2023-06-02.16:38:36] total: 52070, inhibited: 52035 +[2023-06-02.16:38:39] total: 52070, inhibited: 52035 +[2023-06-02.16:38:41] total: 52070, inhibited: 52035 +[2023-06-02.16:38:44] total: 52070, inhibited: 52035 +[2023-06-02.16:38:46] total: 52070, inhibited: 52035 +[2023-06-02.16:38:49] total: 52070, inhibited: 52035 +[2023-06-02.16:38:51] total: 52070, inhibited: 52037 +[2023-06-02.16:38:53] total: 52070, inhibited: 52042 +[2023-06-02.16:38:56] total: 52070, inhibited: 52042 +[2023-06-02.16:38:58] total: 52070, inhibited: 52047 +[2023-06-02.16:39:01] total: 52070, inhibited: 52049 +[2023-06-02.16:39:03] total: 52070, inhibited: 52052 +[2023-06-02.16:39:06] total: 52070, inhibited: 52053 +[2023-06-02.16:39:08] total: 52070, inhibited: 52057 +[2023-06-02.16:39:11] total: 52070, inhibited: 52060 +[2023-06-02.16:39:13] total: 52070, inhibited: 52060 +[2023-06-02.16:39:16] total: 52070, inhibited: 52065 +[2023-06-02.16:39:18] total: 52070, inhibited: 52070 +[2023-06-02.16:39:21] total: 52070, inhibited: 52070 +[2023-06-02.16:39:23] total: 52070, inhibited: 52070 +[2023-06-02.16:39:25] total: 52070, inhibited: 52070 +[2023-06-02.16:39:28] total: 52070, inhibited: 52070 +``` + +# 结论 + +1. 健康检查程序处理能力约为 3000 RS/s; +2. RS 状态变化不超过 5000 RS/s 时,健康检查程序能够快速摘除或恢复故障的 RS; +3. RS 状态变化 10000 RS/s 时,健康检查程序可以在 8s 内摘除或恢复故障的 RS; +3. 可以支持 50000+ RS/s 的 RS 状态变化,但约 50% 的 RS 故障可能得不到及时发现。 + +此外,在 50000+ 个 RS 配置处于健康状态稳定的场景下,单个 RS 故障的摘除和恢复时间约为 5s,与只有此 1 个 RS 配置时的处理时间区别不大。 + +> 说明: 以上结论采用健康检查程序默认配置时测试得到,测试过程中 DPVS 服务无数据面流量。 diff --git a/tools/healthcheck/test/dpvs-agent-api.sh b/tools/healthcheck/test/dpvs-agent-api.sh new file mode 100755 index 000000000..a5013c206 --- /dev/null +++ b/tools/healthcheck/test/dpvs-agent-api.sh @@ -0,0 +1,31 @@ +# Copyright 2023 IQiYi Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import requests +import json + +def UpdateService(): + payload="{\"Items\":[{\"inhibited\":false,\"ip\":\"192.168.88.68\",\"port\":80,\"weight\":100}]}" + url = "http://127.0.0.1:6600/v2/vs/192.168.88.1-80-TCP/rs?healthcheck=true" + headers = {'content-type': 'application/json'} + r = requests.put(url, headers=headers, data=payload) + print(r, r.json()) + + url = "http://127.0.0.1:6600/v2/vs/192.168.88.1-80-TCP" + headers = {'content-type': 'application/json'} + r = requests.get(url, headers=headers, data=payload) + print(r, r.json()) + +if __name__ == '__main__': + UpdateService() diff --git a/tools/healthcheck/test/stress-test.sh b/tools/healthcheck/test/stress-test.sh new file mode 100755 index 000000000..6ea848f85 --- /dev/null +++ b/tools/healthcheck/test/stress-test.sh @@ -0,0 +1,67 @@ +#!/bin/sh +# Copyright 2023 IQiYi Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +## Step 1. +echo -e "Cleaning existing services ..." +ipvsadm -C +sleep 5 + +now=$(date +%F.%T) +echo -e "[$now] Start" + +## Step 2. +echo -e "Adding test services ..." +rsid=5000 +for i in $(seq 0 32) +do + for j in $(seq 1 255) + do + vip="192.168.${i}.${j}" + flag="-t" + #udp=$((j%2)) + #[ "$udp" -eq 1 ] && flag="-u" + #echo $vip $flag + ipvsadm -A $flag $vip:80 + for k in $(seq 5) + do + seg3=$((rsid/255)) + seg4=$((rsid%255)) + rsid=$((rsid+1)) + rip="192.168.${seg3}.${seg4}" + #echo "-> $rip" + ipvsadm -a $flag $vip:80 -r $rip:8080 -b -w 100 + done + #dpip addr add $vip/32 dev dpdk0 + done +done + +## Step 3. +echo "" +echo "****************************************" +echo -e "Start healthcheck program on your own." +echo "****************************************" +echo "" + +## Step 4. +echo -e "Do Checking ..." +while true +do + now=$(date +%F.%T) + total=$(ipvsadm -ln| grep FullNat -c) + down=$(ipvsadm -ln| grep inhibited -c) + echo "[$now] total: $total, inhibited: $down" + sleep 1 +done