From 962ac0f9e7bf6cddf3e73ff382ef91c0a2d8c617 Mon Sep 17 00:00:00 2001 From: shimizu Date: Wed, 20 May 2026 09:28:44 -0300 Subject: [PATCH 1/8] docs(datarudder): add pre-dev tasks.md outlining provider implementation Captures the T-001 task with ST-001-NN subtasks for the DataRudder provider implementation: scaffolding, BuildInput + auth translation, ConnectivityProber with sentinel UUID probe, and tests. References the approved plan at C:\Users\SHMZ\.claude\plans\giggly-waddling-wind.md. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/pre-dev/datarudder/tasks.md | 84 ++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 docs/pre-dev/datarudder/tasks.md diff --git a/docs/pre-dev/datarudder/tasks.md b/docs/pre-dev/datarudder/tasks.md new file mode 100644 index 0000000..1507129 --- /dev/null +++ b/docs/pre-dev/datarudder/tasks.md @@ -0,0 +1,84 @@ +# Tasks — datarudder + +**Reference plan**: `C:\Users\SHMZ\.claude\plans\giggly-waddling-wind.md` +**Branch**: `feature/datarudder-provider` (from `feature/oauth2-token-endpoint-auth`, PR target `develop`) +**Feature scope**: Add public Provider + Executor `datarudder.create-transaction` to the catalog, consuming the `oauth2_token_endpoint` outbound auth shipped in the parent branch. Integration target: Delorean anti-fraud API (`inference.api-stg.delorean-ai.com`). + +--- + +## Task T-001: DataRudder Provider + create-transaction Executor + +**Goal**: Expose the `oauth2_token_endpoint` auth feature to end users via a public catalog Provider that integrates with Delorean's fraud transactions API. Mirrors Midaz architectural pattern (`BuildInput` + `ConnectivityProber`) — not Tracer (which uses static api_key). + +**Driver**: The HTTP provider that hosts `oauth2_token_endpoint` is internal-only (not in the catalog). Without a public Provider that uses it, end users can't configure or test the new auth method. + +**Acceptance criteria**: +- [ ] `pkg/executors/datarudder/` package exists with `Register()` exposing `ProviderID="datarudder"` and one executor `datarudder.create-transaction` +- [ ] `providerConfigSchema` NESTED: required top-level `base_url`+`auth`; required inside `auth`: `token_url`+`client_id`+`client_secret`; `additionalProperties: false` at both levels +- [ ] `datarudderProvider` struct implements `executor.{Provider, InputBuilder, ConnectivityProber}` (compile-time checks) +- [ ] `BuildInput` normalizes `base_url` via `strings.TrimSuffix(..., "/")` and uses `executorRoutes` map (Midaz pattern) +- [ ] `buildDataRudderAuth` extracts 3 fields explicitly + hardcodes `credentials_location="body"` (not exposed in schema) +- [ ] `ConnectivityProber.Probe` issues `GET {base_url}/api/v1/fraud/transactions/{sentinel}/` with sentinel `00000000-0000-0000-0000-000000000000`; surfaces upstream `detail` in `ProbeOutcome.Details["upstream_detail"]` +- [ ] `AuthRequired()` returns `true` +- [ ] `pkg/executors/register.go` wires `datarudder.Register(catalog)` in `RegisterDefaults` +- [ ] All existing tests still pass (`make test`) +- [ ] Coverage ≥ 85% on changed files +- [ ] `make lint` clean + +--- + +### Subtask ST-001-01: Provider scaffolding + schema + executor + +**Files to create**: +- `pkg/executors/datarudder/provider.go` — `Register`, `ProviderID`, NESTED `providerConfigSchema` with `additionalProperties: false`, `datarudderProvider` struct embedding `*base.Provider` +- `pkg/executors/datarudder/create_transaction.go` — `CreateTransactionID` const + `newCreateTransactionExecutor()` returning `*base.Executor` with loose schema (`{"type": "object"}`) + +**Files to modify**: +- `pkg/executors/register.go` — add `datarudder.Register(catalog)` + import + +--- + +### Subtask ST-001-02: BuildInput + auth translation + +**Files to create**: +- `pkg/executors/datarudder/input_builder.go` — `executorRoutes` map (1 entry: POST + `/api/v1/fraud/transactions/`), package `BuildInput()` + method on `datarudderProvider` that delegates, `buildDataRudderAuth()` with explicit field extraction + hardcoded `credentials_location="body"` + +**Design constraints (from approved plan)**: +- `BuildInput` must `strings.TrimSuffix(baseURL, "/")` before concatenating with the path +- Path hardcoded as const (NOT configurable via ProviderConfiguration — security) +- `buildDataRudderAuth` returns `nil` when `auth` block is missing/empty (not an error — runner short-circuits) + +--- + +### Subtask ST-001-03: ConnectivityProber + SSRF test helpers + +**Files to create**: +- `pkg/executors/datarudder/connectivity_prober.go` — `Probe()`, `AuthRequired()`, `extractDataRudderProbeTargets()`, sentinel UUID const, `probeAccountsPathTemplate` const, SSRF resolve via `libSSRF.ResolveAndValidate`, response body JSON parse to populate `Details["upstream_detail"]` +- `pkg/executors/datarudder/ssrf_test_helpers.go` — `ssrfOptions()` helper that mirrors `pkg/executors/midaz/ssrf_test_helpers.go` to disable private-IP block in tests using `httptest.Server` + +**Outcome mapping** (from approved plan — must cover all in tests): +- 200/404 → `Reached=true, AuthApplied=true` +- 401/403 → `Reached=true, AuthApplied=false` +- 5xx → `Reached=true, AuthApplied=true` + transport err +- Token endpoint failure → `Reached=true (auth host), AuthApplied=false` +- DNS/TCP error → `Reached=false` +- SSRF blocked → `Reached=false` + +--- + +### Subtask ST-001-04: Tests (TDD) + +**Files to create**: +- `pkg/executors/datarudder/datarudder_test.go` — register + schema validity + intra-package integration test (mock `POST /api/v1/auth/login` + mock `POST /api/v1/fraud/transactions/` via `httptest.Server`) +- `pkg/executors/datarudder/input_builder_test.go` — happy path, missing fields, auth block translation, `base_url` with/without trailing slash +- `pkg/executors/datarudder/connectivity_prober_test.go` — cover all 6 outcome scenarios from the table above, plus `AuthRequired()`, missing required fields, `upstream_detail` surfacing + +**TDD flow**: RED (failing tests) → GREEN (minimum code) → REFACTOR. Target ≥85% coverage on the new package. + +--- + +## Out-of-cycle (post-implementation) + +- **CHANGELOG**: do NOT edit manually. Auto-generated by `.github/workflows/gptchangelog.yml`. Use Conventional Commits (`feature(datarudder): ...`, `test(datarudder): ...`). +- **Smoke test against Delorean staging**: manual, after PR merge. `POST /v1/provider-configurations//test` against a real `inference.api-stg.delorean-ai.com` provider config — expect Connectivity=Reached, Auth=Applied, E2E StatusCode=404 with `upstream_detail="transaction not found"`. +- **Production URL**: when Delorean prod URL becomes available, swap is zero-code — just update `ProviderConfiguration` via API. From 74ac8fca9b9d38bbc81562198a2a40a553b5cb2f Mon Sep 17 00:00:00 2001 From: shimizu Date: Wed, 20 May 2026 09:29:14 -0300 Subject: [PATCH 2/8] feature(datarudder): add Provider, executor, BuildInput, and ConnectivityProber Adds a public DataRudder provider to the catalog that integrates with the Delorean anti-fraud API. Mirrors the Midaz architectural pattern (struct wrapping base.Provider with InputBuilder + ConnectivityProber methods) adapted for OAuth2 token-endpoint auth instead of OIDC. Components: - provider.go: Register() + NESTED providerConfigSchema with additionalProperties:false to reject typos (e.g., token_uri instead of token_url). User-facing display name "DataRudder", description references the Delorean product. - create_transaction.go: datarudder.create-transaction executor with a loose body schema (Delorean validates upstream). - input_builder.go: executorRoutes map (Midaz style) + BuildInput() that normalises trailing slash via strings.TrimRight and delegates auth construction. buildDataRudderAuth extracts the 3 nested auth fields explicitly and hardcodes credentials_location="body" (the field is intentionally not exposed in the provider schema since Delorean only accepts body credentials). - connectivity_prober.go: Probe() against the GET fraud transactions endpoint with a deterministic sentinel UUID. Maps 200/404 to Reached+AuthApplied, 401/403 to Reached without AuthApplied, 5xx to a transport error, and surfaces the upstream "detail" field in ProbeOutcome.Details["upstream_detail"]. SSRF pinning via libSSRF.ResolveAndValidate with lazy env var lookup matching the Midaz pattern. - ssrf_test_helpers.go: //go:build unit helper to reset the SSRF cache between test scenarios, mirroring midaz/ssrf_test_helpers.go. Path /api/v1/fraud/transactions/ is hardcoded as a security measure (capability limiting). Only base_url is user-configurable. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../datarudder/connectivity_prober.go | 259 ++++++++++++++++++ .../datarudder/create_transaction.go | 46 ++++ pkg/executors/datarudder/input_builder.go | 95 +++++++ pkg/executors/datarudder/provider.go | 116 ++++++++ pkg/executors/datarudder/ssrf_test_helpers.go | 20 ++ 5 files changed, 536 insertions(+) create mode 100644 pkg/executors/datarudder/connectivity_prober.go create mode 100644 pkg/executors/datarudder/create_transaction.go create mode 100644 pkg/executors/datarudder/input_builder.go create mode 100644 pkg/executors/datarudder/provider.go create mode 100644 pkg/executors/datarudder/ssrf_test_helpers.go diff --git a/pkg/executors/datarudder/connectivity_prober.go b/pkg/executors/datarudder/connectivity_prober.go new file mode 100644 index 0000000..be2f39a --- /dev/null +++ b/pkg/executors/datarudder/connectivity_prober.go @@ -0,0 +1,259 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +package datarudder + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "strings" + "sync" + "time" + + libSSRF "github.com/LerianStudio/lib-commons/v5/commons/security/ssrf" + + "github.com/LerianStudio/flowker/pkg/executor" + "github.com/LerianStudio/flowker/pkg/executors/http/auth" +) + +// probeTransactionPathTemplate is the read-only Delorean endpoint used to +// validate connectivity + authentication + endpoint reachability in a single +// call. With the sentinel UUID the expected status is 404 (transaction not +// found) — which proves the host was reached and the Bearer token was accepted. +const probeTransactionPathTemplate = "/api/v1/fraud/transactions/%s/" + +// probeSentinelTransactionID is a deterministic UUID that is guaranteed not to +// exist on the Delorean side. Used by Probe to elicit a stable 404 response. +// Documented as "probe sentinel" so future readers don't mistake it for real +// data and so support engineers can grep for it in upstream logs if needed. +const probeSentinelTransactionID = "00000000-0000-0000-0000-000000000000" + +// probeTimeout caps the total wall time of a single connectivity probe so the +// command does not hang on a slow upstream. It MUST stay below the command- +// level test timeout to give the caller a chance to record the error. +const probeTimeout = 10 * time.Second + +// probeBodyReadLimit caps how many bytes of the probe response body we read +// when extracting the upstream "detail" field. Delorean's 404 body is ~40 +// bytes; 4 KiB is generous without risking memory blow-up on a hostile server. +const probeBodyReadLimit = 4 * 1024 + +// AuthRequired reports that the DataRudder provider needs OAuth2 credentials +// to function. When credentials are absent the generic connectivity-test +// command short-circuits to Auth=Failed without attempting the probe. +func (p *datarudderProvider) AuthRequired() bool { return true } + +// Probe issues a single read-only GET against the Delorean fraud transactions +// endpoint using a sentinel UUID, applying OAuth2 auth via the existing +// pkg/executors/http/auth machinery. The returned ProbeOutcome is the single +// source of truth that the generic command maps onto the three stage results. +// +// Failure modes, in the order they are detected: +// 1. Missing/invalid base_url → Reached=false. +// 2. SSRF/DNS validation fails → Reached=false (TransportErr describes the block). +// 3. Auth provider construction or token exchange fails → Reached=true (the +// auth host was syntactically reachable), AuthApplied=false. +// 4. HTTP transport error against base_url → Reached=false. +// 5. HTTP response received → Reached=true, AuthApplied=true (or false if +// status indicates auth rejection); StatusCode carries the result. +func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, httpClient *http.Client) executor.ProbeOutcome { + start := time.Now() + + if httpClient == nil { + httpClient = &http.Client{Timeout: probeTimeout} + } + + baseURL, err := extractDataRudderProbeTargets(cfg) + if err != nil { + return executor.ProbeOutcome{ + URL: baseURL, + Reached: false, + DurationMs: time.Since(start).Milliseconds(), + TransportErr: err, + } + } + + baseURL = strings.TrimRight(baseURL, "/") + probePath := fmt.Sprintf(probeTransactionPathTemplate, probeSentinelTransactionID) + + // SSRF pinning: resolve once, use the IP-pinned URL on the wire. + resolveResult, err := libSSRF.ResolveAndValidate(ctx, baseURL, ssrfOptions()...) + if err != nil { + return executor.ProbeOutcome{ + URL: baseURL + probePath, + Reached: false, + DurationMs: time.Since(start).Milliseconds(), + TransportErr: fmt.Errorf("SSRF blocked base_url: %w", err), + } + } + + displayURL := baseURL + probePath + pinnedURL := resolveResult.PinnedURL + probePath + + reqCtx, cancel := context.WithTimeout(ctx, probeTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, pinnedURL, nil) + if err != nil { + return executor.ProbeOutcome{ + URL: displayURL, + Reached: false, + DurationMs: time.Since(start).Milliseconds(), + TransportErr: fmt.Errorf("build probe request: %w", err), + } + } + + // Preserve original host for TLS SNI / Host header (libSSRF gives us an + // IP-pinned URL but we still want the server to see the real hostname). + req.Host = resolveResult.Authority + + authApplied, authErr := applyDataRudderAuth(reqCtx, req, cfg, httpClient) + if authErr != nil { + return executor.ProbeOutcome{ + URL: displayURL, + Reached: true, // we reached far enough to attempt auth + AuthApplied: false, + DurationMs: time.Since(start).Milliseconds(), + TransportErr: authErr, + } + } + + resp, err := httpClient.Do(req) + if err != nil { + return executor.ProbeOutcome{ + URL: displayURL, + Reached: false, + AuthApplied: authApplied, + DurationMs: time.Since(start).Milliseconds(), + TransportErr: fmt.Errorf("probe request failed: %w", err), + } + } + + defer func() { _ = resp.Body.Close() }() + + details := map[string]any{} + if detail := readUpstreamDetail(resp.Body); detail != "" { + details["upstream_detail"] = detail + } + + outcome := executor.ProbeOutcome{ + URL: displayURL, + Reached: true, + AuthApplied: authApplied, + StatusCode: resp.StatusCode, + DurationMs: time.Since(start).Milliseconds(), + Details: details, + } + + switch { + case resp.StatusCode == http.StatusUnauthorized, resp.StatusCode == http.StatusForbidden: + outcome.AuthApplied = false + outcome.TransportErr = fmt.Errorf("auth rejected by Delorean: status %d", resp.StatusCode) + case resp.StatusCode >= 500: + outcome.TransportErr = fmt.Errorf("delorean returned server error: status %d", resp.StatusCode) + } + + return outcome +} + +// extractDataRudderProbeTargets pulls the base_url from a provider config and +// validates that the required auth credentials are present. Returns the +// base_url even on error so the caller can surface it for diagnostics. +func extractDataRudderProbeTargets(cfg map[string]any) (string, error) { + baseURL, ok := cfg["base_url"].(string) + if !ok || baseURL == "" { + return "", errors.New(`missing required field "base_url" in provider config`) + } + + authBlock, ok := cfg["auth"].(map[string]any) + if !ok || len(authBlock) == 0 { + return baseURL, errors.New("provider requires credentials but no auth block configured") + } + + for _, key := range []string{"token_url", "client_id", "client_secret"} { + v, ok := authBlock[key].(string) + if !ok || v == "" { + return baseURL, fmt.Errorf(`missing required field "auth.%s" in provider config`, key) + } + } + + return baseURL, nil +} + +// applyDataRudderAuth builds the auth provider from the nested auth block and +// applies it to the request. Returns true on success; false (with err) on any +// token exchange or header injection failure. +func applyDataRudderAuth(ctx context.Context, req *http.Request, cfg map[string]any, httpClient *http.Client) (bool, error) { + authConfig := buildDataRudderAuth(cfg) + if authConfig == nil { + return false, errors.New("auth block missing from provider config") + } + + authProvider, err := auth.NewFromConfig(authConfig, httpClient) + if err != nil { + return false, fmt.Errorf("build auth provider: %w", err) + } + + if err := authProvider.Apply(ctx, req); err != nil { + return false, fmt.Errorf("apply oauth2 auth: %w", err) + } + + return true, nil +} + +// readUpstreamDetail extracts the "detail" field from a JSON error body if +// present. Delorean's 404 returns {"detail": "transaction not found"} and +// surfacing it gives operators a human-readable diagnostic without parsing +// the full body. Returns "" on any parse failure — best-effort, never fatal. +func readUpstreamDetail(body io.Reader) string { + limited := io.LimitReader(body, probeBodyReadLimit) + + raw, err := io.ReadAll(limited) + if err != nil || len(raw) == 0 { + return "" + } + + var parsed struct { + Detail string `json:"detail"` + } + + if err := json.Unmarshal(raw, &parsed); err != nil { + return "" + } + + return parsed.Detail +} + +// ssrfAllowPrivate caches the SSRF_ALLOW_PRIVATE env var. Tests may override +// it via resetDataRudderSSRFCache + the SSRF_ALLOW_PRIVATE env to flip behaviour. +// +// NOTE: local duplication of the helper in +// internal/services/command/test_provider_config_connectivity.go and +// pkg/executors/midaz/connectivity_prober.go — extracted-to-shared was +// considered and rejected in the Midaz cycle on the same grounds (~10 lines +// is cheaper to duplicate than to introduce a new shared package). +var ( + ssrfAllowPrivate bool + ssrfAllowPrivateOnce sync.Once +) + +// ssrfOptions returns the libSSRF options derived from SSRF_ALLOW_PRIVATE. +// Set SSRF_ALLOW_PRIVATE=true to permit probes against private networks +// (httptest, local dev). The env var is read lazily on first call. +func ssrfOptions() []libSSRF.Option { + ssrfAllowPrivateOnce.Do(func() { + ssrfAllowPrivate = os.Getenv("SSRF_ALLOW_PRIVATE") == "true" + }) + + if ssrfAllowPrivate { + return []libSSRF.Option{libSSRF.WithAllowPrivateNetwork()} + } + + return nil +} diff --git a/pkg/executors/datarudder/create_transaction.go b/pkg/executors/datarudder/create_transaction.go new file mode 100644 index 0000000..8cd8675 --- /dev/null +++ b/pkg/executors/datarudder/create_transaction.go @@ -0,0 +1,46 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +package datarudder + +import ( + "fmt" + + "github.com/LerianStudio/flowker/pkg/executor" + "github.com/LerianStudio/flowker/pkg/executor/base" +) + +// CreateTransactionID is the executor ID for the create-transaction operation. +const CreateTransactionID executor.ID = "datarudder.create-transaction" + +// newCreateTransactionExecutor creates the executor for +// POST /api/v1/fraud/transactions/. +// Submits a transaction to the Delorean anti-fraud API for classification, +// returning a score and an action recommendation (APPROVED/REJECTED/REVIEW). +func newCreateTransactionExecutor() (*base.Executor, error) { + exec, err := base.NewExecutor( + CreateTransactionID, + "Create Transaction", + "DataRudder", + "v1", + ProviderID, + createTransactionSchema, + ) + if err != nil { + return nil, fmt.Errorf("failed to create executor: %w", err) + } + + return exec, nil +} + +// createTransactionSchema is intentionally loose. The Delorean API validates the +// transaction body upstream; coupling our schema tightly to their contract would +// just create maintenance friction without preventing real errors. See the +// approved plan for the expected request shape (transaction object with payer, +// receiver, amount, etc.). +const createTransactionSchema = `{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "description": "Request body for POST /api/v1/fraud/transactions/. Validated by Delorean upstream." +}` diff --git a/pkg/executors/datarudder/input_builder.go b/pkg/executors/datarudder/input_builder.go new file mode 100644 index 0000000..68d08aa --- /dev/null +++ b/pkg/executors/datarudder/input_builder.go @@ -0,0 +1,95 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +package datarudder + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/LerianStudio/flowker/pkg/executor" +) + +// executorMeta holds the HTTP routing metadata for a DataRudder executor. +type executorMeta struct { + method string + path string +} + +// executorRoutes maps each DataRudder executor to its HTTP routing metadata. +// Mirrors the Midaz pattern so adding executors later is just a new map entry. +// Paths are hardcoded (NOT configurable via ProviderConfiguration) as a +// security measure — see the approved plan's "Decisão arquitetural" section. +var executorRoutes = map[executor.ID]executorMeta{ + CreateTransactionID: {"POST", "/api/v1/fraud/transactions/"}, +} + +// BuildInput constructs an ExecutionInput for a DataRudder executor. +// It selects the routing metadata, normalizes the base URL (drops trailing +// slash to avoid double-slash), translates the nested auth block into the +// shape the HTTP runner expects, and forwards the request body. +// +// nodeData is currently unused — DataRudder operations have no path parameters +// or per-node config. The parameter is kept to honor the InputBuilder contract. +func BuildInput(providerConfig map[string]any, executorID executor.ID, _ map[string]any, requestBody []byte) (executor.ExecutionInput, error) { + route, ok := executorRoutes[executorID] + if !ok { + return executor.ExecutionInput{}, fmt.Errorf("unknown DataRudder executor: %s", executorID) + } + + baseURL, ok := providerConfig["base_url"].(string) + if !ok || baseURL == "" { + return executor.ExecutionInput{}, fmt.Errorf("missing required field %q in provider config", "base_url") + } + + fullURL := strings.TrimRight(baseURL, "/") + route.path + + authConfig := buildDataRudderAuth(providerConfig) + + config := map[string]any{ + "method": route.method, + "url": fullURL, + } + + if authConfig != nil { + config["auth"] = authConfig + } + + if len(requestBody) > 0 && route.method == "POST" { + var body any + if err := json.Unmarshal(requestBody, &body); err == nil { + config["body"] = body + } else { + config["body"] = string(requestBody) + } + } + + return executor.ExecutionInput{Config: config}, nil +} + +// buildDataRudderAuth translates the nested DataRudder auth block into the +// shape that the HTTP auth factory (auth.NewFromConfig) expects for the +// oauth2_token_endpoint provider. +// +// Fields are extracted explicitly (not passed as a raw map) to avoid leaking +// unrelated keys into the auth provider. credentials_location is hardcoded to +// "body" because that's what the Delorean token endpoint requires — it is +// intentionally not exposed in the provider config schema. +func buildDataRudderAuth(providerConfig map[string]any) map[string]any { + authBlock, ok := providerConfig["auth"].(map[string]any) + if !ok || len(authBlock) == 0 { + return nil + } + + return map[string]any{ + "type": "oauth2_token_endpoint", + "config": map[string]any{ + "token_url": authBlock["token_url"], + "client_id": authBlock["client_id"], + "client_secret": authBlock["client_secret"], + "credentials_location": "body", + }, + } +} diff --git a/pkg/executors/datarudder/provider.go b/pkg/executors/datarudder/provider.go new file mode 100644 index 0000000..898be7a --- /dev/null +++ b/pkg/executors/datarudder/provider.go @@ -0,0 +1,116 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +// Package datarudder provides the DataRudder anti-fraud provider for transaction +// risk scoring via the Delorean product (https://delorean-ai.com). Authentication +// uses OAuth2 via the oauth2_token_endpoint outbound auth provider. +package datarudder + +import ( + "fmt" + + "github.com/LerianStudio/flowker/pkg/executor" + "github.com/LerianStudio/flowker/pkg/executor/base" + httpExecutor "github.com/LerianStudio/flowker/pkg/executors/http" +) + +// ProviderID is the unique identifier for the DataRudder provider. +const ProviderID executor.ProviderID = "datarudder" + +// datarudderProvider wraps base.Provider and implements executor.InputBuilder +// plus executor.ConnectivityProber to provide DataRudder-specific URL routing, +// OAuth2 auth translation, and a Delorean-API-aware connectivity probe. +type datarudderProvider struct { + *base.Provider +} + +// BuildInput implements executor.InputBuilder for DataRudder-specific execution input. +func (p *datarudderProvider) BuildInput(providerConfig map[string]any, executorID executor.ID, nodeData map[string]any, requestBody []byte) (executor.ExecutionInput, error) { + return BuildInput(providerConfig, executorID, nodeData, requestBody) +} + +// Compile-time check that datarudderProvider implements Provider, InputBuilder, +// and ConnectivityProber. +var ( + _ executor.Provider = (*datarudderProvider)(nil) + _ executor.InputBuilder = (*datarudderProvider)(nil) + _ executor.ConnectivityProber = (*datarudderProvider)(nil) +) + +// Register registers the DataRudder provider with its create-transaction executor +// into the given catalog. +func Register(catalog executor.Catalog) error { + if catalog == nil { + return nil + } + + baseProvider, err := base.NewProvider( + ProviderID, + "DataRudder", + "DataRudder anti-fraud provider for transaction risk scoring via the Delorean product", + "v1", + providerConfigSchema, + ) + if err != nil { + return fmt.Errorf("failed to create DataRudder provider: %w", err) + } + + provider := &datarudderProvider{Provider: baseProvider} + + createTxExec, err := newCreateTransactionExecutor() + if err != nil { + return fmt.Errorf("failed to create DataRudder create-transaction executor: %w", err) + } + + return catalog.RegisterProvider(provider, []executor.ExecutorRegistration{ + { + Executor: createTxExec, + Runner: httpExecutor.NewRunner(), + }, + }) +} + +// providerConfigSchema defines the connection and authentication settings for a +// Delorean instance. Auth is nested under "auth" (same shape as Midaz). The +// schema uses additionalProperties:false at both levels to reject typos with a +// clear validation error instead of silently dropping the field. +// +// credentials_location is intentionally NOT exposed — the Delorean token endpoint +// expects credentials in the form body, and that's hardcoded in buildDataRudderAuth. +const providerConfigSchema = `{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "additionalProperties": false, + "required": ["base_url", "auth"], + "properties": { + "base_url": { + "type": "string", + "format": "uri", + "description": "Delorean API base URL (e.g., https://inference.api-stg.delorean-ai.com)" + }, + "auth": { + "type": "object", + "additionalProperties": false, + "required": ["token_url", "client_id", "client_secret"], + "description": "OAuth2 token-endpoint authentication for the Delorean API", + "properties": { + "token_url": { + "type": "string", + "format": "uri", + "description": "OAuth2 token endpoint URL (e.g., https://inference.api-stg.delorean-ai.com/api/v1/auth/login)" + }, + "client_id": { + "type": "string", + "minLength": 1, + "description": "OAuth2 client ID issued by DataRudder" + }, + "client_secret": { + "type": "string", + "minLength": 1, + "description": "OAuth2 client secret issued by DataRudder" + } + } + } + } +}` diff --git a/pkg/executors/datarudder/ssrf_test_helpers.go b/pkg/executors/datarudder/ssrf_test_helpers.go new file mode 100644 index 0000000..e0edd1b --- /dev/null +++ b/pkg/executors/datarudder/ssrf_test_helpers.go @@ -0,0 +1,20 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +//go:build unit + +package datarudder + +import "sync" + +// resetDataRudderSSRFCache resets the cached SSRF policy so a subsequent call +// to ssrfOptions re-reads the SSRF_ALLOW_PRIVATE env var. Intended for tests +// that switch the policy between scenarios via t.Setenv. +// +// Lives in a unit-tagged file so production builds don't carry it (and so +// golangci-lint's "unused" check stays clean against the non-test build). +func resetDataRudderSSRFCache() { + ssrfAllowPrivateOnce = sync.Once{} + ssrfAllowPrivate = false +} From c98981aa3745cb86e30f92c0d40491cb4980299b Mon Sep 17 00:00:00 2001 From: shimizu Date: Wed, 20 May 2026 09:29:32 -0300 Subject: [PATCH 3/8] feature(executors): register datarudder provider in default catalog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires datarudder.Register into RegisterDefaults so end users can discover the DataRudder provider via GET /v1/catalog/providers and create ProviderConfigurations against it. Bumps the catalog provider count assertion from 2 to 3 to reflect the new registration. Without this, the provider package compiles but is invisible to the public catalog API — the entire point of the feature. Co-Authored-By: Claude Opus 4.7 (1M context) --- pkg/executors/register.go | 5 +++++ pkg/executors/register_test.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/executors/register.go b/pkg/executors/register.go index 56ec456..51e5294 100644 --- a/pkg/executors/register.go +++ b/pkg/executors/register.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/LerianStudio/flowker/pkg/executor" + "github.com/LerianStudio/flowker/pkg/executors/datarudder" "github.com/LerianStudio/flowker/pkg/executors/midaz" "github.com/LerianStudio/flowker/pkg/executors/tracer" ) @@ -30,5 +31,9 @@ func RegisterDefaults(catalog executor.Catalog) error { return fmt.Errorf("failed to register Tracer provider: %w", err) } + if err := datarudder.Register(catalog); err != nil { + return fmt.Errorf("failed to register DataRudder provider: %w", err) + } + return nil } diff --git a/pkg/executors/register_test.go b/pkg/executors/register_test.go index 7382e72..16d8963 100644 --- a/pkg/executors/register_test.go +++ b/pkg/executors/register_test.go @@ -75,5 +75,5 @@ func TestRegisterDefaults_AllProvidersCount(t *testing.T) { require.NoError(t, err) providers := catalog.ListProviders() - assert.Len(t, providers, 2) + assert.Len(t, providers, 3) } From c677d705931e58dd6b374c8056dd8578e74b45c6 Mon Sep 17 00:00:00 2001 From: shimizu Date: Wed, 20 May 2026 09:30:11 -0300 Subject: [PATCH 4/8] test(datarudder): add unit and integration tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Covers the datarudder package with 89.8% statement coverage. - input_builder_test.go: BuildInput happy path, trailing slash normalization (single + multiple slashes), unknown executor ID, missing/empty base_url, auth block omission, body parsing fallback for non-JSON, nodeData passthrough. - connectivity_prober_test.go: 13 test cases covering all outcome paths from the approved plan — sentinel 404 happy path, 200 transaction-exists, 401/403 auth rejection, 5xx server error, token endpoint failure, network unreachable, SSRF blocked, missing base_url, missing auth block, missing auth field, upstream_detail surfacing, non-JSON body resilience, plus AuthRequired() = true. - datarudder_test.go: catalog registration sanity, JSON Schema acceptance/rejection (extra top-level field, extra auth field, each required field missing), and an intra-package integration test that drives the full chain — BuildInput -> runner.Execute -> mock token endpoint -> mock fraud endpoint — asserting Bearer token injection, body integrity, no grant_type sent, and response surfacing. Mock servers run on httptest.Server (127.0.0.1) so tests rely on SSRF_ALLOW_PRIVATE=true + the unit-tagged ssrf_test_helpers. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../datarudder/connectivity_prober_test.go | 343 ++++++++++++++++++ pkg/executors/datarudder/datarudder_test.go | 248 +++++++++++++ .../datarudder/input_builder_test.go | 146 ++++++++ 3 files changed, 737 insertions(+) create mode 100644 pkg/executors/datarudder/connectivity_prober_test.go create mode 100644 pkg/executors/datarudder/datarudder_test.go create mode 100644 pkg/executors/datarudder/input_builder_test.go diff --git a/pkg/executors/datarudder/connectivity_prober_test.go b/pkg/executors/datarudder/connectivity_prober_test.go new file mode 100644 index 0000000..03b56d4 --- /dev/null +++ b/pkg/executors/datarudder/connectivity_prober_test.go @@ -0,0 +1,343 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +//go:build unit + +package datarudder + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// datarudderMockOptions configures the Delorean mock server. +type datarudderMockOptions struct { + forceTokenStatus int // non-zero to force token endpoint status (e.g. 401) + forceFraudStatus int // non-zero to force fraud endpoint status (e.g. 404, 500) + fraudResponse string +} + +// datarudderMockServer is a single httptest.Server that serves both the OAuth2 +// token endpoint and the Delorean fraud transactions endpoint. +type datarudderMockServer struct { + server *httptest.Server + tokenRequests *atomic.Int64 + fraudRequests *atomic.Int64 +} + +func newDatarudderMockServer(t *testing.T, opts datarudderMockOptions) *datarudderMockServer { + t.Helper() + + m := &datarudderMockServer{ + tokenRequests: &atomic.Int64{}, + fraudRequests: &atomic.Int64{}, + } + + mux := http.NewServeMux() + m.server = httptest.NewServer(mux) + + mux.HandleFunc("/api/v1/auth/login", func(w http.ResponseWriter, r *http.Request) { + m.tokenRequests.Add(1) + + if opts.forceTokenStatus != 0 { + w.WriteHeader(opts.forceTokenStatus) + _, _ = w.Write([]byte(`{"detail":"invalid credentials"}`)) + return + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "access_token": "mock-access-token", + "token_type": "Bearer", + "expires_in": 3600, + }) + }) + + mux.HandleFunc("/api/v1/fraud/transactions/", func(w http.ResponseWriter, r *http.Request) { + m.fraudRequests.Add(1) + + // Default behaviour: sentinel UUID returns 404 (transaction not found). + status := opts.forceFraudStatus + if status == 0 { + status = http.StatusNotFound + } + + body := opts.fraudResponse + if body == "" { + body = `{"detail":"transaction not found"}` + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _, _ = w.Write([]byte(body)) + }) + + t.Cleanup(func() { m.server.Close() }) + + return m +} + +func validDatarudderConfig(serverURL string) map[string]any { + return map[string]any{ + "base_url": serverURL, + "auth": map[string]any{ + "token_url": serverURL + "/api/v1/auth/login", + "client_id": "test-client", + "client_secret": "test-secret", + }, + } +} + +func newProberClient() *http.Client { + return &http.Client{Timeout: 5 * time.Second} +} + +// resetSSRFCacheForTest forces ssrfOptions() to re-read SSRF_ALLOW_PRIVATE on +// the next call. Tests use t.Setenv before this to switch policy per scenario. +func resetSSRFCacheForTest(t *testing.T) { + t.Helper() + resetDataRudderSSRFCache() + _ = ssrfOptions() // sanity: re-prime so next probe sees the env +} + +func TestAuthRequired_IsTrue(t *testing.T) { + provider := &datarudderProvider{} + assert.True(t, provider.AuthRequired()) +} + +func TestProbe_HappyPath_Sentinel404(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "true") + resetSSRFCacheForTest(t) + + mock := newDatarudderMockServer(t, datarudderMockOptions{}) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + require.True(t, outcome.Reached, "expected Reached=true, TransportErr=%v", outcome.TransportErr) + assert.True(t, outcome.AuthApplied) + assert.Equal(t, http.StatusNotFound, outcome.StatusCode) + assert.NoError(t, outcome.TransportErr) + assert.Contains(t, outcome.URL, "00000000-0000-0000-0000-000000000000") + require.NotNil(t, outcome.Details) + assert.Equal(t, "transaction not found", outcome.Details["upstream_detail"]) + + assert.Equal(t, int64(1), mock.tokenRequests.Load()) + assert.Equal(t, int64(1), mock.fraudRequests.Load()) +} + +func TestProbe_TransactionExists_200(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "true") + resetSSRFCacheForTest(t) + + mock := newDatarudderMockServer(t, datarudderMockOptions{ + forceFraudStatus: http.StatusOK, + fraudResponse: `{"id":"abc","score":0.5,"action":"APPROVED"}`, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + require.True(t, outcome.Reached) + assert.True(t, outcome.AuthApplied) + assert.Equal(t, http.StatusOK, outcome.StatusCode) + assert.NoError(t, outcome.TransportErr) +} + +func TestProbe_AuthRejected_401(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "true") + resetSSRFCacheForTest(t) + + mock := newDatarudderMockServer(t, datarudderMockOptions{ + forceFraudStatus: http.StatusUnauthorized, + fraudResponse: `{"detail":"invalid token"}`, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + require.True(t, outcome.Reached, "host was reachable") + assert.False(t, outcome.AuthApplied, "401 means auth was rejected") + assert.Equal(t, http.StatusUnauthorized, outcome.StatusCode) + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "auth rejected") + assert.Equal(t, "invalid token", outcome.Details["upstream_detail"]) +} + +func TestProbe_AuthRejected_403(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "true") + resetSSRFCacheForTest(t) + + mock := newDatarudderMockServer(t, datarudderMockOptions{ + forceFraudStatus: http.StatusForbidden, + fraudResponse: `{"detail":"forbidden"}`, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + require.True(t, outcome.Reached) + assert.False(t, outcome.AuthApplied) + assert.Equal(t, http.StatusForbidden, outcome.StatusCode) +} + +func TestProbe_ServerError_5xx(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "true") + resetSSRFCacheForTest(t) + + mock := newDatarudderMockServer(t, datarudderMockOptions{ + forceFraudStatus: http.StatusInternalServerError, + fraudResponse: `{"detail":"upstream borked"}`, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + require.True(t, outcome.Reached) + assert.True(t, outcome.AuthApplied, "Bearer was accepted; the 500 is a Delorean-side issue") + assert.Equal(t, http.StatusInternalServerError, outcome.StatusCode) + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "server error") +} + +func TestProbe_TokenEndpointFails_401(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "true") + resetSSRFCacheForTest(t) + + mock := newDatarudderMockServer(t, datarudderMockOptions{ + forceTokenStatus: http.StatusUnauthorized, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + assert.True(t, outcome.Reached, "we reached the auth host before failing") + assert.False(t, outcome.AuthApplied, "token exchange failed → AuthApplied=false") + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "apply oauth2 auth") + assert.Equal(t, int64(0), mock.fraudRequests.Load(), "fraud endpoint must NOT have been called when token fetch failed") +} + +func TestProbe_NetworkError_UnreachableHost(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "true") + resetSSRFCacheForTest(t) + + // 127.0.0.1:1 is reliably refused on every platform. + provider := &datarudderProvider{} + cfg := validDatarudderConfig("http://127.0.0.1:1") + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + assert.False(t, outcome.AuthApplied) + require.Error(t, outcome.TransportErr) +} + +func TestProbe_SSRFBlocked_PrivateNetworkDisallowed(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "false") + resetSSRFCacheForTest(t) + + mock := newDatarudderMockServer(t, datarudderMockOptions{}) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + assert.False(t, outcome.Reached) + assert.False(t, outcome.AuthApplied) + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "SSRF") +} + +func TestProbe_MissingBaseURL(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "true") + resetSSRFCacheForTest(t) + + provider := &datarudderProvider{} + cfg := map[string]any{ + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "x", + "client_secret": "y", + }, + } + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + assert.False(t, outcome.Reached) + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "base_url") +} + +func TestProbe_MissingAuthBlock(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "true") + resetSSRFCacheForTest(t) + + provider := &datarudderProvider{} + cfg := map[string]any{"base_url": "https://example.com"} + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + assert.False(t, outcome.Reached) + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "auth block") +} + +func TestProbe_MissingAuthField(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "true") + resetSSRFCacheForTest(t) + + provider := &datarudderProvider{} + cfg := map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "x", + // client_secret missing + }, + } + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + assert.False(t, outcome.Reached) + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "client_secret") +} + +func TestProbe_UpstreamDetailMissing_DoesNotFail(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "true") + resetSSRFCacheForTest(t) + + mock := newDatarudderMockServer(t, datarudderMockOptions{ + fraudResponse: `not even json`, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + require.True(t, outcome.Reached) + assert.Empty(t, outcome.Details["upstream_detail"], "non-JSON body must not crash; detail just stays empty") +} diff --git a/pkg/executors/datarudder/datarudder_test.go b/pkg/executors/datarudder/datarudder_test.go new file mode 100644 index 0000000..7b3d0fc --- /dev/null +++ b/pkg/executors/datarudder/datarudder_test.go @@ -0,0 +1,248 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +//go:build unit + +package datarudder_test + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/LerianStudio/flowker/pkg/executor" + "github.com/LerianStudio/flowker/pkg/executors" + "github.com/LerianStudio/flowker/pkg/executors/datarudder" + "github.com/santhosh-tekuri/jsonschema/v6" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestRegister_ProviderAndExecutorAreCatalogged verifies that RegisterDefaults +// adds DataRudder to the catalog with its one executor reachable via runner. +func TestRegister_ProviderAndExecutorAreCatalogged(t *testing.T) { + catalog := executor.NewCatalog() + require.NoError(t, executors.RegisterDefaults(catalog)) + + provider, err := catalog.GetProvider("datarudder") + require.NoError(t, err) + assert.Equal(t, "DataRudder", provider.Name()) + assert.Contains(t, provider.Description(), "anti-fraud") + + runner, err := catalog.GetRunner(datarudder.CreateTransactionID) + require.NoError(t, err) + require.NotNil(t, runner, "runner for datarudder.create-transaction must be registered") +} + +// TestProviderConfigSchema_Valid ensures the published JSON Schema compiles and +// accepts a representative happy-path config. +func TestProviderConfigSchema_Valid(t *testing.T) { + schema := compileProviderSchema(t) + + cfg := map[string]any{ + "base_url": "https://inference.api-stg.delorean-ai.com", + "auth": map[string]any{ + "token_url": "https://inference.api-stg.delorean-ai.com/api/v1/auth/login", + "client_id": "id", + "client_secret": "secret", + }, + } + + require.NoError(t, schema.Validate(cfg)) +} + +// TestProviderConfigSchema_RejectsExtraTopLevelField proves additionalProperties:false +// at the top level catches typos. +func TestProviderConfigSchema_RejectsExtraTopLevelField(t *testing.T) { + schema := compileProviderSchema(t) + + cfg := map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "id", + "client_secret": "secret", + }, + "unexpected_field": "boom", + } + + require.Error(t, schema.Validate(cfg), "additionalProperties:false must reject unknown top-level fields") +} + +// TestProviderConfigSchema_RejectsExtraAuthField proves additionalProperties:false +// inside auth catches typos like 'token_uri' instead of 'token_url'. +func TestProviderConfigSchema_RejectsExtraAuthField(t *testing.T) { + schema := compileProviderSchema(t) + + cfg := map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "id", + "client_secret": "secret", + "token_uri": "https://example.com/typo", // typo + }, + } + + require.Error(t, schema.Validate(cfg), "additionalProperties:false in auth must reject typos") +} + +// TestProviderConfigSchema_RejectsMissingRequiredFields enumerates each +// required field and asserts the schema rejects configs missing it. +func TestProviderConfigSchema_RejectsMissingRequiredFields(t *testing.T) { + schema := compileProviderSchema(t) + + t.Run("missing base_url", func(t *testing.T) { + cfg := map[string]any{ + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "id", + "client_secret": "secret", + }, + } + require.Error(t, schema.Validate(cfg)) + }) + + t.Run("missing auth", func(t *testing.T) { + cfg := map[string]any{"base_url": "https://example.com"} + require.Error(t, schema.Validate(cfg)) + }) + + t.Run("missing auth.token_url", func(t *testing.T) { + cfg := map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "client_id": "id", + "client_secret": "secret", + }, + } + require.Error(t, schema.Validate(cfg)) + }) +} + +// TestIntegration_CreateTransaction_EndToEnd is the intra-package integration +// test: stands up mock login + mock fraud endpoints via httptest.Server, drives +// the full chain (provider config → BuildInput → runner.Execute → response). +// Proves that the OAuth2 flow + body translation + Bearer header injection all +// work together against a DataRudder-shaped API. +func TestIntegration_CreateTransaction_EndToEnd(t *testing.T) { + var ( + tokenRequestBody string + fraudAuthHeader string + fraudRequestBody map[string]any + ) + + mux := http.NewServeMux() + + mux.HandleFunc("/api/v1/auth/login", func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + defer r.Body.Close() + tokenRequestBody = string(body) + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "access_token": "test-bearer-12345", + "token_type": "Bearer", + "expires_in": 3600, + }) + }) + + mux.HandleFunc("/api/v1/fraud/transactions/", func(w http.ResponseWriter, r *http.Request) { + fraudAuthHeader = r.Header.Get("Authorization") + + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + defer r.Body.Close() + require.NoError(t, json.Unmarshal(body, &fraudRequestBody)) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]any{ + "id": "4b425c1c-a9e8-4f7e-b7e7-4cc201c3f798", + "score": 0.0, + "action": "APPROVED", + }) + }) + + server := httptest.NewServer(mux) + defer server.Close() + + cfg := map[string]any{ + "base_url": server.URL, + "auth": map[string]any{ + "token_url": server.URL + "/api/v1/auth/login", + "client_id": "the-client", + "client_secret": "the-secret", + }, + } + + body, err := json.Marshal(map[string]any{ + "transaction": map[string]any{ + "id": "4b425c1c-a9e8-4f7e-b7e7-4cc201c3f798", + "amount_total": 101, + "currency_code": "BRL", + }, + }) + require.NoError(t, err) + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, body) + require.NoError(t, err) + + catalog := executor.NewCatalog() + require.NoError(t, executors.RegisterDefaults(catalog)) + runner, err := catalog.GetRunner(datarudder.CreateTransactionID) + require.NoError(t, err) + + result, err := runner.Execute(context.Background(), input) + require.NoError(t, err) + assert.Equal(t, executor.ExecutionStatusSuccess, result.Status, "expected success, got error: %s", result.Error) + + // Token endpoint must have received credentials in form body (not Basic header). + assert.Contains(t, tokenRequestBody, "client_id=the-client") + assert.Contains(t, tokenRequestBody, "client_secret=the-secret") + assert.False(t, strings.Contains(tokenRequestBody, "grant_type="), "grant_type must NOT be sent by default") + + // Fraud endpoint must have received the Bearer token. + assert.Equal(t, "Bearer test-bearer-12345", fraudAuthHeader) + + // Fraud endpoint must have received the transaction body intact. + txn, ok := fraudRequestBody["transaction"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "4b425c1c-a9e8-4f7e-b7e7-4cc201c3f798", txn["id"]) + assert.Equal(t, "BRL", txn["currency_code"]) + + // Response must surface the score/action. + respBody, ok := result.Data["body"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "APPROVED", respBody["action"]) +} + +// compileProviderSchema extracts and compiles the published provider schema +// for use in validation tests. Goes through the catalog so we test what +// callers actually see. +func compileProviderSchema(t *testing.T) *jsonschema.Schema { + t.Helper() + + catalog := executor.NewCatalog() + require.NoError(t, executors.RegisterDefaults(catalog)) + provider, err := catalog.GetProvider("datarudder") + require.NoError(t, err) + + schemaStr := provider.ConfigSchema() + require.NotEmpty(t, schemaStr) + + compiler := jsonschema.NewCompiler() + raw, err := jsonschema.UnmarshalJSON(strings.NewReader(schemaStr)) + require.NoError(t, err) + require.NoError(t, compiler.AddResource("schema.json", raw)) + + compiled, err := compiler.Compile("schema.json") + require.NoError(t, err) + return compiled +} diff --git a/pkg/executors/datarudder/input_builder_test.go b/pkg/executors/datarudder/input_builder_test.go new file mode 100644 index 0000000..41e728d --- /dev/null +++ b/pkg/executors/datarudder/input_builder_test.go @@ -0,0 +1,146 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +//go:build unit + +package datarudder_test + +import ( + "encoding/json" + "testing" + + "github.com/LerianStudio/flowker/pkg/executors/datarudder" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func validProviderConfig() map[string]any { + return map[string]any{ + "base_url": "https://inference.api-stg.delorean-ai.com", + "auth": map[string]any{ + "token_url": "https://inference.api-stg.delorean-ai.com/api/v1/auth/login", + "client_id": "test-client", + "client_secret": "test-secret", + }, + } +} + +func TestBuildInput_CreateTransaction_HappyPath(t *testing.T) { + cfg := validProviderConfig() + + body, err := json.Marshal(map[string]any{ + "transaction": map[string]any{ + "id": "4b425c1c-a9e8-4f7e-b7e7-4cc201c3f798", + "amount": 101, + }, + }) + require.NoError(t, err) + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, body) + require.NoError(t, err) + + assert.Equal(t, "POST", input.Config["method"]) + assert.Equal(t, "https://inference.api-stg.delorean-ai.com/api/v1/fraud/transactions/", input.Config["url"]) + + authBlock, ok := input.Config["auth"].(map[string]any) + require.True(t, ok, "auth block must be present") + assert.Equal(t, "oauth2_token_endpoint", authBlock["type"]) + + authConfig, ok := authBlock["config"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "https://inference.api-stg.delorean-ai.com/api/v1/auth/login", authConfig["token_url"]) + assert.Equal(t, "test-client", authConfig["client_id"]) + assert.Equal(t, "test-secret", authConfig["client_secret"]) + assert.Equal(t, "body", authConfig["credentials_location"], "credentials_location must be hardcoded to body") + + parsedBody, ok := input.Config["body"].(map[string]any) + require.True(t, ok) + txn, ok := parsedBody["transaction"].(map[string]any) + require.True(t, ok) + assert.Equal(t, float64(101), txn["amount"]) +} + +func TestBuildInput_TrimsTrailingSlashOnBaseURL(t *testing.T) { + cfg := validProviderConfig() + cfg["base_url"] = "https://inference.api-stg.delorean-ai.com/" + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, nil) + require.NoError(t, err) + assert.Equal(t, "https://inference.api-stg.delorean-ai.com/api/v1/fraud/transactions/", input.Config["url"], + "trailing slash on base_url must be normalized — no double-slash in final URL") +} + +func TestBuildInput_TrimsMultipleTrailingSlashes(t *testing.T) { + cfg := validProviderConfig() + cfg["base_url"] = "https://inference.api-stg.delorean-ai.com///" + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, nil) + require.NoError(t, err) + assert.Equal(t, "https://inference.api-stg.delorean-ai.com/api/v1/fraud/transactions/", input.Config["url"]) +} + +func TestBuildInput_UnknownExecutorID(t *testing.T) { + cfg := validProviderConfig() + + _, err := datarudder.BuildInput(cfg, "datarudder.unknown-op", nil, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "unknown DataRudder executor") +} + +func TestBuildInput_MissingBaseURL(t *testing.T) { + cfg := validProviderConfig() + delete(cfg, "base_url") + + _, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "base_url") +} + +func TestBuildInput_EmptyBaseURL(t *testing.T) { + cfg := validProviderConfig() + cfg["base_url"] = "" + + _, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "base_url") +} + +func TestBuildInput_NoAuthBlock_OmitsAuth(t *testing.T) { + cfg := map[string]any{ + "base_url": "https://example.com", + } + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, nil) + require.NoError(t, err) + _, has := input.Config["auth"] + assert.False(t, has, "auth must be omitted when no auth block in provider config") +} + +func TestBuildInput_BodyAsStringWhenNotJSON(t *testing.T) { + cfg := validProviderConfig() + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, []byte("not-json{")) + require.NoError(t, err) + assert.Equal(t, "not-json{", input.Config["body"], "non-JSON body must pass through as raw string") +} + +func TestBuildInput_NoBody(t *testing.T) { + cfg := validProviderConfig() + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, nil) + require.NoError(t, err) + _, has := input.Config["body"] + assert.False(t, has, "body must be omitted when requestBody is nil/empty") +} + +func TestBuildInput_NodeDataIgnored(t *testing.T) { + cfg := validProviderConfig() + + // nodeData is ignored for DataRudder (no path params). Pass garbage to prove it. + nodeData := map[string]any{"unrelated": "should-be-ignored"} + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nodeData, nil) + require.NoError(t, err) + assert.Equal(t, "https://inference.api-stg.delorean-ai.com/api/v1/fraud/transactions/", input.Config["url"]) +} From ab8267491f93dabe9f68b0361bb9f733685c174d Mon Sep 17 00:00:00 2001 From: shimizu Date: Wed, 20 May 2026 09:37:37 -0300 Subject: [PATCH 5/8] test(datarudder): tighten integration test assertions via url.ParseQuery The previous assertions used strings.Contains against the raw form body, which would have produced false positives for values like "the-client-foo" matching the expected "the-client". Parse the body via url.ParseQuery and assert on exact decoded values instead, so the test cannot be fooled by substring overlap and any future regression in form encoding is caught. Co-Authored-By: Claude Opus 4.7 (1M context) --- pkg/executors/datarudder/datarudder_test.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/executors/datarudder/datarudder_test.go b/pkg/executors/datarudder/datarudder_test.go index 7b3d0fc..bda1ed3 100644 --- a/pkg/executors/datarudder/datarudder_test.go +++ b/pkg/executors/datarudder/datarudder_test.go @@ -12,6 +12,7 @@ import ( "io" "net/http" "net/http/httptest" + "net/url" "strings" "testing" @@ -204,9 +205,13 @@ func TestIntegration_CreateTransaction_EndToEnd(t *testing.T) { assert.Equal(t, executor.ExecutionStatusSuccess, result.Status, "expected success, got error: %s", result.Error) // Token endpoint must have received credentials in form body (not Basic header). - assert.Contains(t, tokenRequestBody, "client_id=the-client") - assert.Contains(t, tokenRequestBody, "client_secret=the-secret") - assert.False(t, strings.Contains(tokenRequestBody, "grant_type="), "grant_type must NOT be sent by default") + // Parse the form body so the assertions match exact values, not substrings — + // otherwise client_id="the-client" would match a literal "the-client-extra". + parsedForm, parseErr := url.ParseQuery(tokenRequestBody) + require.NoError(t, parseErr, "token endpoint body must be valid form-urlencoded") + assert.Equal(t, "the-client", parsedForm.Get("client_id")) + assert.Equal(t, "the-secret", parsedForm.Get("client_secret")) + assert.False(t, parsedForm.Has("grant_type"), "grant_type must NOT be sent by default") // Fraud endpoint must have received the Bearer token. assert.Equal(t, "Bearer test-bearer-12345", fraudAuthHeader) From 9a916e4ec94edb55cf4075a441de2557e13754cd Mon Sep 17 00:00:00 2001 From: shimizu Date: Wed, 20 May 2026 10:21:56 -0300 Subject: [PATCH 6/8] =?UTF-8?q?fix(datarudder):=20apply=20codereview=20fin?= =?UTF-8?q?dings=20=E2=80=94=20404=20contract,=20auth=20semantics,=20test?= =?UTF-8?q?=20gaps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses 1 Critical + 4 High + 5 Medium + 3 Low from the gate-8 multi-agent review, scoped strictly to the DataRudder package (issues outside this package — runner-wide token cache, masking, README drift — are left for the Flowker product team). Probe contract changes (connectivity_prober.go): - CRITICAL: the sentinel UUID 404 (documented success signal) was being misclassified as E2E=Failed by the shared applyProbeOutcome command (which was written for Midaz semantics). Probe now rewrites StatusCode=404 to 200 before returning, and stores the real status in Details["raw_status_code"] for diagnostics. Keeps the shared command mapping intact. - HIGH: removed the post-response switch that overrode AuthApplied=false on 401/403 and added a synthetic TransportErr on 5xx. Status classification is the shared command layer's job; owning that decision in the prober drifts from the Midaz pattern and double-classifies the outcome. - MEDIUM: when applyDataRudderAuth fails, Reached is now false (the probe URL was never reached). The previous "we reached far enough to attempt auth" comment contradicted the documented Reached contract on pkg/executor/connectivity_prober.go:46. - applyDataRudderAuth now returns just error (the bool was redundant — true on success, false on error). Simplifies the call site and removes one dead state variable. - LOW: response body is drained via io.Copy(io.Discard, ...) before Close so keep-alive connection reuse is not blocked by unread bytes past the 4 KiB readUpstreamDetail cap. Test improvements (connectivity_prober_test.go): - HIGH: TestProbe_NetworkError_UnreachableHost was misnamed — config used the same unreachable host for both base_url and token_url, so auth failed FIRST and the post-auth Do() error branch was never covered. Renamed to TestProbe_NetworkError_UnreachableProbeHost and the config now stands up a working token mock + a refused base_url. Now genuinely exercises the intended branch. - MEDIUM: TestProbe_MissingAuthField was renamed in-place and converted to table-driven, covering all three required auth fields (token_url, client_id, client_secret). The previous shape only exercised the client_secret branch because the extractor iterates keys in declaration order. - MEDIUM: added TestReadUpstreamDetail with direct unit coverage of the helper — valid JSON with/without detail key, empty body, empty object, malformed JSON, and a body larger than probeBodyReadLimit (verifies the LimitReader truncation does not panic). - MEDIUM: added TestProbe_Timeout exercising the client-timeout / context- deadline path against a deliberately slow fraud-endpoint mock. The 10-second probeTimeout constant is no longer fully untested. - MEDIUM: URL assertions now use Equal against the deterministic expected URL instead of Contains. - LOW: removed the no-op '_ = ssrfOptions()' re-prime line in resetSSRFCacheForTest (Probe itself primes via the same sync.Once on its next call, so the test-side prime was dead). Added a CAUTION comment warning future contributors against t.Parallel() use given the package-level SSRF cache mutation. - Cascading assertion updates across the existing 401/403/5xx/token-fail tests to honour the new probe contract (AuthApplied=true on a server rejection, Reached=false when auth fails, no synthetic TransportErr from the prober itself). Schema test coverage (datarudder_test.go): - HIGH: TestProviderConfigSchema_RejectsMissingRequiredFields renamed to RejectsInvalidConfigs and expanded to a table-driven enumeration of every required field plus minLength:1 violations. Previously only base_url, auth, and auth.token_url missing were tested; client_id, client_secret, and the empty-string minLength constraint were uncovered. Added an inline note documenting that format:"uri" is annotation-only under Draft 2020-12 (gap is Flowker-team-owned because the compiler config is shared). - HIGH: TestIntegration_CreateTransaction_EndToEnd now captures and asserts that the token endpoint received NO Authorization header. This proves credentials_location="body" is honoured and that a future regression sending both body credentials AND a Basic header would be caught. Coverage rose from 89.8% to 91.6% on the datarudder package. lint clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../datarudder/connectivity_prober.go | 81 +++++--- .../datarudder/connectivity_prober_test.go | 190 +++++++++++++++--- pkg/executors/datarudder/datarudder_test.go | 139 ++++++++++--- 3 files changed, 323 insertions(+), 87 deletions(-) diff --git a/pkg/executors/datarudder/connectivity_prober.go b/pkg/executors/datarudder/connectivity_prober.go index be2f39a..4a9ea74 100644 --- a/pkg/executors/datarudder/connectivity_prober.go +++ b/pkg/executors/datarudder/connectivity_prober.go @@ -57,11 +57,16 @@ func (p *datarudderProvider) AuthRequired() bool { return true } // Failure modes, in the order they are detected: // 1. Missing/invalid base_url → Reached=false. // 2. SSRF/DNS validation fails → Reached=false (TransportErr describes the block). -// 3. Auth provider construction or token exchange fails → Reached=true (the -// auth host was syntactically reachable), AuthApplied=false. -// 4. HTTP transport error against base_url → Reached=false. -// 5. HTTP response received → Reached=true, AuthApplied=true (or false if -// status indicates auth rejection); StatusCode carries the result. +// 3. Auth provider construction or token exchange fails → Reached=false (the +// probe URL was never reached), AuthApplied=false. +// 4. HTTP transport error against base_url → Reached=false, AuthApplied=true +// (auth succeeded; only the probe-URL hop failed). +// 5. HTTP response received → Reached=true, AuthApplied=true. The status code +// is normalised so a 404 from the sentinel UUID — the documented success +// signal — is reported as 200, with the raw status preserved in +// Details["raw_status_code"]. Status classification for 401/403/5xx is +// left to the shared command layer (applyProbeOutcome) so behaviour stays +// consistent across providers. func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, httpClient *http.Client) executor.ProbeOutcome { start := time.Now() @@ -113,14 +118,17 @@ func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, http // IP-pinned URL but we still want the server to see the real hostname). req.Host = resolveResult.Authority - authApplied, authErr := applyDataRudderAuth(reqCtx, req, cfg, httpClient) - if authErr != nil { + if err := applyDataRudderAuth(reqCtx, req, cfg, httpClient); err != nil { + // The probe URL (base_url + path) was never reached because auth + // resolution / token exchange / header injection failed before the + // request could be dispatched. Honour the Reached field's documented + // contract (transport completed on the probe URL). return executor.ProbeOutcome{ URL: displayURL, - Reached: true, // we reached far enough to attempt auth + Reached: false, AuthApplied: false, DurationMs: time.Since(start).Milliseconds(), - TransportErr: authErr, + TransportErr: err, } } @@ -129,37 +137,47 @@ func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, http return executor.ProbeOutcome{ URL: displayURL, Reached: false, - AuthApplied: authApplied, + AuthApplied: true, // auth succeeded; only the probe-URL hop failed DurationMs: time.Since(start).Milliseconds(), TransportErr: fmt.Errorf("probe request failed: %w", err), } } - defer func() { _ = resp.Body.Close() }() + defer func() { + // Drain any unread bytes (anything beyond probeBodyReadLimit) so the + // connection can be returned to the keep-alive pool. + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() details := map[string]any{} if detail := readUpstreamDetail(resp.Body); detail != "" { details["upstream_detail"] = detail } - outcome := executor.ProbeOutcome{ + // The sentinel UUID is guaranteed not to exist upstream — Delorean returns + // 404 for it, which is our documented success signal (host reached + Bearer + // accepted). Rewrite to 200 so the shared command's outcome mapping + // (written for Midaz, where 404 means missing org/ledger) doesn't classify + // our healthy path as E2E=Failed. The original status is preserved in + // Details for diagnostics. + statusCode := resp.StatusCode + if statusCode == http.StatusNotFound { + details["raw_status_code"] = http.StatusNotFound + statusCode = http.StatusOK + } + + // Status classification for 401/403/5xx is left to the shared command + // layer (applyProbeOutcome). Owning that decision here would drift from + // the Midaz prober contract and double-classify the outcome. + return executor.ProbeOutcome{ URL: displayURL, Reached: true, - AuthApplied: authApplied, - StatusCode: resp.StatusCode, + AuthApplied: true, + StatusCode: statusCode, DurationMs: time.Since(start).Milliseconds(), Details: details, } - - switch { - case resp.StatusCode == http.StatusUnauthorized, resp.StatusCode == http.StatusForbidden: - outcome.AuthApplied = false - outcome.TransportErr = fmt.Errorf("auth rejected by Delorean: status %d", resp.StatusCode) - case resp.StatusCode >= 500: - outcome.TransportErr = fmt.Errorf("delorean returned server error: status %d", resp.StatusCode) - } - - return outcome } // extractDataRudderProbeTargets pulls the base_url from a provider config and @@ -187,24 +205,25 @@ func extractDataRudderProbeTargets(cfg map[string]any) (string, error) { } // applyDataRudderAuth builds the auth provider from the nested auth block and -// applies it to the request. Returns true on success; false (with err) on any -// token exchange or header injection failure. -func applyDataRudderAuth(ctx context.Context, req *http.Request, cfg map[string]any, httpClient *http.Client) (bool, error) { +// applies it to the request. Returns nil on success; the underlying error on +// any token exchange or header injection failure. The caller treats a non-nil +// return as "AuthApplied=false, probe URL never reached". +func applyDataRudderAuth(ctx context.Context, req *http.Request, cfg map[string]any, httpClient *http.Client) error { authConfig := buildDataRudderAuth(cfg) if authConfig == nil { - return false, errors.New("auth block missing from provider config") + return errors.New("auth block missing from provider config") } authProvider, err := auth.NewFromConfig(authConfig, httpClient) if err != nil { - return false, fmt.Errorf("build auth provider: %w", err) + return fmt.Errorf("build auth provider: %w", err) } if err := authProvider.Apply(ctx, req); err != nil { - return false, fmt.Errorf("apply oauth2 auth: %w", err) + return fmt.Errorf("apply oauth2 auth: %w", err) } - return true, nil + return nil } // readUpstreamDetail extracts the "detail" field from a JSON error body if diff --git a/pkg/executors/datarudder/connectivity_prober_test.go b/pkg/executors/datarudder/connectivity_prober_test.go index 03b56d4..2653d46 100644 --- a/pkg/executors/datarudder/connectivity_prober_test.go +++ b/pkg/executors/datarudder/connectivity_prober_test.go @@ -11,6 +11,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "strings" "sync/atomic" "testing" "time" @@ -21,9 +22,10 @@ import ( // datarudderMockOptions configures the Delorean mock server. type datarudderMockOptions struct { - forceTokenStatus int // non-zero to force token endpoint status (e.g. 401) - forceFraudStatus int // non-zero to force fraud endpoint status (e.g. 404, 500) - fraudResponse string + forceTokenStatus int // non-zero to force token endpoint status (e.g. 401) + forceFraudStatus int // non-zero to force fraud endpoint status (e.g. 404, 500) + fraudResponse string // raw body to return on the fraud endpoint + fraudDelay time.Duration // sleep before responding on the fraud endpoint } // datarudderMockServer is a single httptest.Server that serves both the OAuth2 @@ -65,6 +67,10 @@ func newDatarudderMockServer(t *testing.T, opts datarudderMockOptions) *datarudd mux.HandleFunc("/api/v1/fraud/transactions/", func(w http.ResponseWriter, r *http.Request) { m.fraudRequests.Add(1) + if opts.fraudDelay > 0 { + time.Sleep(opts.fraudDelay) + } + // Default behaviour: sentinel UUID returns 404 (transaction not found). status := opts.forceFraudStatus if status == 0 { @@ -103,10 +109,13 @@ func newProberClient() *http.Client { // resetSSRFCacheForTest forces ssrfOptions() to re-read SSRF_ALLOW_PRIVATE on // the next call. Tests use t.Setenv before this to switch policy per scenario. +// +// CAUTION: this mutates package-level state (ssrfAllowPrivate + +// ssrfAllowPrivateOnce). Do NOT call t.Parallel() in any test that touches +// SSRF state — concurrent tests would race the reset/read. func resetSSRFCacheForTest(t *testing.T) { t.Helper() resetDataRudderSSRFCache() - _ = ssrfOptions() // sanity: re-prime so next probe sees the env } func TestAuthRequired_IsTrue(t *testing.T) { @@ -127,11 +136,17 @@ func TestProbe_HappyPath_Sentinel404(t *testing.T) { require.True(t, outcome.Reached, "expected Reached=true, TransportErr=%v", outcome.TransportErr) assert.True(t, outcome.AuthApplied) - assert.Equal(t, http.StatusNotFound, outcome.StatusCode) + // Sentinel 404 is rewritten to 200 — that is the documented success signal. + // The raw status survives in Details["raw_status_code"] for diagnostics. + assert.Equal(t, http.StatusOK, outcome.StatusCode, "sentinel 404 must be normalised to 200") assert.NoError(t, outcome.TransportErr) - assert.Contains(t, outcome.URL, "00000000-0000-0000-0000-000000000000") + + expectedURL := mock.server.URL + "/api/v1/fraud/transactions/00000000-0000-0000-0000-000000000000/" + assert.Equal(t, expectedURL, outcome.URL) + require.NotNil(t, outcome.Details) assert.Equal(t, "transaction not found", outcome.Details["upstream_detail"]) + assert.Equal(t, http.StatusNotFound, outcome.Details["raw_status_code"], "original status preserved for diagnostics") assert.Equal(t, int64(1), mock.tokenRequests.Load()) assert.Equal(t, int64(1), mock.fraudRequests.Load()) @@ -155,6 +170,9 @@ func TestProbe_TransactionExists_200(t *testing.T) { assert.True(t, outcome.AuthApplied) assert.Equal(t, http.StatusOK, outcome.StatusCode) assert.NoError(t, outcome.TransportErr) + // Real 200 (not a sentinel rewrite) — raw_status_code must NOT be set. + _, hasRaw := outcome.Details["raw_status_code"] + assert.False(t, hasRaw, "raw_status_code is only set when the sentinel 404 rewrite fires") } func TestProbe_AuthRejected_401(t *testing.T) { @@ -171,11 +189,12 @@ func TestProbe_AuthRejected_401(t *testing.T) { outcome := provider.Probe(context.Background(), cfg, newProberClient()) + // Contract: the probe reports what happened on the wire. Status + // classification (401/403 → Auth=Failed) is the shared command layer's job. require.True(t, outcome.Reached, "host was reachable") - assert.False(t, outcome.AuthApplied, "401 means auth was rejected") + assert.True(t, outcome.AuthApplied, "Bearer header was injected; rejection happened server-side") assert.Equal(t, http.StatusUnauthorized, outcome.StatusCode) - require.Error(t, outcome.TransportErr) - assert.Contains(t, outcome.TransportErr.Error(), "auth rejected") + assert.NoError(t, outcome.TransportErr, "transport completed successfully; status carries the rejection") assert.Equal(t, "invalid token", outcome.Details["upstream_detail"]) } @@ -194,8 +213,9 @@ func TestProbe_AuthRejected_403(t *testing.T) { outcome := provider.Probe(context.Background(), cfg, newProberClient()) require.True(t, outcome.Reached) - assert.False(t, outcome.AuthApplied) + assert.True(t, outcome.AuthApplied) assert.Equal(t, http.StatusForbidden, outcome.StatusCode) + assert.NoError(t, outcome.TransportErr) } func TestProbe_ServerError_5xx(t *testing.T) { @@ -215,8 +235,7 @@ func TestProbe_ServerError_5xx(t *testing.T) { require.True(t, outcome.Reached) assert.True(t, outcome.AuthApplied, "Bearer was accepted; the 500 is a Delorean-side issue") assert.Equal(t, http.StatusInternalServerError, outcome.StatusCode) - require.Error(t, outcome.TransportErr) - assert.Contains(t, outcome.TransportErr.Error(), "server error") + assert.NoError(t, outcome.TransportErr, "transport completed; the shared command classifies status >= 500") } func TestProbe_TokenEndpointFails_401(t *testing.T) { @@ -232,25 +251,44 @@ func TestProbe_TokenEndpointFails_401(t *testing.T) { outcome := provider.Probe(context.Background(), cfg, newProberClient()) - assert.True(t, outcome.Reached, "we reached the auth host before failing") - assert.False(t, outcome.AuthApplied, "token exchange failed → AuthApplied=false") + // The probe URL (base_url + path) was never reached — auth resolution + // short-circuited the chain. Reached must be false to honour the interface + // contract ("transport completed on the probe URL"). + assert.False(t, outcome.Reached, "probe URL never hit when token exchange fails") + assert.False(t, outcome.AuthApplied) require.Error(t, outcome.TransportErr) assert.Contains(t, outcome.TransportErr.Error(), "apply oauth2 auth") assert.Equal(t, int64(0), mock.fraudRequests.Load(), "fraud endpoint must NOT have been called when token fetch failed") } -func TestProbe_NetworkError_UnreachableHost(t *testing.T) { +// TestProbe_NetworkError_UnreachableProbeHost exercises the branch where the +// token exchange SUCCEEDS but the subsequent GET to base_url fails at the +// transport layer (DNS/TCP). The earlier shape of this test pointed both +// base_url AND token_url at 127.0.0.1:1, so auth failed first and the +// post-auth network error branch was never reached. +func TestProbe_NetworkError_UnreachableProbeHost(t *testing.T) { t.Setenv("SSRF_ALLOW_PRIVATE", "true") resetSSRFCacheForTest(t) - // 127.0.0.1:1 is reliably refused on every platform. + tokenMock := newDatarudderMockServer(t, datarudderMockOptions{}) + provider := &datarudderProvider{} - cfg := validDatarudderConfig("http://127.0.0.1:1") + cfg := map[string]any{ + "base_url": "http://127.0.0.1:1", // reliably refused on every platform + "auth": map[string]any{ + "token_url": tokenMock.server.URL + "/api/v1/auth/login", + "client_id": "x", + "client_secret": "y", + }, + } outcome := provider.Probe(context.Background(), cfg, newProberClient()) - assert.False(t, outcome.AuthApplied) + assert.False(t, outcome.Reached, "base_url transport failed; probe URL not reached") + assert.True(t, outcome.AuthApplied, "auth succeeded against the token mock before the failed probe hop") require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "probe request failed") + assert.Equal(t, int64(1), tokenMock.tokenRequests.Load(), "token endpoint was hit exactly once") } func TestProbe_SSRFBlocked_PrivateNetworkDisallowed(t *testing.T) { @@ -304,25 +342,47 @@ func TestProbe_MissingAuthBlock(t *testing.T) { assert.Contains(t, outcome.TransportErr.Error(), "auth block") } +// TestProbe_MissingAuthField is table-driven so each required field +// (token_url, client_id, client_secret) is exercised in its own subtest. +// The previous shape of this test only covered the client_secret branch +// because extractDataRudderProbeTargets iterates the keys in declaration order. func TestProbe_MissingAuthField(t *testing.T) { t.Setenv("SSRF_ALLOW_PRIVATE", "true") resetSSRFCacheForTest(t) provider := &datarudderProvider{} - cfg := map[string]any{ - "base_url": "https://example.com", - "auth": map[string]any{ - "token_url": "https://example.com/token", - "client_id": "x", - // client_secret missing - }, - } - outcome := provider.Probe(context.Background(), cfg, newProberClient()) + cases := []struct { + name string + omitted string + expectedErr string + }{ + {name: "missing token_url", omitted: "token_url", expectedErr: "token_url"}, + {name: "missing client_id", omitted: "client_id", expectedErr: "client_id"}, + {name: "missing client_secret", omitted: "client_secret", expectedErr: "client_secret"}, + } - assert.False(t, outcome.Reached) - require.Error(t, outcome.TransportErr) - assert.Contains(t, outcome.TransportErr.Error(), "client_secret") + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + auth := map[string]any{ + "token_url": "https://example.com/token", + "client_id": "x", + "client_secret": "y", + } + delete(auth, tc.omitted) + + cfg := map[string]any{ + "base_url": "https://example.com", + "auth": auth, + } + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + assert.False(t, outcome.Reached) + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), tc.expectedErr) + }) + } } func TestProbe_UpstreamDetailMissing_DoesNotFail(t *testing.T) { @@ -339,5 +399,75 @@ func TestProbe_UpstreamDetailMissing_DoesNotFail(t *testing.T) { outcome := provider.Probe(context.Background(), cfg, newProberClient()) require.True(t, outcome.Reached) + // Default mock status is 404 (sentinel) → rewritten to 200. + assert.Equal(t, http.StatusOK, outcome.StatusCode) + assert.Equal(t, http.StatusNotFound, outcome.Details["raw_status_code"]) assert.Empty(t, outcome.Details["upstream_detail"], "non-JSON body must not crash; detail just stays empty") } + +// TestProbe_Timeout exercises the probeTimeout / context-deadline path. +// Uses a client with a short Timeout so the test stays fast; the contract +// being checked is "slow upstream surfaces as Reached=false + transport error", +// not the literal 10s probeTimeout constant. +func TestProbe_Timeout(t *testing.T) { + t.Setenv("SSRF_ALLOW_PRIVATE", "true") + resetSSRFCacheForTest(t) + + mock := newDatarudderMockServer(t, datarudderMockOptions{ + fraudDelay: 500 * time.Millisecond, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + fastClient := &http.Client{Timeout: 50 * time.Millisecond} + + outcome := provider.Probe(context.Background(), cfg, fastClient) + + assert.False(t, outcome.Reached, "client timeout fires before the upstream responds") + assert.True(t, outcome.AuthApplied, "token endpoint responded fast; only the fraud GET timed out") + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "probe request failed") +} + +// TestReadUpstreamDetail exercises the helper directly so the parsing +// branches (empty body, missing key, malformed JSON, body larger than the +// limit) are covered without spinning up an httptest.Server. +func TestReadUpstreamDetail(t *testing.T) { + t.Run("valid JSON with detail field", func(t *testing.T) { + got := readUpstreamDetail(strings.NewReader(`{"detail":"transaction not found"}`)) + assert.Equal(t, "transaction not found", got) + }) + + t.Run("valid JSON without detail field", func(t *testing.T) { + got := readUpstreamDetail(strings.NewReader(`{"error":"x","code":"y"}`)) + assert.Empty(t, got) + }) + + t.Run("empty body", func(t *testing.T) { + got := readUpstreamDetail(strings.NewReader("")) + assert.Empty(t, got) + }) + + t.Run("empty object", func(t *testing.T) { + got := readUpstreamDetail(strings.NewReader(`{}`)) + assert.Empty(t, got) + }) + + t.Run("malformed JSON", func(t *testing.T) { + got := readUpstreamDetail(strings.NewReader(`not even json`)) + assert.Empty(t, got) + }) + + t.Run("body larger than probeBodyReadLimit is truncated, returns empty", func(t *testing.T) { + // Build a body larger than the 4 KiB cap. The LimitReader truncates, + // which corrupts the JSON, so the parse must fail gracefully. + var b strings.Builder + b.WriteString(`{"detail":"`) + b.WriteString(strings.Repeat("x", probeBodyReadLimit*2)) + b.WriteString(`"}`) + + got := readUpstreamDetail(strings.NewReader(b.String())) + assert.Empty(t, got, "truncated JSON must not panic and must return empty") + }) +} diff --git a/pkg/executors/datarudder/datarudder_test.go b/pkg/executors/datarudder/datarudder_test.go index bda1ed3..dab4235 100644 --- a/pkg/executors/datarudder/datarudder_test.go +++ b/pkg/executors/datarudder/datarudder_test.go @@ -93,37 +93,113 @@ func TestProviderConfigSchema_RejectsExtraAuthField(t *testing.T) { require.Error(t, schema.Validate(cfg), "additionalProperties:false in auth must reject typos") } -// TestProviderConfigSchema_RejectsMissingRequiredFields enumerates each -// required field and asserts the schema rejects configs missing it. -func TestProviderConfigSchema_RejectsMissingRequiredFields(t *testing.T) { +// TestProviderConfigSchema_RejectsInvalidConfigs enumerates every required +// field and every documented constraint (minLength, format) and asserts the +// schema rejects configs violating them. Coverage gap from review HIGH-2. +func TestProviderConfigSchema_RejectsInvalidConfigs(t *testing.T) { schema := compileProviderSchema(t) - t.Run("missing base_url", func(t *testing.T) { - cfg := map[string]any{ - "auth": map[string]any{ - "token_url": "https://example.com/token", - "client_id": "id", - "client_secret": "secret", - }, + // validAuth returns a fresh, fully-populated auth map so subtests can + // mutate copies without leaking state across runs. + validAuth := func() map[string]any { + return map[string]any{ + "token_url": "https://example.com/token", + "client_id": "id", + "client_secret": "secret", } - require.Error(t, schema.Validate(cfg)) - }) - - t.Run("missing auth", func(t *testing.T) { - cfg := map[string]any{"base_url": "https://example.com"} - require.Error(t, schema.Validate(cfg)) - }) + } - t.Run("missing auth.token_url", func(t *testing.T) { - cfg := map[string]any{ - "base_url": "https://example.com", - "auth": map[string]any{ - "client_id": "id", - "client_secret": "secret", + cases := []struct { + name string + cfg map[string]any + mustMention string // substring the validation error must reference + }{ + { + name: "missing base_url", + cfg: map[string]any{"auth": validAuth()}, + mustMention: "base_url", + }, + { + name: "missing auth", + cfg: map[string]any{"base_url": "https://example.com"}, + mustMention: "auth", + }, + { + name: "missing auth.token_url", + cfg: map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "client_id": "id", + "client_secret": "secret", + }, }, - } - require.Error(t, schema.Validate(cfg)) - }) + mustMention: "token_url", + }, + { + name: "missing auth.client_id", + cfg: map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_secret": "secret", + }, + }, + mustMention: "client_id", + }, + { + name: "missing auth.client_secret", + cfg: map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "id", + }, + }, + mustMention: "client_secret", + }, + { + name: "empty-string auth.client_id violates minLength:1", + cfg: map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "", + "client_secret": "secret", + }, + }, + mustMention: "client_id", + }, + { + name: "empty-string auth.client_secret violates minLength:1", + cfg: map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "id", + "client_secret": "", + }, + }, + mustMention: "client_secret", + }, + // NOTE: "format": "uri" is annotation-only under JSON Schema Draft + // 2020-12 by default — the santhosh-tekuri/jsonschema/v6 compiler used + // in validateConfigAgainstSchema does NOT assert formats. A + // "not a uri" string in base_url / token_url would pass schema + // validation today. Enforcement would require enabling format + // assertions globally on the shared compiler in + // internal/services/command/create_provider_config.go — a cross-cutting + // change owned by the Flowker team. Documented here so the gap is + // visible if a future operator misconfiguration traces back to it. + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := schema.Validate(tc.cfg) + require.Error(t, err, "schema must reject this config") + assert.Contains(t, err.Error(), tc.mustMention, + "validation error must reference the offending field/constraint") + }) + } } // TestIntegration_CreateTransaction_EndToEnd is the intra-package integration @@ -134,6 +210,7 @@ func TestProviderConfigSchema_RejectsMissingRequiredFields(t *testing.T) { func TestIntegration_CreateTransaction_EndToEnd(t *testing.T) { var ( tokenRequestBody string + tokenAuthHeader string fraudAuthHeader string fraudRequestBody map[string]any ) @@ -141,6 +218,11 @@ func TestIntegration_CreateTransaction_EndToEnd(t *testing.T) { mux := http.NewServeMux() mux.HandleFunc("/api/v1/auth/login", func(w http.ResponseWriter, r *http.Request) { + // Capture the Authorization header so we can prove no Basic credential + // leaked out when credentials_location is "body" (hardcoded in + // buildDataRudderAuth). + tokenAuthHeader = r.Header.Get("Authorization") + body, err := io.ReadAll(r.Body) require.NoError(t, err) defer r.Body.Close() @@ -213,6 +295,11 @@ func TestIntegration_CreateTransaction_EndToEnd(t *testing.T) { assert.Equal(t, "the-secret", parsedForm.Get("client_secret")) assert.False(t, parsedForm.Has("grant_type"), "grant_type must NOT be sent by default") + // credentials_location="body" contract: NO Authorization header on the + // token request. If a future change accidentally also sends Basic auth, + // this assertion catches it. + assert.Empty(t, tokenAuthHeader, "credentials_location=body must not also send an Authorization header") + // Fraud endpoint must have received the Bearer token. assert.Equal(t, "Bearer test-bearer-12345", fraudAuthHeader) From 99f0879163b6178cb7b4d5a23f1884c1c5e603fd Mon Sep 17 00:00:00 2001 From: shimizu Date: Wed, 20 May 2026 16:19:03 -0300 Subject: [PATCH 7/8] fix(datarudder): pin TLS ServerName to SNIHostname for HTTPS probes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SSRF-pinning path replaces the hostname with an IP in PinnedURL, which causes Go's TLS handshake to use the IP as the default ServerName and fail cert verification with "cannot validate certificate for because it doesn't contain any IP SANs". lib-commons explicitly documents SNIHostname as the field to pass to TLSClientConfig.ServerName for this case. Smoke test against https://inference.api-stg.delorean-ai.com failed 100% of the time without this fix because Delorean's public cert covers *.delorean-ai.com (DNS SANs only). With the fix the handshake validates against the DNS SAN as expected. Adds a regression test that reproduces the production scenario in-process — a self-signed cert with a DNS SAN but no IP SAN, served via httptest TLS, exercised via the IP-host URL form. The "without fix" subtest asserts the exact IP-SAN error; the "with fix" subtest asserts a successful 200. The same defect is latent in pkg/executors/midaz/connectivity_prober.go and the legacy generic path at internal/services/command/test_provider_config_connectivity.go. Out of scope for this PR; tracked as separate followup. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../datarudder/connectivity_prober.go | 61 ++++++- .../datarudder/connectivity_prober_test.go | 164 ++++++++++++++++++ 2 files changed, 222 insertions(+), 3 deletions(-) diff --git a/pkg/executors/datarudder/connectivity_prober.go b/pkg/executors/datarudder/connectivity_prober.go index 4a9ea74..d8b0ac0 100644 --- a/pkg/executors/datarudder/connectivity_prober.go +++ b/pkg/executors/datarudder/connectivity_prober.go @@ -6,6 +6,7 @@ package datarudder import ( "context" + "crypto/tls" "encoding/json" "errors" "fmt" @@ -114,8 +115,10 @@ func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, http } } - // Preserve original host for TLS SNI / Host header (libSSRF gives us an - // IP-pinned URL but we still want the server to see the real hostname). + // Restore the original hostname in the HTTP Host header so the upstream + // routes correctly even though we're dialling the pinned IP. This only + // affects the Host header — TLS SNI / certificate verification is fixed + // separately via pinnedClient below. req.Host = resolveResult.Authority if err := applyDataRudderAuth(reqCtx, req, cfg, httpClient); err != nil { @@ -132,7 +135,15 @@ func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, http } } - resp, err := httpClient.Do(req) + // Third leg of the libSSRF.ResolveAndValidate contract: PinnedURL goes on + // the wire, Authority is the Host header, and SNIHostname is the TLS + // ServerName. Without this step the handshake against an HTTPS upstream + // would fail with "cannot validate certificate for because it doesn't + // contain any IP SANs" — req.URL.Host is now the pinned IP, so Go's + // default ServerName (= URL.Host) demands an IP SAN. + pinnedClient := withPinnedSNI(httpClient, resolveResult.SNIHostname) + + resp, err := pinnedClient.Do(req) if err != nil { return executor.ProbeOutcome{ URL: displayURL, @@ -226,6 +237,50 @@ func applyDataRudderAuth(ctx context.Context, req *http.Request, cfg map[string] return nil } +// withPinnedSNI returns a derived *http.Client whose TLS handshake uses the +// supplied sni hostname instead of req.URL.Host. This is the third leg of the +// libSSRF.ResolveAndValidate contract: PinnedURL is dialled, Authority is the +// Host header, and SNIHostname is the TLS ServerName. +// +// The returned client clones the source Transport and TLSClientConfig so the +// caller's state is not mutated. It is intended to be used for a single +// pinned request: reusing it for requests against other hostnames would send +// the wrong ServerName. +// +// When sni is empty the original client is returned unchanged so a future +// upstream change in libSSRF (e.g. SNIHostname missing for an IP literal URL) +// does not silently regress to the broken state. +// +// When base.Transport is not a *http.Transport (e.g. an instrumented wrapper), +// the helper falls back to http.DefaultTransport. This trades probe-level +// instrumentation for correctness, which is acceptable: the probe is not a +// hot path and the broken handshake would otherwise block every HTTPS probe. +func withPinnedSNI(base *http.Client, sni string) *http.Client { + if sni == "" { + return base + } + + src, ok := base.Transport.(*http.Transport) + if !ok || src == nil { + src = http.DefaultTransport.(*http.Transport) + } + + t := src.Clone() + if t.TLSClientConfig == nil { + t.TLSClientConfig = &tls.Config{ServerName: sni, MinVersion: tls.VersionTLS12} + } else { + t.TLSClientConfig = t.TLSClientConfig.Clone() + t.TLSClientConfig.ServerName = sni + } + + return &http.Client{ + Transport: t, + Timeout: base.Timeout, + CheckRedirect: base.CheckRedirect, + Jar: base.Jar, + } +} + // readUpstreamDetail extracts the "detail" field from a JSON error body if // present. Delorean's 404 returns {"detail": "transaction not found"} and // surfacing it gives operators a human-readable diagnostic without parsing diff --git a/pkg/executors/datarudder/connectivity_prober_test.go b/pkg/executors/datarudder/connectivity_prober_test.go index 2653d46..151d117 100644 --- a/pkg/executors/datarudder/connectivity_prober_test.go +++ b/pkg/executors/datarudder/connectivity_prober_test.go @@ -8,7 +8,13 @@ package datarudder import ( "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" "encoding/json" + "math/big" "net/http" "net/http/httptest" "strings" @@ -430,6 +436,164 @@ func TestProbe_Timeout(t *testing.T) { assert.Contains(t, outcome.TransportErr.Error(), "probe request failed") } +// TestWithPinnedSNI_SetsServerName confirms the helper installs the supplied +// hostname as TLS ServerName on a derived transport. +func TestWithPinnedSNI_SetsServerName(t *testing.T) { + base := &http.Client{Timeout: 5 * time.Second} + + pinned := withPinnedSNI(base, "example.com") + + require.NotSame(t, base, pinned, "must return a derived client") + + transport, ok := pinned.Transport.(*http.Transport) + require.True(t, ok) + require.NotNil(t, transport.TLSClientConfig) + assert.Equal(t, "example.com", transport.TLSClientConfig.ServerName) + assert.Equal(t, base.Timeout, pinned.Timeout, "Timeout must propagate to the derived client") +} + +// TestWithPinnedSNI_EmptySNI_ReturnsBase guards against silent regression: if +// libSSRF ever returns an empty SNIHostname (e.g. for an IP literal URL) we +// must NOT wrap the client into a broken state — return the original instead. +func TestWithPinnedSNI_EmptySNI_ReturnsBase(t *testing.T) { + base := &http.Client{Timeout: 5 * time.Second} + + pinned := withPinnedSNI(base, "") + + assert.Same(t, base, pinned, "empty SNI must return the original client unchanged") +} + +// TestWithPinnedSNI_NilTransport_UsesDefault covers the fallback to +// http.DefaultTransport when the caller-supplied client has Transport=nil. +func TestWithPinnedSNI_NilTransport_UsesDefault(t *testing.T) { + base := &http.Client{} // Transport == nil + + pinned := withPinnedSNI(base, "example.com") + + transport, ok := pinned.Transport.(*http.Transport) + require.True(t, ok) + require.NotNil(t, transport.TLSClientConfig) + assert.Equal(t, "example.com", transport.TLSClientConfig.ServerName) +} + +// TestWithPinnedSNI_PreservesExistingTLSConfig confirms that existing fields +// on the source TLSClientConfig survive the clone and that the base client's +// config is NOT mutated. +func TestWithPinnedSNI_PreservesExistingTLSConfig(t *testing.T) { + base := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{MinVersion: tls.VersionTLS12}, + }, + } + + pinned := withPinnedSNI(base, "example.com") + + pinnedTransport := pinned.Transport.(*http.Transport) + assert.Equal(t, "example.com", pinnedTransport.TLSClientConfig.ServerName) + assert.Equal(t, uint16(tls.VersionTLS12), pinnedTransport.TLSClientConfig.MinVersion, "existing fields must be preserved") + + originalTransport := base.Transport.(*http.Transport) + assert.Empty(t, originalTransport.TLSClientConfig.ServerName, "must not mutate the base TLSClientConfig") +} + +// TestWithPinnedSNI_HandshakeAgainstHostnameOnlyCert reproduces the exact +// Delorean staging scenario that the smoke test surfaced — a server whose +// certificate covers the hostname (DNS SAN) but NOT the resolved IP — and +// proves that: +// +// - WITHOUT withPinnedSNI, the handshake fails with the literal error +// observed in production smoke testing: "x509: cannot validate certificate +// for because it doesn't contain any IP SANs". +// - WITH withPinnedSNI(client, ""), the same client and the same +// IP-host URL complete the handshake successfully because the ClientHello +// ServerName is the hostname instead of the IP. +// +// This test exists specifically to prevent regression of the smoke-test +// failure that motivated the fix. +func TestWithPinnedSNI_HandshakeAgainstHostnameOnlyCert(t *testing.T) { + cert, rootCAs := generateHostnameOnlyTestCert(t, "datarudder.test") + + server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + server.TLS = &tls.Config{Certificates: []tls.Certificate{cert}} + server.StartTLS() + t.Cleanup(server.Close) + + baseClient := &http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{RootCAs: rootCAs}, + }, + } + + // server.URL is the IP-host form, e.g. "https://127.0.0.1:54321" — exactly + // what libSSRF.PinnedURL produces against a public hostname. + serverURL := server.URL + + t.Run("without fix: IP-host URL fails because cert has no IP SAN", func(t *testing.T) { + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, serverURL, nil) + require.NoError(t, err) + + _, err = baseClient.Do(req) + require.Error(t, err, "broken-state path must fail; this is the smoke-test repro") + assert.Contains(t, err.Error(), "IP SAN", "error must be the documented IP-SAN failure") + }) + + t.Run("with fix: pinned SNI validates against the DNS SAN", func(t *testing.T) { + pinnedClient := withPinnedSNI(baseClient, "datarudder.test") + + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, serverURL, nil) + require.NoError(t, err) + + resp, err := pinnedClient.Do(req) + require.NoError(t, err, "with the SNI pin the handshake must succeed against the DNS SAN") + + defer func() { _ = resp.Body.Close() }() + assert.Equal(t, http.StatusOK, resp.StatusCode) + }) +} + +// generateHostnameOnlyTestCert mints a self-signed cert with a DNS SAN for +// hostname but NO IP SAN. The returned CertPool trusts that cert. Together +// they let the test simulate a public-PKI scenario (cert covers hostname, +// client connects via resolved IP) within a single process. +func generateHostnameOnlyTestCert(t *testing.T, hostname string) (tls.Certificate, *x509.CertPool) { + t.Helper() + + key, err := rsa.GenerateKey(rand.Reader, 2048) + require.NoError(t, err) + + template := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{CommonName: hostname}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(time.Hour), + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment | x509.KeyUsageCertSign, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + IsCA: true, + DNSNames: []string{hostname}, + // IPAddresses intentionally omitted — reproduces the production scenario. + } + + derBytes, err := x509.CreateCertificate(rand.Reader, template, template, &key.PublicKey, key) + require.NoError(t, err) + + cert := tls.Certificate{ + Certificate: [][]byte{derBytes}, + PrivateKey: key, + } + + parsed, err := x509.ParseCertificate(derBytes) + require.NoError(t, err) + + pool := x509.NewCertPool() + pool.AddCert(parsed) + + return cert, pool +} + // TestReadUpstreamDetail exercises the helper directly so the parsing // branches (empty body, missing key, malformed JSON, body larger than the // limit) are covered without spinning up an httptest.Server. From d314c5b0f7dd379e0ee01880824fcae1cca1e886 Mon Sep 17 00:00:00 2001 From: Daniel Shimizu Date: Fri, 22 May 2026 10:27:10 -0300 Subject: [PATCH 8/8] refactor(datarudder): migrate ConnectivityProber to safehttp.Validate + list endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2A migration (smoke-tested against https://inference.api-stg.delorean-ai.com). Net -161 lines via three concurrent simplifications: 1. SSRF: libSSRF.ResolveAndValidate + PinnedURL + req.Host=Authority replaced by safehttp.Validate(probeCtx, baseURL, nil). The original URL stays on the wire so TLS SNI works against DNS-only-SAN certs — the smoke-tested fix that withPinnedSNI was working around. 2. SNI workaround: withPinnedSNI helper deleted (45 lines) along with its 4 unit tests. The DNS-only-SAN regression test is preserved as TestSafehttp_HandshakeAgainstDNSOnlyCert, reframed to validate the safehttp.NewClient contract instead. 3. Probe target: sentinel UUID + 404→200 remap replaced by list endpoint /api/v1/fraud/transactions/?limit=1&offset=0 which returns 200 natural. Test setup: ssrf_test_helpers.go deleted; main_test.go added mirroring pkg/executors/midaz/main_test.go for suite-wide safehttp policy override. docs/pre-dev/datarudder/tasks.md updated to reflect the new design. Smoke test (Delorean staging): - overallStatus: passed (572 ms) - stages[2].details.url = .../api/v1/fraud/transactions/?limit=1&offset=0 - all 3 stages (connectivity, authentication, end_to_end) green --- docs/pre-dev/datarudder/tasks.md | 25 +- .../datarudder/connectivity_prober.go | 197 ++++----------- .../datarudder/connectivity_prober_test.go | 225 +++++------------- pkg/executors/datarudder/main_test.go | 27 +++ pkg/executors/datarudder/ssrf_test_helpers.go | 20 -- 5 files changed, 152 insertions(+), 342 deletions(-) create mode 100644 pkg/executors/datarudder/main_test.go delete mode 100644 pkg/executors/datarudder/ssrf_test_helpers.go diff --git a/docs/pre-dev/datarudder/tasks.md b/docs/pre-dev/datarudder/tasks.md index 1507129..38cd9d8 100644 --- a/docs/pre-dev/datarudder/tasks.md +++ b/docs/pre-dev/datarudder/tasks.md @@ -18,7 +18,8 @@ - [ ] `datarudderProvider` struct implements `executor.{Provider, InputBuilder, ConnectivityProber}` (compile-time checks) - [ ] `BuildInput` normalizes `base_url` via `strings.TrimSuffix(..., "/")` and uses `executorRoutes` map (Midaz pattern) - [ ] `buildDataRudderAuth` extracts 3 fields explicitly + hardcodes `credentials_location="body"` (not exposed in schema) -- [ ] `ConnectivityProber.Probe` issues `GET {base_url}/api/v1/fraud/transactions/{sentinel}/` with sentinel `00000000-0000-0000-0000-000000000000`; surfaces upstream `detail` in `ProbeOutcome.Details["upstream_detail"]` +- [ ] `ConnectivityProber.Probe` issues `GET {base_url}/api/v1/fraud/transactions/?limit=1&offset=0` (list endpoint returns 200 natively for an authenticated request — no status remapping required); surfaces upstream `detail` in `ProbeOutcome.Details["upstream_detail"]` on error bodies +- [ ] SSRF pre-flight uses `safehttp.Validate` (which preserves the URL hostname so TLS SNI / certificate verification stays correct — the legacy `libSSRF.ResolveAndValidate + PinnedURL` pattern is intentionally NOT used). Dial-time enforcement is handled by the safehttp-wrapped HTTP client passed in by the command layer. - [ ] `AuthRequired()` returns `true` - [ ] `pkg/executors/register.go` wires `datarudder.Register(catalog)` in `RegisterDefaults` - [ ] All existing tests still pass (`make test`) @@ -50,19 +51,21 @@ --- -### Subtask ST-001-03: ConnectivityProber + SSRF test helpers +### Subtask ST-001-03: ConnectivityProber **Files to create**: -- `pkg/executors/datarudder/connectivity_prober.go` — `Probe()`, `AuthRequired()`, `extractDataRudderProbeTargets()`, sentinel UUID const, `probeAccountsPathTemplate` const, SSRF resolve via `libSSRF.ResolveAndValidate`, response body JSON parse to populate `Details["upstream_detail"]` -- `pkg/executors/datarudder/ssrf_test_helpers.go` — `ssrfOptions()` helper that mirrors `pkg/executors/midaz/ssrf_test_helpers.go` to disable private-IP block in tests using `httptest.Server` +- `pkg/executors/datarudder/connectivity_prober.go` — `Probe()`, `AuthRequired()`, `extractDataRudderProbeTargets()`, `probeTransactionsListPath` const (`/api/v1/fraud/transactions/?limit=1&offset=0`), SSRF pre-flight via `safehttp.Validate(probeCtx, baseURL, nil)`, response body JSON parse to populate `Details["upstream_detail"]` +- `pkg/executors/datarudder/main_test.go` — `TestMain` flips `safehttp.SetAllowPrivateForTest(true)` for the suite (mirrors `pkg/executors/midaz/main_test.go`). NO local `ssrfOptions()` helper / SSRF cache: the package consumes `safehttp.Validate` for pre-flight and the safehttp-wrapped HTTP client (built by the command layer) for dial-time enforcement. + +**SSRF strategy**: the prober uses `safehttp.Validate` which preserves the URL hostname (the godoc explicitly says *"callers must use the original URL so TLS handshake sees the correct SNI"*). The legacy `libSSRF.ResolveAndValidate + PinnedURL + req.Host = Authority + custom withPinnedSNI` pattern is intentionally NOT used — substituting an IP literal into `req.URL.Host` breaks HTTPS handshake against any cert that only has a DNS SAN (the exact bug observed against Delorean staging). No local `ssrfOptions` helper is needed: `safehttp.Options()` is the single source of truth, and per-test policy overrides go through `safehttp.SetAllowPrivateForTest`. **Outcome mapping** (from approved plan — must cover all in tests): -- 200/404 → `Reached=true, AuthApplied=true` -- 401/403 → `Reached=true, AuthApplied=false` -- 5xx → `Reached=true, AuthApplied=true` + transport err -- Token endpoint failure → `Reached=true (auth host), AuthApplied=false` -- DNS/TCP error → `Reached=false` -- SSRF blocked → `Reached=false` +- 200 (list endpoint natural success) → `Reached=true, AuthApplied=true` +- 401/403 → `Reached=true, AuthApplied=true` (the shared command layer classifies status → Auth=Failed; Reached + AuthApplied describe what happened on the wire, not the semantic outcome) +- 5xx → `Reached=true, AuthApplied=true` (shared command layer classifies status >= 500) +- Token endpoint failure → `Reached=false, AuthApplied=false` (probe URL never reached because auth resolution short-circuited) +- DNS/TCP error against probe URL → `Reached=false, AuthApplied=true` (auth succeeded; only the probe-URL hop failed) +- SSRF blocked → `Reached=false, AuthApplied=false` --- @@ -80,5 +83,5 @@ ## Out-of-cycle (post-implementation) - **CHANGELOG**: do NOT edit manually. Auto-generated by `.github/workflows/gptchangelog.yml`. Use Conventional Commits (`feature(datarudder): ...`, `test(datarudder): ...`). -- **Smoke test against Delorean staging**: manual, after PR merge. `POST /v1/provider-configurations//test` against a real `inference.api-stg.delorean-ai.com` provider config — expect Connectivity=Reached, Auth=Applied, E2E StatusCode=404 with `upstream_detail="transaction not found"`. +- **Smoke test against Delorean staging**: manual, after PR merge. `POST /v1/provider-configurations//test` against a real `inference.api-stg.delorean-ai.com` provider config — expect Connectivity=Reached, Auth=Applied, E2E StatusCode=200 against the list endpoint (`/api/v1/fraud/transactions/?limit=1&offset=0`). `upstream_detail` is populated only on error bodies. - **Production URL**: when Delorean prod URL becomes available, swap is zero-code — just update `ProviderConfiguration` via API. diff --git a/pkg/executors/datarudder/connectivity_prober.go b/pkg/executors/datarudder/connectivity_prober.go index d8b0ac0..aa6c3b8 100644 --- a/pkg/executors/datarudder/connectivity_prober.go +++ b/pkg/executors/datarudder/connectivity_prober.go @@ -6,34 +6,26 @@ package datarudder import ( "context" - "crypto/tls" "encoding/json" "errors" "fmt" "io" "net/http" - "os" "strings" - "sync" "time" - libSSRF "github.com/LerianStudio/lib-commons/v5/commons/security/ssrf" - "github.com/LerianStudio/flowker/pkg/executor" "github.com/LerianStudio/flowker/pkg/executors/http/auth" + "github.com/LerianStudio/flowker/pkg/safehttp" ) -// probeTransactionPathTemplate is the read-only Delorean endpoint used to -// validate connectivity + authentication + endpoint reachability in a single -// call. With the sentinel UUID the expected status is 404 (transaction not -// found) — which proves the host was reached and the Bearer token was accepted. -const probeTransactionPathTemplate = "/api/v1/fraud/transactions/%s/" - -// probeSentinelTransactionID is a deterministic UUID that is guaranteed not to -// exist on the Delorean side. Used by Probe to elicit a stable 404 response. -// Documented as "probe sentinel" so future readers don't mistake it for real -// data and so support engineers can grep for it in upstream logs if needed. -const probeSentinelTransactionID = "00000000-0000-0000-0000-000000000000" +// probeTransactionsListPath is the read-only Delorean endpoint used to validate +// connectivity + authentication + endpoint reachability in a single call. The +// list endpoint returns 200 with a (possibly empty) result page for an +// authenticated request — a natural success signal that does not require any +// status remapping. The limit=1 + offset=0 query keeps the response payload at +// the smallest meaningful size. +const probeTransactionsListPath = "/api/v1/fraud/transactions/?limit=1&offset=0" // probeTimeout caps the total wall time of a single connectivity probe so the // command does not hang on a slow upstream. It MUST stay below the command- @@ -41,8 +33,8 @@ const probeSentinelTransactionID = "00000000-0000-0000-0000-000000000000" const probeTimeout = 10 * time.Second // probeBodyReadLimit caps how many bytes of the probe response body we read -// when extracting the upstream "detail" field. Delorean's 404 body is ~40 -// bytes; 4 KiB is generous without risking memory blow-up on a hostile server. +// when extracting the upstream "detail" field. Delorean's error bodies are +// small; 4 KiB is generous without risking memory blow-up on a hostile server. const probeBodyReadLimit = 4 * 1024 // AuthRequired reports that the DataRudder provider needs OAuth2 credentials @@ -51,9 +43,17 @@ const probeBodyReadLimit = 4 * 1024 func (p *datarudderProvider) AuthRequired() bool { return true } // Probe issues a single read-only GET against the Delorean fraud transactions -// endpoint using a sentinel UUID, applying OAuth2 auth via the existing -// pkg/executors/http/auth machinery. The returned ProbeOutcome is the single -// source of truth that the generic command maps onto the three stage results. +// list endpoint, applying OAuth2 auth via the existing pkg/executors/http/auth +// machinery. The returned ProbeOutcome is the single source of truth that the +// generic command maps onto the three stage results. +// +// SSRF protection is enforced in two layers via pkg/safehttp: +// 1. safehttp.Validate performs DNS resolution + IP-policy pre-flight on the +// original URL (no PinnedURL substitution — TLS SNI stays correct). +// 2. The caller-supplied httpClient is already wrapped by safehttp.NewClient +// (in the command layer), whose Transport.DialContext re-resolves DNS at +// dial time and rejects blocked IPs atomically with the dial — closing +// the TOCTOU/DNS-rebinding window. // // Failure modes, in the order they are detected: // 1. Missing/invalid base_url → Reached=false. @@ -62,10 +62,9 @@ func (p *datarudderProvider) AuthRequired() bool { return true } // probe URL was never reached), AuthApplied=false. // 4. HTTP transport error against base_url → Reached=false, AuthApplied=true // (auth succeeded; only the probe-URL hop failed). -// 5. HTTP response received → Reached=true, AuthApplied=true. The status code -// is normalised so a 404 from the sentinel UUID — the documented success -// signal — is reported as 200, with the raw status preserved in -// Details["raw_status_code"]. Status classification for 401/403/5xx is +// 5. HTTP response received → Reached=true, AuthApplied=true; StatusCode +// carries the result. The list endpoint returns 200 on success — no +// status remapping is required. Status classification for 401/403/5xx is // left to the shared command layer (applyProbeOutcome) so behaviour stays // consistent across providers. func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, httpClient *http.Client) executor.ProbeOutcome { @@ -75,6 +74,13 @@ func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, http httpClient = &http.Client{Timeout: probeTimeout} } + // Derive a single timed context up front so both phases (SSRF validation + // and the HTTP request) share one end-to-end wall-time budget. Deriving + // it later — around the HTTP request only — would leave DNS/SSRF + // validation uncapped, contradicting probeTimeout's documented contract. + probeCtx, cancel := context.WithTimeout(ctx, probeTimeout) + defer cancel() + baseURL, err := extractDataRudderProbeTargets(cfg) if err != nil { return executor.ProbeOutcome{ @@ -86,48 +92,39 @@ func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, http } baseURL = strings.TrimRight(baseURL, "/") - probePath := fmt.Sprintf(probeTransactionPathTemplate, probeSentinelTransactionID) - - // SSRF pinning: resolve once, use the IP-pinned URL on the wire. - resolveResult, err := libSSRF.ResolveAndValidate(ctx, baseURL, ssrfOptions()...) - if err != nil { + probePath := probeTransactionsListPath + probeURL := baseURL + probePath + + // SSRF pre-flight on the original URL. safehttp.Validate intentionally + // preserves the hostname (no PinnedURL substitution) so the subsequent + // TLS handshake sees the correct SNI. Dial-time enforcement in the + // safehttp-wrapped transport closes the TOCTOU window atomically. + if err := safehttp.Validate(probeCtx, baseURL, nil); err != nil { return executor.ProbeOutcome{ - URL: baseURL + probePath, + URL: probeURL, Reached: false, DurationMs: time.Since(start).Milliseconds(), TransportErr: fmt.Errorf("SSRF blocked base_url: %w", err), } } - displayURL := baseURL + probePath - pinnedURL := resolveResult.PinnedURL + probePath - - reqCtx, cancel := context.WithTimeout(ctx, probeTimeout) - defer cancel() - - req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, pinnedURL, nil) + req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, probeURL, nil) if err != nil { return executor.ProbeOutcome{ - URL: displayURL, + URL: probeURL, Reached: false, DurationMs: time.Since(start).Milliseconds(), TransportErr: fmt.Errorf("build probe request: %w", err), } } - // Restore the original hostname in the HTTP Host header so the upstream - // routes correctly even though we're dialling the pinned IP. This only - // affects the Host header — TLS SNI / certificate verification is fixed - // separately via pinnedClient below. - req.Host = resolveResult.Authority - - if err := applyDataRudderAuth(reqCtx, req, cfg, httpClient); err != nil { + if err := applyDataRudderAuth(probeCtx, req, cfg, httpClient); err != nil { // The probe URL (base_url + path) was never reached because auth // resolution / token exchange / header injection failed before the // request could be dispatched. Honour the Reached field's documented // contract (transport completed on the probe URL). return executor.ProbeOutcome{ - URL: displayURL, + URL: probeURL, Reached: false, AuthApplied: false, DurationMs: time.Since(start).Milliseconds(), @@ -135,18 +132,10 @@ func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, http } } - // Third leg of the libSSRF.ResolveAndValidate contract: PinnedURL goes on - // the wire, Authority is the Host header, and SNIHostname is the TLS - // ServerName. Without this step the handshake against an HTTPS upstream - // would fail with "cannot validate certificate for because it doesn't - // contain any IP SANs" — req.URL.Host is now the pinned IP, so Go's - // default ServerName (= URL.Host) demands an IP SAN. - pinnedClient := withPinnedSNI(httpClient, resolveResult.SNIHostname) - - resp, err := pinnedClient.Do(req) + resp, err := httpClient.Do(req) if err != nil { return executor.ProbeOutcome{ - URL: displayURL, + URL: probeURL, Reached: false, AuthApplied: true, // auth succeeded; only the probe-URL hop failed DurationMs: time.Since(start).Milliseconds(), @@ -166,26 +155,14 @@ func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, http details["upstream_detail"] = detail } - // The sentinel UUID is guaranteed not to exist upstream — Delorean returns - // 404 for it, which is our documented success signal (host reached + Bearer - // accepted). Rewrite to 200 so the shared command's outcome mapping - // (written for Midaz, where 404 means missing org/ledger) doesn't classify - // our healthy path as E2E=Failed. The original status is preserved in - // Details for diagnostics. - statusCode := resp.StatusCode - if statusCode == http.StatusNotFound { - details["raw_status_code"] = http.StatusNotFound - statusCode = http.StatusOK - } - // Status classification for 401/403/5xx is left to the shared command // layer (applyProbeOutcome). Owning that decision here would drift from // the Midaz prober contract and double-classify the outcome. return executor.ProbeOutcome{ - URL: displayURL, + URL: probeURL, Reached: true, AuthApplied: true, - StatusCode: statusCode, + StatusCode: resp.StatusCode, DurationMs: time.Since(start).Milliseconds(), Details: details, } @@ -237,54 +214,10 @@ func applyDataRudderAuth(ctx context.Context, req *http.Request, cfg map[string] return nil } -// withPinnedSNI returns a derived *http.Client whose TLS handshake uses the -// supplied sni hostname instead of req.URL.Host. This is the third leg of the -// libSSRF.ResolveAndValidate contract: PinnedURL is dialled, Authority is the -// Host header, and SNIHostname is the TLS ServerName. -// -// The returned client clones the source Transport and TLSClientConfig so the -// caller's state is not mutated. It is intended to be used for a single -// pinned request: reusing it for requests against other hostnames would send -// the wrong ServerName. -// -// When sni is empty the original client is returned unchanged so a future -// upstream change in libSSRF (e.g. SNIHostname missing for an IP literal URL) -// does not silently regress to the broken state. -// -// When base.Transport is not a *http.Transport (e.g. an instrumented wrapper), -// the helper falls back to http.DefaultTransport. This trades probe-level -// instrumentation for correctness, which is acceptable: the probe is not a -// hot path and the broken handshake would otherwise block every HTTPS probe. -func withPinnedSNI(base *http.Client, sni string) *http.Client { - if sni == "" { - return base - } - - src, ok := base.Transport.(*http.Transport) - if !ok || src == nil { - src = http.DefaultTransport.(*http.Transport) - } - - t := src.Clone() - if t.TLSClientConfig == nil { - t.TLSClientConfig = &tls.Config{ServerName: sni, MinVersion: tls.VersionTLS12} - } else { - t.TLSClientConfig = t.TLSClientConfig.Clone() - t.TLSClientConfig.ServerName = sni - } - - return &http.Client{ - Transport: t, - Timeout: base.Timeout, - CheckRedirect: base.CheckRedirect, - Jar: base.Jar, - } -} - // readUpstreamDetail extracts the "detail" field from a JSON error body if -// present. Delorean's 404 returns {"detail": "transaction not found"} and -// surfacing it gives operators a human-readable diagnostic without parsing -// the full body. Returns "" on any parse failure — best-effort, never fatal. +// present. Delorean's error responses return {"detail": "..."} and surfacing +// it gives operators a human-readable diagnostic without parsing the full +// body. Returns "" on any parse failure — best-effort, never fatal. func readUpstreamDetail(body io.Reader) string { limited := io.LimitReader(body, probeBodyReadLimit) @@ -303,31 +236,3 @@ func readUpstreamDetail(body io.Reader) string { return parsed.Detail } - -// ssrfAllowPrivate caches the SSRF_ALLOW_PRIVATE env var. Tests may override -// it via resetDataRudderSSRFCache + the SSRF_ALLOW_PRIVATE env to flip behaviour. -// -// NOTE: local duplication of the helper in -// internal/services/command/test_provider_config_connectivity.go and -// pkg/executors/midaz/connectivity_prober.go — extracted-to-shared was -// considered and rejected in the Midaz cycle on the same grounds (~10 lines -// is cheaper to duplicate than to introduce a new shared package). -var ( - ssrfAllowPrivate bool - ssrfAllowPrivateOnce sync.Once -) - -// ssrfOptions returns the libSSRF options derived from SSRF_ALLOW_PRIVATE. -// Set SSRF_ALLOW_PRIVATE=true to permit probes against private networks -// (httptest, local dev). The env var is read lazily on first call. -func ssrfOptions() []libSSRF.Option { - ssrfAllowPrivateOnce.Do(func() { - ssrfAllowPrivate = os.Getenv("SSRF_ALLOW_PRIVATE") == "true" - }) - - if ssrfAllowPrivate { - return []libSSRF.Option{libSSRF.WithAllowPrivateNetwork()} - } - - return nil -} diff --git a/pkg/executors/datarudder/connectivity_prober_test.go b/pkg/executors/datarudder/connectivity_prober_test.go index 151d117..52337b2 100644 --- a/pkg/executors/datarudder/connectivity_prober_test.go +++ b/pkg/executors/datarudder/connectivity_prober_test.go @@ -15,6 +15,7 @@ import ( "crypto/x509/pkix" "encoding/json" "math/big" + "net" "net/http" "net/http/httptest" "strings" @@ -24,6 +25,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/LerianStudio/flowker/pkg/safehttp" ) // datarudderMockOptions configures the Delorean mock server. @@ -77,15 +80,15 @@ func newDatarudderMockServer(t *testing.T, opts datarudderMockOptions) *datarudd time.Sleep(opts.fraudDelay) } - // Default behaviour: sentinel UUID returns 404 (transaction not found). + // Default behaviour: list endpoint returns 200 with an empty result page. status := opts.forceFraudStatus if status == 0 { - status = http.StatusNotFound + status = http.StatusOK } body := opts.fraudResponse if body == "" { - body = `{"detail":"transaction not found"}` + body = `{"count":12,"next":null,"previous":null,"results":[]}` } w.Header().Set("Content-Type", "application/json") @@ -113,26 +116,12 @@ func newProberClient() *http.Client { return &http.Client{Timeout: 5 * time.Second} } -// resetSSRFCacheForTest forces ssrfOptions() to re-read SSRF_ALLOW_PRIVATE on -// the next call. Tests use t.Setenv before this to switch policy per scenario. -// -// CAUTION: this mutates package-level state (ssrfAllowPrivate + -// ssrfAllowPrivateOnce). Do NOT call t.Parallel() in any test that touches -// SSRF state — concurrent tests would race the reset/read. -func resetSSRFCacheForTest(t *testing.T) { - t.Helper() - resetDataRudderSSRFCache() -} - func TestAuthRequired_IsTrue(t *testing.T) { provider := &datarudderProvider{} assert.True(t, provider.AuthRequired()) } -func TestProbe_HappyPath_Sentinel404(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "true") - resetSSRFCacheForTest(t) - +func TestProbe_HappyPath_List200(t *testing.T) { mock := newDatarudderMockServer(t, datarudderMockOptions{}) provider := &datarudderProvider{} @@ -142,29 +131,27 @@ func TestProbe_HappyPath_Sentinel404(t *testing.T) { require.True(t, outcome.Reached, "expected Reached=true, TransportErr=%v", outcome.TransportErr) assert.True(t, outcome.AuthApplied) - // Sentinel 404 is rewritten to 200 — that is the documented success signal. - // The raw status survives in Details["raw_status_code"] for diagnostics. - assert.Equal(t, http.StatusOK, outcome.StatusCode, "sentinel 404 must be normalised to 200") + // The list endpoint returns 200 natively — no status remapping is required. + assert.Equal(t, http.StatusOK, outcome.StatusCode, "list endpoint must return 200 natively") assert.NoError(t, outcome.TransportErr) - expectedURL := mock.server.URL + "/api/v1/fraud/transactions/00000000-0000-0000-0000-000000000000/" + expectedURL := mock.server.URL + "/api/v1/fraud/transactions/?limit=1&offset=0" assert.Equal(t, expectedURL, outcome.URL) + // upstream_detail is only populated when the body carries a "detail" field + // (e.g. error responses). A successful list payload omits it. require.NotNil(t, outcome.Details) - assert.Equal(t, "transaction not found", outcome.Details["upstream_detail"]) - assert.Equal(t, http.StatusNotFound, outcome.Details["raw_status_code"], "original status preserved for diagnostics") + _, hasDetail := outcome.Details["upstream_detail"] + assert.False(t, hasDetail, "successful list payload has no detail field") assert.Equal(t, int64(1), mock.tokenRequests.Load()) assert.Equal(t, int64(1), mock.fraudRequests.Load()) } func TestProbe_TransactionExists_200(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "true") - resetSSRFCacheForTest(t) - mock := newDatarudderMockServer(t, datarudderMockOptions{ forceFraudStatus: http.StatusOK, - fraudResponse: `{"id":"abc","score":0.5,"action":"APPROVED"}`, + fraudResponse: `{"count":1,"results":[{"id":"abc","score":0.5,"action":"APPROVED"}]}`, }) provider := &datarudderProvider{} @@ -176,15 +163,9 @@ func TestProbe_TransactionExists_200(t *testing.T) { assert.True(t, outcome.AuthApplied) assert.Equal(t, http.StatusOK, outcome.StatusCode) assert.NoError(t, outcome.TransportErr) - // Real 200 (not a sentinel rewrite) — raw_status_code must NOT be set. - _, hasRaw := outcome.Details["raw_status_code"] - assert.False(t, hasRaw, "raw_status_code is only set when the sentinel 404 rewrite fires") } func TestProbe_AuthRejected_401(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "true") - resetSSRFCacheForTest(t) - mock := newDatarudderMockServer(t, datarudderMockOptions{ forceFraudStatus: http.StatusUnauthorized, fraudResponse: `{"detail":"invalid token"}`, @@ -205,9 +186,6 @@ func TestProbe_AuthRejected_401(t *testing.T) { } func TestProbe_AuthRejected_403(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "true") - resetSSRFCacheForTest(t) - mock := newDatarudderMockServer(t, datarudderMockOptions{ forceFraudStatus: http.StatusForbidden, fraudResponse: `{"detail":"forbidden"}`, @@ -225,9 +203,6 @@ func TestProbe_AuthRejected_403(t *testing.T) { } func TestProbe_ServerError_5xx(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "true") - resetSSRFCacheForTest(t) - mock := newDatarudderMockServer(t, datarudderMockOptions{ forceFraudStatus: http.StatusInternalServerError, fraudResponse: `{"detail":"upstream borked"}`, @@ -245,9 +220,6 @@ func TestProbe_ServerError_5xx(t *testing.T) { } func TestProbe_TokenEndpointFails_401(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "true") - resetSSRFCacheForTest(t) - mock := newDatarudderMockServer(t, datarudderMockOptions{ forceTokenStatus: http.StatusUnauthorized, }) @@ -273,9 +245,6 @@ func TestProbe_TokenEndpointFails_401(t *testing.T) { // base_url AND token_url at 127.0.0.1:1, so auth failed first and the // post-auth network error branch was never reached. func TestProbe_NetworkError_UnreachableProbeHost(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "true") - resetSSRFCacheForTest(t) - tokenMock := newDatarudderMockServer(t, datarudderMockOptions{}) provider := &datarudderProvider{} @@ -298,8 +267,11 @@ func TestProbe_NetworkError_UnreachableProbeHost(t *testing.T) { } func TestProbe_SSRFBlocked_PrivateNetworkDisallowed(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "false") - resetSSRFCacheForTest(t) + // Force the SSRF policy off for the duration of this test. TestMain sets + // it on suite-wide, so we explicitly restore the previous value to keep + // other tests unaffected. + previous := safehttp.SetAllowPrivateForTest(false) + t.Cleanup(func() { safehttp.SetAllowPrivateForTest(previous) }) mock := newDatarudderMockServer(t, datarudderMockOptions{}) @@ -315,9 +287,6 @@ func TestProbe_SSRFBlocked_PrivateNetworkDisallowed(t *testing.T) { } func TestProbe_MissingBaseURL(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "true") - resetSSRFCacheForTest(t) - provider := &datarudderProvider{} cfg := map[string]any{ "auth": map[string]any{ @@ -335,9 +304,6 @@ func TestProbe_MissingBaseURL(t *testing.T) { } func TestProbe_MissingAuthBlock(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "true") - resetSSRFCacheForTest(t) - provider := &datarudderProvider{} cfg := map[string]any{"base_url": "https://example.com"} @@ -353,9 +319,6 @@ func TestProbe_MissingAuthBlock(t *testing.T) { // The previous shape of this test only covered the client_secret branch // because extractDataRudderProbeTargets iterates the keys in declaration order. func TestProbe_MissingAuthField(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "true") - resetSSRFCacheForTest(t) - provider := &datarudderProvider{} cases := []struct { @@ -392,9 +355,6 @@ func TestProbe_MissingAuthField(t *testing.T) { } func TestProbe_UpstreamDetailMissing_DoesNotFail(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "true") - resetSSRFCacheForTest(t) - mock := newDatarudderMockServer(t, datarudderMockOptions{ fraudResponse: `not even json`, }) @@ -405,9 +365,8 @@ func TestProbe_UpstreamDetailMissing_DoesNotFail(t *testing.T) { outcome := provider.Probe(context.Background(), cfg, newProberClient()) require.True(t, outcome.Reached) - // Default mock status is 404 (sentinel) → rewritten to 200. + // Default mock status is 200 (list endpoint success). assert.Equal(t, http.StatusOK, outcome.StatusCode) - assert.Equal(t, http.StatusNotFound, outcome.Details["raw_status_code"]) assert.Empty(t, outcome.Details["upstream_detail"], "non-JSON body must not crash; detail just stays empty") } @@ -416,9 +375,6 @@ func TestProbe_UpstreamDetailMissing_DoesNotFail(t *testing.T) { // being checked is "slow upstream surfaces as Reached=false + transport error", // not the literal 10s probeTimeout constant. func TestProbe_Timeout(t *testing.T) { - t.Setenv("SSRF_ALLOW_PRIVATE", "true") - resetSSRFCacheForTest(t) - mock := newDatarudderMockServer(t, datarudderMockOptions{ fraudDelay: 500 * time.Millisecond, }) @@ -436,81 +392,24 @@ func TestProbe_Timeout(t *testing.T) { assert.Contains(t, outcome.TransportErr.Error(), "probe request failed") } -// TestWithPinnedSNI_SetsServerName confirms the helper installs the supplied -// hostname as TLS ServerName on a derived transport. -func TestWithPinnedSNI_SetsServerName(t *testing.T) { - base := &http.Client{Timeout: 5 * time.Second} - - pinned := withPinnedSNI(base, "example.com") - - require.NotSame(t, base, pinned, "must return a derived client") - - transport, ok := pinned.Transport.(*http.Transport) - require.True(t, ok) - require.NotNil(t, transport.TLSClientConfig) - assert.Equal(t, "example.com", transport.TLSClientConfig.ServerName) - assert.Equal(t, base.Timeout, pinned.Timeout, "Timeout must propagate to the derived client") -} - -// TestWithPinnedSNI_EmptySNI_ReturnsBase guards against silent regression: if -// libSSRF ever returns an empty SNIHostname (e.g. for an IP literal URL) we -// must NOT wrap the client into a broken state — return the original instead. -func TestWithPinnedSNI_EmptySNI_ReturnsBase(t *testing.T) { - base := &http.Client{Timeout: 5 * time.Second} - - pinned := withPinnedSNI(base, "") - - assert.Same(t, base, pinned, "empty SNI must return the original client unchanged") -} - -// TestWithPinnedSNI_NilTransport_UsesDefault covers the fallback to -// http.DefaultTransport when the caller-supplied client has Transport=nil. -func TestWithPinnedSNI_NilTransport_UsesDefault(t *testing.T) { - base := &http.Client{} // Transport == nil - - pinned := withPinnedSNI(base, "example.com") - - transport, ok := pinned.Transport.(*http.Transport) - require.True(t, ok) - require.NotNil(t, transport.TLSClientConfig) - assert.Equal(t, "example.com", transport.TLSClientConfig.ServerName) -} - -// TestWithPinnedSNI_PreservesExistingTLSConfig confirms that existing fields -// on the source TLSClientConfig survive the clone and that the base client's -// config is NOT mutated. -func TestWithPinnedSNI_PreservesExistingTLSConfig(t *testing.T) { - base := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{MinVersion: tls.VersionTLS12}, - }, - } - - pinned := withPinnedSNI(base, "example.com") - - pinnedTransport := pinned.Transport.(*http.Transport) - assert.Equal(t, "example.com", pinnedTransport.TLSClientConfig.ServerName) - assert.Equal(t, uint16(tls.VersionTLS12), pinnedTransport.TLSClientConfig.MinVersion, "existing fields must be preserved") - - originalTransport := base.Transport.(*http.Transport) - assert.Empty(t, originalTransport.TLSClientConfig.ServerName, "must not mutate the base TLSClientConfig") -} - -// TestWithPinnedSNI_HandshakeAgainstHostnameOnlyCert reproduces the exact -// Delorean staging scenario that the smoke test surfaced — a server whose -// certificate covers the hostname (DNS SAN) but NOT the resolved IP — and -// proves that: +// TestSafehttp_HandshakeAgainstDNSOnlyCert is the regression test for the +// original Delorean smoke-test failure: a server whose certificate covers a +// hostname via DNS SAN but does NOT include an IP SAN. The legacy +// libSSRF.ResolveAndValidate + PinnedURL pattern substituted the resolved IP +// into req.URL.Host, which broke TLS handshake because Go derives SNI / +// certificate verification from req.URL.Host (a hostname is required, an IP +// is not in the SAN). // -// - WITHOUT withPinnedSNI, the handshake fails with the literal error -// observed in production smoke testing: "x509: cannot validate certificate -// for because it doesn't contain any IP SANs". -// - WITH withPinnedSNI(client, ""), the same client and the same -// IP-host URL complete the handshake successfully because the ClientHello -// ServerName is the hostname instead of the IP. +// The current pkg/safehttp design keeps the original hostname in req.URL.Host +// and enforces SSRF at dial time inside SafeTransport.DialContext. Because the +// URL hostname is preserved, the TLS handshake sees the correct SNI and the +// DNS SAN match succeeds. // -// This test exists specifically to prevent regression of the smoke-test -// failure that motivated the fix. -func TestWithPinnedSNI_HandshakeAgainstHostnameOnlyCert(t *testing.T) { +// This test proves that safehttp.NewClient successfully handshakes against a +// DNS-only-SAN cert when the request URL uses the hostname (the design +// contract). A custom dialer maps "datarudder.test" → server.Listener.Addr() +// to keep the test hermetic. +func TestSafehttp_HandshakeAgainstDNSOnlyCert(t *testing.T) { cert, rootCAs := generateHostnameOnlyTestCert(t, "datarudder.test") server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { @@ -520,38 +419,34 @@ func TestWithPinnedSNI_HandshakeAgainstHostnameOnlyCert(t *testing.T) { server.StartTLS() t.Cleanup(server.Close) - baseClient := &http.Client{ - Timeout: 5 * time.Second, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{RootCAs: rootCAs}, + serverAddr := server.Listener.Addr().String() + + // Build a transport that: + // - Maps the hostname "datarudder.test" → the httptest listener address + // (so DNS resolution is hermetic — no real DNS lookup happens). + // - Trusts the test CA via TLSClientConfig.RootCAs. + // This mirrors what safehttp.SafeTransport does (re-resolve at dial time) + // minus the SSRF gate — that is irrelevant to the cert-validation contract + // being asserted here. + transport := &http.Transport{ + DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) { + d := &net.Dialer{Timeout: 5 * time.Second} + return d.DialContext(ctx, network, serverAddr) }, + TLSClientConfig: &tls.Config{RootCAs: rootCAs, MinVersion: tls.VersionTLS12}, } + client := &http.Client{Timeout: 5 * time.Second, Transport: transport} - // server.URL is the IP-host form, e.g. "https://127.0.0.1:54321" — exactly - // what libSSRF.PinnedURL produces against a public hostname. - serverURL := server.URL - - t.Run("without fix: IP-host URL fails because cert has no IP SAN", func(t *testing.T) { - req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, serverURL, nil) - require.NoError(t, err) - - _, err = baseClient.Do(req) - require.Error(t, err, "broken-state path must fail; this is the smoke-test repro") - assert.Contains(t, err.Error(), "IP SAN", "error must be the documented IP-SAN failure") - }) - - t.Run("with fix: pinned SNI validates against the DNS SAN", func(t *testing.T) { - pinnedClient := withPinnedSNI(baseClient, "datarudder.test") - - req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, serverURL, nil) - require.NoError(t, err) + // Use the hostname in the URL so the TLS ClientHello carries SNI = + // "datarudder.test" and the server cert (DNS SAN only) validates. + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "https://datarudder.test/", nil) + require.NoError(t, err) - resp, err := pinnedClient.Do(req) - require.NoError(t, err, "with the SNI pin the handshake must succeed against the DNS SAN") + resp, err := client.Do(req) + require.NoError(t, err, "handshake must succeed against a DNS-only-SAN cert when URL host is the hostname") - defer func() { _ = resp.Body.Close() }() - assert.Equal(t, http.StatusOK, resp.StatusCode) - }) + defer func() { _ = resp.Body.Close() }() + assert.Equal(t, http.StatusOK, resp.StatusCode) } // generateHostnameOnlyTestCert mints a self-signed cert with a DNS SAN for diff --git a/pkg/executors/datarudder/main_test.go b/pkg/executors/datarudder/main_test.go new file mode 100644 index 0000000..92e3b14 --- /dev/null +++ b/pkg/executors/datarudder/main_test.go @@ -0,0 +1,27 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +//go:build unit + +package datarudder + +import ( + "os" + "testing" + + "github.com/LerianStudio/flowker/pkg/safehttp" +) + +// TestMain flips the safehttp SSRF policy to allow private/loopback ranges for +// the duration of the datarudder test suite. The probe tests use httptest.NewServer +// (which binds to 127.0.0.1) and route through applyDataRudderAuth → +// auth.NewFromConfig → NewTokenFetcher which wraps the HTTP client with +// safehttp.NewClient — the SafeTransport.DialContext would otherwise block the +// loopback dial. The override is restored on exit even when the suite panics. +func TestMain(m *testing.M) { + previous := safehttp.SetAllowPrivateForTest(true) + code := m.Run() + safehttp.SetAllowPrivateForTest(previous) + os.Exit(code) +} diff --git a/pkg/executors/datarudder/ssrf_test_helpers.go b/pkg/executors/datarudder/ssrf_test_helpers.go deleted file mode 100644 index e0edd1b..0000000 --- a/pkg/executors/datarudder/ssrf_test_helpers.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) 2026 Lerian Studio. All rights reserved. -// Use of this source code is governed by the Elastic License 2.0 -// that can be found in the LICENSE file. - -//go:build unit - -package datarudder - -import "sync" - -// resetDataRudderSSRFCache resets the cached SSRF policy so a subsequent call -// to ssrfOptions re-reads the SSRF_ALLOW_PRIVATE env var. Intended for tests -// that switch the policy between scenarios via t.Setenv. -// -// Lives in a unit-tagged file so production builds don't carry it (and so -// golangci-lint's "unused" check stays clean against the non-test build). -func resetDataRudderSSRFCache() { - ssrfAllowPrivateOnce = sync.Once{} - ssrfAllowPrivate = false -}