diff --git a/pkg/bkmonitorbeat/go.sum b/pkg/bkmonitorbeat/go.sum index e354b6a85..3fcb46f0b 100644 --- a/pkg/bkmonitorbeat/go.sum +++ b/pkg/bkmonitorbeat/go.sum @@ -219,6 +219,7 @@ github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+v github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI= github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/pkg/influxdb-proxy/go.mod b/pkg/influxdb-proxy/go.mod index 4d7c31beb..29dc4f8f5 100644 --- a/pkg/influxdb-proxy/go.mod +++ b/pkg/influxdb-proxy/go.mod @@ -1,6 +1,6 @@ module github.com/TencentBlueKing/bkmonitor-datalink/pkg/influxdb-proxy -go 1.21 +go 1.21.0 require ( github.com/Shopify/sarama v1.22.1 @@ -38,7 +38,7 @@ require ( github.com/eapache/queue v1.1.0 // indirect github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect github.com/fatih/color v1.9.0 // indirect - github.com/frankban/quicktest v1.11.0 // indirect + github.com/frankban/quicktest v1.14.6 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/gogap/env_json v0.0.0-20150503135429-86150085ddbe // indirect github.com/gogap/env_strings v0.0.1 // indirect @@ -77,7 +77,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect github.com/spf13/afero v1.9.2 // indirect - github.com/spf13/cast v1.3.0 // indirect + github.com/spf13/cast v1.10.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/tebeka/strftime v0.1.5 // indirect diff --git a/pkg/influxdb-proxy/go.sum b/pkg/influxdb-proxy/go.sum index e3caf8fc6..6944326ba 100644 --- a/pkg/influxdb-proxy/go.sum +++ b/pkg/influxdb-proxy/go.sum @@ -97,6 +97,7 @@ github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -128,6 +129,7 @@ github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/frankban/quicktest v1.11.0 h1:Yyrghcw93e1jKo4DTZkRFTTFvBsVhzbblBUPNU1vW6Q= github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -200,6 +202,7 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -325,6 +328,7 @@ github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3x github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible h1:Y6sqxHMyB1D2YSzWkLibYKgg+SwmyFU9dF2hn6MdTj4= @@ -394,6 +398,7 @@ github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0je github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI= github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -438,6 +443,7 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -461,6 +467,7 @@ github.com/spf13/afero v1.9.2 h1:j49Hj62F0n+DaZ1dDCvhABaPNSGNkt32oRFxI33IEMw= github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= diff --git a/pkg/sliwebhook/go.mod b/pkg/sliwebhook/go.mod index 062250b51..759d67044 100644 --- a/pkg/sliwebhook/go.mod +++ b/pkg/sliwebhook/go.mod @@ -1,6 +1,6 @@ module github.com/TencentBlueKing/bkmonitor-datalink/pkg/sliwebhook -go 1.21 +go 1.21.0 require ( github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils v0.0.0-00010101000000-000000000000 diff --git a/pkg/unify-query/go.mod b/pkg/unify-query/go.mod index ce752fa7f..16acf87ec 100755 --- a/pkg/unify-query/go.mod +++ b/pkg/unify-query/go.mod @@ -19,11 +19,10 @@ require ( github.com/go-redis/redis/v8 v8.11.5 github.com/golang-jwt/jwt/v4 v4.5.2 github.com/golang/mock v1.6.0 - github.com/google/go-cmp v0.7.0 github.com/google/gops v0.3.26 github.com/google/uuid v1.6.0 github.com/hashicorp/consul/api v1.18.0 - github.com/influxdata/influxdb v1.10.0 + github.com/influxdata/influxdb v1.11.5 github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385 github.com/jarcoal/httpmock v1.3.1 @@ -36,11 +35,11 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 github.com/prashantv/gostub v1.1.0 - github.com/prometheus/client_golang v1.16.0 - github.com/prometheus/common v0.42.0 + github.com/prometheus/client_golang v1.19.0 + github.com/prometheus/common v0.48.0 github.com/prometheus/prometheus v0.42.0 github.com/samber/lo v1.51.0 - github.com/sirupsen/logrus v1.9.3 + github.com/spaolacci/murmur3 v1.1.0 github.com/spf13/cast v1.10.0 github.com/spf13/cobra v1.6.1 github.com/spf13/viper v1.15.0 @@ -105,8 +104,8 @@ require ( github.com/goccy/go-json v0.10.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.2.5 // indirect - github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/go-cmp v0.7.0 // indirect github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -127,7 +126,6 @@ require ( github.com/mailru/easyjson v0.9.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -139,7 +137,8 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common/sigv4 v0.1.0 // indirect - github.com/prometheus/procfs v0.10.1 // indirect + github.com/prometheus/procfs v0.12.0 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/afero v1.15.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/pkg/unify-query/go.sum b/pkg/unify-query/go.sum index cab295277..e1a226a1a 100644 --- a/pkg/unify-query/go.sum +++ b/pkg/unify-query/go.sum @@ -498,8 +498,6 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= -github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/maxatome/go-testdeep v1.12.0 h1:Ql7Go8Tg0C1D/uMMX59LAoYK7LffeJQ6X2T04nTH68g= github.com/maxatome/go-testdeep v1.12.0/go.mod h1:lPZc/HAcJMP92l7yI6TRz1aZN5URwUBUAfUNvrclaNM= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= @@ -578,8 +576,8 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= -github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= -github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= +github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= +github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -591,8 +589,8 @@ github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8b github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= -github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= -github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4= github.com/prometheus/common/sigv4 v0.1.0/go.mod h1:2Jkxxk9yYvCkE5G1sQT7GuEXm57JrvHu9k5YwTjsNtI= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= @@ -600,8 +598,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= -github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/prometheus/prometheus v0.42.0 h1:G769v8covTkOiNckXFIwLx01XE04OE6Fr0JPA0oR2nI= github.com/prometheus/prometheus v0.42.0/go.mod h1:Pfqb/MLnnR2KK+0vchiaH39jXxvLMBk+3lnIGP4N7Vk= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -623,6 +621,8 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I= github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg= github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY= diff --git a/pkg/unify-query/redis/redis.go b/pkg/unify-query/redis/redis.go index 486c8a425..002148a6a 100644 --- a/pkg/unify-query/redis/redis.go +++ b/pkg/unify-query/redis/redis.go @@ -11,6 +11,7 @@ package redis import ( "context" + "errors" "sync" "time" @@ -105,6 +106,29 @@ var Set = func(ctx context.Context, key, val string, expiration time.Duration) ( return res.Result() } +// SetNx sets key to hold string value if key does not exist. +// - returns true if the key was set. +// - returns false if the key was not set.(already exists) +var SetNX = func(ctx context.Context, key, val string, expiration time.Duration) (bool, error) { + if key == "" { + key = globalInstance.serviceName + } + log.Debugf(ctx, "[redis] setnx %s", key) + res := globalInstance.client.SetNX(ctx, key, val, expiration) + return res.Result() +} + +var Delete = func(ctx context.Context, keys ...string) (int64, error) { + log.Debugf(ctx, "[redis] del %s", keys) + res := globalInstance.client.Del(ctx, keys...) + return res.Result() +} + +var TxPipeline = func(ctx context.Context) goRedis.Pipeliner { + log.Debugf(ctx, "[redis] txpipeline") + return globalInstance.client.TxPipeline() +} + var Get = func(ctx context.Context, key string) (string, error) { if key == "" { key = globalInstance.serviceName @@ -114,6 +138,10 @@ var Get = func(ctx context.Context, key string) (string, error) { return res.Result() } +var IsNil = func(err error) bool { + return errors.Is(err, goRedis.Nil) +} + var MGet = func(ctx context.Context, key string) ([]any, error) { if key == "" { key = globalInstance.serviceName @@ -129,8 +157,30 @@ var SMembers = func(ctx context.Context, key string) ([]string, error) { return res.Result() } -var Subscribe = func(ctx context.Context, channels ...string) <-chan *goRedis.Message { +var Subscribe = func(ctx context.Context, channels ...string) (ch <-chan *goRedis.Message, close func() error) { log.Debugf(ctx, "[redis] subscribe %s", channels) p := globalInstance.client.Subscribe(ctx, channels...) - return p.Channel() + + close = func() error { + return p.Close() + } + return p.Channel(), close +} + +var ExecLua = func(ctx context.Context, script *goRedis.Script, keys []string, args ...any) (any, error) { + log.Debugf(ctx, "[redis] exec lua %s", script) + res := script.Run(ctx, globalInstance.client, keys, args...) + return res.Result() +} + +var Expire = func(ctx context.Context, key string, expiration time.Duration) (bool, error) { + log.Debugf(ctx, "[redis] expire %s", key) + res := globalInstance.client.Expire(ctx, key, expiration) + return res.Result() +} + +var Publish = func(ctx context.Context, channel string, message any) (int64, error) { + log.Debugf(ctx, "[redis] publish %s", channel) + res := globalInstance.client.Publish(ctx, channel, message) + return res.Result() } diff --git a/pkg/unify-query/redis/router.go b/pkg/unify-query/redis/router.go index e113fdebf..6b6f03049 100644 --- a/pkg/unify-query/redis/router.go +++ b/pkg/unify-query/redis/router.go @@ -80,5 +80,26 @@ var GetSpace = func(ctx context.Context, spaceUid string) (Space, error) { } var SubscribeSpace = func(ctx context.Context) <-chan *redis.Message { - return Subscribe(ctx, globalInstance.serviceName) + inCh, closeFn := Subscribe(ctx, globalInstance.serviceName) + outCh := make(chan *redis.Message) + go func() { + defer closeFn() + defer close(outCh) + for { + select { + case msg, ok := <-inCh: + if !ok { + return + } + select { + case outCh <- msg: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + return outCh } diff --git a/pkg/unify-query/service/http/hook.go b/pkg/unify-query/service/http/hook.go index 5f1751f0e..1d57fcb77 100644 --- a/pkg/unify-query/service/http/hook.go +++ b/pkg/unify-query/service/http/hook.go @@ -79,6 +79,12 @@ func setDefaultConfig() { viper.SetDefault(ScrollSliceLimitConfigPath, 10000) viper.SetDefault(ScrollSessionLockTimeoutConfigPath, "60s") viper.SetDefault(ScrollWindowTimeoutConfigPath, "3m") + + // cache + viper.SetDefault(QueryCacheEnabledConfigPath, true) + viper.SetDefault(QueryCacheDefaultLimitConfigPath, 10000) + viper.SetDefault(QueryCacheSkipPathsConfigPath, []string{}) + viper.SetDefault(QueryCacheSkipMethodsConfigPath, []string{"PUT", "DELETE"}) } // LoadConfig @@ -108,6 +114,8 @@ func LoadConfig() { JwtPublicKey = viper.GetString(JwtPublicKeyConfigPath) JwtBkAppCodeSpaces = viper.GetStringMapStringSlice(JwtBkAppCodeSpacesConfigPath) JwtEnabled = viper.GetBool(JwtEnabledConfigPath) + + QueryCacheEnabled = viper.GetBool(QueryCacheEnabledConfigPath) } // init diff --git a/pkg/unify-query/service/http/middleware/cache/cache.go b/pkg/unify-query/service/http/middleware/cache/cache.go new file mode 100644 index 000000000..2d67b252b --- /dev/null +++ b/pkg/unify-query/service/http/middleware/cache/cache.go @@ -0,0 +1,506 @@ +package cache + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "regexp" + "strings" + "sync" + "time" + + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/memcache" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/redis" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/trace" + "github.com/dgraph-io/ristretto" + "github.com/gin-gonic/gin" + "github.com/spf13/viper" +) + +const ( + locked = "1" + doneMsg = "done" +) + +func initConf() Config { + writeTimeout := viper.GetDuration("http.write_timeout") + skipPaths := viper.GetStringSlice("http.query_cache.skip_paths") + + return Config{ + executeTTL: writeTimeout, + payloadTTL: writeTimeout, + lockTTL: writeTimeout * 2, + freshLock: writeTimeout / 2, + skipMethods: viper.GetStringSlice("http.query_cache.skip_methods"), + skipPaths: skipPaths, + bucketLimit: viper.GetInt64("http.query_cache.default_limit"), + cacheEnabled: viper.GetBool("http.query_cache.enabled"), + } +} + +func NewInstance(ctx context.Context) (*Service, error) { + service := &Service{} + err := service.initialize(ctx, initConf()) + if err != nil { + return nil, err + } + log.Infof(ctx, "cache middleware initialized successfully") + return service, nil +} + +type Service struct { + ctx context.Context + conf Config + + localCache *ristretto.Cache + + winnerMap map[string]string + winnerLock sync.RWMutex + waiterMap map[string]*WaitGroupValue + waiterLock sync.RWMutex +} + +type Config struct { + // executeTTL 是执行函数的最大允许时间 + executeTTL time.Duration + // payloadTTL 是缓存数据的 TTL + payloadTTL time.Duration + // lockTTL 是分布式锁的 TTL + lockTTL time.Duration + // freshLock 是作为配置的锁的续期时间. 真实的刷新频率会是该值的一半 + freshLock time.Duration + skipMethods []string + skipPaths []string + bucketLimit int64 + cacheEnabled bool +} + +type WaitGroupValue struct { + mu sync.Mutex + channels []chan struct{} +} + +func (d *Service) getCacheFromLocal(key string) (interface{}, bool) { + return d.localCache.Get(key) +} + +func (d *Service) setCacheToLocal(key string, value interface{}) { + d.localCache.SetWithTTL(key, value, 1, d.conf.payloadTTL) +} + +func (d *Service) doDistributed(ctx context.Context, key string, doQuery func() (interface{}, string, error)) (result interface{}, hit bool, err error) { + var acquired bool + + ctx, span := trace.NewSpan(ctx, "cache-middleware-do-distributed") + defer span.End(&err) + + lockKey := cacheKeyMap(lockKeyType, key) + + span.Set("lock-key", lockKey) + + result, hit, err = d.getFromDistributedCache(ctx, key) + if err != nil { + return result, hit, err + } + if hit { + d.setCacheToLocal(key, result) + return result, hit, err + } + + // 1. 尝试从Redis获取分布式锁 + acquired, err = redis.SetNX(ctx, lockKey, locked, d.conf.lockTTL) + if err != nil { + return result, hit, err + } + + span.Set("distributed-lock-acquired", acquired) + + if acquired { + // 2.1 锁获取成功,成为 Cluster Winner + d.winnerLock.Lock() + d.winnerMap[key] = key + d.winnerLock.Unlock() + + defer func() { + d.winnerLock.Lock() + delete(d.winnerMap, key) + d.winnerLock.Unlock() + }() + + // 2.1.1 执行函数并通知等待者 + result, err = d.runAndNotify(ctx, key, doQuery) + return result, hit, err + } else { + // 2.2 锁获取失败,成为 Cluster Waiter + // 2.2.1 进入等待循环 + result, err = d.waiterLoop(ctx, key) + return result, hit, err + } +} + +func (d *Service) runAndNotify(ctx context.Context, key string, doQuery func() (interface{}, string, error)) (result interface{}, err error) { + ctx, span := trace.NewSpan(ctx, "cache-middleware-run-and-notify") + defer span.End(&err) + + dataKey := cacheKeyMap(dataKeyType, key) + channelKey := cacheKeyMap(channelKeyType, key) + + span.Set("data-key", dataKey) + span.Set("channel-key", channelKey) + + // 1. 执行函数获取结果 + result, _, err = doQuery() + if err != nil { + return result, err + } + + bts, err := json.Marshal(result) + if err != nil { + return nil, err + } + + // 2. 回写缓存 + err = d.writeLimitedDistributedCache(ctx, dataKey, bts) + if err != nil { + log.Warnf(ctx, "failed to write cache with limit control: %v", err) + return nil, err + } + + span.Set("cache-written", true) + + // 3. 通知等待者 + _, err = redis.Publish(ctx, channelKey, doneMsg) + if err != nil { + log.Warnf(ctx, "failed to publish completion for key %s: %v", dataKey, err) + return nil, err + } + + span.Set("cache-notify-published", true) + + return result, nil +} + +func (d *Service) waiterLoop(ctx context.Context, key string) (result interface{}, err error) { + ctx, span := trace.NewSpan(ctx, "cache-middleware-waiter-loop") + defer span.End(&err) + + dataKey := cacheKeyMap(dataKeyType, key) + + span.Set("data-key", dataKey) + + // 1.阻塞进入等待 + err = d.waitForNotify(ctx, dataKey) + if err != nil { + return result, err + } + // 3. 通知到达,读取缓存并返回 + result, _, err = d.getFromDistributedCache(ctx, dataKey) + + return result, err +} + +func (d *Service) ttlKeeper(ctx context.Context) { + refreshInterval := d.conf.freshLock + ticker := time.NewTicker(refreshInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + var lockKeys []string + d.winnerLock.RLock() + for keyStr := range d.winnerMap { + lockKeys = append(lockKeys, cacheKeyMap(lockKeyType, keyStr)) + } + + if len(lockKeys) > 0 { + client := redis.Client() + pipe := client.Pipeline() + + for _, lockKey := range lockKeys { + pipe.Expire(ctx, lockKey, d.conf.lockTTL) + } + + _, err := pipe.Exec(ctx) + if err != nil { + log.Warnf(ctx, "failed to batch refresh lock TTL for %d keys: %v", len(lockKeys), err) + } else { + log.Debugf(ctx, "successfully refreshed TTL for %d lock keys", len(lockKeys)) + } + } + d.winnerLock.RUnlock() + case <-ctx.Done(): + return + } + } +} + +func (d *Service) getFromDistributedCache(ctx context.Context, key string) (result interface{}, hit bool, err error) { + ctx, span := trace.NewSpan(ctx, "cache-middleware-get-distributed-cache") + defer span.End(&err) + cacheKey := cacheKeyMap(dataKeyType, key) + + span.Set("data-key", cacheKey) + + valStr, err := redis.Get(ctx, cacheKey) + if err != nil { + // 如果是缓存未命中,则不返回错误 + missing := redis.IsNil(err) + err = nil + + if missing { + err = nil + } + + return result, hit, err + } + if redis.IsNil(err) { + span.Set("hit-distributed", false) + + // 缓存未命中 + err = nil + return result, hit, err + } + + if err != nil { + return result, hit, err + } + + span.Set("hit-distributed", true) + err = json.Unmarshal([]byte(valStr), &result) + + hit = true + return result, hit, err +} + +func (d *Service) initialize(ctx context.Context, conf Config) error { + localCache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: viper.GetInt64(memcache.RistrettoNumCountersPath), + MaxCost: viper.GetInt64(memcache.RistrettoMaxCostPath), + BufferItems: viper.GetInt64(memcache.RistrettoBufferItemsPath), + IgnoreInternalCost: viper.GetBool(memcache.RistrettoIgnoreInternalCostPath), + }) + if err != nil { + return err + } + + d.conf = conf + // freshLock < executeTTL < lockTTL + + d.ctx = ctx + d.localCache = localCache + d.winnerMap = make(map[string]string) + d.waiterMap = make(map[string]*WaitGroupValue) + go d.ttlKeeper(ctx) + go d.subLoop(ctx) + return nil +} + +func (d *Service) do(ctx context.Context, key string, doQuery func() (interface{}, string, error)) (result interface{}, crossReason string, err error) { + ctx, span := trace.NewSpan(ctx, "cache-middleware-do") + defer span.End(&err) + + var ( + hitLocal bool + hitDistributed bool + ) + + // 1. 尝试从本地缓存获取 + result, hitLocal = d.getCacheFromLocal(key) + + span.Set("key", key) + span.Set("hit-local", hitLocal) + + if hitLocal { + crossReason = crossByL1CacheHit + return result, crossReason, err + } + + // 2. 尝试从分布式缓存获取 + result, hitDistributed, err = d.doDistributed(d.ctx, key, doQuery) + if err != nil { + return result, crossReason, err + } + + if hitDistributed { + crossReason = crossByL2CacheHit + } + + return result, crossReason, err +} + +type CachedResponse struct { + CacheKey string `json:"cache_key"` + StatusCode int `json:"status_code"` + Headers map[string][]string `json:"headers"` + Body []byte `json:"body"` +} + +type Payload struct { + req interface{} + spaceID string + path string +} + +type responseWriter struct { + gin.ResponseWriter + buffer *bytes.Buffer +} + +func (w *responseWriter) Write(data []byte) (int, error) { + w.buffer.Write(data) + return w.ResponseWriter.Write(data) +} + +func skipPath(path string, skipPaths []string) bool { + for _, p := range skipPaths { + if path == p { + return true + } + + if strings.Contains(p, "*") { + if matchWildcard(path, p) { + return true + } + } + } + return false +} + +func matchWildcard(path, pattern string) bool { + regexPattern := "^" + strings.ReplaceAll(regexp.QuoteMeta(pattern), "\\*", ".*") + "$" + matched, _ := regexp.MatchString(regexPattern, path) + return matched +} + +func (d *Service) isSkipPath(path string) bool { + return skipPath(path, d.conf.skipPaths) +} + +func (d *Service) isSkipMethod(method string) bool { + for _, skipMethod := range d.conf.skipMethods { + if method == skipMethod { + return true + } + } + return false +} + +const ( + crossByNotEnabled = "cache-cross-by-not-enabled" + crossBySkipPath = "cache-cross-by-skip-path" + crossBySkipMethod = "cache-cross-by-skip-method" + crossByL1CacheHit = "cache-cross-by-cache-l1-hit" + crossByL2CacheHit = "cache-cross-by-cache-l2-hit" + crossByClientError = "cache-cross-by-client-error" + crossByServerError = "cache-cross-by-server-error" + crossBySuccess = "cache-cross-by-success" +) + +// CacheMiddleware 返回缓存中间件 +func (d *Service) CacheMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + var ( + result interface{} + err error + crossReason string + ) + + ctx, span := trace.NewSpan(c.Request.Context(), "cache-middleware") + defer span.End(&err) + + defer func() { + span.Set("cache-middleware-cross-reason", crossReason) + }() + + span.Set("cache-enabled", d.conf.cacheEnabled) + span.Set("cache-skip-path", d.conf.skipPaths) + span.Set("cache-skip-methods", d.conf.skipMethods) + + if !d.conf.cacheEnabled { + crossReason = crossByNotEnabled + c.Next() + return + } + + if d.isSkipPath(c.Request.URL.Path) { + crossReason = crossBySkipPath + c.Next() + return + } + + if d.isSkipMethod(c.Request.Method) { + crossReason = crossBySkipMethod + c.Next() + return + } + + doQuery := func(key string, c *gin.Context) (result interface{}, crossReason string, err error) { + writer := &responseWriter{ + ResponseWriter: c.Writer, + buffer: bytes.NewBuffer(nil), + } + originWriter := c.Writer + c.Writer = writer + c.Next() + c.Writer = originWriter + isSuccess := c.Writer.Status() >= 200 && c.Writer.Status() < 300 + if isSuccess { + result = CachedResponse{ + CacheKey: key, + StatusCode: c.Writer.Status(), + Headers: c.Writer.Header(), + Body: writer.buffer.Bytes(), + } + crossReason = crossBySuccess + return result, crossReason, err + } else { + if c.Writer.Status() >= 400 && c.Writer.Status() < 500 { + crossReason = crossByClientError + } + if c.Writer.Status() >= 500 { + crossReason = crossByServerError + } + + return result, crossReason, err + } + } + + cacheKey, err := generateCacheKey(c) + if err != nil { + log.Warnf(ctx, "failed to generate cache key: %v", err) + c.AbortWithError(400, fmt.Errorf("failed to generate cache key: %v", err)) + return + } + + span.Set("cache-key", cacheKey) + + result, crossReason, err = d.do(c.Request.Context(), cacheKey, func() (interface{}, string, error) { + return doQuery(cacheKey, c) + }) + if err != nil { + c.Next() + return + } + + if cachedResp, ok := result.(*CachedResponse); ok { + d.serveCachedResponse(c, cachedResp) + return + } + + c.Next() + } +} + +func (d *Service) serveCachedResponse(c *gin.Context, cachedResp *CachedResponse) { + for key, values := range cachedResp.Headers { + for _, value := range values { + c.Header(key, value) + } + } + + c.Status(cachedResp.StatusCode) + c.Writer.Write(cachedResp.Body) +} diff --git a/pkg/unify-query/service/http/middleware/cache/cache_test.go b/pkg/unify-query/service/http/middleware/cache/cache_test.go new file mode 100644 index 000000000..49cd18fe3 --- /dev/null +++ b/pkg/unify-query/service/http/middleware/cache/cache_test.go @@ -0,0 +1,52 @@ +package cache + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestService_isSkipPath(t *testing.T) { + type shouldPathTestStruct struct { + name string + skipPaths []string + input string + shouldPass bool + } + tests := []shouldPathTestStruct{ + { + name: "should pass abs equal", + skipPaths: []string{"/a/b"}, + input: "/a/b", + shouldPass: true, + }, + { + name: "should pass likely equal", + skipPaths: []string{"/a/b*"}, + input: "/a/b/c/d", + shouldPass: true, + }, + { + name: "should not pass different", + skipPaths: []string{"/a/b"}, + input: "/a/b/c", + shouldPass: false, + }, + { + name: "should pass root wildcard", + skipPaths: []string{"/*"}, + input: "/any/path/here", + shouldPass: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal( + t, + skipPath(tt.input, tt.skipPaths), + tt.shouldPass, + ) + }) + } +} diff --git a/pkg/unify-query/service/http/middleware/cache/keys.go b/pkg/unify-query/service/http/middleware/cache/keys.go new file mode 100644 index 000000000..55e07512a --- /dev/null +++ b/pkg/unify-query/service/http/middleware/cache/keys.go @@ -0,0 +1,95 @@ +package cache + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata" + "github.com/gin-gonic/gin" + "github.com/spaolacci/murmur3" +) + +const ( + // dataKeyType 缓存数据键类型,用于存储实际的数据内容 + // 格式: dsg:data:{cache_key} + dataKeyType = "data_key" + + // indexKeyType LRU 索引键类型,用于维护缓存项的时间戳排序 + // 格式: dsg:sys:index (全局唯一) + indexKeyType = "index_key" + + // limitKeyType 缓存限制配置键类型,用于动态设置缓存容量限制 + // 格式: dsg:conf:limit (全局唯一) + limitKeyType = "limit_key" + + // lockKeyType 分布式锁键类型 + // 格式: dsg:lock:{cache_key} + lockKeyType = "lock_key" + + // channelKeyType 通知频道键类型 + // 格式: dsg:chan:{cache_key} + channelKeyType = "channel_key" +) + +var ( + trans = map[string]string{ + dataKeyType: `dsg:data:%s`, + indexKeyType: `dsg:sys:index`, + limitKeyType: `dsg:conf:limit`, + lockKeyType: `dsg:lock:%s`, + channelKeyType: `dsg:chan:%s`, + } +) + +func generateCacheKey(c *gin.Context) (string, error) { + ctx := c.Request.Context() + user := metadata.GetUser(ctx) + payload := Payload{ + req: c.Request, + spaceID: user.SpaceUID, + path: c.Request.URL.Path, + } + + pStr, err := json.Marshal(payload) + return hash(pStr), err +} + +func hash(key []byte) string { + hasher := murmur3.New128() + result := hasher.Sum(key) + return string(result) +} + +func cacheKeyMap(key string, subject string) string { + if format, ok := trans[key]; ok { + return fmt.Sprintf(format, subject) + } + return "" +} + +func subscribeAll() string { + channelFormat := trans[channelKeyType] + return fmt.Sprintf(channelFormat, "*") +} + +func extractKeyFromChannel(channel string) string { + // 1. 使用现有的channelKey格式获取前缀 + channelPrefix := cacheKeyMap(channelKeyType, "") + if channelPrefix == "" { + return "" + } + + // 2. 检查频道前缀 + if !strings.HasPrefix(channel, channelPrefix) { + return "" + } + + // 3. 提取key部分 + key := strings.TrimPrefix(channel, channelPrefix) + if key == "" { + return "" + } + + return key +} diff --git a/pkg/unify-query/service/http/middleware/cache/lua_scripts.go b/pkg/unify-query/service/http/middleware/cache/lua_scripts.go new file mode 100644 index 000000000..41afb56a3 --- /dev/null +++ b/pkg/unify-query/service/http/middleware/cache/lua_scripts.go @@ -0,0 +1,71 @@ +package cache + +import ( + "context" + "time" + + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/redis" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/trace" + goRedis "github.com/go-redis/redis/v8" +) + +const ( + cacheWriteWithLimitScript = ` +-- 变量定义 +local data_key = KEYS[1] +local index_key = KEYS[2] +local limit_config_key = KEYS[3] +local value = ARGV[1] +local ttl = tonumber(ARGV[2]) +local timestamp = tonumber(ARGV[3]) +local default_limit = tonumber(ARGV[4]) + +-- 1. 动态获取容量限制 +local limit = tonumber(redis.call('GET', limit_config_key)) or default_limit + +-- 2. 维护 LRU 索引 +redis.call('ZADD', index_key, timestamp, data_key) +redis.call('SET', data_key, value, 'EX', ttl) + +-- 3. 水位检查和清理 +local count = redis.call('ZCARD', index_key) +if count > limit then + local eviction_count = count - limit + local candidates = redis.call('ZRANGE', index_key, 0, eviction_count - 1) + + if #candidates > 0 then + redis.call('DEL', unpack(candidates)) + redis.call('ZREM', index_key, unpack(candidates)) + end +end + +return 1 +` +) + +func (d *Service) writeLimitedDistributedCache(ctx context.Context, dataKey string, data []byte) (err error) { + ctx, span := trace.NewSpan(ctx, "write-limited-distributed-cache") + defer span.End(&err) + + // 1. 生成正确的Redis键名 + redisDataKey := cacheKeyMap(dataKeyType, dataKey) + indexKey := cacheKeyMap(indexKeyType, "") + limitConfigKey := cacheKeyMap(limitKeyType, "") + timestamp := time.Now().UnixNano() + + span.Set("data-key", redisDataKey) + span.Set("index-key", indexKey) + span.Set("limit-config-key", limitConfigKey) + span.Set("timestamp", timestamp) + + script := goRedis.NewScript(cacheWriteWithLimitScript) + // 2. 修复参数列表,使用正确的键名 + _, err = redis.ExecLua(ctx, script, []string{redisDataKey, indexKey, limitConfigKey}, + string(data), int(d.conf.payloadTTL.Seconds()), timestamp, d.conf.bucketLimit) + + if err != nil { + return + } + + return +} diff --git a/pkg/unify-query/service/http/middleware/cache/sub_notify.go b/pkg/unify-query/service/http/middleware/cache/sub_notify.go new file mode 100644 index 000000000..7383ad486 --- /dev/null +++ b/pkg/unify-query/service/http/middleware/cache/sub_notify.go @@ -0,0 +1,149 @@ +package cache + +import ( + "context" + "fmt" + "time" + + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/redis" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/trace" +) + +func (d *Service) subLoop(ctx context.Context) { + var ( + err error + ) + ctx, span := trace.NewSpan(ctx, "cache-middleware-sub-loop") + defer span.End(&err) + + channelName := subscribeAll() + + span.Set("subscribe-channel", channelName) + + // 1. 监听channel + msgCh, closeFn := redis.Subscribe(ctx, channelName) + if msgCh == nil { + log.Errorf(ctx, "failed to subscribe to pattern %s", channelName) + return + } + defer func() { + // 2. 确保连接正确关闭 + if closeFn != nil { + closeFn() + } + }() + + // 3. 处理所有接收到的消息 + for { + select { + case <-ctx.Done(): + log.Infof(ctx, "global subscription loop received context cancel signal") + return + case msg := <-msgCh: + if msg != nil { + // 3.1 从Redis频道名称中提取缓存key(使用keys.go中的通用函数) + key := extractKeyFromChannel(msg.Channel) + span.Set("key", key) + if key != "" { + // 3.2 广播给本地等待者 + d.broadcastLocal(ctx, key) + } + } + } + } +} + +func (d *Service) broadcastLocal(ctx context.Context, key string) { + var ( + err error + ) + ctx, span := trace.NewSpan(ctx, "cache-middleware-broadcast-local") + defer span.End(&err) + + d.waiterLock.Lock() + defer d.waiterLock.Unlock() + // 1. 从本地waiters中查找等待者 + wg, existWaiter := d.waiterMap[key] + + span.Set("key", key) + span.Set("waiter-exist", existWaiter) + span.Set("waiter-exist-count", len(wg.channels)) + + if existWaiter { + // 2. 从map中删除,防止新的等待者加入 + delete(d.waiterMap, key) + + // 3. 原子性地获取所有channels并标记为已广播 + wg.mu.Lock() + channels := wg.channels + wg.channels = make([]chan struct{}, 0) + wg.mu.Unlock() + + // 4. 关闭所有channel,唤醒等待的goroutine + for _, ch := range channels { + close(ch) + } + + } +} + +func (d *Service) waitForNotify(ctx context.Context, key string) (err error) { + ctx, span := trace.NewSpan(ctx, "cache-middleware-wait-notify") + defer span.End(&err) + + timeoutCh := time.After(d.conf.executeTTL) + + span.Set("time-out-duration", d.conf.executeTTL.String()) + + select { + // case:1 等待直到收到 channel 的关闭通知 + case <-d.waitLoop(ctx, key): + return nil + // case:2 超时处理 + case <-timeoutCh: + return fmt.Errorf("timeout waiting for cache notification: %s", key) + // case:3 上下文取消处理 + case <-ctx.Done(): + return ctx.Err() + } +} + +func (d *Service) waitLoop(ctx context.Context, key string) <-chan struct{} { + var ( + err error + ) + + ctx, span := trace.NewSpan(ctx, "cache-middleware-wait-loop") + defer span.End(&err) + + span.Set("key", key) + + d.waiterLock.Lock() + defer d.waiterLock.Unlock() + + ch := make(chan struct{}) + + var wg *WaitGroupValue + rev, existWaiter := d.waiterMap[key] + + if !existWaiter || rev == nil { + wg = &WaitGroupValue{ + channels: []chan struct{}{ch}, + } + d.waiterMap[key] = wg + } else { + wg = rev + wg.channels = append(wg.channels, ch) + existWaiter = true + } + + span.Set("exist-waiter", existWaiter) + var channelsCount int + if rev != nil { + channelsCount = len(rev.channels) + } + span.Set("exist-waiter-count", channelsCount) + + return ch +} diff --git a/pkg/unify-query/service/http/middleware/cache/sub_notify_test.go b/pkg/unify-query/service/http/middleware/cache/sub_notify_test.go new file mode 100644 index 000000000..7dd2881f0 --- /dev/null +++ b/pkg/unify-query/service/http/middleware/cache/sub_notify_test.go @@ -0,0 +1,104 @@ +package cache + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNotifyWatcher_Awakening(t *testing.T) { + const ( + singleKeyWaiters = 10 // 同一个 key 的等待者数量 + differentKeys = 3 // 不同 key 的数量 + waitersPerKey = 5 // 每个 key 的等待者数量 + ) + + t.Run("SingleKeySubscription", func(t *testing.T) { + sidecar := &Service{ + ctx: context.Background(), + waiterMap: make(map[string]*WaitGroupValue), + waiterLock: sync.RWMutex{}, + } + + testKey := "single_test_key" + var wg sync.WaitGroup + results := make([]bool, singleKeyWaiters) + + // 假设有 singleKeyWaiters 个协程在等待同一个 key + for i := 0; i < singleKeyWaiters; i++ { + wg.Add(1) + go func(index int) { + defer wg.Done() + // 都在等待同一个 key + ch := sidecar.waitLoop(t.Context(), testKey) + + select { + case <-ch: + results[index] = true // 成功被唤醒 + case <-time.After(1 * time.Second): + results[index] = false // 超时未被唤醒 + } + }(i) + } + + // 确保所有 waiter 都已启动并在等待 + time.Sleep(100 * time.Millisecond) + + sidecar.broadcastLocal(t.Context(), testKey) + wg.Wait() + + // 验证:所有 10 个协程都被唤醒(无死锁) + for i, result := range results { + assert.True(t, result, "协程 %d 应该被唤醒", i) + } + }) + + t.Run("MultipleKeysAwakening", func(t *testing.T) { + sidecar := &Service{ + ctx: context.Background(), + waiterMap: make(map[string]*WaitGroupValue), + waiterLock: sync.RWMutex{}, + } + + testKeys := []string{"key1", "key2", "key3"} + var wg sync.WaitGroup + successCount := 0 + var successMu sync.Mutex + + // waiter + for _, key := range testKeys { + for i := 0; i < waitersPerKey; i++ { + wg.Add(1) + go func(waitKey string) { + defer wg.Done() + ch := sidecar.waitLoop(t.Context(), waitKey) + + select { + case <-ch: + successMu.Lock() + successCount++ + successMu.Unlock() + case <-time.After(1 * time.Second): + // 超时,未唤醒 + } + }(key) + } + } + + // 确保所有 waiter 都已启动并在等待 + time.Sleep(100 * time.Millisecond) + + // 模拟收到 Notify 信号,开始唤醒本地的waiter + for _, key := range testKeys { + sidecar.broadcastLocal(t.Context(), key) + } + + wg.Wait() + + // 验证:所有等待者都被唤醒 + assert.Equal(t, differentKeys*waitersPerKey, successCount, "所有等待者都应该被唤醒") + }) +} diff --git a/pkg/unify-query/service/http/middleware/cache/zset_test.go b/pkg/unify-query/service/http/middleware/cache/zset_test.go new file mode 100644 index 000000000..202c623f1 --- /dev/null +++ b/pkg/unify-query/service/http/middleware/cache/zset_test.go @@ -0,0 +1,90 @@ +package cache + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + goRedis "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/config" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/redis" +) + +// testSetup 设置测试环境 +func testSetup(t *testing.T, limit int) (*Service, *miniredis.Miniredis) { + config.InitConfig() + + s, err := miniredis.Run() + require.NoError(t, err) + t.Cleanup(s.Close) + + ctx := context.Background() + err = redis.SetInstance(ctx, "test", &goRedis.UniversalOptions{ + Addrs: []string{s.Addr()}, + }) + require.NoError(t, err) + + client := redis.Client() + limitKey := cacheKeyMap(limitKeyType, "") + _, err = client.Set(ctx, limitKey, fmt.Sprintf("%d", limit), 0).Result() + require.NoError(t, err) + svc, err := NewInstance(ctx) + require.NoError(t, err) + + return svc, s +} + +// TestZSet_LRU_Eviction 验证 ZSet 的 LRU 淘汰机制 +func TestZSet_LRU_Eviction(t *testing.T) { + const ( + limit = 3 + totalItems = 5 + sleepTime = 1 * time.Millisecond + ) + + svc, _ := testSetup(t, limit) + ctx := context.Background() + client := redis.Client() + + // 写入超过限制的数据项 + dataKeys := make([]string, totalItems) + for i := 0; i < totalItems; i++ { + dataKey := fmt.Sprintf("test_key_%d", i) + testData := fmt.Sprintf(`{"value": "data_%d"}`, i) + + err := svc.writeLimitedDistributedCache(ctx, dataKey, []byte(testData)) + require.NoError(t, err) + dataKeys[i] = dataKey + time.Sleep(sleepTime) // 确保时间戳不同 + } + + // 1. 验证超出限制后 ZSet 大小维持不变 + indexKey := cacheKeyMap(indexKeyType, "") + // ZCard 可以根据传递进来的indexKey参数获取对应ZSet的大小 + zsetSize, err := client.ZCard(ctx, indexKey).Result() + require.NoError(t, err) + assert.Equal(t, int64(limit), zsetSize, "ZSet 大小应该维持在限制值") + + // 2. 验证最早的数据被直接淘汰 + // 前 totalItems - limit 个数据应该被淘汰 + evictedCount := totalItems - limit + for i := 0; i < evictedCount; i++ { + redisDataKey := cacheKeyMap(dataKeyType, dataKeys[i]) + _, err := redis.Get(ctx, redisDataKey) + assert.Equal(t, goRedis.Nil, err, "最早的数据应该被淘汰") + } + + // 3. 验证最后保留的是最新的数据 + // 从 evictedCount 到 totalItems - 1 的数据应该存在 + for i := evictedCount; i < totalItems; i++ { + redisDataKey := cacheKeyMap(dataKeyType, dataKeys[i]) + data, err := redis.Get(ctx, redisDataKey) + require.NoError(t, err) + assert.Contains(t, data, fmt.Sprintf("data_%d", i), "最新数据应该保留") + } +} diff --git a/pkg/unify-query/service/http/service.go b/pkg/unify-query/service/http/service.go index 7f7da7b76..904dd9668 100644 --- a/pkg/unify-query/service/http/service.go +++ b/pkg/unify-query/service/http/service.go @@ -17,6 +17,7 @@ import ( "strings" "sync" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/service/http/middleware/cache" "github.com/gin-gonic/gin" "go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin" @@ -70,6 +71,10 @@ func (s *Service) Reload(ctx context.Context) { gin.SetMode(gin.ReleaseMode) s.g = gin.New() + c, err := cache.NewInstance(ctx) + if err != nil { + log.Panicf(ctx, "failed to create cache instance for->[%s]", err) + } public := s.g.Group("/") // 注册默认路由 @@ -81,6 +86,7 @@ func (s *Service) Reload(ctx context.Context) { SlowQueryThreshold: SlowQueryThreshold, }), middleware.JwtAuthMiddleware(JwtEnabled, JwtPublicKey, JwtBkAppCodeSpaces), + c.CacheMiddleware(), ) publicRegisterHandler := endpoint.NewRegisterHandler(ctx, public) diff --git a/pkg/unify-query/service/http/settings.go b/pkg/unify-query/service/http/settings.go index e8e2037bf..359507605 100644 --- a/pkg/unify-query/service/http/settings.go +++ b/pkg/unify-query/service/http/settings.go @@ -72,6 +72,12 @@ const ( ScrollSliceLimitConfigPath = "scroll.slice_limit" ScrollSessionLockTimeoutConfigPath = "scroll.session_lock_timeout" ScrollWindowTimeoutConfigPath = "scroll.window_timeout" + + // 查询缓存配置 + QueryCacheEnabledConfigPath = "http.query_cache.enabled" + QueryCacheSkipPathsConfigPath = "http.query_cache.skip_paths" + QueryCacheSkipMethodsConfigPath = "http.query_cache.skip_methods" + QueryCacheDefaultLimitConfigPath = "http.query_cache.default_limit" // 集群指标查询配置 ClusterMetricQueryPrefixConfigPath = "http.cluster_metric.prefix" ClusterMetricQueryTimeoutConfigPath = "http.cluster_metric.timeout" @@ -107,4 +113,6 @@ var ( ScrollWindowTimeout string ScrollSessionLockTimeout string ScrollSliceLimit int + + QueryCacheEnabled bool )