Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions components/ledger/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,32 @@ STREAMING_ENABLED=false
# STREAMING_REQUIRED_ACKS=all
# franz-go ProducerLinger in ms. Default: 5.
# STREAMING_BATCH_LINGER_MS=5

# --- Per-emit timeout (IMPORTANT-posture events) ---
# Per-call deadline applied around emitter.Emit() for IMPORTANT-posture
# events (organization.*, ledger.*, account.*, asset.*, etc.). When this
# fires, the emit is logged as a warning and dropped — durability of
# IMPORTANT events belongs to PG and (eventually) the outbox subsystem,
# not to the synchronous broker round-trip.
#
# Value is in milliseconds; must be > 0 or the default is used. Default: 5000 (5s).
# Bump it if your broker is far away (cross-region) or when running
# end-to-end against a cold/loaded test cluster. Lower it if you want to
# fail-fast on slow brokers and rely on async/outbox redelivery.
# STREAMING_IMPORTANT_EMIT_TIMEOUT_MS=5000

# --- SASL/TLS auth ---
# When STREAMING_SASL_MECHANISM is empty (default) the producer connects to
# the broker without authentication. Set to PLAIN, SCRAM-SHA-256, or
# SCRAM-SHA-512 to enable SASL; STREAMING_SASL_USERNAME and
# STREAMING_SASL_PASSWORD then become required.
# STREAMING_SASL_MECHANISM=
# STREAMING_SASL_USERNAME=
# STREAMING_SASL_PASSWORD=
#
# SASL without TLS is rejected at bootstrap unless explicitly opted in via
# STREAMING_ALLOW_PLAINTEXT_SASL=true. Use ONLY for local/dev brokers that
# don't terminate TLS (e.g. a Redpanda dev container with a SASL_PLAINTEXT
# listener). NEVER set this in staging or production: SASL credentials
# would cross the network in cleartext.
# STREAMING_ALLOW_PLAINTEXT_SASL=false
18 changes: 18 additions & 0 deletions components/ledger/internal/bootstrap/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,24 @@ type Config struct {
StreamingCompression string `env:"STREAMING_COMPRESSION"`
StreamingRequiredAcks string `env:"STREAMING_REQUIRED_ACKS"`
StreamingBatchLingerMs int `env:"STREAMING_BATCH_LINGER_MS"`

// --- Streaming SASL/TLS auth ---
// When STREAMING_SASL_MECHANISM is empty (default) the producer connects
// without authentication, matching the existing behaviour for local/dev
// brokers. When set, the value must be one of PLAIN, SCRAM-SHA-256,
// SCRAM-SHA-512 (case-insensitive); USERNAME and PASSWORD are then
// required and BuildStreamingEmitter wires the matching franz-go
// sasl.Mechanism into the lib-streaming Builder.
//
// SASL without TLS is rejected by lib-streaming with
// ErrPlaintextSASLNotAllowed. STREAMING_ALLOW_PLAINTEXT_SASL=true is the
// explicit unsafe opt-in for local/dev brokers that do not terminate
// TLS. It must NOT be set in production: SASL credentials cross the
// network in cleartext.
StreamingSASLMechanism string `env:"STREAMING_SASL_MECHANISM"`
StreamingSASLUsername string `env:"STREAMING_SASL_USERNAME"`
StreamingSASLPassword string `env:"STREAMING_SASL_PASSWORD"`
StreamingAllowPlaintextSASL bool `env:"STREAMING_ALLOW_PLAINTEXT_SASL"`
}

// Options contains optional dependencies that can be injected by callers.
Expand Down
99 changes: 96 additions & 3 deletions components/ledger/internal/bootstrap/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ import (
libOpentelemetry "github.com/LerianStudio/lib-commons/v5/commons/opentelemetry"
libStreaming "github.com/LerianStudio/lib-streaming"
"github.com/LerianStudio/midaz/v3/pkg/streaming/events"
"github.com/twmb/franz-go/pkg/sasl"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
)

// SASL mechanism names accepted by STREAMING_SASL_MECHANISM. Compared
// case-insensitively at parse time so operators can write any casing.
const (
saslMechanismPlain = "PLAIN"
saslMechanismScram256 = "SCRAM-SHA-256"
saslMechanismScram512 = "SCRAM-SHA-512"
)

// streamingPrimaryTargetName is the canonical name for midaz's single
Expand Down Expand Up @@ -107,25 +118,58 @@ func BuildStreamingEmitter(
return nil, noopStreamingCloser, fmt.Errorf("failed to build streaming routes: %w", err)
}

