diff --git a/components/ledger/.env.example b/components/ledger/.env.example index c2e1fcf3d..7bec03831 100644 --- a/components/ledger/.env.example +++ b/components/ledger/.env.example @@ -298,3 +298,32 @@ STREAMING_ENABLED=false # STREAMING_REQUIRED_ACKS=all # franz-go ProducerLinger in ms. Default: 5. # STREAMING_BATCH_LINGER_MS=5 + +# --- Per-emit timeout (IMPORTANT-posture events) --- +# Per-call deadline applied around emitter.Emit() for IMPORTANT-posture +# events (organization.*, ledger.*, account.*, asset.*, etc.). When this +# fires, the emit is logged as a warning and dropped — durability of +# IMPORTANT events belongs to PG and (eventually) the outbox subsystem, +# not to the synchronous broker round-trip. +# +# Value is in milliseconds; must be > 0 or the default is used. Default: 5000 (5s). +# Bump it if your broker is far away (cross-region) or when running +# end-to-end against a cold/loaded test cluster. Lower it if you want to +# fail-fast on slow brokers and rely on async/outbox redelivery. +# STREAMING_IMPORTANT_EMIT_TIMEOUT_MS=5000 + +# --- SASL/TLS auth --- +# When STREAMING_SASL_MECHANISM is empty (default) the producer connects to +# the broker without authentication. Set to PLAIN, SCRAM-SHA-256, or +# SCRAM-SHA-512 to enable SASL; STREAMING_SASL_USERNAME and +# STREAMING_SASL_PASSWORD then become required. +# STREAMING_SASL_MECHANISM= +# STREAMING_SASL_USERNAME= +# STREAMING_SASL_PASSWORD= +# +# SASL without TLS is rejected at bootstrap unless explicitly opted in via +# STREAMING_ALLOW_PLAINTEXT_SASL=true. Use ONLY for local/dev brokers that +# don't terminate TLS (e.g. a Redpanda dev container with a SASL_PLAINTEXT +# listener). NEVER set this in staging or production: SASL credentials +# would cross the network in cleartext. +# STREAMING_ALLOW_PLAINTEXT_SASL=false diff --git a/components/ledger/internal/bootstrap/config.go b/components/ledger/internal/bootstrap/config.go index 39d6b2b9b..0e19135da 100644 --- a/components/ledger/internal/bootstrap/config.go +++ b/components/ledger/internal/bootstrap/config.go @@ -217,6 +217,24 @@ type Config struct { StreamingCompression string `env:"STREAMING_COMPRESSION"` StreamingRequiredAcks string `env:"STREAMING_REQUIRED_ACKS"` StreamingBatchLingerMs int `env:"STREAMING_BATCH_LINGER_MS"` + + // --- Streaming SASL/TLS auth --- + // When STREAMING_SASL_MECHANISM is empty (default) the producer connects + // without authentication, matching the existing behaviour for local/dev + // brokers. When set, the value must be one of PLAIN, SCRAM-SHA-256, + // SCRAM-SHA-512 (case-insensitive); USERNAME and PASSWORD are then + // required and BuildStreamingEmitter wires the matching franz-go + // sasl.Mechanism into the lib-streaming Builder. + // + // SASL without TLS is rejected by lib-streaming with + // ErrPlaintextSASLNotAllowed. STREAMING_ALLOW_PLAINTEXT_SASL=true is the + // explicit unsafe opt-in for local/dev brokers that do not terminate + // TLS. It must NOT be set in production: SASL credentials cross the + // network in cleartext. + StreamingSASLMechanism string `env:"STREAMING_SASL_MECHANISM"` + StreamingSASLUsername string `env:"STREAMING_SASL_USERNAME"` + StreamingSASLPassword string `env:"STREAMING_SASL_PASSWORD"` + StreamingAllowPlaintextSASL bool `env:"STREAMING_ALLOW_PLAINTEXT_SASL"` } // Options contains optional dependencies that can be injected by callers. diff --git a/components/ledger/internal/bootstrap/streaming.go b/components/ledger/internal/bootstrap/streaming.go index 332279d35..e160d2a26 100644 --- a/components/ledger/internal/bootstrap/streaming.go +++ b/components/ledger/internal/bootstrap/streaming.go @@ -13,6 +13,17 @@ import ( libOpentelemetry "github.com/LerianStudio/lib-commons/v5/commons/opentelemetry" libStreaming "github.com/LerianStudio/lib-streaming" "github.com/LerianStudio/midaz/v3/pkg/streaming/events" + "github.com/twmb/franz-go/pkg/sasl" + "github.com/twmb/franz-go/pkg/sasl/plain" + "github.com/twmb/franz-go/pkg/sasl/scram" +) + +// SASL mechanism names accepted by STREAMING_SASL_MECHANISM. Compared +// case-insensitively at parse time so operators can write any casing. +const ( + saslMechanismPlain = "PLAIN" + saslMechanismScram256 = "SCRAM-SHA-256" + saslMechanismScram512 = "SCRAM-SHA-512" ) // streamingPrimaryTargetName is the canonical name for midaz's single @@ -107,7 +118,7 @@ func BuildStreamingEmitter( return nil, noopStreamingCloser, fmt.Errorf("failed to build streaming routes: %w", err) } - emitter, err := libStreaming.NewBuilder(). + builder := libStreaming.NewBuilder(). Source(streamingCfg.CloudEventsSource). Catalog(catalog). Routes(routes...). @@ -115,17 +126,50 @@ func BuildStreamingEmitter( Name: streamingPrimaryTargetName, Kind: libStreaming.TransportKafkaLike, Brokers: streamingCfg.Brokers, - }). - Build(ctx) + }) + + // Apply SASL/TLS auth knobs from cfg. resolveSASLMechanism returns a + // nil mechanism (and an empty mechanism name) when SASL is disabled, + // in which case the Builder is left untouched and the producer dials + // the broker without authentication — matching the historical local/dev + // behaviour. When SASL is enabled but TLS is not, lib-streaming + // rejects construction with ErrPlaintextSASLNotAllowed unless the + // caller also opts into AllowPlaintextSASL — gated behind + // STREAMING_ALLOW_PLAINTEXT_SASL=true for dev brokers. + mechanism, mechanismName, err := resolveSASLMechanism(cfg) + if err != nil { + return nil, noopStreamingCloser, fmt.Errorf("failed to resolve streaming SASL mechanism: %w", err) + } + + if mechanism != nil { + builder = builder.SASL(mechanism) + + if cfg.StreamingAllowPlaintextSASL { + builder = builder.AllowPlaintextSASL() + } + } + + emitter, err := builder.Build(ctx) if err != nil { return nil, noopStreamingCloser, fmt.Errorf("failed to construct streaming emitter: %w", err) } if logger != nil { + // NOTE: only mechanism name is logged. Username and password are + // NEVER logged, even at debug level. + authMode := "none" + if mechanismName != "" { + authMode = mechanismName + if cfg.StreamingAllowPlaintextSASL { + authMode += " (plaintext)" + } + } + logger.Log(ctx, libLog.LevelInfo, "Streaming emitter constructed", libLog.String("brokers", strings.Join(streamingCfg.Brokers, ",")), libLog.String("client_id", streamingCfg.ClientID), libLog.String("ce_source", streamingCfg.CloudEventsSource), + libLog.String("auth", authMode), libLog.Int("catalog_size", catalog.Len()), libLog.Int("routes", len(routes)), ) @@ -134,6 +178,55 @@ func BuildStreamingEmitter( return emitter, emitter.Close, nil } +// resolveSASLMechanism inspects the streaming SASL knobs on cfg and +// returns the matching franz-go sasl.Mechanism plus its canonical name. +// +// Behaviour: +// - StreamingSASLMechanism empty (after trimming) → returns (nil, "", nil). +// The Builder stays unauthenticated, matching the existing local/dev +// default. +// - StreamingSASLMechanism set but USERNAME or PASSWORD empty → returns +// a config error. SASL with empty credentials would either be rejected +// by the broker after I/O (PLAIN) or panic inside franz-go's SCRAM +// handshake; failing closed at bootstrap is the safer contract. +// - StreamingSASLMechanism unrecognised → returns a config error +// enumerating the accepted values. +// +// The mechanism name returned is the canonical upper-case form +// ("PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512") — used for the bootstrap +// log line. Username and password are NEVER returned to the caller and +// never logged. +func resolveSASLMechanism(cfg *Config) (sasl.Mechanism, string, error) { + raw := strings.TrimSpace(cfg.StreamingSASLMechanism) + if raw == "" { + return nil, "", nil + } + + mechanism := strings.ToUpper(raw) + + user := cfg.StreamingSASLUsername + pass := cfg.StreamingSASLPassword + + if user == "" || pass == "" { + return nil, "", fmt.Errorf( + "STREAMING_SASL_MECHANISM=%q requires STREAMING_SASL_USERNAME and STREAMING_SASL_PASSWORD", + mechanism) + } + + switch mechanism { + case saslMechanismPlain: + return plain.Auth{User: user, Pass: pass}.AsMechanism(), saslMechanismPlain, nil + case saslMechanismScram256: + return scram.Auth{User: user, Pass: pass}.AsSha256Mechanism(), saslMechanismScram256, nil + case saslMechanismScram512: + return scram.Auth{User: user, Pass: pass}.AsSha512Mechanism(), saslMechanismScram512, nil + default: + return nil, "", fmt.Errorf( + "STREAMING_SASL_MECHANISM=%q is not supported (accepted: %s, %s, %s)", + raw, saslMechanismPlain, saslMechanismScram256, saslMechanismScram512) + } +} + // midazEventDefinitions returns the canonical, ordered list of midaz // event Definitions registered into both the Catalog and the Routes. // Kept as a single source of truth so adding a new event is a one-place diff --git a/components/ledger/internal/bootstrap/streaming_test.go b/components/ledger/internal/bootstrap/streaming_test.go new file mode 100644 index 000000000..ef669a29c --- /dev/null +++ b/components/ledger/internal/bootstrap/streaming_test.go @@ -0,0 +1,296 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +package bootstrap + +import ( + "context" + "errors" + "strings" + "testing" + + libStreaming "github.com/LerianStudio/lib-streaming" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestResolveSASLMechanism_Disabled covers the default code path: when +// STREAMING_SASL_MECHANISM is empty (or whitespace) the resolver returns a +// nil mechanism and an empty name, signalling that the Builder should be +// left unauthenticated. This is the back-compat contract for local/dev +// brokers that are still running without auth. +func TestResolveSASLMechanism_Disabled(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + cfg *Config + }{ + {name: "empty", cfg: &Config{}}, + {name: "whitespace only", cfg: &Config{StreamingSASLMechanism: " \t "}}, + { + // Ensure that USERNAME/PASSWORD set without a MECHANISM is also + // treated as disabled — operators who half-configure SASL from + // the bottom up shouldn't accidentally activate auth. + name: "credentials without mechanism", + cfg: &Config{ + StreamingSASLUsername: "u", + StreamingSASLPassword: "p", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + mech, name, err := resolveSASLMechanism(tc.cfg) + require.NoError(t, err) + assert.Nil(t, mech) + assert.Empty(t, name) + }) + } +} + +// TestResolveSASLMechanism_Supported covers every accepted mechanism +// string, including case-insensitive matching, and asserts that the +// returned canonical name matches the upper-case form used in the +// bootstrap log line. +func TestResolveSASLMechanism_Supported(t *testing.T) { + t.Parallel() + + cases := []struct { + input string + expectedName string + }{ + {input: "PLAIN", expectedName: saslMechanismPlain}, + {input: "plain", expectedName: saslMechanismPlain}, + {input: " PLAIN ", expectedName: saslMechanismPlain}, + {input: "SCRAM-SHA-256", expectedName: saslMechanismScram256}, + {input: "scram-sha-256", expectedName: saslMechanismScram256}, + {input: "SCRAM-SHA-512", expectedName: saslMechanismScram512}, + {input: "scram-sha-512", expectedName: saslMechanismScram512}, + } + + for _, tc := range cases { + t.Run(tc.input, func(t *testing.T) { + t.Parallel() + + cfg := &Config{ + StreamingSASLMechanism: tc.input, + StreamingSASLUsername: "ledger-prod", + StreamingSASLPassword: "s3cret", + } + + mech, name, err := resolveSASLMechanism(cfg) + require.NoError(t, err) + require.NotNil(t, mech) + assert.Equal(t, tc.expectedName, name) + // franz-go's Mechanism.Name() returns the wire-format mechanism + // string, which should match the canonical name 1:1. + assert.Equal(t, tc.expectedName, mech.Name()) + }) + } +} + +// TestResolveSASLMechanism_MissingCredentials guarantees the resolver +// fails closed when MECHANISM is set but USERNAME or PASSWORD is empty. +// SASL with empty credentials would either be rejected by the broker +// after I/O (PLAIN) or panic deep inside franz-go's SCRAM handshake; +// failing at bootstrap is the safer contract. +func TestResolveSASLMechanism_MissingCredentials(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + cfg *Config + }{ + { + name: "missing both", + cfg: &Config{StreamingSASLMechanism: "PLAIN"}, + }, + { + name: "missing username", + cfg: &Config{ + StreamingSASLMechanism: "SCRAM-SHA-256", + StreamingSASLPassword: "p", + }, + }, + { + name: "missing password", + cfg: &Config{ + StreamingSASLMechanism: "SCRAM-SHA-512", + StreamingSASLUsername: "u", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + mech, name, err := resolveSASLMechanism(tc.cfg) + require.Error(t, err) + assert.Nil(t, mech) + assert.Empty(t, name) + assert.Contains(t, err.Error(), "STREAMING_SASL_USERNAME") + assert.Contains(t, err.Error(), "STREAMING_SASL_PASSWORD") + }) + } +} + +// TestResolveSASLMechanism_Unsupported guards against typos and +// unsupported mechanisms (OAUTHBEARER, GSSAPI, etc). The error message +// must enumerate the accepted values so an operator can fix it without +// reading the source. +func TestResolveSASLMechanism_Unsupported(t *testing.T) { + t.Parallel() + + cases := []string{ + "OAUTHBEARER", + "GSSAPI", + "SCRAM", // missing -SHA-xxx suffix + "SCRAM-SHA-1", // not supported by franz-go + "plain-sha-256", // mangled + "unknown", + } + + for _, input := range cases { + t.Run(input, func(t *testing.T) { + t.Parallel() + + cfg := &Config{ + StreamingSASLMechanism: input, + StreamingSASLUsername: "u", + StreamingSASLPassword: "p", + } + + mech, name, err := resolveSASLMechanism(cfg) + require.Error(t, err) + assert.Nil(t, mech) + assert.Empty(t, name) + assert.Contains(t, err.Error(), saslMechanismPlain) + assert.Contains(t, err.Error(), saslMechanismScram256) + assert.Contains(t, err.Error(), saslMechanismScram512) + }) + } +} + +// TestBuildStreamingEmitter_NilConfig keeps the existing nil-guard +// contract documented as a unit test: BuildStreamingEmitter must never +// panic on a nil config. +func TestBuildStreamingEmitter_NilConfig(t *testing.T) { + t.Parallel() + + emitter, closer, err := BuildStreamingEmitter(context.Background(), nil, nil, nil) + require.Error(t, err) + assert.Nil(t, emitter) + require.NotNil(t, closer) + assert.NoError(t, closer()) +} + +// TestBuildStreamingEmitter_DisabledReturnsNoop covers the default +// pilot path: STREAMING_ENABLED is false, the emitter is the no-op, +// and no broker connection is attempted regardless of the SASL config. +func TestBuildStreamingEmitter_DisabledReturnsNoop(t *testing.T) { + // t.Setenv prevents t.Parallel — we mutate process env. + t.Setenv("STREAMING_ENABLED", "false") + + cfg := &Config{ + StreamingEnabled: false, + StreamingSASLMechanism: "PLAIN", // ignored when disabled + StreamingSASLUsername: "u", + StreamingSASLPassword: "p", + } + + emitter, closer, err := BuildStreamingEmitter(context.Background(), cfg, nil, nil) + require.NoError(t, err) + require.NotNil(t, emitter) + require.NotNil(t, closer) + t.Cleanup(func() { _ = closer() }) +} + +// TestBuildStreamingEmitter_SASLWithoutTLSFailsClosed is the integration +// guard: when SASL is enabled but neither TLS nor AllowPlaintextSASL is +// set, lib-streaming must reject construction with +// ErrPlaintextSASLNotAllowed. This locks the contract that midaz never +// silently downgrades a SASL config to plaintext without explicit opt-in. +// +// The test does NOT dial the broker — Build() validates the option +// combination before any network I/O, so STREAMING_BROKERS can point at +// a non-routable host. +func TestBuildStreamingEmitter_SASLWithoutTLSFailsClosed(t *testing.T) { + t.Setenv("STREAMING_ENABLED", "true") + t.Setenv("STREAMING_BROKERS", "127.0.0.1:0") + t.Setenv("STREAMING_CLOUDEVENTS_SOURCE", "lerian.midaz.ledger.test") + + cfg := &Config{ + StreamingEnabled: true, + StreamingSASLMechanism: "PLAIN", + StreamingSASLUsername: "u", + StreamingSASLPassword: "p", + // StreamingAllowPlaintextSASL intentionally false. + } + + emitter, closer, err := BuildStreamingEmitter(context.Background(), cfg, nil, nil) + require.Error(t, err) + assert.Nil(t, emitter) + require.NotNil(t, closer) + assert.NoError(t, closer()) + assert.True(t, errors.Is(err, libStreaming.ErrPlaintextSASLNotAllowed), + "expected ErrPlaintextSASLNotAllowed, got %v", err) +} + +// TestBuildStreamingEmitter_SASLWithAllowPlaintextSucceeds is the +// dev-broker happy path: SASL enabled + AllowPlaintextSASL=true + no TLS +// builds an emitter without dialling the broker. The emitter is closed +// by t.Cleanup so franz-go's background goroutines do not leak into +// other tests in this package. +func TestBuildStreamingEmitter_SASLWithAllowPlaintextSucceeds(t *testing.T) { + t.Setenv("STREAMING_ENABLED", "true") + t.Setenv("STREAMING_BROKERS", "127.0.0.1:0") + t.Setenv("STREAMING_CLOUDEVENTS_SOURCE", "lerian.midaz.ledger.test") + + cfg := &Config{ + StreamingEnabled: true, + StreamingSASLMechanism: "SCRAM-SHA-256", + StreamingSASLUsername: "u", + StreamingSASLPassword: "p", + StreamingAllowPlaintextSASL: true, + } + + emitter, closer, err := BuildStreamingEmitter(context.Background(), cfg, nil, nil) + require.NoError(t, err) + require.NotNil(t, emitter) + require.NotNil(t, closer) + t.Cleanup(func() { _ = closer() }) +} + +// TestBuildStreamingEmitter_UnsupportedMechanismFailsClosed verifies +// that resolveSASLMechanism's unsupported-mechanism error propagates +// out of BuildStreamingEmitter rather than getting masked by a downstream +// builder error. +func TestBuildStreamingEmitter_UnsupportedMechanismFailsClosed(t *testing.T) { + t.Setenv("STREAMING_ENABLED", "true") + t.Setenv("STREAMING_BROKERS", "127.0.0.1:0") + t.Setenv("STREAMING_CLOUDEVENTS_SOURCE", "lerian.midaz.ledger.test") + + cfg := &Config{ + StreamingEnabled: true, + StreamingSASLMechanism: "OAUTHBEARER", // not on the allow-list + StreamingSASLUsername: "u", + StreamingSASLPassword: "p", + StreamingAllowPlaintextSASL: true, + } + + emitter, closer, err := BuildStreamingEmitter(context.Background(), cfg, nil, nil) + require.Error(t, err) + assert.Nil(t, emitter) + require.NotNil(t, closer) + assert.NoError(t, closer()) + assert.True(t, + strings.Contains(err.Error(), "OAUTHBEARER") || + strings.Contains(err.Error(), "not supported"), + "expected unsupported-mechanism error, got %v", err) +} diff --git a/go.mod b/go.mod index 96b919563..7de86caa3 100644 --- a/go.mod +++ b/go.mod @@ -107,6 +107,7 @@ require ( github.com/moby/moby/api v1.54.1 github.com/moby/moby/client v0.4.0 github.com/testcontainers/testcontainers-go/modules/toxiproxy v0.42.0 + github.com/twmb/franz-go v1.21.0 go.uber.org/goleak v1.3.0 ) @@ -149,7 +150,6 @@ require ( github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect github.com/shirou/gopsutil/v4 v4.26.3 // indirect github.com/sirupsen/logrus v1.9.4 // indirect - github.com/twmb/franz-go v1.21.0 // indirect github.com/twmb/franz-go/pkg/kmsg v1.13.1 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.2.0 // indirect