diff --git a/docs/pre-dev/datarudder/tasks.md b/docs/pre-dev/datarudder/tasks.md new file mode 100644 index 0000000..38cd9d8 --- /dev/null +++ b/docs/pre-dev/datarudder/tasks.md @@ -0,0 +1,87 @@ +# Tasks — datarudder + +**Reference plan**: `C:\Users\SHMZ\.claude\plans\giggly-waddling-wind.md` +**Branch**: `feature/datarudder-provider` (from `feature/oauth2-token-endpoint-auth`, PR target `develop`) +**Feature scope**: Add public Provider + Executor `datarudder.create-transaction` to the catalog, consuming the `oauth2_token_endpoint` outbound auth shipped in the parent branch. Integration target: Delorean anti-fraud API (`inference.api-stg.delorean-ai.com`). + +--- + +## Task T-001: DataRudder Provider + create-transaction Executor + +**Goal**: Expose the `oauth2_token_endpoint` auth feature to end users via a public catalog Provider that integrates with Delorean's fraud transactions API. Mirrors Midaz architectural pattern (`BuildInput` + `ConnectivityProber`) — not Tracer (which uses static api_key). + +**Driver**: The HTTP provider that hosts `oauth2_token_endpoint` is internal-only (not in the catalog). Without a public Provider that uses it, end users can't configure or test the new auth method. + +**Acceptance criteria**: +- [ ] `pkg/executors/datarudder/` package exists with `Register()` exposing `ProviderID="datarudder"` and one executor `datarudder.create-transaction` +- [ ] `providerConfigSchema` NESTED: required top-level `base_url`+`auth`; required inside `auth`: `token_url`+`client_id`+`client_secret`; `additionalProperties: false` at both levels +- [ ] `datarudderProvider` struct implements `executor.{Provider, InputBuilder, ConnectivityProber}` (compile-time checks) +- [ ] `BuildInput` normalizes `base_url` via `strings.TrimSuffix(..., "/")` and uses `executorRoutes` map (Midaz pattern) +- [ ] `buildDataRudderAuth` extracts 3 fields explicitly + hardcodes `credentials_location="body"` (not exposed in schema) +- [ ] `ConnectivityProber.Probe` issues `GET {base_url}/api/v1/fraud/transactions/?limit=1&offset=0` (list endpoint returns 200 natively for an authenticated request — no status remapping required); surfaces upstream `detail` in `ProbeOutcome.Details["upstream_detail"]` on error bodies +- [ ] SSRF pre-flight uses `safehttp.Validate` (which preserves the URL hostname so TLS SNI / certificate verification stays correct — the legacy `libSSRF.ResolveAndValidate + PinnedURL` pattern is intentionally NOT used). Dial-time enforcement is handled by the safehttp-wrapped HTTP client passed in by the command layer. +- [ ] `AuthRequired()` returns `true` +- [ ] `pkg/executors/register.go` wires `datarudder.Register(catalog)` in `RegisterDefaults` +- [ ] All existing tests still pass (`make test`) +- [ ] Coverage ≥ 85% on changed files +- [ ] `make lint` clean + +--- + +### Subtask ST-001-01: Provider scaffolding + schema + executor + +**Files to create**: +- `pkg/executors/datarudder/provider.go` — `Register`, `ProviderID`, NESTED `providerConfigSchema` with `additionalProperties: false`, `datarudderProvider` struct embedding `*base.Provider` +- `pkg/executors/datarudder/create_transaction.go` — `CreateTransactionID` const + `newCreateTransactionExecutor()` returning `*base.Executor` with loose schema (`{"type": "object"}`) + +**Files to modify**: +- `pkg/executors/register.go` — add `datarudder.Register(catalog)` + import + +--- + +### Subtask ST-001-02: BuildInput + auth translation + +**Files to create**: +- `pkg/executors/datarudder/input_builder.go` — `executorRoutes` map (1 entry: POST + `/api/v1/fraud/transactions/`), package `BuildInput()` + method on `datarudderProvider` that delegates, `buildDataRudderAuth()` with explicit field extraction + hardcoded `credentials_location="body"` + +**Design constraints (from approved plan)**: +- `BuildInput` must `strings.TrimSuffix(baseURL, "/")` before concatenating with the path +- Path hardcoded as const (NOT configurable via ProviderConfiguration — security) +- `buildDataRudderAuth` returns `nil` when `auth` block is missing/empty (not an error — runner short-circuits) + +--- + +### Subtask ST-001-03: ConnectivityProber + +**Files to create**: +- `pkg/executors/datarudder/connectivity_prober.go` — `Probe()`, `AuthRequired()`, `extractDataRudderProbeTargets()`, `probeTransactionsListPath` const (`/api/v1/fraud/transactions/?limit=1&offset=0`), SSRF pre-flight via `safehttp.Validate(probeCtx, baseURL, nil)`, response body JSON parse to populate `Details["upstream_detail"]` +- `pkg/executors/datarudder/main_test.go` — `TestMain` flips `safehttp.SetAllowPrivateForTest(true)` for the suite (mirrors `pkg/executors/midaz/main_test.go`). NO local `ssrfOptions()` helper / SSRF cache: the package consumes `safehttp.Validate` for pre-flight and the safehttp-wrapped HTTP client (built by the command layer) for dial-time enforcement. + +**SSRF strategy**: the prober uses `safehttp.Validate` which preserves the URL hostname (the godoc explicitly says *"callers must use the original URL so TLS handshake sees the correct SNI"*). The legacy `libSSRF.ResolveAndValidate + PinnedURL + req.Host = Authority + custom withPinnedSNI` pattern is intentionally NOT used — substituting an IP literal into `req.URL.Host` breaks HTTPS handshake against any cert that only has a DNS SAN (the exact bug observed against Delorean staging). No local `ssrfOptions` helper is needed: `safehttp.Options()` is the single source of truth, and per-test policy overrides go through `safehttp.SetAllowPrivateForTest`. + +**Outcome mapping** (from approved plan — must cover all in tests): +- 200 (list endpoint natural success) → `Reached=true, AuthApplied=true` +- 401/403 → `Reached=true, AuthApplied=true` (the shared command layer classifies status → Auth=Failed; Reached + AuthApplied describe what happened on the wire, not the semantic outcome) +- 5xx → `Reached=true, AuthApplied=true` (shared command layer classifies status >= 500) +- Token endpoint failure → `Reached=false, AuthApplied=false` (probe URL never reached because auth resolution short-circuited) +- DNS/TCP error against probe URL → `Reached=false, AuthApplied=true` (auth succeeded; only the probe-URL hop failed) +- SSRF blocked → `Reached=false, AuthApplied=false` + +--- + +### Subtask ST-001-04: Tests (TDD) + +**Files to create**: +- `pkg/executors/datarudder/datarudder_test.go` — register + schema validity + intra-package integration test (mock `POST /api/v1/auth/login` + mock `POST /api/v1/fraud/transactions/` via `httptest.Server`) +- `pkg/executors/datarudder/input_builder_test.go` — happy path, missing fields, auth block translation, `base_url` with/without trailing slash +- `pkg/executors/datarudder/connectivity_prober_test.go` — cover all 6 outcome scenarios from the table above, plus `AuthRequired()`, missing required fields, `upstream_detail` surfacing + +**TDD flow**: RED (failing tests) → GREEN (minimum code) → REFACTOR. Target ≥85% coverage on the new package. + +--- + +## Out-of-cycle (post-implementation) + +- **CHANGELOG**: do NOT edit manually. Auto-generated by `.github/workflows/gptchangelog.yml`. Use Conventional Commits (`feature(datarudder): ...`, `test(datarudder): ...`). +- **Smoke test against Delorean staging**: manual, after PR merge. `POST /v1/provider-configurations//test` against a real `inference.api-stg.delorean-ai.com` provider config — expect Connectivity=Reached, Auth=Applied, E2E StatusCode=200 against the list endpoint (`/api/v1/fraud/transactions/?limit=1&offset=0`). `upstream_detail` is populated only on error bodies. +- **Production URL**: when Delorean prod URL becomes available, swap is zero-code — just update `ProviderConfiguration` via API. diff --git a/pkg/executors/datarudder/connectivity_prober.go b/pkg/executors/datarudder/connectivity_prober.go new file mode 100644 index 0000000..aa6c3b8 --- /dev/null +++ b/pkg/executors/datarudder/connectivity_prober.go @@ -0,0 +1,238 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +package datarudder + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/LerianStudio/flowker/pkg/executor" + "github.com/LerianStudio/flowker/pkg/executors/http/auth" + "github.com/LerianStudio/flowker/pkg/safehttp" +) + +// probeTransactionsListPath is the read-only Delorean endpoint used to validate +// connectivity + authentication + endpoint reachability in a single call. The +// list endpoint returns 200 with a (possibly empty) result page for an +// authenticated request — a natural success signal that does not require any +// status remapping. The limit=1 + offset=0 query keeps the response payload at +// the smallest meaningful size. +const probeTransactionsListPath = "/api/v1/fraud/transactions/?limit=1&offset=0" + +// probeTimeout caps the total wall time of a single connectivity probe so the +// command does not hang on a slow upstream. It MUST stay below the command- +// level test timeout to give the caller a chance to record the error. +const probeTimeout = 10 * time.Second + +// probeBodyReadLimit caps how many bytes of the probe response body we read +// when extracting the upstream "detail" field. Delorean's error bodies are +// small; 4 KiB is generous without risking memory blow-up on a hostile server. +const probeBodyReadLimit = 4 * 1024 + +// AuthRequired reports that the DataRudder provider needs OAuth2 credentials +// to function. When credentials are absent the generic connectivity-test +// command short-circuits to Auth=Failed without attempting the probe. +func (p *datarudderProvider) AuthRequired() bool { return true } + +// Probe issues a single read-only GET against the Delorean fraud transactions +// list endpoint, applying OAuth2 auth via the existing pkg/executors/http/auth +// machinery. The returned ProbeOutcome is the single source of truth that the +// generic command maps onto the three stage results. +// +// SSRF protection is enforced in two layers via pkg/safehttp: +// 1. safehttp.Validate performs DNS resolution + IP-policy pre-flight on the +// original URL (no PinnedURL substitution — TLS SNI stays correct). +// 2. The caller-supplied httpClient is already wrapped by safehttp.NewClient +// (in the command layer), whose Transport.DialContext re-resolves DNS at +// dial time and rejects blocked IPs atomically with the dial — closing +// the TOCTOU/DNS-rebinding window. +// +// Failure modes, in the order they are detected: +// 1. Missing/invalid base_url → Reached=false. +// 2. SSRF/DNS validation fails → Reached=false (TransportErr describes the block). +// 3. Auth provider construction or token exchange fails → Reached=false (the +// probe URL was never reached), AuthApplied=false. +// 4. HTTP transport error against base_url → Reached=false, AuthApplied=true +// (auth succeeded; only the probe-URL hop failed). +// 5. HTTP response received → Reached=true, AuthApplied=true; StatusCode +// carries the result. The list endpoint returns 200 on success — no +// status remapping is required. Status classification for 401/403/5xx is +// left to the shared command layer (applyProbeOutcome) so behaviour stays +// consistent across providers. +func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, httpClient *http.Client) executor.ProbeOutcome { + start := time.Now() + + if httpClient == nil { + httpClient = &http.Client{Timeout: probeTimeout} + } + + // Derive a single timed context up front so both phases (SSRF validation + // and the HTTP request) share one end-to-end wall-time budget. Deriving + // it later — around the HTTP request only — would leave DNS/SSRF + // validation uncapped, contradicting probeTimeout's documented contract. + probeCtx, cancel := context.WithTimeout(ctx, probeTimeout) + defer cancel() + + baseURL, err := extractDataRudderProbeTargets(cfg) + if err != nil { + return executor.ProbeOutcome{ + URL: baseURL, + Reached: false, + DurationMs: time.Since(start).Milliseconds(), + TransportErr: err, + } + } + + baseURL = strings.TrimRight(baseURL, "/") + probePath := probeTransactionsListPath + probeURL := baseURL + probePath + + // SSRF pre-flight on the original URL. safehttp.Validate intentionally + // preserves the hostname (no PinnedURL substitution) so the subsequent + // TLS handshake sees the correct SNI. Dial-time enforcement in the + // safehttp-wrapped transport closes the TOCTOU window atomically. + if err := safehttp.Validate(probeCtx, baseURL, nil); err != nil { + return executor.ProbeOutcome{ + URL: probeURL, + Reached: false, + DurationMs: time.Since(start).Milliseconds(), + TransportErr: fmt.Errorf("SSRF blocked base_url: %w", err), + } + } + + req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, probeURL, nil) + if err != nil { + return executor.ProbeOutcome{ + URL: probeURL, + Reached: false, + DurationMs: time.Since(start).Milliseconds(), + TransportErr: fmt.Errorf("build probe request: %w", err), + } + } + + if err := applyDataRudderAuth(probeCtx, req, cfg, httpClient); err != nil { + // The probe URL (base_url + path) was never reached because auth + // resolution / token exchange / header injection failed before the + // request could be dispatched. Honour the Reached field's documented + // contract (transport completed on the probe URL). + return executor.ProbeOutcome{ + URL: probeURL, + Reached: false, + AuthApplied: false, + DurationMs: time.Since(start).Milliseconds(), + TransportErr: err, + } + } + + resp, err := httpClient.Do(req) + if err != nil { + return executor.ProbeOutcome{ + URL: probeURL, + Reached: false, + AuthApplied: true, // auth succeeded; only the probe-URL hop failed + DurationMs: time.Since(start).Milliseconds(), + TransportErr: fmt.Errorf("probe request failed: %w", err), + } + } + + defer func() { + // Drain any unread bytes (anything beyond probeBodyReadLimit) so the + // connection can be returned to the keep-alive pool. + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() + + details := map[string]any{} + if detail := readUpstreamDetail(resp.Body); detail != "" { + details["upstream_detail"] = detail + } + + // Status classification for 401/403/5xx is left to the shared command + // layer (applyProbeOutcome). Owning that decision here would drift from + // the Midaz prober contract and double-classify the outcome. + return executor.ProbeOutcome{ + URL: probeURL, + Reached: true, + AuthApplied: true, + StatusCode: resp.StatusCode, + DurationMs: time.Since(start).Milliseconds(), + Details: details, + } +} + +// extractDataRudderProbeTargets pulls the base_url from a provider config and +// validates that the required auth credentials are present. Returns the +// base_url even on error so the caller can surface it for diagnostics. +func extractDataRudderProbeTargets(cfg map[string]any) (string, error) { + baseURL, ok := cfg["base_url"].(string) + if !ok || baseURL == "" { + return "", errors.New(`missing required field "base_url" in provider config`) + } + + authBlock, ok := cfg["auth"].(map[string]any) + if !ok || len(authBlock) == 0 { + return baseURL, errors.New("provider requires credentials but no auth block configured") + } + + for _, key := range []string{"token_url", "client_id", "client_secret"} { + v, ok := authBlock[key].(string) + if !ok || v == "" { + return baseURL, fmt.Errorf(`missing required field "auth.%s" in provider config`, key) + } + } + + return baseURL, nil +} + +// applyDataRudderAuth builds the auth provider from the nested auth block and +// applies it to the request. Returns nil on success; the underlying error on +// any token exchange or header injection failure. The caller treats a non-nil +// return as "AuthApplied=false, probe URL never reached". +func applyDataRudderAuth(ctx context.Context, req *http.Request, cfg map[string]any, httpClient *http.Client) error { + authConfig := buildDataRudderAuth(cfg) + if authConfig == nil { + return errors.New("auth block missing from provider config") + } + + authProvider, err := auth.NewFromConfig(authConfig, httpClient) + if err != nil { + return fmt.Errorf("build auth provider: %w", err) + } + + if err := authProvider.Apply(ctx, req); err != nil { + return fmt.Errorf("apply oauth2 auth: %w", err) + } + + return nil +} + +// readUpstreamDetail extracts the "detail" field from a JSON error body if +// present. Delorean's error responses return {"detail": "..."} and surfacing +// it gives operators a human-readable diagnostic without parsing the full +// body. Returns "" on any parse failure — best-effort, never fatal. +func readUpstreamDetail(body io.Reader) string { + limited := io.LimitReader(body, probeBodyReadLimit) + + raw, err := io.ReadAll(limited) + if err != nil || len(raw) == 0 { + return "" + } + + var parsed struct { + Detail string `json:"detail"` + } + + if err := json.Unmarshal(raw, &parsed); err != nil { + return "" + } + + return parsed.Detail +} diff --git a/pkg/executors/datarudder/connectivity_prober_test.go b/pkg/executors/datarudder/connectivity_prober_test.go new file mode 100644 index 0000000..52337b2 --- /dev/null +++ b/pkg/executors/datarudder/connectivity_prober_test.go @@ -0,0 +1,532 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +//go:build unit + +package datarudder + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/json" + "math/big" + "net" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/LerianStudio/flowker/pkg/safehttp" +) + +// datarudderMockOptions configures the Delorean mock server. +type datarudderMockOptions struct { + forceTokenStatus int // non-zero to force token endpoint status (e.g. 401) + forceFraudStatus int // non-zero to force fraud endpoint status (e.g. 404, 500) + fraudResponse string // raw body to return on the fraud endpoint + fraudDelay time.Duration // sleep before responding on the fraud endpoint +} + +// datarudderMockServer is a single httptest.Server that serves both the OAuth2 +// token endpoint and the Delorean fraud transactions endpoint. +type datarudderMockServer struct { + server *httptest.Server + tokenRequests *atomic.Int64 + fraudRequests *atomic.Int64 +} + +func newDatarudderMockServer(t *testing.T, opts datarudderMockOptions) *datarudderMockServer { + t.Helper() + + m := &datarudderMockServer{ + tokenRequests: &atomic.Int64{}, + fraudRequests: &atomic.Int64{}, + } + + mux := http.NewServeMux() + m.server = httptest.NewServer(mux) + + mux.HandleFunc("/api/v1/auth/login", func(w http.ResponseWriter, r *http.Request) { + m.tokenRequests.Add(1) + + if opts.forceTokenStatus != 0 { + w.WriteHeader(opts.forceTokenStatus) + _, _ = w.Write([]byte(`{"detail":"invalid credentials"}`)) + return + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "access_token": "mock-access-token", + "token_type": "Bearer", + "expires_in": 3600, + }) + }) + + mux.HandleFunc("/api/v1/fraud/transactions/", func(w http.ResponseWriter, r *http.Request) { + m.fraudRequests.Add(1) + + if opts.fraudDelay > 0 { + time.Sleep(opts.fraudDelay) + } + + // Default behaviour: list endpoint returns 200 with an empty result page. + status := opts.forceFraudStatus + if status == 0 { + status = http.StatusOK + } + + body := opts.fraudResponse + if body == "" { + body = `{"count":12,"next":null,"previous":null,"results":[]}` + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _, _ = w.Write([]byte(body)) + }) + + t.Cleanup(func() { m.server.Close() }) + + return m +} + +func validDatarudderConfig(serverURL string) map[string]any { + return map[string]any{ + "base_url": serverURL, + "auth": map[string]any{ + "token_url": serverURL + "/api/v1/auth/login", + "client_id": "test-client", + "client_secret": "test-secret", + }, + } +} + +func newProberClient() *http.Client { + return &http.Client{Timeout: 5 * time.Second} +} + +func TestAuthRequired_IsTrue(t *testing.T) { + provider := &datarudderProvider{} + assert.True(t, provider.AuthRequired()) +} + +func TestProbe_HappyPath_List200(t *testing.T) { + mock := newDatarudderMockServer(t, datarudderMockOptions{}) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + require.True(t, outcome.Reached, "expected Reached=true, TransportErr=%v", outcome.TransportErr) + assert.True(t, outcome.AuthApplied) + // The list endpoint returns 200 natively — no status remapping is required. + assert.Equal(t, http.StatusOK, outcome.StatusCode, "list endpoint must return 200 natively") + assert.NoError(t, outcome.TransportErr) + + expectedURL := mock.server.URL + "/api/v1/fraud/transactions/?limit=1&offset=0" + assert.Equal(t, expectedURL, outcome.URL) + + // upstream_detail is only populated when the body carries a "detail" field + // (e.g. error responses). A successful list payload omits it. + require.NotNil(t, outcome.Details) + _, hasDetail := outcome.Details["upstream_detail"] + assert.False(t, hasDetail, "successful list payload has no detail field") + + assert.Equal(t, int64(1), mock.tokenRequests.Load()) + assert.Equal(t, int64(1), mock.fraudRequests.Load()) +} + +func TestProbe_TransactionExists_200(t *testing.T) { + mock := newDatarudderMockServer(t, datarudderMockOptions{ + forceFraudStatus: http.StatusOK, + fraudResponse: `{"count":1,"results":[{"id":"abc","score":0.5,"action":"APPROVED"}]}`, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + require.True(t, outcome.Reached) + assert.True(t, outcome.AuthApplied) + assert.Equal(t, http.StatusOK, outcome.StatusCode) + assert.NoError(t, outcome.TransportErr) +} + +func TestProbe_AuthRejected_401(t *testing.T) { + mock := newDatarudderMockServer(t, datarudderMockOptions{ + forceFraudStatus: http.StatusUnauthorized, + fraudResponse: `{"detail":"invalid token"}`, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + // Contract: the probe reports what happened on the wire. Status + // classification (401/403 → Auth=Failed) is the shared command layer's job. + require.True(t, outcome.Reached, "host was reachable") + assert.True(t, outcome.AuthApplied, "Bearer header was injected; rejection happened server-side") + assert.Equal(t, http.StatusUnauthorized, outcome.StatusCode) + assert.NoError(t, outcome.TransportErr, "transport completed successfully; status carries the rejection") + assert.Equal(t, "invalid token", outcome.Details["upstream_detail"]) +} + +func TestProbe_AuthRejected_403(t *testing.T) { + mock := newDatarudderMockServer(t, datarudderMockOptions{ + forceFraudStatus: http.StatusForbidden, + fraudResponse: `{"detail":"forbidden"}`, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + require.True(t, outcome.Reached) + assert.True(t, outcome.AuthApplied) + assert.Equal(t, http.StatusForbidden, outcome.StatusCode) + assert.NoError(t, outcome.TransportErr) +} + +func TestProbe_ServerError_5xx(t *testing.T) { + mock := newDatarudderMockServer(t, datarudderMockOptions{ + forceFraudStatus: http.StatusInternalServerError, + fraudResponse: `{"detail":"upstream borked"}`, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + require.True(t, outcome.Reached) + assert.True(t, outcome.AuthApplied, "Bearer was accepted; the 500 is a Delorean-side issue") + assert.Equal(t, http.StatusInternalServerError, outcome.StatusCode) + assert.NoError(t, outcome.TransportErr, "transport completed; the shared command classifies status >= 500") +} + +func TestProbe_TokenEndpointFails_401(t *testing.T) { + mock := newDatarudderMockServer(t, datarudderMockOptions{ + forceTokenStatus: http.StatusUnauthorized, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + // The probe URL (base_url + path) was never reached — auth resolution + // short-circuited the chain. Reached must be false to honour the interface + // contract ("transport completed on the probe URL"). + assert.False(t, outcome.Reached, "probe URL never hit when token exchange fails") + assert.False(t, outcome.AuthApplied) + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "apply oauth2 auth") + assert.Equal(t, int64(0), mock.fraudRequests.Load(), "fraud endpoint must NOT have been called when token fetch failed") +} + +// TestProbe_NetworkError_UnreachableProbeHost exercises the branch where the +// token exchange SUCCEEDS but the subsequent GET to base_url fails at the +// transport layer (DNS/TCP). The earlier shape of this test pointed both +// base_url AND token_url at 127.0.0.1:1, so auth failed first and the +// post-auth network error branch was never reached. +func TestProbe_NetworkError_UnreachableProbeHost(t *testing.T) { + tokenMock := newDatarudderMockServer(t, datarudderMockOptions{}) + + provider := &datarudderProvider{} + cfg := map[string]any{ + "base_url": "http://127.0.0.1:1", // reliably refused on every platform + "auth": map[string]any{ + "token_url": tokenMock.server.URL + "/api/v1/auth/login", + "client_id": "x", + "client_secret": "y", + }, + } + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + assert.False(t, outcome.Reached, "base_url transport failed; probe URL not reached") + assert.True(t, outcome.AuthApplied, "auth succeeded against the token mock before the failed probe hop") + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "probe request failed") + assert.Equal(t, int64(1), tokenMock.tokenRequests.Load(), "token endpoint was hit exactly once") +} + +func TestProbe_SSRFBlocked_PrivateNetworkDisallowed(t *testing.T) { + // Force the SSRF policy off for the duration of this test. TestMain sets + // it on suite-wide, so we explicitly restore the previous value to keep + // other tests unaffected. + previous := safehttp.SetAllowPrivateForTest(false) + t.Cleanup(func() { safehttp.SetAllowPrivateForTest(previous) }) + + mock := newDatarudderMockServer(t, datarudderMockOptions{}) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + assert.False(t, outcome.Reached) + assert.False(t, outcome.AuthApplied) + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "SSRF") +} + +func TestProbe_MissingBaseURL(t *testing.T) { + provider := &datarudderProvider{} + cfg := map[string]any{ + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "x", + "client_secret": "y", + }, + } + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + assert.False(t, outcome.Reached) + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "base_url") +} + +func TestProbe_MissingAuthBlock(t *testing.T) { + provider := &datarudderProvider{} + cfg := map[string]any{"base_url": "https://example.com"} + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + assert.False(t, outcome.Reached) + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "auth block") +} + +// TestProbe_MissingAuthField is table-driven so each required field +// (token_url, client_id, client_secret) is exercised in its own subtest. +// The previous shape of this test only covered the client_secret branch +// because extractDataRudderProbeTargets iterates the keys in declaration order. +func TestProbe_MissingAuthField(t *testing.T) { + provider := &datarudderProvider{} + + cases := []struct { + name string + omitted string + expectedErr string + }{ + {name: "missing token_url", omitted: "token_url", expectedErr: "token_url"}, + {name: "missing client_id", omitted: "client_id", expectedErr: "client_id"}, + {name: "missing client_secret", omitted: "client_secret", expectedErr: "client_secret"}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + auth := map[string]any{ + "token_url": "https://example.com/token", + "client_id": "x", + "client_secret": "y", + } + delete(auth, tc.omitted) + + cfg := map[string]any{ + "base_url": "https://example.com", + "auth": auth, + } + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + assert.False(t, outcome.Reached) + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), tc.expectedErr) + }) + } +} + +func TestProbe_UpstreamDetailMissing_DoesNotFail(t *testing.T) { + mock := newDatarudderMockServer(t, datarudderMockOptions{ + fraudResponse: `not even json`, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + outcome := provider.Probe(context.Background(), cfg, newProberClient()) + + require.True(t, outcome.Reached) + // Default mock status is 200 (list endpoint success). + assert.Equal(t, http.StatusOK, outcome.StatusCode) + assert.Empty(t, outcome.Details["upstream_detail"], "non-JSON body must not crash; detail just stays empty") +} + +// TestProbe_Timeout exercises the probeTimeout / context-deadline path. +// Uses a client with a short Timeout so the test stays fast; the contract +// being checked is "slow upstream surfaces as Reached=false + transport error", +// not the literal 10s probeTimeout constant. +func TestProbe_Timeout(t *testing.T) { + mock := newDatarudderMockServer(t, datarudderMockOptions{ + fraudDelay: 500 * time.Millisecond, + }) + + provider := &datarudderProvider{} + cfg := validDatarudderConfig(mock.server.URL) + + fastClient := &http.Client{Timeout: 50 * time.Millisecond} + + outcome := provider.Probe(context.Background(), cfg, fastClient) + + assert.False(t, outcome.Reached, "client timeout fires before the upstream responds") + assert.True(t, outcome.AuthApplied, "token endpoint responded fast; only the fraud GET timed out") + require.Error(t, outcome.TransportErr) + assert.Contains(t, outcome.TransportErr.Error(), "probe request failed") +} + +// TestSafehttp_HandshakeAgainstDNSOnlyCert is the regression test for the +// original Delorean smoke-test failure: a server whose certificate covers a +// hostname via DNS SAN but does NOT include an IP SAN. The legacy +// libSSRF.ResolveAndValidate + PinnedURL pattern substituted the resolved IP +// into req.URL.Host, which broke TLS handshake because Go derives SNI / +// certificate verification from req.URL.Host (a hostname is required, an IP +// is not in the SAN). +// +// The current pkg/safehttp design keeps the original hostname in req.URL.Host +// and enforces SSRF at dial time inside SafeTransport.DialContext. Because the +// URL hostname is preserved, the TLS handshake sees the correct SNI and the +// DNS SAN match succeeds. +// +// This test proves that safehttp.NewClient successfully handshakes against a +// DNS-only-SAN cert when the request URL uses the hostname (the design +// contract). A custom dialer maps "datarudder.test" → server.Listener.Addr() +// to keep the test hermetic. +func TestSafehttp_HandshakeAgainstDNSOnlyCert(t *testing.T) { + cert, rootCAs := generateHostnameOnlyTestCert(t, "datarudder.test") + + server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + server.TLS = &tls.Config{Certificates: []tls.Certificate{cert}} + server.StartTLS() + t.Cleanup(server.Close) + + serverAddr := server.Listener.Addr().String() + + // Build a transport that: + // - Maps the hostname "datarudder.test" → the httptest listener address + // (so DNS resolution is hermetic — no real DNS lookup happens). + // - Trusts the test CA via TLSClientConfig.RootCAs. + // This mirrors what safehttp.SafeTransport does (re-resolve at dial time) + // minus the SSRF gate — that is irrelevant to the cert-validation contract + // being asserted here. + transport := &http.Transport{ + DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) { + d := &net.Dialer{Timeout: 5 * time.Second} + return d.DialContext(ctx, network, serverAddr) + }, + TLSClientConfig: &tls.Config{RootCAs: rootCAs, MinVersion: tls.VersionTLS12}, + } + client := &http.Client{Timeout: 5 * time.Second, Transport: transport} + + // Use the hostname in the URL so the TLS ClientHello carries SNI = + // "datarudder.test" and the server cert (DNS SAN only) validates. + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "https://datarudder.test/", nil) + require.NoError(t, err) + + resp, err := client.Do(req) + require.NoError(t, err, "handshake must succeed against a DNS-only-SAN cert when URL host is the hostname") + + defer func() { _ = resp.Body.Close() }() + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +// generateHostnameOnlyTestCert mints a self-signed cert with a DNS SAN for +// hostname but NO IP SAN. The returned CertPool trusts that cert. Together +// they let the test simulate a public-PKI scenario (cert covers hostname, +// client connects via resolved IP) within a single process. +func generateHostnameOnlyTestCert(t *testing.T, hostname string) (tls.Certificate, *x509.CertPool) { + t.Helper() + + key, err := rsa.GenerateKey(rand.Reader, 2048) + require.NoError(t, err) + + template := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{CommonName: hostname}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(time.Hour), + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment | x509.KeyUsageCertSign, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + IsCA: true, + DNSNames: []string{hostname}, + // IPAddresses intentionally omitted — reproduces the production scenario. + } + + derBytes, err := x509.CreateCertificate(rand.Reader, template, template, &key.PublicKey, key) + require.NoError(t, err) + + cert := tls.Certificate{ + Certificate: [][]byte{derBytes}, + PrivateKey: key, + } + + parsed, err := x509.ParseCertificate(derBytes) + require.NoError(t, err) + + pool := x509.NewCertPool() + pool.AddCert(parsed) + + return cert, pool +} + +// TestReadUpstreamDetail exercises the helper directly so the parsing +// branches (empty body, missing key, malformed JSON, body larger than the +// limit) are covered without spinning up an httptest.Server. +func TestReadUpstreamDetail(t *testing.T) { + t.Run("valid JSON with detail field", func(t *testing.T) { + got := readUpstreamDetail(strings.NewReader(`{"detail":"transaction not found"}`)) + assert.Equal(t, "transaction not found", got) + }) + + t.Run("valid JSON without detail field", func(t *testing.T) { + got := readUpstreamDetail(strings.NewReader(`{"error":"x","code":"y"}`)) + assert.Empty(t, got) + }) + + t.Run("empty body", func(t *testing.T) { + got := readUpstreamDetail(strings.NewReader("")) + assert.Empty(t, got) + }) + + t.Run("empty object", func(t *testing.T) { + got := readUpstreamDetail(strings.NewReader(`{}`)) + assert.Empty(t, got) + }) + + t.Run("malformed JSON", func(t *testing.T) { + got := readUpstreamDetail(strings.NewReader(`not even json`)) + assert.Empty(t, got) + }) + + t.Run("body larger than probeBodyReadLimit is truncated, returns empty", func(t *testing.T) { + // Build a body larger than the 4 KiB cap. The LimitReader truncates, + // which corrupts the JSON, so the parse must fail gracefully. + var b strings.Builder + b.WriteString(`{"detail":"`) + b.WriteString(strings.Repeat("x", probeBodyReadLimit*2)) + b.WriteString(`"}`) + + got := readUpstreamDetail(strings.NewReader(b.String())) + assert.Empty(t, got, "truncated JSON must not panic and must return empty") + }) +} diff --git a/pkg/executors/datarudder/create_transaction.go b/pkg/executors/datarudder/create_transaction.go new file mode 100644 index 0000000..8cd8675 --- /dev/null +++ b/pkg/executors/datarudder/create_transaction.go @@ -0,0 +1,46 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +package datarudder + +import ( + "fmt" + + "github.com/LerianStudio/flowker/pkg/executor" + "github.com/LerianStudio/flowker/pkg/executor/base" +) + +// CreateTransactionID is the executor ID for the create-transaction operation. +const CreateTransactionID executor.ID = "datarudder.create-transaction" + +// newCreateTransactionExecutor creates the executor for +// POST /api/v1/fraud/transactions/. +// Submits a transaction to the Delorean anti-fraud API for classification, +// returning a score and an action recommendation (APPROVED/REJECTED/REVIEW). +func newCreateTransactionExecutor() (*base.Executor, error) { + exec, err := base.NewExecutor( + CreateTransactionID, + "Create Transaction", + "DataRudder", + "v1", + ProviderID, + createTransactionSchema, + ) + if err != nil { + return nil, fmt.Errorf("failed to create executor: %w", err) + } + + return exec, nil +} + +// createTransactionSchema is intentionally loose. The Delorean API validates the +// transaction body upstream; coupling our schema tightly to their contract would +// just create maintenance friction without preventing real errors. See the +// approved plan for the expected request shape (transaction object with payer, +// receiver, amount, etc.). +const createTransactionSchema = `{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "description": "Request body for POST /api/v1/fraud/transactions/. Validated by Delorean upstream." +}` diff --git a/pkg/executors/datarudder/datarudder_test.go b/pkg/executors/datarudder/datarudder_test.go new file mode 100644 index 0000000..dab4235 --- /dev/null +++ b/pkg/executors/datarudder/datarudder_test.go @@ -0,0 +1,340 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +//go:build unit + +package datarudder_test + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + + "github.com/LerianStudio/flowker/pkg/executor" + "github.com/LerianStudio/flowker/pkg/executors" + "github.com/LerianStudio/flowker/pkg/executors/datarudder" + "github.com/santhosh-tekuri/jsonschema/v6" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestRegister_ProviderAndExecutorAreCatalogged verifies that RegisterDefaults +// adds DataRudder to the catalog with its one executor reachable via runner. +func TestRegister_ProviderAndExecutorAreCatalogged(t *testing.T) { + catalog := executor.NewCatalog() + require.NoError(t, executors.RegisterDefaults(catalog)) + + provider, err := catalog.GetProvider("datarudder") + require.NoError(t, err) + assert.Equal(t, "DataRudder", provider.Name()) + assert.Contains(t, provider.Description(), "anti-fraud") + + runner, err := catalog.GetRunner(datarudder.CreateTransactionID) + require.NoError(t, err) + require.NotNil(t, runner, "runner for datarudder.create-transaction must be registered") +} + +// TestProviderConfigSchema_Valid ensures the published JSON Schema compiles and +// accepts a representative happy-path config. +func TestProviderConfigSchema_Valid(t *testing.T) { + schema := compileProviderSchema(t) + + cfg := map[string]any{ + "base_url": "https://inference.api-stg.delorean-ai.com", + "auth": map[string]any{ + "token_url": "https://inference.api-stg.delorean-ai.com/api/v1/auth/login", + "client_id": "id", + "client_secret": "secret", + }, + } + + require.NoError(t, schema.Validate(cfg)) +} + +// TestProviderConfigSchema_RejectsExtraTopLevelField proves additionalProperties:false +// at the top level catches typos. +func TestProviderConfigSchema_RejectsExtraTopLevelField(t *testing.T) { + schema := compileProviderSchema(t) + + cfg := map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "id", + "client_secret": "secret", + }, + "unexpected_field": "boom", + } + + require.Error(t, schema.Validate(cfg), "additionalProperties:false must reject unknown top-level fields") +} + +// TestProviderConfigSchema_RejectsExtraAuthField proves additionalProperties:false +// inside auth catches typos like 'token_uri' instead of 'token_url'. +func TestProviderConfigSchema_RejectsExtraAuthField(t *testing.T) { + schema := compileProviderSchema(t) + + cfg := map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "id", + "client_secret": "secret", + "token_uri": "https://example.com/typo", // typo + }, + } + + require.Error(t, schema.Validate(cfg), "additionalProperties:false in auth must reject typos") +} + +// TestProviderConfigSchema_RejectsInvalidConfigs enumerates every required +// field and every documented constraint (minLength, format) and asserts the +// schema rejects configs violating them. Coverage gap from review HIGH-2. +func TestProviderConfigSchema_RejectsInvalidConfigs(t *testing.T) { + schema := compileProviderSchema(t) + + // validAuth returns a fresh, fully-populated auth map so subtests can + // mutate copies without leaking state across runs. + validAuth := func() map[string]any { + return map[string]any{ + "token_url": "https://example.com/token", + "client_id": "id", + "client_secret": "secret", + } + } + + cases := []struct { + name string + cfg map[string]any + mustMention string // substring the validation error must reference + }{ + { + name: "missing base_url", + cfg: map[string]any{"auth": validAuth()}, + mustMention: "base_url", + }, + { + name: "missing auth", + cfg: map[string]any{"base_url": "https://example.com"}, + mustMention: "auth", + }, + { + name: "missing auth.token_url", + cfg: map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "client_id": "id", + "client_secret": "secret", + }, + }, + mustMention: "token_url", + }, + { + name: "missing auth.client_id", + cfg: map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_secret": "secret", + }, + }, + mustMention: "client_id", + }, + { + name: "missing auth.client_secret", + cfg: map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "id", + }, + }, + mustMention: "client_secret", + }, + { + name: "empty-string auth.client_id violates minLength:1", + cfg: map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "", + "client_secret": "secret", + }, + }, + mustMention: "client_id", + }, + { + name: "empty-string auth.client_secret violates minLength:1", + cfg: map[string]any{ + "base_url": "https://example.com", + "auth": map[string]any{ + "token_url": "https://example.com/token", + "client_id": "id", + "client_secret": "", + }, + }, + mustMention: "client_secret", + }, + // NOTE: "format": "uri" is annotation-only under JSON Schema Draft + // 2020-12 by default — the santhosh-tekuri/jsonschema/v6 compiler used + // in validateConfigAgainstSchema does NOT assert formats. A + // "not a uri" string in base_url / token_url would pass schema + // validation today. Enforcement would require enabling format + // assertions globally on the shared compiler in + // internal/services/command/create_provider_config.go — a cross-cutting + // change owned by the Flowker team. Documented here so the gap is + // visible if a future operator misconfiguration traces back to it. + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := schema.Validate(tc.cfg) + require.Error(t, err, "schema must reject this config") + assert.Contains(t, err.Error(), tc.mustMention, + "validation error must reference the offending field/constraint") + }) + } +} + +// TestIntegration_CreateTransaction_EndToEnd is the intra-package integration +// test: stands up mock login + mock fraud endpoints via httptest.Server, drives +// the full chain (provider config → BuildInput → runner.Execute → response). +// Proves that the OAuth2 flow + body translation + Bearer header injection all +// work together against a DataRudder-shaped API. +func TestIntegration_CreateTransaction_EndToEnd(t *testing.T) { + var ( + tokenRequestBody string + tokenAuthHeader string + fraudAuthHeader string + fraudRequestBody map[string]any + ) + + mux := http.NewServeMux() + + mux.HandleFunc("/api/v1/auth/login", func(w http.ResponseWriter, r *http.Request) { + // Capture the Authorization header so we can prove no Basic credential + // leaked out when credentials_location is "body" (hardcoded in + // buildDataRudderAuth). + tokenAuthHeader = r.Header.Get("Authorization") + + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + defer r.Body.Close() + tokenRequestBody = string(body) + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "access_token": "test-bearer-12345", + "token_type": "Bearer", + "expires_in": 3600, + }) + }) + + mux.HandleFunc("/api/v1/fraud/transactions/", func(w http.ResponseWriter, r *http.Request) { + fraudAuthHeader = r.Header.Get("Authorization") + + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + defer r.Body.Close() + require.NoError(t, json.Unmarshal(body, &fraudRequestBody)) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]any{ + "id": "4b425c1c-a9e8-4f7e-b7e7-4cc201c3f798", + "score": 0.0, + "action": "APPROVED", + }) + }) + + server := httptest.NewServer(mux) + defer server.Close() + + cfg := map[string]any{ + "base_url": server.URL, + "auth": map[string]any{ + "token_url": server.URL + "/api/v1/auth/login", + "client_id": "the-client", + "client_secret": "the-secret", + }, + } + + body, err := json.Marshal(map[string]any{ + "transaction": map[string]any{ + "id": "4b425c1c-a9e8-4f7e-b7e7-4cc201c3f798", + "amount_total": 101, + "currency_code": "BRL", + }, + }) + require.NoError(t, err) + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, body) + require.NoError(t, err) + + catalog := executor.NewCatalog() + require.NoError(t, executors.RegisterDefaults(catalog)) + runner, err := catalog.GetRunner(datarudder.CreateTransactionID) + require.NoError(t, err) + + result, err := runner.Execute(context.Background(), input) + require.NoError(t, err) + assert.Equal(t, executor.ExecutionStatusSuccess, result.Status, "expected success, got error: %s", result.Error) + + // Token endpoint must have received credentials in form body (not Basic header). + // Parse the form body so the assertions match exact values, not substrings — + // otherwise client_id="the-client" would match a literal "the-client-extra". + parsedForm, parseErr := url.ParseQuery(tokenRequestBody) + require.NoError(t, parseErr, "token endpoint body must be valid form-urlencoded") + assert.Equal(t, "the-client", parsedForm.Get("client_id")) + assert.Equal(t, "the-secret", parsedForm.Get("client_secret")) + assert.False(t, parsedForm.Has("grant_type"), "grant_type must NOT be sent by default") + + // credentials_location="body" contract: NO Authorization header on the + // token request. If a future change accidentally also sends Basic auth, + // this assertion catches it. + assert.Empty(t, tokenAuthHeader, "credentials_location=body must not also send an Authorization header") + + // Fraud endpoint must have received the Bearer token. + assert.Equal(t, "Bearer test-bearer-12345", fraudAuthHeader) + + // Fraud endpoint must have received the transaction body intact. + txn, ok := fraudRequestBody["transaction"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "4b425c1c-a9e8-4f7e-b7e7-4cc201c3f798", txn["id"]) + assert.Equal(t, "BRL", txn["currency_code"]) + + // Response must surface the score/action. + respBody, ok := result.Data["body"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "APPROVED", respBody["action"]) +} + +// compileProviderSchema extracts and compiles the published provider schema +// for use in validation tests. Goes through the catalog so we test what +// callers actually see. +func compileProviderSchema(t *testing.T) *jsonschema.Schema { + t.Helper() + + catalog := executor.NewCatalog() + require.NoError(t, executors.RegisterDefaults(catalog)) + provider, err := catalog.GetProvider("datarudder") + require.NoError(t, err) + + schemaStr := provider.ConfigSchema() + require.NotEmpty(t, schemaStr) + + compiler := jsonschema.NewCompiler() + raw, err := jsonschema.UnmarshalJSON(strings.NewReader(schemaStr)) + require.NoError(t, err) + require.NoError(t, compiler.AddResource("schema.json", raw)) + + compiled, err := compiler.Compile("schema.json") + require.NoError(t, err) + return compiled +} diff --git a/pkg/executors/datarudder/input_builder.go b/pkg/executors/datarudder/input_builder.go new file mode 100644 index 0000000..68d08aa --- /dev/null +++ b/pkg/executors/datarudder/input_builder.go @@ -0,0 +1,95 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +package datarudder + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/LerianStudio/flowker/pkg/executor" +) + +// executorMeta holds the HTTP routing metadata for a DataRudder executor. +type executorMeta struct { + method string + path string +} + +// executorRoutes maps each DataRudder executor to its HTTP routing metadata. +// Mirrors the Midaz pattern so adding executors later is just a new map entry. +// Paths are hardcoded (NOT configurable via ProviderConfiguration) as a +// security measure — see the approved plan's "Decisão arquitetural" section. +var executorRoutes = map[executor.ID]executorMeta{ + CreateTransactionID: {"POST", "/api/v1/fraud/transactions/"}, +} + +// BuildInput constructs an ExecutionInput for a DataRudder executor. +// It selects the routing metadata, normalizes the base URL (drops trailing +// slash to avoid double-slash), translates the nested auth block into the +// shape the HTTP runner expects, and forwards the request body. +// +// nodeData is currently unused — DataRudder operations have no path parameters +// or per-node config. The parameter is kept to honor the InputBuilder contract. +func BuildInput(providerConfig map[string]any, executorID executor.ID, _ map[string]any, requestBody []byte) (executor.ExecutionInput, error) { + route, ok := executorRoutes[executorID] + if !ok { + return executor.ExecutionInput{}, fmt.Errorf("unknown DataRudder executor: %s", executorID) + } + + baseURL, ok := providerConfig["base_url"].(string) + if !ok || baseURL == "" { + return executor.ExecutionInput{}, fmt.Errorf("missing required field %q in provider config", "base_url") + } + + fullURL := strings.TrimRight(baseURL, "/") + route.path + + authConfig := buildDataRudderAuth(providerConfig) + + config := map[string]any{ + "method": route.method, + "url": fullURL, + } + + if authConfig != nil { + config["auth"] = authConfig + } + + if len(requestBody) > 0 && route.method == "POST" { + var body any + if err := json.Unmarshal(requestBody, &body); err == nil { + config["body"] = body + } else { + config["body"] = string(requestBody) + } + } + + return executor.ExecutionInput{Config: config}, nil +} + +// buildDataRudderAuth translates the nested DataRudder auth block into the +// shape that the HTTP auth factory (auth.NewFromConfig) expects for the +// oauth2_token_endpoint provider. +// +// Fields are extracted explicitly (not passed as a raw map) to avoid leaking +// unrelated keys into the auth provider. credentials_location is hardcoded to +// "body" because that's what the Delorean token endpoint requires — it is +// intentionally not exposed in the provider config schema. +func buildDataRudderAuth(providerConfig map[string]any) map[string]any { + authBlock, ok := providerConfig["auth"].(map[string]any) + if !ok || len(authBlock) == 0 { + return nil + } + + return map[string]any{ + "type": "oauth2_token_endpoint", + "config": map[string]any{ + "token_url": authBlock["token_url"], + "client_id": authBlock["client_id"], + "client_secret": authBlock["client_secret"], + "credentials_location": "body", + }, + } +} diff --git a/pkg/executors/datarudder/input_builder_test.go b/pkg/executors/datarudder/input_builder_test.go new file mode 100644 index 0000000..41e728d --- /dev/null +++ b/pkg/executors/datarudder/input_builder_test.go @@ -0,0 +1,146 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +//go:build unit + +package datarudder_test + +import ( + "encoding/json" + "testing" + + "github.com/LerianStudio/flowker/pkg/executors/datarudder" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func validProviderConfig() map[string]any { + return map[string]any{ + "base_url": "https://inference.api-stg.delorean-ai.com", + "auth": map[string]any{ + "token_url": "https://inference.api-stg.delorean-ai.com/api/v1/auth/login", + "client_id": "test-client", + "client_secret": "test-secret", + }, + } +} + +func TestBuildInput_CreateTransaction_HappyPath(t *testing.T) { + cfg := validProviderConfig() + + body, err := json.Marshal(map[string]any{ + "transaction": map[string]any{ + "id": "4b425c1c-a9e8-4f7e-b7e7-4cc201c3f798", + "amount": 101, + }, + }) + require.NoError(t, err) + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, body) + require.NoError(t, err) + + assert.Equal(t, "POST", input.Config["method"]) + assert.Equal(t, "https://inference.api-stg.delorean-ai.com/api/v1/fraud/transactions/", input.Config["url"]) + + authBlock, ok := input.Config["auth"].(map[string]any) + require.True(t, ok, "auth block must be present") + assert.Equal(t, "oauth2_token_endpoint", authBlock["type"]) + + authConfig, ok := authBlock["config"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "https://inference.api-stg.delorean-ai.com/api/v1/auth/login", authConfig["token_url"]) + assert.Equal(t, "test-client", authConfig["client_id"]) + assert.Equal(t, "test-secret", authConfig["client_secret"]) + assert.Equal(t, "body", authConfig["credentials_location"], "credentials_location must be hardcoded to body") + + parsedBody, ok := input.Config["body"].(map[string]any) + require.True(t, ok) + txn, ok := parsedBody["transaction"].(map[string]any) + require.True(t, ok) + assert.Equal(t, float64(101), txn["amount"]) +} + +func TestBuildInput_TrimsTrailingSlashOnBaseURL(t *testing.T) { + cfg := validProviderConfig() + cfg["base_url"] = "https://inference.api-stg.delorean-ai.com/" + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, nil) + require.NoError(t, err) + assert.Equal(t, "https://inference.api-stg.delorean-ai.com/api/v1/fraud/transactions/", input.Config["url"], + "trailing slash on base_url must be normalized — no double-slash in final URL") +} + +func TestBuildInput_TrimsMultipleTrailingSlashes(t *testing.T) { + cfg := validProviderConfig() + cfg["base_url"] = "https://inference.api-stg.delorean-ai.com///" + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, nil) + require.NoError(t, err) + assert.Equal(t, "https://inference.api-stg.delorean-ai.com/api/v1/fraud/transactions/", input.Config["url"]) +} + +func TestBuildInput_UnknownExecutorID(t *testing.T) { + cfg := validProviderConfig() + + _, err := datarudder.BuildInput(cfg, "datarudder.unknown-op", nil, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "unknown DataRudder executor") +} + +func TestBuildInput_MissingBaseURL(t *testing.T) { + cfg := validProviderConfig() + delete(cfg, "base_url") + + _, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "base_url") +} + +func TestBuildInput_EmptyBaseURL(t *testing.T) { + cfg := validProviderConfig() + cfg["base_url"] = "" + + _, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "base_url") +} + +func TestBuildInput_NoAuthBlock_OmitsAuth(t *testing.T) { + cfg := map[string]any{ + "base_url": "https://example.com", + } + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, nil) + require.NoError(t, err) + _, has := input.Config["auth"] + assert.False(t, has, "auth must be omitted when no auth block in provider config") +} + +func TestBuildInput_BodyAsStringWhenNotJSON(t *testing.T) { + cfg := validProviderConfig() + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, []byte("not-json{")) + require.NoError(t, err) + assert.Equal(t, "not-json{", input.Config["body"], "non-JSON body must pass through as raw string") +} + +func TestBuildInput_NoBody(t *testing.T) { + cfg := validProviderConfig() + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nil, nil) + require.NoError(t, err) + _, has := input.Config["body"] + assert.False(t, has, "body must be omitted when requestBody is nil/empty") +} + +func TestBuildInput_NodeDataIgnored(t *testing.T) { + cfg := validProviderConfig() + + // nodeData is ignored for DataRudder (no path params). Pass garbage to prove it. + nodeData := map[string]any{"unrelated": "should-be-ignored"} + + input, err := datarudder.BuildInput(cfg, datarudder.CreateTransactionID, nodeData, nil) + require.NoError(t, err) + assert.Equal(t, "https://inference.api-stg.delorean-ai.com/api/v1/fraud/transactions/", input.Config["url"]) +} diff --git a/pkg/executors/datarudder/main_test.go b/pkg/executors/datarudder/main_test.go new file mode 100644 index 0000000..92e3b14 --- /dev/null +++ b/pkg/executors/datarudder/main_test.go @@ -0,0 +1,27 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +//go:build unit + +package datarudder + +import ( + "os" + "testing" + + "github.com/LerianStudio/flowker/pkg/safehttp" +) + +// TestMain flips the safehttp SSRF policy to allow private/loopback ranges for +// the duration of the datarudder test suite. The probe tests use httptest.NewServer +// (which binds to 127.0.0.1) and route through applyDataRudderAuth → +// auth.NewFromConfig → NewTokenFetcher which wraps the HTTP client with +// safehttp.NewClient — the SafeTransport.DialContext would otherwise block the +// loopback dial. The override is restored on exit even when the suite panics. +func TestMain(m *testing.M) { + previous := safehttp.SetAllowPrivateForTest(true) + code := m.Run() + safehttp.SetAllowPrivateForTest(previous) + os.Exit(code) +} diff --git a/pkg/executors/datarudder/provider.go b/pkg/executors/datarudder/provider.go new file mode 100644 index 0000000..898be7a --- /dev/null +++ b/pkg/executors/datarudder/provider.go @@ -0,0 +1,116 @@ +// Copyright (c) 2026 Lerian Studio. All rights reserved. +// Use of this source code is governed by the Elastic License 2.0 +// that can be found in the LICENSE file. + +// Package datarudder provides the DataRudder anti-fraud provider for transaction +// risk scoring via the Delorean product (https://delorean-ai.com). Authentication +// uses OAuth2 via the oauth2_token_endpoint outbound auth provider. +package datarudder + +import ( + "fmt" + + "github.com/LerianStudio/flowker/pkg/executor" + "github.com/LerianStudio/flowker/pkg/executor/base" + httpExecutor "github.com/LerianStudio/flowker/pkg/executors/http" +) + +// ProviderID is the unique identifier for the DataRudder provider. +const ProviderID executor.ProviderID = "datarudder" + +// datarudderProvider wraps base.Provider and implements executor.InputBuilder +// plus executor.ConnectivityProber to provide DataRudder-specific URL routing, +// OAuth2 auth translation, and a Delorean-API-aware connectivity probe. +type datarudderProvider struct { + *base.Provider +} + +// BuildInput implements executor.InputBuilder for DataRudder-specific execution input. +func (p *datarudderProvider) BuildInput(providerConfig map[string]any, executorID executor.ID, nodeData map[string]any, requestBody []byte) (executor.ExecutionInput, error) { + return BuildInput(providerConfig, executorID, nodeData, requestBody) +} + +// Compile-time check that datarudderProvider implements Provider, InputBuilder, +// and ConnectivityProber. +var ( + _ executor.Provider = (*datarudderProvider)(nil) + _ executor.InputBuilder = (*datarudderProvider)(nil) + _ executor.ConnectivityProber = (*datarudderProvider)(nil) +) + +// Register registers the DataRudder provider with its create-transaction executor +// into the given catalog. +func Register(catalog executor.Catalog) error { + if catalog == nil { + return nil + } + + baseProvider, err := base.NewProvider( + ProviderID, + "DataRudder", + "DataRudder anti-fraud provider for transaction risk scoring via the Delorean product", + "v1", + providerConfigSchema, + ) + if err != nil { + return fmt.Errorf("failed to create DataRudder provider: %w", err) + } + + provider := &datarudderProvider{Provider: baseProvider} + + createTxExec, err := newCreateTransactionExecutor() + if err != nil { + return fmt.Errorf("failed to create DataRudder create-transaction executor: %w", err) + } + + return catalog.RegisterProvider(provider, []executor.ExecutorRegistration{ + { + Executor: createTxExec, + Runner: httpExecutor.NewRunner(), + }, + }) +} + +// providerConfigSchema defines the connection and authentication settings for a +// Delorean instance. Auth is nested under "auth" (same shape as Midaz). The +// schema uses additionalProperties:false at both levels to reject typos with a +// clear validation error instead of silently dropping the field. +// +// credentials_location is intentionally NOT exposed — the Delorean token endpoint +// expects credentials in the form body, and that's hardcoded in buildDataRudderAuth. +const providerConfigSchema = `{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "additionalProperties": false, + "required": ["base_url", "auth"], + "properties": { + "base_url": { + "type": "string", + "format": "uri", + "description": "Delorean API base URL (e.g., https://inference.api-stg.delorean-ai.com)" + }, + "auth": { + "type": "object", + "additionalProperties": false, + "required": ["token_url", "client_id", "client_secret"], + "description": "OAuth2 token-endpoint authentication for the Delorean API", + "properties": { + "token_url": { + "type": "string", + "format": "uri", + "description": "OAuth2 token endpoint URL (e.g., https://inference.api-stg.delorean-ai.com/api/v1/auth/login)" + }, + "client_id": { + "type": "string", + "minLength": 1, + "description": "OAuth2 client ID issued by DataRudder" + }, + "client_secret": { + "type": "string", + "minLength": 1, + "description": "OAuth2 client secret issued by DataRudder" + } + } + } + } +}` diff --git a/pkg/executors/register.go b/pkg/executors/register.go index 56ec456..51e5294 100644 --- a/pkg/executors/register.go +++ b/pkg/executors/register.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/LerianStudio/flowker/pkg/executor" + "github.com/LerianStudio/flowker/pkg/executors/datarudder" "github.com/LerianStudio/flowker/pkg/executors/midaz" "github.com/LerianStudio/flowker/pkg/executors/tracer" ) @@ -30,5 +31,9 @@ func RegisterDefaults(catalog executor.Catalog) error { return fmt.Errorf("failed to register Tracer provider: %w", err) } + if err := datarudder.Register(catalog); err != nil { + return fmt.Errorf("failed to register DataRudder provider: %w", err) + } + return nil } diff --git a/pkg/executors/register_test.go b/pkg/executors/register_test.go index 7382e72..16d8963 100644 --- a/pkg/executors/register_test.go +++ b/pkg/executors/register_test.go @@ -75,5 +75,5 @@ func TestRegisterDefaults_AllProvidersCount(t *testing.T) { require.NoError(t, err) providers := catalog.ListProviders() - assert.Len(t, providers, 2) + assert.Len(t, providers, 3) }