emitter, err := libStreaming.NewBuilder().
builder := libStreaming.NewBuilder().
Source(streamingCfg.CloudEventsSource).
Catalog(catalog).
Routes(routes...).
Target(libStreaming.TargetConfig{
Name: streamingPrimaryTargetName,
Kind: libStreaming.TransportKafkaLike,
Brokers: streamingCfg.Brokers,
}).
Build(ctx)
})

// Apply SASL/TLS auth knobs from cfg. resolveSASLMechanism returns a
// nil mechanism (and an empty mechanism name) when SASL is disabled,
// in which case the Builder is left untouched and the producer dials
// the broker without authentication — matching the historical local/dev
// behaviour. When SASL is enabled but TLS is not, lib-streaming
// rejects construction with ErrPlaintextSASLNotAllowed unless the
// caller also opts into AllowPlaintextSASL — gated behind
// STREAMING_ALLOW_PLAINTEXT_SASL=true for dev brokers.
mechanism, mechanismName, err := resolveSASLMechanism(cfg)
if err != nil {
return nil, noopStreamingCloser, fmt.Errorf("failed to resolve streaming SASL mechanism: %w", err)
}

if mechanism != nil {
builder = builder.SASL(mechanism)

if cfg.StreamingAllowPlaintextSASL {
builder = builder.AllowPlaintextSASL()
}
}

emitter, err := builder.Build(ctx)
Comment thread
ClaraTersi marked this conversation as resolved.
if err != nil {
return nil, noopStreamingCloser, fmt.Errorf("failed to construct streaming emitter: %w", err)
}

if logger != nil {
// NOTE: only mechanism name is logged. Username and password are
// NEVER logged, even at debug level.
authMode := "none"
if mechanismName != "" {
authMode = mechanismName
if cfg.StreamingAllowPlaintextSASL {
authMode += " (plaintext)"
}
}

logger.Log(ctx, libLog.LevelInfo, "Streaming emitter constructed",
libLog.String("brokers", strings.Join(streamingCfg.Brokers, ",")),
libLog.String("client_id", streamingCfg.ClientID),
libLog.String("ce_source", streamingCfg.CloudEventsSource),
libLog.String("auth", authMode),
libLog.Int("catalog_size", catalog.Len()),
libLog.Int("routes", len(routes)),
)
Expand All @@ -134,6 +178,55 @@ func BuildStreamingEmitter(
return emitter, emitter.Close, nil
}

// resolveSASLMechanism inspects the streaming SASL knobs on cfg and
// returns the matching franz-go sasl.Mechanism plus its canonical name.
//
// Behaviour:
// - StreamingSASLMechanism empty (after trimming) → returns (nil, "", nil).
// The Builder stays unauthenticated, matching the existing local/dev
// default.
// - StreamingSASLMechanism set but USERNAME or PASSWORD empty → returns
// a config error. SASL with empty credentials would either be rejected
// by the broker after I/O (PLAIN) or panic inside franz-go's SCRAM
// handshake; failing closed at bootstrap is the safer contract.
// - StreamingSASLMechanism unrecognised → returns a config error
// enumerating the accepted values.
//
// The mechanism name returned is the canonical upper-case form
// ("PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512") — used for the bootstrap
// log line. Username and password are NEVER returned to the caller and
// never logged.
func resolveSASLMechanism(cfg *Config) (sasl.Mechanism, string, error) {
raw := strings.TrimSpace(cfg.StreamingSASLMechanism)
if raw == "" {
return nil, "", nil
}

mechanism := strings.ToUpper(raw)

user := cfg.StreamingSASLUsername
pass := cfg.StreamingSASLPassword

if user == "" || pass == "" {
return nil, "", fmt.Errorf(
"STREAMING_SASL_MECHANISM=%q requires STREAMING_SASL_USERNAME and STREAMING_SASL_PASSWORD",
mechanism)
}

switch mechanism {
case saslMechanismPlain:
return plain.Auth{User: user, Pass: pass}.AsMechanism(), saslMechanismPlain, nil
case saslMechanismScram256:
return scram.Auth{User: user, Pass: pass}.AsSha256Mechanism(), saslMechanismScram256, nil
case saslMechanismScram512:
return scram.Auth{User: user, Pass: pass}.AsSha512Mechanism(), saslMechanismScram512, nil
default:
return nil, "", fmt.Errorf(
"STREAMING_SASL_MECHANISM=%q is not supported (accepted: %s, %s, %s)",
raw, saslMechanismPlain, saslMechanismScram256, saslMechanismScram512)
}
}

// midazEventDefinitions returns the canonical, ordered list of midaz
// event Definitions registered into both the Catalog and the Routes.
// Kept as a single source of truth so adding a new event is a one-place
Expand Down
Loading
Loading