diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index c60e526185..48a75fa92f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [1.23.2] + go-version: [1.23.4] steps: - name: Install Go uses: actions/setup-go@v5 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f39cb6d4e7..733f65c48c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,7 +23,7 @@ jobs: if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository strategy: matrix: - go-version: [1.23.2] + go-version: [1.23.4] steps: - name: Install Go uses: actions/setup-go@v5 diff --git a/Dockerfile b/Dockerfile index 1942fe053f..5d972522f5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,20 @@ -FROM alpine:3.18 +FROM alpine:3.21 -RUN apk --no-cache upgrade && apk --no-cache add ca-certificates +ARG USER=centrifugo +ARG UID=1000 +ARG GID=1000 -COPY centrifugo /usr/local/bin/centrifugo +RUN addgroup -S -g $GID $USER && \ + adduser -S -G $USER -u $UID $USER + +RUN apk --no-cache upgrade && \ + apk --no-cache add ca-certificates && \ + update-ca-certificates + +USER $USER WORKDIR /centrifugo +COPY centrifugo /usr/local/bin/centrifugo + CMD ["centrifugo"] diff --git a/go.mod b/go.mod index 21548f331f..8011df35ab 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.0 require ( github.com/FZambia/eagle v0.1.0 github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b - github.com/centrifugal/centrifuge v0.33.5-0.20241111162802-ddd7cc1e7267 + github.com/centrifugal/centrifuge v0.33.5-0.20241222090232-3a3fceb66a3d github.com/centrifugal/protocol v0.13.5-0.20241111155425-6c360178091e github.com/cristalhq/jwt/v5 v5.4.0 github.com/go-viper/mapstructure/v2 v2.2.1 @@ -17,34 +17,34 @@ require ( github.com/jackc/pgx/v5 v5.7.1 github.com/justinas/alice v1.2.0 github.com/mattn/go-isatty v0.0.20 - github.com/nats-io/nats.go v1.37.0 + github.com/nats-io/nats.go v1.38.0 github.com/pelletier/go-toml/v2 v2.2.3 github.com/prometheus/client_golang v1.20.5 - github.com/quic-go/quic-go v0.48.1 + github.com/quic-go/quic-go v0.48.2 github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 github.com/rakutentech/jwk-go v1.1.3 github.com/rs/zerolog v1.33.0 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.20.0-alpha.6.0.20240903103719-273543ce8237 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/twmb/franz-go v1.18.0 github.com/twmb/franz-go/pkg/kadm v1.14.0 github.com/twmb/franz-go/pkg/kmsg v1.9.0 github.com/valyala/fasttemplate v1.2.2 - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 - go.opentelemetry.io/otel v1.31.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0 - go.opentelemetry.io/otel/sdk v1.31.0 - go.opentelemetry.io/otel/trace v1.31.0 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 + go.opentelemetry.io/otel v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 + go.opentelemetry.io/otel/sdk v1.32.0 + go.opentelemetry.io/otel/trace v1.32.0 go.uber.org/automaxprocs v1.6.0 - golang.org/x/crypto v0.28.0 - golang.org/x/sync v0.8.0 - golang.org/x/time v0.7.0 - google.golang.org/grpc v1.67.1 - google.golang.org/protobuf v1.35.1 + golang.org/x/crypto v0.31.0 + golang.org/x/sync v0.10.0 + golang.org/x/time v0.8.0 + google.golang.org/grpc v1.69.2 + google.golang.org/protobuf v1.35.2 ) require ( @@ -53,7 +53,7 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/maypok86/otter v1.2.3 // indirect + github.com/maypok86/otter v1.2.4 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect @@ -80,13 +80,13 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nkeys v0.4.9 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/ginkgo/v2 v2.12.1 // indirect @@ -95,23 +95,23 @@ require ( github.com/prometheus/common v0.60.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/quic-go/qpack v0.5.1 // indirect - github.com/redis/rueidis v1.0.48 // indirect + github.com/redis/rueidis v1.0.51 // indirect github.com/segmentio/asm v1.2.0 // indirect - github.com/segmentio/encoding v0.4.0 // indirect + github.com/segmentio/encoding v0.4.1 // indirect github.com/spf13/cast v1.7.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 - go.opentelemetry.io/otel/metric v1.31.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 + go.opentelemetry.io/otel/metric v1.32.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.19.0 // indirect golang.org/x/net v0.30.0 // indirect - golang.org/x/sys v0.26.0 // indirect - golang.org/x/text v0.19.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/tools v0.23.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index afe0f5fc3f..2c391e0baf 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/centrifugal/centrifuge v0.33.5-0.20241111162802-ddd7cc1e7267 h1:qCDl370NqiN1XsVnpaHnnH3ixLM0oNBujHIphoI/4/Q= -github.com/centrifugal/centrifuge v0.33.5-0.20241111162802-ddd7cc1e7267/go.mod h1:enLQkNNo05bv/a2fKWHS2IyhrE91TQJghpygTKtqfmM= +github.com/centrifugal/centrifuge v0.33.5-0.20241222090232-3a3fceb66a3d h1:xr1lVKyxwcsfXtGBWLuiF0N7bRR5XuS3fN/lVuzImss= +github.com/centrifugal/centrifuge v0.33.5-0.20241222090232-3a3fceb66a3d/go.mod h1:hKy1+IjduZJge3EDS3NSSZdTpWd4qhz+AlANNfyv/jE= github.com/centrifugal/protocol v0.13.5-0.20241111155425-6c360178091e h1:+GbuEwJybDuHz6e8S17t/f0I4aTDnZjk37c0aGNFbwc= github.com/centrifugal/protocol v0.13.5-0.20241111155425-6c360178091e/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -54,6 +54,8 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -69,8 +71,8 @@ github.com/gorilla/securecookie v1.1.2 h1:YCIWL56dvtr73r6715mJs5ZvhtnY73hBvEF8kX github.com/gorilla/securecookie v1.1.2/go.mod h1:NfCASbcHqRSY+3a8tlWJwsQap2VX5pwzwo4h3eOamfo= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN9coASF1GusYX6AlU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0= github.com/hashicorp/go-envparse v0.1.0 h1:bE++6bhIsNCPLvgDZkYqo3nA+/PFI51pkrHdmPSDFPY= github.com/hashicorp/go-envparse v0.1.0/go.mod h1:OHheN1GoygLlAkTlXLXvAdnXdZxy8JUweQ1rAXx1xnc= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -104,14 +106,14 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/maypok86/otter v1.2.3 h1:jxyPD4ofCwtrQM5is5JNrdAs+6+JQkf/PREZd7JCVgg= -github.com/maypok86/otter v1.2.3/go.mod h1:mKLfoI7v1HOmQMwFgX4QkRk23mX6ge3RDvjdHOWG4R4= +github.com/maypok86/otter v1.2.4 h1:HhW1Pq6VdJkmWwcZZq19BlEQkHtI8xgsQzBVXJU0nfc= +github.com/maypok86/otter v1.2.4/go.mod h1:mKLfoI7v1HOmQMwFgX4QkRk23mX6ge3RDvjdHOWG4R4= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= -github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= -github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= -github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= +github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= +github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= +github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -151,14 +153,14 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= -github.com/quic-go/quic-go v0.48.1 h1:y/8xmfWI9qmGTc+lBr4jKRUWLGSlSigv847ULJ4hYXA= -github.com/quic-go/quic-go v0.48.1/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs= +github.com/quic-go/quic-go v0.48.2 h1:wsKXZPeGWpMpCGSWqOcqpW2wZYic/8T3aqiOID0/KWE= +github.com/quic-go/quic-go v0.48.2/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs= github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 h1:4WFk6u3sOT6pLa1kQ50ZVdm8BQFgJNA117cepZxtLIg= github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66/go.mod h1:Vp72IJajgeOL6ddqrAhmp7IM9zbTcgkQxD/YdxrVwMw= github.com/rakutentech/jwk-go v1.1.3 h1:PiLwepKyUaW+QFG3ki78DIO2+b4IVK3nMhlxM70zrQ4= github.com/rakutentech/jwk-go v1.1.3/go.mod h1:LtzSv4/+Iti1nnNeVQiP6l5cI74GBStbhyXCYvgPZFk= -github.com/redis/rueidis v1.0.48 h1:ggZHjEtc/echUmPkGTfssRisnc3p/mIUEwrpbNsZ1mQ= -github.com/redis/rueidis v1.0.48/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM= +github.com/redis/rueidis v1.0.51 h1:NZ1KIncPIQtjrp+GDLynrLKBiPU106EN5cJHOFSqvDM= +github.com/redis/rueidis v1.0.51/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -169,8 +171,8 @@ github.com/sagikazarmark/locafero v0.6.0 h1:ON7AQg37yzcRPU69mt7gwhFEBwxI6P9T4Qu3 github.com/sagikazarmark/locafero v0.6.0/go.mod h1:77OmuIc6VTraTXKXIs/uvUxKGUXjE1GbemJYHqdNjX0= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= -github.com/segmentio/encoding v0.4.0 h1:MEBYvRqiUB2nfR2criEXWqwdY6HJOUrCn5hboVOVmy8= -github.com/segmentio/encoding v0.4.0/go.mod h1:/d03Cd8PoaDeceuhUUUQWjU0KhWjrmYrWPgtJHYZSnI= +github.com/segmentio/encoding v0.4.1 h1:KLGaLSW0jrmhB58Nn4+98spfvPvmo4Ci1P/WIQ9wn7w= +github.com/segmentio/encoding v0.4.1/go.mod h1:/d03Cd8PoaDeceuhUUUQWjU0KhWjrmYrWPgtJHYZSnI= github.com/shadowspore/fossil-delta v0.0.0-20241003175239-d3b7ce6bda62 h1:2OBZQ+j6pNlvhcUJqV5WtoyMEv32vpZyKE9+ta2Sr+Y= github.com/shadowspore/fossil-delta v0.0.0-20241003175239-d3b7ce6bda62/go.mod h1:TNFF98m0aYYsGHXAKrlG2T6Tr/hUV2MbNHPdmyA6PFg= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= @@ -190,8 +192,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= @@ -214,24 +216,26 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 h1:yMkBS9yViCc7U7yeLzJPM2XizlfdVvBRSmsQDWu6qc0= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0/go.mod h1:n8MR6/liuGB5EmTETUBeU5ZgqMOlqKRxUaqPQBOANZ8= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= -go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= -go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 h1:K0XaT3DwHAcV4nKLzcQvwAgSyisUghWoY20I7huthMk= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0/go.mod h1:B5Ki776z/MBnVha1Nzwp5arlzBbE3+1jk+pGmaP5HME= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 h1:FFeLy03iVTXP6ffeN2iXrxfGsZGCjVx0/4KlizjyBwU= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0/go.mod h1:TMu73/k1CP8nBUpDLc71Wj/Kf7ZS9FK5b53VapRsP9o= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0 h1:lUsI2TYsQw2r1IASwoROaCnjdj2cvC2+Jbxvk6nHnWU= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0/go.mod h1:2HpZxxQurfGxJlJDblybejHB6RX6pmExPNe517hREw4= -go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= -go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= -go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= -go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= -go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= -go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0 h1:qtFISDHKolvIxzSs0gIaiPUPR0Cucb0F2coHC7ZLdps= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0/go.mod h1:Y+Pop1Q6hCOnETWTW4NROK/q1hv50hM7yDaUTjG8lp8= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 h1:DheMAlT6POBP+gh8RUH19EOTnQIor5QE0uSRPtzCpSw= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0/go.mod h1:wZcGmeVO9nzP67aYSLDqXNWK87EZWhi7JWj1v7ZXf94= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 h1:IJFEoHiytixx8cMiVAO+GmHR6Frwu+u5Ur8njpFO6Ac= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0/go.mod h1:3rHrKNtLIoS0oZwkY2vxi+oJcwFRWdtUyRII+so45p8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 h1:9kV11HXBHZAvuPUZxmMWrH8hZn/6UnHX4K0mu36vNsU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0/go.mod h1:JyA0FHXe22E1NeNiHmVp7kFHglnexDQ7uRWDiiJ1hKQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 h1:cMyu9O88joYEaI47CnQkxO1XZdpoTF9fEnW2duIddhw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0/go.mod h1:6Am3rn7P9TVVeXYG+wtcGE7IE1tsQ+bP3AuWcKt/gOI= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= +go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= @@ -248,8 +252,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= -golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -265,8 +269,8 @@ golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -279,14 +283,14 @@ golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= -golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -296,20 +300,20 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 h1:T6rh4haD3GVYsgEfWExoCZA2o2FmbNyKpTuAxbEFPTg= -google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:wp2WsuBYj6j8wUdo3ToZsdxxixbvQNAHqVJrTgi5E5M= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 h1:QCqS/PdaHTSWGvupk2F/ehwHtGc0/GYkT+3GAcR1CCc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= -google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= -google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1:M0KvPgPmDZHPlbRbaNU1APr28TvwvvdUPlSv7PUvy8g= +google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= +google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/internal/app/node.go b/internal/app/node.go index c036c35673..baaf990eb3 100644 --- a/internal/app/node.go +++ b/internal/app/node.go @@ -15,7 +15,9 @@ func centrifugeNodeConfig(version string, cfgContainer *config.Container) centri appCfg := cfgContainer.Config() cfg := centrifuge.Config{} cfg.Version = version - cfg.MetricsNamespace = "centrifugo" + cfg.Metrics = centrifuge.MetricsConfig{ + MetricsNamespace: "centrifugo", + } cfg.Name = nodeName(appCfg) cfg.ChannelMaxLength = appCfg.Channel.MaxLength cfg.ClientPresenceUpdateInterval = appCfg.Client.PresenceUpdateInterval.ToDuration() diff --git a/internal/configtypes/types.go b/internal/configtypes/types.go index 76106304aa..16eb84ca93 100644 --- a/internal/configtypes/types.go +++ b/internal/configtypes/types.go @@ -620,6 +620,10 @@ type KafkaConsumerConfig struct { // Set to -1 to use non-buffered channel. PartitionBufferSize int `mapstructure:"partition_buffer_size" json:"partition_buffer_size" envconfig:"partition_buffer_size" default:"16" yaml:"partition_buffer_size" toml:"partition_buffer_size"` + // FetchMaxBytes is the maximum number of bytes to fetch from Kafka in a single request. + // If not set the default 50MB is used. + FetchMaxBytes int32 `mapstructure:"fetch_max_bytes" json:"fetch_max_bytes" envconfig:"fetch_max_bytes" yaml:"fetch_max_bytes" toml:"fetch_max_bytes"` + // PublicationDataMode is a configuration for the mode where message payload already // contains data ready to publish into channels, instead of API command. PublicationDataMode KafkaPublicationDataModeConfig `mapstructure:"publication_data_mode" json:"publication_data_mode" envconfig:"publication_data_mode" yaml:"publication_data_mode" toml:"publication_data_mode"` diff --git a/internal/consuming/kafka.go b/internal/consuming/kafka.go index bd73f57086..adc8279114 100644 --- a/internal/consuming/kafka.go +++ b/internal/consuming/kafka.go @@ -25,21 +25,28 @@ import ( type KafkaConfig = configtypes.KafkaConsumerConfig +type testOnlyConfig struct { + topicPartitionBeforePauseCh chan topicPartition + topicPartitionPauseProceedCh chan struct{} + fetchTopicPartitionSubmittedCh chan kgo.FetchTopicPartition +} + type topicPartition struct { - t string - p int32 + topic string + partition int32 } type KafkaConsumer struct { - name string - client *kgo.Client - nodeID string - logger Logger - dispatcher Dispatcher - config KafkaConfig - consumers map[topicPartition]*partitionConsumer - doneCh chan struct{} - metrics *commonMetrics + name string + client *kgo.Client + nodeID string + logger Logger + dispatcher Dispatcher + config KafkaConfig + consumers map[topicPartition]*partitionConsumer + doneCh chan struct{} + metrics *commonMetrics + testOnlyConfig testOnlyConfig } // JSONRawOrString can decode payload from bytes and from JSON string. This gives @@ -90,6 +97,9 @@ func NewKafkaConsumer( if config.MaxPollRecords == 0 { config.MaxPollRecords = 100 } + if config.PartitionBufferSize < 0 { + return nil, errors.New("partition buffer size can't be negative") + } consumer := &KafkaConsumer{ name: name, nodeID: nodeID, @@ -127,6 +137,9 @@ func (c *KafkaConsumer) initClient() (*kgo.Client, error) { kgo.ClientID(kafkaClientID), kgo.InstanceID(c.getInstanceID()), } + if c.config.FetchMaxBytes > 0 { + opts = append(opts, kgo.FetchMaxBytes(c.config.FetchMaxBytes)) + } if c.config.TLS.Enabled { tlsConfig, err := c.config.TLS.ToGoTLSConfig("kafka:" + c.name) if err != nil { @@ -265,12 +278,19 @@ func (c *KafkaConsumer) pollUntilFatal(ctx context.Context) error { return fmt.Errorf("poll error: %w", errors.Join(errs...)) } + pausedTopicPartitions := map[topicPartition]struct{}{} fetches.EachPartition(func(p kgo.FetchTopicPartition) { if len(p.Records) == 0 { return } tp := topicPartition{p.Topic, p.Partition} + if _, paused := pausedTopicPartitions[tp]; paused { + // We have already paused this partition during this poll, so we should not + // process records from it anymore. We will resume partition processing with the + // correct offset soon, after we have space in recs buffer. + return + } // Since we are using BlockRebalanceOnPoll, we can be // sure this partition consumer exists: @@ -284,13 +304,43 @@ func (c *KafkaConsumer) pollUntilFatal(ctx context.Context) error { case <-c.consumers[tp].quit: return case c.consumers[tp].recs <- p: + if c.testOnlyConfig.fetchTopicPartitionSubmittedCh != nil { // Only set in tests. + c.testOnlyConfig.fetchTopicPartitionSubmittedCh <- p + } default: + if c.testOnlyConfig.topicPartitionBeforePauseCh != nil { // Only set in tests. + c.testOnlyConfig.topicPartitionBeforePauseCh <- tp + } + if c.testOnlyConfig.topicPartitionPauseProceedCh != nil { // Only set in tests. + <-c.testOnlyConfig.topicPartitionPauseProceedCh + } + partitionsToPause := map[string][]int32{p.Topic: {p.Partition}} // PauseFetchPartitions here to not poll partition until records are processed. // This allows parallel processing of records from different partitions, without // keeping records in memory and blocking rebalance. Resume will be called after // records are processed by c.consumers[tp]. c.client.PauseFetchPartitions(partitionsToPause) + defer func() { + // There is a chance that message processor resumed partition processing before + // we called PauseFetchPartitions above. Such a race was observed in a service + // under CPU throttling conditions. In that case Pause is called after Resume, + // and topic is never resumed after that. To avoid we check if the channel current + // len is less than cap, if len < cap => we can be sure that buffer has space now, + // so we can resume partition processing. If it is not, this means that the records + // are still not processed and resume will be called eventually after processing by + // partition consumer. See also TestKafkaConsumer_TestPauseAfterResumeRace test case. + if len(c.consumers[tp].recs) < cap(c.consumers[tp].recs) { + c.client.ResumeFetchPartitions(partitionsToPause) + } + }() + + pausedTopicPartitions[tp] = struct{}{} + // To poll next time since correct offset we need to set it manually to the offset of + // the first record in the batch. Otherwise, next poll will return the next record batch, + // and we will lose the current one. + epochOffset := kgo.EpochOffset{Epoch: -1, Offset: p.Records[0].Offset} + c.client.SetOffsets(map[string]map[int32]kgo.EpochOffset{p.Topic: {p.Partition: epochOffset}}) } }) c.client.AllowRebalance() @@ -325,29 +375,37 @@ func (c *KafkaConsumer) reInitClient(ctx context.Context) error { return nil } -const defaultPartitionBufferSize = 16 +const defaultPartitionBufferSize = 8 func (c *KafkaConsumer) assigned(ctx context.Context, cl *kgo.Client, assigned map[string][]int32) { bufferSize := c.config.PartitionBufferSize - if bufferSize == -1 { - bufferSize = 0 - } else if bufferSize == 0 { + if bufferSize == 0 { bufferSize = defaultPartitionBufferSize } for topic, partitions := range assigned { for _, partition := range partitions { + quitCh := make(chan struct{}) + partitionCtx, cancel := context.WithCancel(ctx) + go func() { + select { + case <-ctx.Done(): + cancel() + case <-quitCh: + cancel() + } + }() pc := &partitionConsumer{ - clientCtx: ctx, - dispatcher: c.dispatcher, - logger: c.logger, - cl: cl, - topic: topic, - partition: partition, - config: c.config, - name: c.name, - metrics: c.metrics, - - quit: make(chan struct{}), + partitionCtx: partitionCtx, + dispatcher: c.dispatcher, + logger: c.logger, + cl: cl, + topic: topic, + partition: partition, + config: c.config, + name: c.name, + metrics: c.metrics, + + quit: quitCh, done: make(chan struct{}), recs: make(chan kgo.FetchTopicPartition, bufferSize), } @@ -391,15 +449,15 @@ func (c *KafkaConsumer) killConsumers(lost map[string][]int32) { } type partitionConsumer struct { - clientCtx context.Context - dispatcher Dispatcher - logger Logger - cl *kgo.Client - topic string - partition int32 - config KafkaConfig - name string - metrics *commonMetrics + partitionCtx context.Context + dispatcher Dispatcher + logger Logger + cl *kgo.Client + topic string + partition int32 + config KafkaConfig + name string + metrics *commonMetrics quit chan struct{} done chan struct{} @@ -457,9 +515,7 @@ func (pc *partitionConsumer) processRecord(ctx context.Context, record *kgo.Reco func (pc *partitionConsumer) processRecords(records []*kgo.Record) { for _, record := range records { select { - case <-pc.clientCtx.Done(): - return - case <-pc.quit: + case <-pc.partitionCtx.Done(): return default: } @@ -467,7 +523,7 @@ func (pc *partitionConsumer) processRecords(records []*kgo.Record) { var backoffDuration time.Duration = 0 retries := 0 for { - err := pc.processRecord(pc.clientCtx, record) + err := pc.processRecord(pc.partitionCtx, record) if err == nil { if retries > 0 { pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "OK processing message after errors", map[string]any{"topic": record.Topic, "partition": record.Partition})) @@ -479,12 +535,10 @@ func (pc *partitionConsumer) processRecords(records []*kgo.Record) { retries++ backoffDuration = getNextBackoffDuration(backoffDuration, retries) pc.metrics.errorsTotal.WithLabelValues(pc.name).Inc() - pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing consumed message", map[string]any{"error": err.Error(), "nextAttemptIn": backoffDuration.String(), "topic": record.Topic, "partition": record.Partition})) + pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing consumed record", map[string]any{"error": err.Error(), "next_attempt_in": backoffDuration.String(), "topic": record.Topic, "partition": record.Partition})) select { case <-time.After(backoffDuration): - case <-pc.quit: - return - case <-pc.clientCtx.Done(): + case <-pc.partitionCtx.Done(): return } } @@ -500,13 +554,11 @@ func (pc *partitionConsumer) consume() { defer resumeConsuming() for { select { - case <-pc.clientCtx.Done(): - return - case <-pc.quit: + case <-pc.partitionCtx.Done(): return case p := <-pc.recs: pc.processRecords(p.Records) - // At this point we are ready to consume the next batch from partition, thus resume. + // After processing records, we can resume partition processing (if it was paused, no-op otherwise). resumeConsuming() } } diff --git a/internal/consuming/kafka_test.go b/internal/consuming/kafka_test.go index 708808e2ae..7396f33aa2 100644 --- a/internal/consuming/kafka_test.go +++ b/internal/consuming/kafka_test.go @@ -7,14 +7,15 @@ import ( "encoding/json" "errors" "fmt" + "github.com/centrifugal/centrifugo/v5/internal/configtypes" + "strconv" "strings" + "sync/atomic" "testing" "time" - "github.com/centrifugal/centrifugo/v5/internal/apiproto" - "github.com/centrifugal/centrifugo/v5/internal/configtypes" - "github.com/centrifugal/centrifuge" + "github.com/centrifugal/centrifugo/v5/internal/apiproto" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -74,8 +75,20 @@ func produceTestMessage(topic string, message []byte, headers []kgo.RecordHeader return nil } +func produceManyRecords(records ...*kgo.Record) error { + client, err := kgo.NewClient(kgo.SeedBrokers(testKafkaBrokerURL)) + if err != nil { + return fmt.Errorf("failed to create Kafka client: %w", err) + } + defer client.Close() + err = client.ProduceSync(context.Background(), records...).FirstErr() + if err != nil { + return fmt.Errorf("failed to produce message: %w", err) + } + return nil +} + func produceTestMessageToPartition(topic string, message []byte, partition int32) error { - // Create a new client. client, err := kgo.NewClient( kgo.SeedBrokers(testKafkaBrokerURL), kgo.RecordPartitioner(kgo.ManualPartitioner()), @@ -85,7 +98,6 @@ func produceTestMessageToPartition(topic string, message []byte, partition int32 } defer client.Close() - // Produce a message until we hit desired partition. res := client.ProduceSync(context.Background(), &kgo.Record{ Topic: topic, Partition: partition, Value: message}) if res.FirstErr() != nil { @@ -371,7 +383,7 @@ func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherTopic(t *testing.T) { // is stuck on it. We want to make sure that the consumer is not blocked and can still process // messages from other topic partitions. func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherPartition(t *testing.T) { - partitionBufferSizes := []int{-1, 0} + partitionBufferSizes := []int{0, 1} for _, partitionBufferSize := range partitionBufferSizes { t.Run(fmt.Sprintf("partition_buffer_size_%d", partitionBufferSize), func(t *testing.T) { @@ -441,6 +453,293 @@ func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherPartition(t *testing.T }) } } +func TestKafkaConsumer_PausePartitions(t *testing.T) { + t.Parallel() + testKafkaTopic := "consumer_test_" + uuid.New().String() + testPayload1 := []byte(`{"key":"value1"}`) + testPayload2 := []byte(`{"key":"value2"}`) + testPayload3 := []byte(`{"key":"value3"}`) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err := createTestTopic(ctx, testKafkaTopic, 1, 1) + require.NoError(t, err) + + event1Received := make(chan struct{}) + event2Received := make(chan struct{}) + event3Received := make(chan struct{}) + consumerClosed := make(chan struct{}) + doneCh := make(chan struct{}) + + fetchSubmittedCh := make(chan kgo.FetchTopicPartition) + beforePauseCh := make(chan topicPartition) + + config := KafkaConfig{ + Brokers: []string{testKafkaBrokerURL}, + Topics: []string{testKafkaTopic}, + ConsumerGroup: uuid.New().String(), + + PartitionBufferSize: 1, + } + + numCalls := 0 + + unblockCh := make(chan struct{}) + + testConfig := testOnlyConfig{ + fetchTopicPartitionSubmittedCh: fetchSubmittedCh, + topicPartitionBeforePauseCh: beforePauseCh, + } + + mockDispatcher := &MockDispatcher{ + onDispatch: func(ctx context.Context, method string, data []byte) error { + numCalls++ + if numCalls == 1 { + close(event1Received) + <-unblockCh + return nil + } else if numCalls == 2 { + close(event2Received) + return nil + } + close(event3Received) + return nil + }, + } + consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) + require.NoError(t, err) + + consumer.testOnlyConfig = testConfig + + go func() { + err = produceTestMessage(testKafkaTopic, testPayload1, nil) + require.NoError(t, err) + <-fetchSubmittedCh + <-event1Received + + err = produceTestMessage(testKafkaTopic, testPayload2, nil) + require.NoError(t, err) + <-fetchSubmittedCh + + // At this point message 1 is being processed and the next produced message must + // cause a partition pause. + err = produceTestMessage(testKafkaTopic, testPayload3, nil) + require.NoError(t, err) + <-beforePauseCh // Wait for triggering the partition pause. + + // Unblock the message processing. + close(unblockCh) + <-fetchSubmittedCh + }() + + go func() { + err := consumer.Run(ctx) + require.ErrorIs(t, err, context.Canceled) + close(consumerClosed) + }() + + waitCh(t, event1Received, 30*time.Second, "timeout waiting for event 1") + waitCh(t, event2Received, 30*time.Second, "timeout waiting for event 2") + waitCh(t, event3Received, 30*time.Second, "timeout waiting for event 3") + cancel() + waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed") + close(doneCh) +} + +func TestKafkaConsumer_WorksCorrectlyInLoadedTopic(t *testing.T) { + t.Skip() + t.Parallel() + + testCases := []struct { + numPartitions int32 + numMessages int + partitionBuffer int + }{ + //{numPartitions: 1, numMessages: 1000, partitionBuffer: 1} + {numPartitions: 10, numMessages: 10000, partitionBuffer: 1}, + } + + for _, tc := range testCases { + name := fmt.Sprintf("partitions=%d,messages=%d,buffer=%d", tc.numPartitions, tc.numMessages, tc.partitionBuffer) + t.Run(name, func(t *testing.T) { + testKafkaTopic := "consumer_test_" + uuid.New().String() + + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) + defer cancel() + + err := createTestTopic(ctx, testKafkaTopic, tc.numPartitions, 1) + require.NoError(t, err) + + consumerClosed := make(chan struct{}) + doneCh := make(chan struct{}) + + numMessages := tc.numMessages + messageCh := make(chan struct{}, numMessages) + + mockDispatcher := &MockDispatcher{ + onDispatch: func(ctx context.Context, method string, data []byte) error { + // Emulate delay due to some work. + time.Sleep(20 * time.Millisecond) + messageCh <- struct{}{} + return nil + }, + } + config := KafkaConfig{ + Brokers: []string{testKafkaBrokerURL}, + Topics: []string{testKafkaTopic}, + ConsumerGroup: uuid.New().String(), + PartitionBufferSize: tc.partitionBuffer, + } + consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) + require.NoError(t, err) + + var records []*kgo.Record + for i := 0; i < numMessages; i++ { + records = append(records, &kgo.Record{Topic: testKafkaTopic, Value: []byte(`{"hello": "` + strconv.Itoa(i) + `"}`)}) + if (i+1)%100 == 0 { + err = produceManyRecords(records...) + if err != nil { + t.Fatal(err) + } + records = nil + t.Logf("produced %d messages", i+1) + } + } + + t.Logf("all messages produced, 3, 2, 1, go!") + time.Sleep(time.Second) + + go func() { + err := consumer.Run(ctx) + require.ErrorIs(t, err, context.Canceled) + close(consumerClosed) + }() + + var numProcessed int64 + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + t.Logf("processed %d messages", atomic.LoadInt64(&numProcessed)) + } + } + }() + + for i := 0; i < numMessages; i++ { + <-messageCh + atomic.AddInt64(&numProcessed, 1) + } + t.Logf("all messages processed") + cancel() + waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed") + close(doneCh) + }) + } +} + +// TestKafkaConsumer_TestPauseAfterResumeRace tests a scenario where a partition was +// paused after it was resumed and partition never processed any messages after that. +func TestKafkaConsumer_TestPauseAfterResumeRace(t *testing.T) { + t.Parallel() + testKafkaTopic := "consumer_test_" + uuid.New().String() + testPayload1 := []byte(`{"input":"value1"}`) + testPayload2 := []byte(`{"input":"value2"}`) + testPayload3 := []byte(`{"input":"value3"}`) + + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) + defer cancel() + + err := createTestTopic(ctx, testKafkaTopic, 1, 1) + require.NoError(t, err) + + consumerClosed := make(chan struct{}) + doneCh := make(chan struct{}) + + messageCh := make(chan struct{}, 128) + + partitionBeforePauseCh := make(chan topicPartition) + partitionPauseProceedCh := make(chan struct{}) + fetchSubmittedCh := make(chan kgo.FetchTopicPartition) + + config := KafkaConfig{ + Brokers: []string{testKafkaBrokerURL}, + Topics: []string{testKafkaTopic}, + ConsumerGroup: uuid.New().String(), + PartitionBufferSize: 1, + } + + count := 0 + proceedCh := make(chan struct{}) + firstMessageReceived := make(chan struct{}) + + mockDispatcher := &MockDispatcher{ + onDispatch: func(ctx context.Context, method string, data []byte) error { + if count == 0 { + close(firstMessageReceived) + // Block until we are allowed to proceed + t.Logf("waiting for proceed") + <-proceedCh + t.Logf("proceeding") + } + count++ + messageCh <- struct{}{} + t.Logf("message processed") + return nil + }, + } + consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry())) + require.NoError(t, err) + + consumer.testOnlyConfig = testOnlyConfig{ + topicPartitionBeforePauseCh: partitionBeforePauseCh, + topicPartitionPauseProceedCh: partitionPauseProceedCh, + fetchTopicPartitionSubmittedCh: fetchSubmittedCh, + } + + go func() { + err := consumer.Run(ctx) + require.ErrorIs(t, err, context.Canceled) + close(consumerClosed) + }() + + go func() { + err = produceTestMessage(testKafkaTopic, testPayload1, nil) + require.NoError(t, err) + + <-fetchSubmittedCh + t.Logf("fetch 1 submitted") + + // This one should be buffered. + err = produceTestMessage(testKafkaTopic, testPayload2, nil) + require.NoError(t, err) + + <-fetchSubmittedCh + t.Logf("fetch 2 submitted") + + // This message pauses the partition consumer. + err = produceTestMessage(testKafkaTopic, testPayload3, nil) + require.NoError(t, err) + <-partitionBeforePauseCh + close(proceedCh) + // Give consumer some time to process messages, so we can be sure that resume was called. + time.Sleep(time.Second) + // And now we can proceed so that partition will be paused after resume. + close(partitionPauseProceedCh) + // Wait for the third message submitted for processing. + <-fetchSubmittedCh + }() + + for i := 0; i < 3; i++ { + <-messageCh + } + + cancel() + waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed") + close(doneCh) +} func TestKafkaConsumer_GreenScenario_PublicationDataMode(t *testing.T) { t.Parallel() diff --git a/internal/proxy/refresh_handler_test.go b/internal/proxy/refresh_handler_test.go index 4b728699bf..03f09c370a 100644 --- a/internal/proxy/refresh_handler_test.go +++ b/internal/proxy/refresh_handler_test.go @@ -187,14 +187,14 @@ func TestHandleRefreshWithoutProxyServerStart(t *testing.T) { httpTestCase := newRefreshHandlerHTTPTestCase(context.Background(), "/refresh") httpTestCase.Teardown() - expectedReply := centrifuge.RefreshReply{ - ExpireAt: time.Now().Unix() + 60, - } - cases := newRefreshHandlerTestCases(httpTestCase, grpcTestCase) for _, c := range cases { reply, err := c.invokeHandle() require.NoError(t, err, c.protocol) + + expectedReply := centrifuge.RefreshReply{ + ExpireAt: time.Now().Unix() + 60, + } require.Equal(t, expectedReply, reply, c.protocol) } } diff --git a/misc/release/notes.md b/misc/release/notes.md index 9f23516fe9..bcbd0630c1 100644 --- a/misc/release/notes.md +++ b/misc/release/notes.md @@ -10,15 +10,17 @@ For details, go to the [Centrifugo documentation site](https://centrifugal.dev). ### Improvements -* Code transforms for HTTP proxy and unidirectional connect [#903](https://github.com/centrifugal/centrifugo/pull/903). See [the description in docs](https://centrifugal.dev/docs/server/proxy#unexpected-error-handling-and-code-transforms). -* Support Kafka `scram-sha-256`, `scram-sha-512` and `aws-msk-iam` SASL [#912](https://github.com/centrifugal/centrifugo/pull/912). See [updated docs](https://centrifugal.dev/docs/server/consumers#kafka-consumer-options) for Kafka consumer. +* Change Dockerfile to run `centrifugo` under non-root user [#922](https://github.com/centrifugal/centrifugo/pull/922) by @dmeremyanin +* Update `alpine` base image from 3.18 to 3.21 in Centrifugo Dockerfile ### Fixes -* Centrifugo now does not log tokens when writing INFO level log entry about client disconnection caused by command processing. Thanks to @Dirk007 for submitting the fix. +* Fix a deadlock during pub/sub and recovery sync when using Redis engine and server-side subscriptions, fixes [#925](https://github.com/centrifugal/centrifugo/issues/925) +* Fix pause/resume race in Kafka async consumer [#927](https://github.com/centrifugal/centrifugo/pull/927) – the race could lead to a partition non being processed, while in normal condition the chance of the race is minimal, this was observed in a real system under CPU throttling conditions. +* Fix flaky `TestHandleRefreshWithoutProxyServerStart` test [#920](https://github.com/centrifugal/centrifugo/pull/920) by @makhov ### Miscellaneous -* This release is built with Go 1.23.2. +* This release is built with Go 1.23.4. * Check out the [Centrifugo v6 roadmap](https://github.com/centrifugal/centrifugo/issues/832). It outlines important changes planned for the next major release. We have already started working on v6 and are sharing updates in the issue and our community channels. -* See also the corresponding [Centrifugo PRO release](https://github.com/centrifugal/centrifugo-pro/releases/tag/v5.4.8). +* See also the corresponding [Centrifugo PRO release](https://github.com/centrifugal/centrifugo-pro/releases/tag/v5.4.10).