From 0bde1f41f19f65026d2854c0ce0418241c1eab44 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sun, 1 Dec 2024 17:41:34 +0200 Subject: [PATCH 1/9] Bump the go-packages group with 15 updates (#916) Bumps the go-packages group with 15 updates: | Package | From | To | | --- | --- | --- | | [github.com/quic-go/quic-go](https://github.com/quic-go/quic-go) | `0.48.1` | `0.48.2` | | [github.com/stretchr/testify](https://github.com/stretchr/testify) | `1.9.0` | `1.10.0` | | [go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc](https://github.com/open-telemetry/opentelemetry-go-contrib) | `0.56.0` | `0.57.0` | | [go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp](https://github.com/open-telemetry/opentelemetry-go-contrib) | `0.56.0` | `0.57.0` | | [go.opentelemetry.io/otel](https://github.com/open-telemetry/opentelemetry-go) | `1.31.0` | `1.32.0` | | [go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp](https://github.com/open-telemetry/opentelemetry-go) | `1.31.0` | `1.32.0` | | [go.opentelemetry.io/otel/sdk](https://github.com/open-telemetry/opentelemetry-go) | `1.31.0` | `1.32.0` | | [go.opentelemetry.io/otel/trace](https://github.com/open-telemetry/opentelemetry-go) | `1.31.0` | `1.32.0` | | [golang.org/x/crypto](https://github.com/golang/crypto) | `0.28.0` | `0.29.0` | | [golang.org/x/sync](https://github.com/golang/sync) | `0.8.0` | `0.9.0` | | [golang.org/x/time](https://github.com/golang/time) | `0.7.0` | `0.8.0` | | [google.golang.org/grpc](https://github.com/grpc/grpc-go) | `1.67.1` | `1.68.0` | | google.golang.org/protobuf | `1.35.1` | `1.35.2` | | [go.opentelemetry.io/otel/exporters/otlp/otlptrace](https://github.com/open-telemetry/opentelemetry-go) | `1.31.0` | `1.32.0` | | [go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc](https://github.com/open-telemetry/opentelemetry-go) | `1.31.0` | `1.32.0` | Updates `github.com/quic-go/quic-go` from 0.48.1 to 0.48.2 - [Release notes](https://github.com/quic-go/quic-go/releases) - [Changelog](https://github.com/quic-go/quic-go/blob/master/Changelog.md) - [Commits](https://github.com/quic-go/quic-go/compare/v0.48.1...v0.48.2) Updates `github.com/stretchr/testify` from 1.9.0 to 1.10.0 - [Release notes](https://github.com/stretchr/testify/releases) - [Commits](https://github.com/stretchr/testify/compare/v1.9.0...v1.10.0) Updates `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` from 0.56.0 to 0.57.0 - [Release notes](https://github.com/open-telemetry/opentelemetry-go-contrib/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-go-contrib/compare/zpages/v0.56.0...zpages/v0.57.0) Updates `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp` from 0.56.0 to 0.57.0 - [Release notes](https://github.com/open-telemetry/opentelemetry-go-contrib/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-go-contrib/compare/zpages/v0.56.0...zpages/v0.57.0) Updates `go.opentelemetry.io/otel` from 1.31.0 to 1.32.0 - [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-go/compare/v1.31.0...v1.32.0) Updates `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp` from 1.31.0 to 1.32.0 - [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-go/compare/v1.31.0...v1.32.0) Updates `go.opentelemetry.io/otel/sdk` from 1.31.0 to 1.32.0 - [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-go/compare/v1.31.0...v1.32.0) Updates `go.opentelemetry.io/otel/trace` from 1.31.0 to 1.32.0 - [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-go/compare/v1.31.0...v1.32.0) Updates `golang.org/x/crypto` from 0.28.0 to 0.29.0 - [Commits](https://github.com/golang/crypto/compare/v0.28.0...v0.29.0) Updates `golang.org/x/sync` from 0.8.0 to 0.9.0 - [Commits](https://github.com/golang/sync/compare/v0.8.0...v0.9.0) Updates `golang.org/x/time` from 0.7.0 to 0.8.0 - [Commits](https://github.com/golang/time/compare/v0.7.0...v0.8.0) Updates `google.golang.org/grpc` from 1.67.1 to 1.68.0 - [Release notes](https://github.com/grpc/grpc-go/releases) - [Commits](https://github.com/grpc/grpc-go/compare/v1.67.1...v1.68.0) Updates `google.golang.org/protobuf` from 1.35.1 to 1.35.2 Updates `go.opentelemetry.io/otel/exporters/otlp/otlptrace` from 1.31.0 to 1.32.0 - [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-go/compare/v1.31.0...v1.32.0) Updates `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc` from 1.31.0 to 1.32.0 - [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-go/compare/v1.31.0...v1.32.0) --- updated-dependencies: - dependency-name: github.com/quic-go/quic-go dependency-type: direct:production update-type: version-update:semver-patch dependency-group: go-packages - dependency-name: github.com/stretchr/testify dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages - dependency-name: go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages - dependency-name: go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages - dependency-name: go.opentelemetry.io/otel dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages - dependency-name: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages - dependency-name: go.opentelemetry.io/otel/sdk dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages - dependency-name: go.opentelemetry.io/otel/trace dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages - dependency-name: golang.org/x/crypto dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages - dependency-name: golang.org/x/sync dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages - dependency-name: golang.org/x/time dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages - dependency-name: google.golang.org/grpc dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages - dependency-name: google.golang.org/protobuf dependency-type: direct:production update-type: version-update:semver-patch dependency-group: go-packages - dependency-name: go.opentelemetry.io/otel/exporters/otlp/otlptrace dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages - dependency-name: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc dependency-type: direct:production update-type: version-update:semver-minor dependency-group: go-packages ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 42 ++++++++++++++-------------- go.sum | 86 ++++++++++++++++++++++++++++++---------------------------- 2 files changed, 65 insertions(+), 63 deletions(-) diff --git a/go.mod b/go.mod index d51be83453..cd3cfb3cfa 100644 --- a/go.mod +++ b/go.mod @@ -21,12 +21,12 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/nats-io/nats.go v1.37.0 github.com/prometheus/client_golang v1.20.5 - github.com/quic-go/quic-go v0.48.1 + github.com/quic-go/quic-go v0.48.2 github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 github.com/rakutentech/jwk-go v1.1.3 github.com/rs/zerolog v1.33.0 github.com/spf13/cobra v1.8.1 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/twmb/franz-go v1.18.0 @@ -34,18 +34,18 @@ require ( github.com/twmb/franz-go/pkg/kmsg v1.9.0 github.com/valyala/fasttemplate v1.2.2 github.com/vmihailenco/msgpack/v5 v5.4.1 - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 - go.opentelemetry.io/otel v1.31.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0 - go.opentelemetry.io/otel/sdk v1.31.0 - go.opentelemetry.io/otel/trace v1.31.0 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 + go.opentelemetry.io/otel v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 + go.opentelemetry.io/otel/sdk v1.32.0 + go.opentelemetry.io/otel/trace v1.32.0 go.uber.org/automaxprocs v1.6.0 - golang.org/x/crypto v0.28.0 - golang.org/x/sync v0.8.0 - golang.org/x/time v0.7.0 - google.golang.org/grpc v1.67.1 - google.golang.org/protobuf v1.35.1 + golang.org/x/crypto v0.29.0 + golang.org/x/sync v0.9.0 + golang.org/x/time v0.8.0 + google.golang.org/grpc v1.68.0 + google.golang.org/protobuf v1.35.2 ) require ( @@ -75,7 +75,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect github.com/igm/sockjs-go/v3 v3.0.3 github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -100,18 +100,18 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 - go.opentelemetry.io/otel/metric v1.31.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 + go.opentelemetry.io/otel/metric v1.32.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.19.0 // indirect golang.org/x/net v0.30.0 // indirect - golang.org/x/sys v0.26.0 // indirect - golang.org/x/text v0.19.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/text v0.20.0 // indirect golang.org/x/tools v0.23.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index abdf1f80d6..fbf4b464d1 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,8 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -69,8 +71,8 @@ github.com/gorilla/securecookie v1.1.2/go.mod h1:NfCASbcHqRSY+3a8tlWJwsQap2VX5pw github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN9coASF1GusYX6AlU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0= github.com/hashicorp/go-envparse v0.1.0 h1:bE++6bhIsNCPLvgDZkYqo3nA+/PFI51pkrHdmPSDFPY= github.com/hashicorp/go-envparse v0.1.0/go.mod h1:OHheN1GoygLlAkTlXLXvAdnXdZxy8JUweQ1rAXx1xnc= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -154,8 +156,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= -github.com/quic-go/quic-go v0.48.1 h1:y/8xmfWI9qmGTc+lBr4jKRUWLGSlSigv847ULJ4hYXA= -github.com/quic-go/quic-go v0.48.1/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs= +github.com/quic-go/quic-go v0.48.2 h1:wsKXZPeGWpMpCGSWqOcqpW2wZYic/8T3aqiOID0/KWE= +github.com/quic-go/quic-go v0.48.2/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs= github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 h1:4WFk6u3sOT6pLa1kQ50ZVdm8BQFgJNA117cepZxtLIg= github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66/go.mod h1:Vp72IJajgeOL6ddqrAhmp7IM9zbTcgkQxD/YdxrVwMw= github.com/rakutentech/jwk-go v1.1.3 h1:PiLwepKyUaW+QFG3ki78DIO2+b4IVK3nMhlxM70zrQ4= @@ -188,8 +190,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= @@ -215,24 +217,24 @@ github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21 github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 h1:yMkBS9yViCc7U7yeLzJPM2XizlfdVvBRSmsQDWu6qc0= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0/go.mod h1:n8MR6/liuGB5EmTETUBeU5ZgqMOlqKRxUaqPQBOANZ8= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= -go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= -go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 h1:K0XaT3DwHAcV4nKLzcQvwAgSyisUghWoY20I7huthMk= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0/go.mod h1:B5Ki776z/MBnVha1Nzwp5arlzBbE3+1jk+pGmaP5HME= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 h1:FFeLy03iVTXP6ffeN2iXrxfGsZGCjVx0/4KlizjyBwU= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0/go.mod h1:TMu73/k1CP8nBUpDLc71Wj/Kf7ZS9FK5b53VapRsP9o= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0 h1:lUsI2TYsQw2r1IASwoROaCnjdj2cvC2+Jbxvk6nHnWU= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0/go.mod h1:2HpZxxQurfGxJlJDblybejHB6RX6pmExPNe517hREw4= -go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= -go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= -go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= -go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= -go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= -go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0 h1:qtFISDHKolvIxzSs0gIaiPUPR0Cucb0F2coHC7ZLdps= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0/go.mod h1:Y+Pop1Q6hCOnETWTW4NROK/q1hv50hM7yDaUTjG8lp8= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 h1:DheMAlT6POBP+gh8RUH19EOTnQIor5QE0uSRPtzCpSw= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0/go.mod h1:wZcGmeVO9nzP67aYSLDqXNWK87EZWhi7JWj1v7ZXf94= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 h1:IJFEoHiytixx8cMiVAO+GmHR6Frwu+u5Ur8njpFO6Ac= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0/go.mod h1:3rHrKNtLIoS0oZwkY2vxi+oJcwFRWdtUyRII+so45p8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 h1:9kV11HXBHZAvuPUZxmMWrH8hZn/6UnHX4K0mu36vNsU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0/go.mod h1:JyA0FHXe22E1NeNiHmVp7kFHglnexDQ7uRWDiiJ1hKQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 h1:cMyu9O88joYEaI47CnQkxO1XZdpoTF9fEnW2duIddhw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0/go.mod h1:6Am3rn7P9TVVeXYG+wtcGE7IE1tsQ+bP3AuWcKt/gOI= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= @@ -245,8 +247,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= -golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= +golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -262,8 +264,8 @@ golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -277,14 +279,14 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= -golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -294,20 +296,20 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 h1:T6rh4haD3GVYsgEfWExoCZA2o2FmbNyKpTuAxbEFPTg= -google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:wp2WsuBYj6j8wUdo3ToZsdxxixbvQNAHqVJrTgi5E5M= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 h1:QCqS/PdaHTSWGvupk2F/ehwHtGc0/GYkT+3GAcR1CCc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= -google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= -google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1:M0KvPgPmDZHPlbRbaNU1APr28TvwvvdUPlSv7PUvy8g= +google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= +google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= From 6e16c38b323a6b2915a460a0eddfe4ac4b4d8001 Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Thu, 5 Dec 2024 07:12:53 +0200 Subject: [PATCH 2/9] kafka: fix possibility to loose records under load (#917) --- internal/consuming/kafka.go | 78 ++++++++----- internal/consuming/kafka_test.go | 187 ++++++++++++++++++++++++++++++- 2 files changed, 234 insertions(+), 31 deletions(-) diff --git a/internal/consuming/kafka.go b/internal/consuming/kafka.go index 44cf1cbbd5..7720e43e30 100644 --- a/internal/consuming/kafka.go +++ b/internal/consuming/kafka.go @@ -42,6 +42,10 @@ type KafkaConfig struct { // will pause fetching records from Kafka. By default, this is 16. // Set to -1 to use non-buffered channel. PartitionBufferSize int `mapstructure:"partition_buffer_size" json:"partition_buffer_size"` + + // FetchMaxBytes is the maximum number of bytes to fetch from Kafka in a single request. + // If not set the default 50MB is used. + FetchMaxBytes int32 `mapstructure:"fetch_max_bytes" json:"fetch_max_bytes"` } type topicPartition struct { @@ -142,6 +146,9 @@ func (c *KafkaConsumer) initClient() (*kgo.Client, error) { kgo.ClientID(kafkaClientID), kgo.InstanceID(c.getInstanceID()), } + if c.config.FetchMaxBytes > 0 { + opts = append(opts, kgo.FetchMaxBytes(c.config.FetchMaxBytes)) + } if c.config.TLS { tlsOptionsMap, err := c.config.TLSOptions.ToMap() if err != nil { @@ -284,12 +291,19 @@ func (c *KafkaConsumer) pollUntilFatal(ctx context.Context) error { return fmt.Errorf("poll error: %w", errors.Join(errs...)) } + pausedTopicPartitions := map[topicPartition]struct{}{} fetches.EachPartition(func(p kgo.FetchTopicPartition) { if len(p.Records) == 0 { return } tp := topicPartition{p.Topic, p.Partition} + if _, paused := pausedTopicPartitions[tp]; paused { + // We have already paused this partition during this poll, so we should not + // process records from it anymore. We will resume partition processing with the + // correct offset soon, after we have space in recs buffer. + return + } // Since we are using BlockRebalanceOnPoll, we can be // sure this partition consumer exists: @@ -310,6 +324,12 @@ func (c *KafkaConsumer) pollUntilFatal(ctx context.Context) error { // keeping records in memory and blocking rebalance. Resume will be called after // records are processed by c.consumers[tp]. c.client.PauseFetchPartitions(partitionsToPause) + pausedTopicPartitions[tp] = struct{}{} + // To poll next time since correct offset we need to set it manually to the offset of + // the first record in the batch. Otherwise, next poll will return the next record batch, + // and we will lose the current one. + epochOffset := kgo.EpochOffset{Epoch: -1, Offset: p.Records[0].Offset} + c.client.SetOffsets(map[string]map[int32]kgo.EpochOffset{p.Topic: {p.Partition: epochOffset}}) } }) c.client.AllowRebalance() @@ -355,15 +375,25 @@ func (c *KafkaConsumer) assigned(ctx context.Context, cl *kgo.Client, assigned m } for topic, partitions := range assigned { for _, partition := range partitions { + quitCh := make(chan struct{}) + partitionCtx, cancel := context.WithCancel(ctx) + go func() { + select { + case <-ctx.Done(): + cancel() + case <-quitCh: + cancel() + } + }() pc := &partitionConsumer{ - clientCtx: ctx, - dispatcher: c.dispatcher, - logger: c.logger, - cl: cl, - topic: topic, - partition: partition, - - quit: make(chan struct{}), + partitionCtx: partitionCtx, + dispatcher: c.dispatcher, + logger: c.logger, + cl: cl, + topic: topic, + partition: partition, + + quit: quitCh, done: make(chan struct{}), recs: make(chan kgo.FetchTopicPartition, bufferSize), } @@ -407,12 +437,12 @@ func (c *KafkaConsumer) killConsumers(lost map[string][]int32) { } type partitionConsumer struct { - clientCtx context.Context - dispatcher Dispatcher - logger Logger - cl *kgo.Client - topic string - partition int32 + partitionCtx context.Context + dispatcher Dispatcher + logger Logger + cl *kgo.Client + topic string + partition int32 quit chan struct{} done chan struct{} @@ -422,9 +452,7 @@ type partitionConsumer struct { func (pc *partitionConsumer) processRecords(records []*kgo.Record) { for _, record := range records { select { - case <-pc.clientCtx.Done(): - return - case <-pc.quit: + case <-pc.partitionCtx.Done(): return default: } @@ -432,7 +460,7 @@ func (pc *partitionConsumer) processRecords(records []*kgo.Record) { var e KafkaJSONEvent err := json.Unmarshal(record.Value, &e) if err != nil { - pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error unmarshalling event from Kafka", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition})) + pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error unmarshalling record value from Kafka", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition})) pc.cl.MarkCommitRecords(record) continue } @@ -440,22 +468,20 @@ func (pc *partitionConsumer) processRecords(records []*kgo.Record) { var backoffDuration time.Duration = 0 retries := 0 for { - err := pc.dispatcher.Dispatch(pc.clientCtx, e.Method, e.Payload) + err := pc.dispatcher.Dispatch(pc.partitionCtx, e.Method, e.Payload) if err == nil { if retries > 0 { - pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "OK processing events after errors", map[string]any{})) + pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "OK processing record after errors", map[string]any{})) } pc.cl.MarkCommitRecords(record) break } retries++ backoffDuration = getNextBackoffDuration(backoffDuration, retries) - pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing consumed event", map[string]any{"error": err.Error(), "method": e.Method, "nextAttemptIn": backoffDuration.String()})) + pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing consumed record", map[string]any{"error": err.Error(), "method": e.Method, "next_attempt_in": backoffDuration.String()})) select { case <-time.After(backoffDuration): - case <-pc.quit: - return - case <-pc.clientCtx.Done(): + case <-pc.partitionCtx.Done(): return } } @@ -471,9 +497,7 @@ func (pc *partitionConsumer) consume() { defer resumeConsuming() for { select { - case <-pc.clientCtx.Done(): - return - case <-pc.quit: + case <-pc.partitionCtx.Done(): return case p := <-pc.recs: pc.processRecords(p.Records) diff --git a/internal/consuming/kafka_test.go b/internal/consuming/kafka_test.go index 60e782aff6..cbb61dad6f 100644 --- a/internal/consuming/kafka_test.go +++ b/internal/consuming/kafka_test.go @@ -7,7 +7,9 @@ import ( "encoding/json" "errors" "fmt" + "strconv" "strings" + "sync/atomic" "testing" "time" @@ -44,15 +46,26 @@ func (m *MockLogger) Log(_ centrifuge.LogEntry) { // Implement mock logic, e.g., storing log entries for assertions } +func produceManyRecords(records ...*kgo.Record) error { + client, err := kgo.NewClient(kgo.SeedBrokers(testKafkaBrokerURL)) + if err != nil { + return fmt.Errorf("failed to create Kafka client: %w", err) + } + defer client.Close() + err = client.ProduceSync(context.Background(), records...).FirstErr() + if err != nil { + return fmt.Errorf("failed to produce message: %w", err) + } + return nil +} + func produceTestMessage(topic string, message []byte) error { - // Create a new client client, err := kgo.NewClient(kgo.SeedBrokers(testKafkaBrokerURL)) if err != nil { return fmt.Errorf("failed to create Kafka client: %w", err) } defer client.Close() - // Produce a message err = client.ProduceSync(context.Background(), &kgo.Record{Topic: topic, Partition: 0, Value: message}).FirstErr() if err != nil { return fmt.Errorf("failed to produce message: %w", err) @@ -61,7 +74,6 @@ func produceTestMessage(topic string, message []byte) error { } func produceTestMessageToPartition(topic string, message []byte, partition int32) error { - // Create a new client. client, err := kgo.NewClient( kgo.SeedBrokers(testKafkaBrokerURL), kgo.RecordPartitioner(kgo.ManualPartitioner()), @@ -71,7 +83,6 @@ func produceTestMessageToPartition(topic string, message []byte, partition int32 } defer client.Close() - // Produce a message until we hit desired partition. res := client.ProduceSync(context.Background(), &kgo.Record{ Topic: topic, Partition: partition, Value: message}) if res.FirstErr() != nil { @@ -427,3 +438,171 @@ func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherPartition(t *testing.T }) } } + +func TestKafkaConsumer_PausePartitions(t *testing.T) { + t.Parallel() + testKafkaTopic := "consumer_test_" + uuid.New().String() + testPayload1 := []byte(`{"key":"value1"}`) + testPayload2 := []byte(`{"key":"value2"}`) + testPayload3 := []byte(`{"key":"value3"}`) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err := createTestTopic(ctx, testKafkaTopic, 1, 1) + require.NoError(t, err) + + event1Received := make(chan struct{}) + event2Received := make(chan struct{}) + event3Received := make(chan struct{}) + consumerClosed := make(chan struct{}) + doneCh := make(chan struct{}) + + config := KafkaConfig{ + Brokers: []string{testKafkaBrokerURL}, + Topics: []string{testKafkaTopic}, + ConsumerGroup: uuid.New().String(), + PartitionBufferSize: -1, + } + + numCalls := 0 + + mockDispatcher := &MockDispatcher{ + onDispatch: func(ctx context.Context, method string, data []byte) error { + numCalls++ + if numCalls == 1 { + close(event1Received) + time.Sleep(5 * time.Second) + return nil + } else if numCalls == 2 { + close(event2Received) + return nil + } + close(event3Received) + return nil + }, + } + consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config) + require.NoError(t, err) + + go func() { + err = produceTestMessage(testKafkaTopic, testPayload1) + require.NoError(t, err) + <-event1Received + // At this point message 1 is being processed and the next produced message will + // cause a partition pause. + err = produceTestMessage(testKafkaTopic, testPayload2) + require.NoError(t, err) + <-event2Received + err = produceTestMessage(testKafkaTopic, testPayload3) + require.NoError(t, err) + }() + + go func() { + err := consumer.Run(ctx) + require.ErrorIs(t, err, context.Canceled) + close(consumerClosed) + }() + + waitCh(t, event1Received, 30*time.Second, "timeout waiting for event 1") + waitCh(t, event2Received, 30*time.Second, "timeout waiting for event 2") + waitCh(t, event3Received, 30*time.Second, "timeout waiting for event 3") + cancel() + waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed") + close(doneCh) +} + +func TestKafkaConsumer_WorksCorrectlyInLoadedTopic(t *testing.T) { + t.Skip() + t.Parallel() + + testCases := []struct { + numPartitions int32 + numMessages int + partitionBuffer int + }{ + //{numPartitions: 1, numMessages: 1000, partitionBuffer: -1}, + //{numPartitions: 1, numMessages: 1000, partitionBuffer: 1}, + //{numPartitions: 10, numMessages: 10000, partitionBuffer: -1}, + {numPartitions: 10, numMessages: 10000, partitionBuffer: 1}, + } + + for _, tc := range testCases { + name := fmt.Sprintf("partitions=%d,messages=%d,buffer=%d", tc.numPartitions, tc.numMessages, tc.partitionBuffer) + t.Run(name, func(t *testing.T) { + testKafkaTopic := "consumer_test_" + uuid.New().String() + + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) + defer cancel() + + err := createTestTopic(ctx, testKafkaTopic, tc.numPartitions, 1) + require.NoError(t, err) + + consumerClosed := make(chan struct{}) + doneCh := make(chan struct{}) + + numMessages := tc.numMessages + messageCh := make(chan struct{}, numMessages) + + mockDispatcher := &MockDispatcher{ + onDispatch: func(ctx context.Context, method string, data []byte) error { + // Emulate delay due to some work. + time.Sleep(20 * time.Millisecond) + messageCh <- struct{}{} + return nil + }, + } + config := KafkaConfig{ + Brokers: []string{testKafkaBrokerURL}, + Topics: []string{testKafkaTopic}, + ConsumerGroup: uuid.New().String(), + PartitionBufferSize: tc.partitionBuffer, + } + consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config) + require.NoError(t, err) + + var records []*kgo.Record + for i := 0; i < numMessages; i++ { + records = append(records, &kgo.Record{Topic: testKafkaTopic, Value: []byte(`{"hello": "` + strconv.Itoa(i) + `"}`)}) + if (i+1)%100 == 0 { + err = produceManyRecords(records...) + if err != nil { + t.Fatal(err) + } + records = nil + t.Logf("produced %d messages", i+1) + } + } + + t.Logf("all messages produced, 3, 2, 1, go!") + time.Sleep(time.Second) + + go func() { + err := consumer.Run(ctx) + require.ErrorIs(t, err, context.Canceled) + close(consumerClosed) + }() + + var numProcessed int64 + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + t.Logf("processed %d messages", atomic.LoadInt64(&numProcessed)) + } + } + }() + + for i := 0; i < numMessages; i++ { + <-messageCh + atomic.AddInt64(&numProcessed, 1) + } + t.Logf("all messages processed") + cancel() + waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed") + close(doneCh) + }) + } +} From 6e046e58db7c75ccc541f0385bea03a66f785035 Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Sun, 8 Dec 2024 09:32:13 +0200 Subject: [PATCH 3/9] prepare 5.4.8 release (#919) --- .github/workflows/release.yml | 2 +- .github/workflows/test.yml | 2 +- go.mod | 18 +++++++++--------- go.sum | 36 +++++++++++++++++------------------ misc/release/notes.md | 12 ++++-------- 5 files changed, 33 insertions(+), 37 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index c60e526185..48a75fa92f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [1.23.2] + go-version: [1.23.4] steps: - name: Install Go uses: actions/setup-go@v5 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7a86e1db7b..05d8fccadd 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,7 +23,7 @@ jobs: if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository strategy: matrix: - go-version: [1.23.2] + go-version: [1.23.4] tarantool-version: [2.7.2] steps: - name: Install Go diff --git a/go.mod b/go.mod index cd3cfb3cfa..97c56a6635 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b github.com/FZambia/tarantool v0.3.1 github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e - github.com/centrifugal/centrifuge v0.33.5-0.20241104073442-b695b2eb669d + github.com/centrifugal/centrifuge v0.33.5-0.20241208063408-f3a1d6b00b59 github.com/centrifugal/protocol v0.13.4 github.com/cristalhq/jwt/v5 v5.4.0 github.com/gobwas/glob v0.2.3 @@ -41,10 +41,10 @@ require ( go.opentelemetry.io/otel/sdk v1.32.0 go.opentelemetry.io/otel/trace v1.32.0 go.uber.org/automaxprocs v1.6.0 - golang.org/x/crypto v0.29.0 - golang.org/x/sync v0.9.0 + golang.org/x/crypto v0.30.0 + golang.org/x/sync v0.10.0 golang.org/x/time v0.8.0 - google.golang.org/grpc v1.68.0 + google.golang.org/grpc v1.68.1 google.golang.org/protobuf v1.35.2 ) @@ -54,7 +54,7 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/maypok86/otter v1.2.3 // indirect + github.com/maypok86/otter v1.2.4 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect @@ -92,9 +92,9 @@ require ( github.com/prometheus/common v0.60.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/quic-go/qpack v0.5.1 // indirect - github.com/redis/rueidis v1.0.48 // indirect + github.com/redis/rueidis v1.0.51 // indirect github.com/segmentio/asm v1.2.0 // indirect - github.com/segmentio/encoding v0.4.0 // indirect + github.com/segmentio/encoding v0.4.1 // indirect github.com/spf13/cast v1.4.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect @@ -107,8 +107,8 @@ require ( golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.19.0 // indirect golang.org/x/net v0.30.0 // indirect - golang.org/x/sys v0.27.0 // indirect - golang.org/x/text v0.20.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/tools v0.23.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect diff --git a/go.sum b/go.sum index fbf4b464d1..420d148d7c 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/centrifugal/centrifuge v0.33.5-0.20241104073442-b695b2eb669d h1:k53DsmeNfhw7KbM+T8qeIOL23vI3ewLK1vmv+IYb04U= -github.com/centrifugal/centrifuge v0.33.5-0.20241104073442-b695b2eb669d/go.mod h1:yvzNn5hq/bFBpoXQwM8HbU481pAXQkyP2tzvJgFsiN8= +github.com/centrifugal/centrifuge v0.33.5-0.20241208063408-f3a1d6b00b59 h1:JD4cYq6qoJDEAMtYbYeZ2rL9zo8/kc80O5W8C1DcnY0= +github.com/centrifugal/centrifuge v0.33.5-0.20241208063408-f3a1d6b00b59/go.mod h1:qByx68JcOJmF0svRIwcHsq8XVYsjptyTHJr/3P/JmRs= github.com/centrifugal/protocol v0.13.4 h1:I0YxXtFNfn/ndDIZp5RkkqQcSSNH7DNPUbXKYtJXDzs= github.com/centrifugal/protocol v0.13.4/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -108,8 +108,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/maypok86/otter v1.2.3 h1:jxyPD4ofCwtrQM5is5JNrdAs+6+JQkf/PREZd7JCVgg= -github.com/maypok86/otter v1.2.3/go.mod h1:mKLfoI7v1HOmQMwFgX4QkRk23mX6ge3RDvjdHOWG4R4= +github.com/maypok86/otter v1.2.4 h1:HhW1Pq6VdJkmWwcZZq19BlEQkHtI8xgsQzBVXJU0nfc= +github.com/maypok86/otter v1.2.4/go.mod h1:mKLfoI7v1HOmQMwFgX4QkRk23mX6ge3RDvjdHOWG4R4= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -162,8 +162,8 @@ github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 h1:4WFk6 github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66/go.mod h1:Vp72IJajgeOL6ddqrAhmp7IM9zbTcgkQxD/YdxrVwMw= github.com/rakutentech/jwk-go v1.1.3 h1:PiLwepKyUaW+QFG3ki78DIO2+b4IVK3nMhlxM70zrQ4= github.com/rakutentech/jwk-go v1.1.3/go.mod h1:LtzSv4/+Iti1nnNeVQiP6l5cI74GBStbhyXCYvgPZFk= -github.com/redis/rueidis v1.0.48 h1:ggZHjEtc/echUmPkGTfssRisnc3p/mIUEwrpbNsZ1mQ= -github.com/redis/rueidis v1.0.48/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM= +github.com/redis/rueidis v1.0.51 h1:NZ1KIncPIQtjrp+GDLynrLKBiPU106EN5cJHOFSqvDM= +github.com/redis/rueidis v1.0.51/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -172,8 +172,8 @@ github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWR github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= -github.com/segmentio/encoding v0.4.0 h1:MEBYvRqiUB2nfR2criEXWqwdY6HJOUrCn5hboVOVmy8= -github.com/segmentio/encoding v0.4.0/go.mod h1:/d03Cd8PoaDeceuhUUUQWjU0KhWjrmYrWPgtJHYZSnI= +github.com/segmentio/encoding v0.4.1 h1:KLGaLSW0jrmhB58Nn4+98spfvPvmo4Ci1P/WIQ9wn7w= +github.com/segmentio/encoding v0.4.1/go.mod h1:/d03Cd8PoaDeceuhUUUQWjU0KhWjrmYrWPgtJHYZSnI= github.com/shadowspore/fossil-delta v0.0.0-20241003175239-d3b7ce6bda62 h1:2OBZQ+j6pNlvhcUJqV5WtoyMEv32vpZyKE9+ta2Sr+Y= github.com/shadowspore/fossil-delta v0.0.0-20241003175239-d3b7ce6bda62/go.mod h1:TNFF98m0aYYsGHXAKrlG2T6Tr/hUV2MbNHPdmyA6PFg= github.com/spf13/cast v1.4.1 h1:s0hze+J0196ZfEMTs80N7UlFt0BDuQ7Q+JDnHiMWKdA= @@ -247,8 +247,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= -golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= +golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -264,8 +264,8 @@ golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= -golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -279,12 +279,12 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= -golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= -golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -300,8 +300,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1: google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4= google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE= google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= -google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= -google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= +google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= +google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/misc/release/notes.md b/misc/release/notes.md index 9f23516fe9..fb3ec13f1e 100644 --- a/misc/release/notes.md +++ b/misc/release/notes.md @@ -8,17 +8,13 @@ For details, go to the [Centrifugo documentation site](https://centrifugal.dev). ## What's changed -### Improvements - -* Code transforms for HTTP proxy and unidirectional connect [#903](https://github.com/centrifugal/centrifugo/pull/903). See [the description in docs](https://centrifugal.dev/docs/server/proxy#unexpected-error-handling-and-code-transforms). -* Support Kafka `scram-sha-256`, `scram-sha-512` and `aws-msk-iam` SASL [#912](https://github.com/centrifugal/centrifugo/pull/912). See [updated docs](https://centrifugal.dev/docs/server/consumers#kafka-consumer-options) for Kafka consumer. - ### Fixes -* Centrifugo now does not log tokens when writing INFO level log entry about client disconnection caused by command processing. Thanks to @Dirk007 for submitting the fix. +* Kafka async consumer: fix potential loss of records under load [#917](https://github.com/centrifugal/centrifugo/pull/917) +* Centrifugo now does not log tokens when writing INFO level log entry about client error caused by command processing. ### Miscellaneous -* This release is built with Go 1.23.2. +* This release is built with Go 1.23.4. * Check out the [Centrifugo v6 roadmap](https://github.com/centrifugal/centrifugo/issues/832). It outlines important changes planned for the next major release. We have already started working on v6 and are sharing updates in the issue and our community channels. -* See also the corresponding [Centrifugo PRO release](https://github.com/centrifugal/centrifugo-pro/releases/tag/v5.4.8). +* See also the corresponding [Centrifugo PRO release](https://github.com/centrifugal/centrifugo-pro/releases/tag/v5.4.9). From e046d6e582a73a9d58d883b118cd2a238ba4a91b Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Fri, 20 Dec 2024 23:44:54 +0200 Subject: [PATCH 4/9] Kafka consumer: fix pause/resume race (#927) --- internal/consuming/kafka.go | 55 ++++++++++-- internal/consuming/kafka_test.go | 142 ++++++++++++++++++++++++++++--- 2 files changed, 176 insertions(+), 21 deletions(-) diff --git a/internal/consuming/kafka.go b/internal/consuming/kafka.go index 7720e43e30..e8c24f4c0f 100644 --- a/internal/consuming/kafka.go +++ b/internal/consuming/kafka.go @@ -39,18 +39,30 @@ type KafkaConfig struct { // PartitionBufferSize is the size of the buffer for each partition consumer. // This is the number of records that can be buffered before the consumer - // will pause fetching records from Kafka. By default, this is 16. - // Set to -1 to use non-buffered channel. + // will pause fetching records from Kafka. By default, this is 8. + // Note, due to the way the consumer works with Kafka partitions, we do not + // allow using unbuffered channels for partition consumers. Specifically + // due to the race condition of the consumer pausing/resuming, covered in + // TestKafkaConsumer_TestPauseAfterResumeRace test case. PartitionBufferSize int `mapstructure:"partition_buffer_size" json:"partition_buffer_size"` // FetchMaxBytes is the maximum number of bytes to fetch from Kafka in a single request. // If not set the default 50MB is used. FetchMaxBytes int32 `mapstructure:"fetch_max_bytes" json:"fetch_max_bytes"` + + // testOnlyConfig is an additional options which are used for testing purposes only. + testOnlyConfig testOnlyConfig +} + +type testOnlyConfig struct { + topicPartitionBeforePauseCh chan topicPartition + topicPartitionPauseProceedCh chan struct{} + fetchTopicPartitionSubmittedCh chan kgo.FetchTopicPartition } type topicPartition struct { - t string - p int32 + topic string + partition int32 } type KafkaConsumer struct { @@ -110,6 +122,9 @@ func NewKafkaConsumer(name string, nodeID string, logger Logger, dispatcher Disp if config.MaxPollRecords == 0 { config.MaxPollRecords = 100 } + if config.PartitionBufferSize < 0 { + return nil, errors.New("partition buffer size can't be negative") + } consumer := &KafkaConsumer{ name: name, nodeID: nodeID, @@ -317,13 +332,37 @@ func (c *KafkaConsumer) pollUntilFatal(ctx context.Context) error { case <-c.consumers[tp].quit: return case c.consumers[tp].recs <- p: + if c.config.testOnlyConfig.fetchTopicPartitionSubmittedCh != nil { // Only set in tests. + c.config.testOnlyConfig.fetchTopicPartitionSubmittedCh <- p + } default: + if c.config.testOnlyConfig.topicPartitionBeforePauseCh != nil { // Only set in tests. + c.config.testOnlyConfig.topicPartitionBeforePauseCh <- tp + } + if c.config.testOnlyConfig.topicPartitionPauseProceedCh != nil { // Only set in tests. + <-c.config.testOnlyConfig.topicPartitionPauseProceedCh + } + partitionsToPause := map[string][]int32{p.Topic: {p.Partition}} // PauseFetchPartitions here to not poll partition until records are processed. // This allows parallel processing of records from different partitions, without // keeping records in memory and blocking rebalance. Resume will be called after // records are processed by c.consumers[tp]. c.client.PauseFetchPartitions(partitionsToPause) + defer func() { + // There is a chance that message processor resumed partition processing before + // we called PauseFetchPartitions above. Such a race was observed in a service + // under CPU throttling conditions. In that case Pause is called after Resume, + // and topic is never resumed after that. To avoid we check if the channel current + // len is less than cap, if len < cap => we can be sure that buffer has space now, + // so we can resume partition processing. If it is not, this means that the records + // are still not processed and resume will be called eventually after processing by + // partition consumer. See also TestKafkaConsumer_TestPauseAfterResumeRace test case. + if len(c.consumers[tp].recs) < cap(c.consumers[tp].recs) { + c.client.ResumeFetchPartitions(partitionsToPause) + } + }() + pausedTopicPartitions[tp] = struct{}{} // To poll next time since correct offset we need to set it manually to the offset of // the first record in the batch. Otherwise, next poll will return the next record batch, @@ -364,13 +403,11 @@ func (c *KafkaConsumer) reInitClient(ctx context.Context) error { return nil } -const defaultPartitionBufferSize = 16 +const defaultPartitionBufferSize = 8 func (c *KafkaConsumer) assigned(ctx context.Context, cl *kgo.Client, assigned map[string][]int32) { bufferSize := c.config.PartitionBufferSize - if bufferSize == -1 { - bufferSize = 0 - } else if bufferSize == 0 { + if bufferSize == 0 { bufferSize = defaultPartitionBufferSize } for topic, partitions := range assigned { @@ -501,7 +538,7 @@ func (pc *partitionConsumer) consume() { return case p := <-pc.recs: pc.processRecords(p.Records) - // At this point we are ready to consume the next batch from partition, thus resume. + // After processing records, we can resume partition processing (if it was paused, no-op otherwise). resumeConsuming() } } diff --git a/internal/consuming/kafka_test.go b/internal/consuming/kafka_test.go index cbb61dad6f..3ca684ef3b 100644 --- a/internal/consuming/kafka_test.go +++ b/internal/consuming/kafka_test.go @@ -368,7 +368,7 @@ func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherTopic(t *testing.T) { // is stuck on it. We want to make sure that the consumer is not blocked and can still process // messages from other topic partitions. func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherPartition(t *testing.T) { - partitionBufferSizes := []int{-1, 0} + partitionBufferSizes := []int{0, 1} for _, partitionBufferSize := range partitionBufferSizes { t.Run(fmt.Sprintf("partition_buffer_size_%d", partitionBufferSize), func(t *testing.T) { @@ -458,21 +458,32 @@ func TestKafkaConsumer_PausePartitions(t *testing.T) { consumerClosed := make(chan struct{}) doneCh := make(chan struct{}) + fetchSubmittedCh := make(chan kgo.FetchTopicPartition) + beforePauseCh := make(chan topicPartition) + config := KafkaConfig{ - Brokers: []string{testKafkaBrokerURL}, - Topics: []string{testKafkaTopic}, - ConsumerGroup: uuid.New().String(), - PartitionBufferSize: -1, + Brokers: []string{testKafkaBrokerURL}, + Topics: []string{testKafkaTopic}, + ConsumerGroup: uuid.New().String(), + + PartitionBufferSize: 1, + + testOnlyConfig: testOnlyConfig{ + fetchTopicPartitionSubmittedCh: fetchSubmittedCh, + topicPartitionBeforePauseCh: beforePauseCh, + }, } numCalls := 0 + unblockCh := make(chan struct{}) + mockDispatcher := &MockDispatcher{ onDispatch: func(ctx context.Context, method string, data []byte) error { numCalls++ if numCalls == 1 { close(event1Received) - time.Sleep(5 * time.Second) + <-unblockCh return nil } else if numCalls == 2 { close(event2Received) @@ -488,14 +499,22 @@ func TestKafkaConsumer_PausePartitions(t *testing.T) { go func() { err = produceTestMessage(testKafkaTopic, testPayload1) require.NoError(t, err) + <-fetchSubmittedCh <-event1Received - // At this point message 1 is being processed and the next produced message will - // cause a partition pause. + err = produceTestMessage(testKafkaTopic, testPayload2) require.NoError(t, err) - <-event2Received + <-fetchSubmittedCh + + // At this point message 1 is being processed and the next produced message must + // cause a partition pause. err = produceTestMessage(testKafkaTopic, testPayload3) require.NoError(t, err) + <-beforePauseCh // Wait for triggering the partition pause. + + // Unblock the message processing. + close(unblockCh) + <-fetchSubmittedCh }() go func() { @@ -521,9 +540,7 @@ func TestKafkaConsumer_WorksCorrectlyInLoadedTopic(t *testing.T) { numMessages int partitionBuffer int }{ - //{numPartitions: 1, numMessages: 1000, partitionBuffer: -1}, - //{numPartitions: 1, numMessages: 1000, partitionBuffer: 1}, - //{numPartitions: 10, numMessages: 10000, partitionBuffer: -1}, + //{numPartitions: 1, numMessages: 1000, partitionBuffer: 1} {numPartitions: 10, numMessages: 10000, partitionBuffer: 1}, } @@ -606,3 +623,104 @@ func TestKafkaConsumer_WorksCorrectlyInLoadedTopic(t *testing.T) { }) } } + +// TestKafkaConsumer_TestPauseAfterResumeRace tests a scenario where a partition was +// paused after it was resumed and partition never processed any messages after that. +func TestKafkaConsumer_TestPauseAfterResumeRace(t *testing.T) { + t.Parallel() + testKafkaTopic := "consumer_test_" + uuid.New().String() + testPayload1 := []byte(`{"input":"value1"}`) + testPayload2 := []byte(`{"input":"value2"}`) + testPayload3 := []byte(`{"input":"value3"}`) + + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) + defer cancel() + + err := createTestTopic(ctx, testKafkaTopic, 1, 1) + require.NoError(t, err) + + consumerClosed := make(chan struct{}) + doneCh := make(chan struct{}) + + messageCh := make(chan struct{}, 128) + + partitionBeforePauseCh := make(chan topicPartition) + partitionPauseProceedCh := make(chan struct{}) + fetchSubmittedCh := make(chan kgo.FetchTopicPartition) + + config := KafkaConfig{ + Brokers: []string{testKafkaBrokerURL}, + Topics: []string{testKafkaTopic}, + ConsumerGroup: uuid.New().String(), + PartitionBufferSize: 1, + + testOnlyConfig: testOnlyConfig{ + topicPartitionBeforePauseCh: partitionBeforePauseCh, + topicPartitionPauseProceedCh: partitionPauseProceedCh, + fetchTopicPartitionSubmittedCh: fetchSubmittedCh, + }, + } + + count := 0 + proceedCh := make(chan struct{}) + firstMessageReceived := make(chan struct{}) + + mockDispatcher := &MockDispatcher{ + onDispatch: func(ctx context.Context, method string, data []byte) error { + if count == 0 { + close(firstMessageReceived) + // Block until we are allowed to proceed + t.Logf("waiting for proceed") + <-proceedCh + t.Logf("proceeding") + } + count++ + messageCh <- struct{}{} + t.Logf("message processed") + return nil + }, + } + consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config) + require.NoError(t, err) + + go func() { + err := consumer.Run(ctx) + require.ErrorIs(t, err, context.Canceled) + close(consumerClosed) + }() + + go func() { + err = produceTestMessage(testKafkaTopic, testPayload1) + require.NoError(t, err) + + <-fetchSubmittedCh + t.Logf("fetch 1 submitted") + + // This one should be buffered. + err = produceTestMessage(testKafkaTopic, testPayload2) + require.NoError(t, err) + + <-fetchSubmittedCh + t.Logf("fetch 2 submitted") + + // This message pauses the partition consumer. + err = produceTestMessage(testKafkaTopic, testPayload3) + require.NoError(t, err) + <-partitionBeforePauseCh + close(proceedCh) + // Give consumer some time to process messages, so we can be sure that resume was called. + time.Sleep(time.Second) + // And now we can proceed so that partition will be paused after resume. + close(partitionPauseProceedCh) + // Wait for the third message submitted for processing. + <-fetchSubmittedCh + }() + + for i := 0; i < 3; i++ { + <-messageCh + } + + cancel() + waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed") + close(doneCh) +} From 74b714a30c4aea92a0739a086608d8d9cfdd81af Mon Sep 17 00:00:00 2001 From: DM Date: Sat, 21 Dec 2024 01:22:20 -0800 Subject: [PATCH 5/9] change Dockerfile to run app under non-root user (#922) --- Dockerfile | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1942fe053f..1985cbba61 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,20 @@ FROM alpine:3.18 -RUN apk --no-cache upgrade && apk --no-cache add ca-certificates +ARG USER=centrifugo +ARG UID=1000 +ARG GID=1000 -COPY centrifugo /usr/local/bin/centrifugo +RUN addgroup -S -g $GID $USER && \ + adduser -S -G $USER -u $UID $USER + +RUN apk --no-cache upgrade && \ + apk --no-cache add ca-certificates && \ + update-ca-certificates + +USER $USER WORKDIR /centrifugo +COPY centrifugo /usr/local/bin/centrifugo + CMD ["centrifugo"] From 49100fbdf7463b003714a981c93abfe2fa2ee038 Mon Sep 17 00:00:00 2001 From: Alexey Makhov Date: Sat, 21 Dec 2024 11:27:54 +0200 Subject: [PATCH 6/9] Fix flaky TestHandleRefreshWithoutProxyServerStart test (#920) --- internal/proxy/refresh_handler_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/proxy/refresh_handler_test.go b/internal/proxy/refresh_handler_test.go index 855974ddd9..149a9dc8cd 100644 --- a/internal/proxy/refresh_handler_test.go +++ b/internal/proxy/refresh_handler_test.go @@ -187,14 +187,14 @@ func TestHandleRefreshWithoutProxyServerStart(t *testing.T) { httpTestCase := newRefreshHandlerHTTPTestCase(context.Background(), "/refresh") httpTestCase.Teardown() - expectedReply := centrifuge.RefreshReply{ - ExpireAt: time.Now().Unix() + 60, - } - cases := newRefreshHandlerTestCases(httpTestCase, grpcTestCase) for _, c := range cases { reply, err := c.invokeHandle() require.NoError(t, err, c.protocol) + + expectedReply := centrifuge.RefreshReply{ + ExpireAt: time.Now().Unix() + 60, + } require.Equal(t, expectedReply, reply, c.protocol) } } From eb63d342f8d036fb0883e890bea35e7f4a630e9b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 21 Dec 2024 11:29:48 +0200 Subject: [PATCH 7/9] Bump golang.org/x/crypto from 0.30.0 to 0.31.0 (#921) Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.30.0 to 0.31.0. - [Commits](https://github.com/golang/crypto/compare/v0.30.0...v0.31.0) --- updated-dependencies: - dependency-name: golang.org/x/crypto dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 97c56a6635..736e3db218 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( go.opentelemetry.io/otel/sdk v1.32.0 go.opentelemetry.io/otel/trace v1.32.0 go.uber.org/automaxprocs v1.6.0 - golang.org/x/crypto v0.30.0 + golang.org/x/crypto v0.31.0 golang.org/x/sync v0.10.0 golang.org/x/time v0.8.0 google.golang.org/grpc v1.68.1 diff --git a/go.sum b/go.sum index 420d148d7c..2cb82ddb5e 100644 --- a/go.sum +++ b/go.sum @@ -247,8 +247,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= -golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= From 7b403a56bf6411b75ee7457c2630319e0a26dccf Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Sat, 21 Dec 2024 18:21:41 +0200 Subject: [PATCH 8/9] up dependencies (#928) --- go.mod | 8 ++++---- go.sum | 10 ++++++++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 736e3db218..693d572386 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b github.com/FZambia/tarantool v0.3.1 github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e - github.com/centrifugal/centrifuge v0.33.5-0.20241208063408-f3a1d6b00b59 + github.com/centrifugal/centrifuge v0.33.5-0.20241221140549-8f88728a2744 github.com/centrifugal/protocol v0.13.4 github.com/cristalhq/jwt/v5 v5.4.0 github.com/gobwas/glob v0.2.3 @@ -19,7 +19,7 @@ require ( github.com/justinas/alice v1.2.0 github.com/mattn/go-isatty v0.0.20 github.com/mitchellh/mapstructure v1.5.0 - github.com/nats-io/nats.go v1.37.0 + github.com/nats-io/nats.go v1.38.0 github.com/prometheus/client_golang v1.20.5 github.com/quic-go/quic-go v0.48.2 github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 @@ -44,7 +44,7 @@ require ( golang.org/x/crypto v0.31.0 golang.org/x/sync v0.10.0 golang.org/x/time v0.8.0 - google.golang.org/grpc v1.68.1 + google.golang.org/grpc v1.69.2 google.golang.org/protobuf v1.35.2 ) @@ -82,7 +82,7 @@ require ( github.com/klauspost/compress v1.17.11 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nkeys v0.4.9 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/ginkgo/v2 v2.12.1 // indirect diff --git a/go.sum b/go.sum index 2cb82ddb5e..97e1a6521a 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/centrifugal/centrifuge v0.33.5-0.20241208063408-f3a1d6b00b59 h1:JD4cYq6qoJDEAMtYbYeZ2rL9zo8/kc80O5W8C1DcnY0= -github.com/centrifugal/centrifuge v0.33.5-0.20241208063408-f3a1d6b00b59/go.mod h1:qByx68JcOJmF0svRIwcHsq8XVYsjptyTHJr/3P/JmRs= +github.com/centrifugal/centrifuge v0.33.5-0.20241221140549-8f88728a2744 h1:VO3gP46i93SfvDDG2BHd1oXxeO+MEYBtYdwdXjgx2F4= +github.com/centrifugal/centrifuge v0.33.5-0.20241221140549-8f88728a2744/go.mod h1:qByx68JcOJmF0svRIwcHsq8XVYsjptyTHJr/3P/JmRs= github.com/centrifugal/protocol v0.13.4 h1:I0YxXtFNfn/ndDIZp5RkkqQcSSNH7DNPUbXKYtJXDzs= github.com/centrifugal/protocol v0.13.4/go.mod h1:7V5vI30VcoxJe4UD87xi7bOsvI0bmEhvbQuMjrFM2L4= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -116,8 +116,12 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= +github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= +github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -302,6 +306,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw= +google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= +google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= From 0bc378a899366c1eb30134310f4e0f8f5107f43b Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Sun, 22 Dec 2024 09:39:35 +0200 Subject: [PATCH 9/9] Prepare v5.4.9 release notes, alpine 3.21 (#929) --- Dockerfile | 2 +- go.sum | 8 ++------ misc/release/notes.md | 12 +++++++++--- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1985cbba61..5d972522f5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM alpine:3.18 +FROM alpine:3.21 ARG USER=centrifugo ARG UID=1000 diff --git a/go.sum b/go.sum index 97e1a6521a..deeb1d8862 100644 --- a/go.sum +++ b/go.sum @@ -114,12 +114,8 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= -github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= -github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= -github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -237,6 +233,8 @@ go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZk go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= +go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= @@ -304,8 +302,6 @@ google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1: google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4= google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE= google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= -google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= -google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw= google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/misc/release/notes.md b/misc/release/notes.md index fb3ec13f1e..bcbd0630c1 100644 --- a/misc/release/notes.md +++ b/misc/release/notes.md @@ -8,13 +8,19 @@ For details, go to the [Centrifugo documentation site](https://centrifugal.dev). ## What's changed +### Improvements + +* Change Dockerfile to run `centrifugo` under non-root user [#922](https://github.com/centrifugal/centrifugo/pull/922) by @dmeremyanin +* Update `alpine` base image from 3.18 to 3.21 in Centrifugo Dockerfile + ### Fixes -* Kafka async consumer: fix potential loss of records under load [#917](https://github.com/centrifugal/centrifugo/pull/917) -* Centrifugo now does not log tokens when writing INFO level log entry about client error caused by command processing. +* Fix a deadlock during pub/sub and recovery sync when using Redis engine and server-side subscriptions, fixes [#925](https://github.com/centrifugal/centrifugo/issues/925) +* Fix pause/resume race in Kafka async consumer [#927](https://github.com/centrifugal/centrifugo/pull/927) – the race could lead to a partition non being processed, while in normal condition the chance of the race is minimal, this was observed in a real system under CPU throttling conditions. +* Fix flaky `TestHandleRefreshWithoutProxyServerStart` test [#920](https://github.com/centrifugal/centrifugo/pull/920) by @makhov ### Miscellaneous * This release is built with Go 1.23.4. * Check out the [Centrifugo v6 roadmap](https://github.com/centrifugal/centrifugo/issues/832). It outlines important changes planned for the next major release. We have already started working on v6 and are sharing updates in the issue and our community channels. -* See also the corresponding [Centrifugo PRO release](https://github.com/centrifugal/centrifugo-pro/releases/tag/v5.4.9). +* See also the corresponding [Centrifugo PRO release](https://github.com/centrifugal/centrifugo-pro/releases/tag/v5.4.10).