Skip to content
Open
87 changes: 87 additions & 0 deletions docs/pre-dev/datarudder/tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Tasks — datarudder

**Reference plan**: `C:\Users\SHMZ\.claude\plans\giggly-waddling-wind.md`
**Branch**: `feature/datarudder-provider` (from `feature/oauth2-token-endpoint-auth`, PR target `develop`)
**Feature scope**: Add public Provider + Executor `datarudder.create-transaction` to the catalog, consuming the `oauth2_token_endpoint` outbound auth shipped in the parent branch. Integration target: Delorean anti-fraud API (`inference.api-stg.delorean-ai.com`).

---

## Task T-001: DataRudder Provider + create-transaction Executor

**Goal**: Expose the `oauth2_token_endpoint` auth feature to end users via a public catalog Provider that integrates with Delorean's fraud transactions API. Mirrors Midaz architectural pattern (`BuildInput` + `ConnectivityProber`) — not Tracer (which uses static api_key).

**Driver**: The HTTP provider that hosts `oauth2_token_endpoint` is internal-only (not in the catalog). Without a public Provider that uses it, end users can't configure or test the new auth method.

**Acceptance criteria**:
- [ ] `pkg/executors/datarudder/` package exists with `Register()` exposing `ProviderID="datarudder"` and one executor `datarudder.create-transaction`
- [ ] `providerConfigSchema` NESTED: required top-level `base_url`+`auth`; required inside `auth`: `token_url`+`client_id`+`client_secret`; `additionalProperties: false` at both levels
- [ ] `datarudderProvider` struct implements `executor.{Provider, InputBuilder, ConnectivityProber}` (compile-time checks)
- [ ] `BuildInput` normalizes `base_url` via `strings.TrimSuffix(..., "/")` and uses `executorRoutes` map (Midaz pattern)
- [ ] `buildDataRudderAuth` extracts 3 fields explicitly + hardcodes `credentials_location="body"` (not exposed in schema)
- [ ] `ConnectivityProber.Probe` issues `GET {base_url}/api/v1/fraud/transactions/?limit=1&offset=0` (list endpoint returns 200 natively for an authenticated request — no status remapping required); surfaces upstream `detail` in `ProbeOutcome.Details["upstream_detail"]` on error bodies
- [ ] SSRF pre-flight uses `safehttp.Validate` (which preserves the URL hostname so TLS SNI / certificate verification stays correct — the legacy `libSSRF.ResolveAndValidate + PinnedURL` pattern is intentionally NOT used). Dial-time enforcement is handled by the safehttp-wrapped HTTP client passed in by the command layer.
- [ ] `AuthRequired()` returns `true`
- [ ] `pkg/executors/register.go` wires `datarudder.Register(catalog)` in `RegisterDefaults`
- [ ] All existing tests still pass (`make test`)
- [ ] Coverage ≥ 85% on changed files
- [ ] `make lint` clean

---

### Subtask ST-001-01: Provider scaffolding + schema + executor

**Files to create**:
- `pkg/executors/datarudder/provider.go` — `Register`, `ProviderID`, NESTED `providerConfigSchema` with `additionalProperties: false`, `datarudderProvider` struct embedding `*base.Provider`
- `pkg/executors/datarudder/create_transaction.go` — `CreateTransactionID` const + `newCreateTransactionExecutor()` returning `*base.Executor` with loose schema (`{"type": "object"}`)

**Files to modify**:
- `pkg/executors/register.go` — add `datarudder.Register(catalog)` + import

---

### Subtask ST-001-02: BuildInput + auth translation

**Files to create**:
- `pkg/executors/datarudder/input_builder.go` — `executorRoutes` map (1 entry: POST + `/api/v1/fraud/transactions/`), package `BuildInput()` + method on `datarudderProvider` that delegates, `buildDataRudderAuth()` with explicit field extraction + hardcoded `credentials_location="body"`

**Design constraints (from approved plan)**:
- `BuildInput` must `strings.TrimSuffix(baseURL, "/")` before concatenating with the path
- Path hardcoded as const (NOT configurable via ProviderConfiguration — security)
- `buildDataRudderAuth` returns `nil` when `auth` block is missing/empty (not an error — runner short-circuits)

---

### Subtask ST-001-03: ConnectivityProber

**Files to create**:
- `pkg/executors/datarudder/connectivity_prober.go` — `Probe()`, `AuthRequired()`, `extractDataRudderProbeTargets()`, `probeTransactionsListPath` const (`/api/v1/fraud/transactions/?limit=1&offset=0`), SSRF pre-flight via `safehttp.Validate(probeCtx, baseURL, nil)`, response body JSON parse to populate `Details["upstream_detail"]`
- `pkg/executors/datarudder/main_test.go` — `TestMain` flips `safehttp.SetAllowPrivateForTest(true)` for the suite (mirrors `pkg/executors/midaz/main_test.go`). NO local `ssrfOptions()` helper / SSRF cache: the package consumes `safehttp.Validate` for pre-flight and the safehttp-wrapped HTTP client (built by the command layer) for dial-time enforcement.

**SSRF strategy**: the prober uses `safehttp.Validate` which preserves the URL hostname (the godoc explicitly says *"callers must use the original URL so TLS handshake sees the correct SNI"*). The legacy `libSSRF.ResolveAndValidate + PinnedURL + req.Host = Authority + custom withPinnedSNI` pattern is intentionally NOT used — substituting an IP literal into `req.URL.Host` breaks HTTPS handshake against any cert that only has a DNS SAN (the exact bug observed against Delorean staging). No local `ssrfOptions` helper is needed: `safehttp.Options()` is the single source of truth, and per-test policy overrides go through `safehttp.SetAllowPrivateForTest`.

**Outcome mapping** (from approved plan — must cover all in tests):
- 200 (list endpoint natural success) → `Reached=true, AuthApplied=true`
- 401/403 → `Reached=true, AuthApplied=true` (the shared command layer classifies status → Auth=Failed; Reached + AuthApplied describe what happened on the wire, not the semantic outcome)
- 5xx → `Reached=true, AuthApplied=true` (shared command layer classifies status >= 500)
- Token endpoint failure → `Reached=false, AuthApplied=false` (probe URL never reached because auth resolution short-circuited)
- DNS/TCP error against probe URL → `Reached=false, AuthApplied=true` (auth succeeded; only the probe-URL hop failed)
- SSRF blocked → `Reached=false, AuthApplied=false`

---

### Subtask ST-001-04: Tests (TDD)

**Files to create**:
- `pkg/executors/datarudder/datarudder_test.go` — register + schema validity + intra-package integration test (mock `POST /api/v1/auth/login` + mock `POST /api/v1/fraud/transactions/` via `httptest.Server`)
- `pkg/executors/datarudder/input_builder_test.go` — happy path, missing fields, auth block translation, `base_url` with/without trailing slash
- `pkg/executors/datarudder/connectivity_prober_test.go` — cover all 6 outcome scenarios from the table above, plus `AuthRequired()`, missing required fields, `upstream_detail` surfacing

**TDD flow**: RED (failing tests) → GREEN (minimum code) → REFACTOR. Target ≥85% coverage on the new package.

---

## Out-of-cycle (post-implementation)

- **CHANGELOG**: do NOT edit manually. Auto-generated by `.github/workflows/gptchangelog.yml`. Use Conventional Commits (`feature(datarudder): ...`, `test(datarudder): ...`).
- **Smoke test against Delorean staging**: manual, after PR merge. `POST /v1/provider-configurations/<id>/test` against a real `inference.api-stg.delorean-ai.com` provider config — expect Connectivity=Reached, Auth=Applied, E2E StatusCode=200 against the list endpoint (`/api/v1/fraud/transactions/?limit=1&offset=0`). `upstream_detail` is populated only on error bodies.
- **Production URL**: when Delorean prod URL becomes available, swap is zero-code — just update `ProviderConfiguration` via API.
238 changes: 238 additions & 0 deletions pkg/executors/datarudder/connectivity_prober.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Copyright (c) 2026 Lerian Studio. All rights reserved.
// Use of this source code is governed by the Elastic License 2.0
// that can be found in the LICENSE file.

package datarudder

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/LerianStudio/flowker/pkg/executor"
"github.com/LerianStudio/flowker/pkg/executors/http/auth"
"github.com/LerianStudio/flowker/pkg/safehttp"
)

// probeTransactionsListPath is the read-only Delorean endpoint used to validate
// connectivity + authentication + endpoint reachability in a single call. The
// list endpoint returns 200 with a (possibly empty) result page for an
// authenticated request — a natural success signal that does not require any
// status remapping. The limit=1 + offset=0 query keeps the response payload at
// the smallest meaningful size.
const probeTransactionsListPath = "/api/v1/fraud/transactions/?limit=1&offset=0"

// probeTimeout caps the total wall time of a single connectivity probe so the
// command does not hang on a slow upstream. It MUST stay below the command-
// level test timeout to give the caller a chance to record the error.
const probeTimeout = 10 * time.Second

// probeBodyReadLimit caps how many bytes of the probe response body we read
// when extracting the upstream "detail" field. Delorean's error bodies are
// small; 4 KiB is generous without risking memory blow-up on a hostile server.
const probeBodyReadLimit = 4 * 1024

// AuthRequired reports that the DataRudder provider needs OAuth2 credentials
// to function. When credentials are absent the generic connectivity-test
// command short-circuits to Auth=Failed without attempting the probe.
func (p *datarudderProvider) AuthRequired() bool { return true }

// Probe issues a single read-only GET against the Delorean fraud transactions
// list endpoint, applying OAuth2 auth via the existing pkg/executors/http/auth
// machinery. The returned ProbeOutcome is the single source of truth that the
// generic command maps onto the three stage results.
//
// SSRF protection is enforced in two layers via pkg/safehttp:
// 1. safehttp.Validate performs DNS resolution + IP-policy pre-flight on the
// original URL (no PinnedURL substitution — TLS SNI stays correct).
// 2. The caller-supplied httpClient is already wrapped by safehttp.NewClient
// (in the command layer), whose Transport.DialContext re-resolves DNS at
// dial time and rejects blocked IPs atomically with the dial — closing
// the TOCTOU/DNS-rebinding window.
//
// Failure modes, in the order they are detected:
// 1. Missing/invalid base_url → Reached=false.
// 2. SSRF/DNS validation fails → Reached=false (TransportErr describes the block).
// 3. Auth provider construction or token exchange fails → Reached=false (the
// probe URL was never reached), AuthApplied=false.
// 4. HTTP transport error against base_url → Reached=false, AuthApplied=true
// (auth succeeded; only the probe-URL hop failed).
// 5. HTTP response received → Reached=true, AuthApplied=true; StatusCode
// carries the result. The list endpoint returns 200 on success — no
// status remapping is required. Status classification for 401/403/5xx is
// left to the shared command layer (applyProbeOutcome) so behaviour stays
// consistent across providers.
func (p *datarudderProvider) Probe(ctx context.Context, cfg map[string]any, httpClient *http.Client) executor.ProbeOutcome {
start := time.Now()

if httpClient == nil {
httpClient = &http.Client{Timeout: probeTimeout}
}

// Derive a single timed context up front so both phases (SSRF validation
// and the HTTP request) share one end-to-end wall-time budget. Deriving
// it later — around the HTTP request only — would leave DNS/SSRF
// validation uncapped, contradicting probeTimeout's documented contract.
probeCtx, cancel := context.WithTimeout(ctx, probeTimeout)
defer cancel()

baseURL, err := extractDataRudderProbeTargets(cfg)
if err != nil {
return executor.ProbeOutcome{
URL: baseURL,
Reached: false,
DurationMs: time.Since(start).Milliseconds(),
TransportErr: err,
}
}

baseURL = strings.TrimRight(baseURL, "/")
probePath := probeTransactionsListPath
probeURL := baseURL + probePath

// SSRF pre-flight on the original URL. safehttp.Validate intentionally
// preserves the hostname (no PinnedURL substitution) so the subsequent
// TLS handshake sees the correct SNI. Dial-time enforcement in the
// safehttp-wrapped transport closes the TOCTOU window atomically.
if err := safehttp.Validate(probeCtx, baseURL, nil); err != nil {
return executor.ProbeOutcome{
URL: probeURL,
Reached: false,
DurationMs: time.Since(start).Milliseconds(),
TransportErr: fmt.Errorf("SSRF blocked base_url: %w", err),
}
}

req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, probeURL, nil)
if err != nil {
return executor.ProbeOutcome{
URL: probeURL,
Reached: false,
DurationMs: time.Since(start).Milliseconds(),
TransportErr: fmt.Errorf("build probe request: %w", err),
}
}

if err := applyDataRudderAuth(probeCtx, req, cfg, httpClient); err != nil {
// The probe URL (base_url + path) was never reached because auth
// resolution / token exchange / header injection failed before the
// request could be dispatched. Honour the Reached field's documented
// contract (transport completed on the probe URL).
return executor.ProbeOutcome{
URL: probeURL,
Reached: false,
AuthApplied: false,
DurationMs: time.Since(start).Milliseconds(),
TransportErr: err,
}
}

resp, err := httpClient.Do(req)
if err != nil {
return executor.ProbeOutcome{
URL: probeURL,
Reached: false,
AuthApplied: true, // auth succeeded; only the probe-URL hop failed
DurationMs: time.Since(start).Milliseconds(),
TransportErr: fmt.Errorf("probe request failed: %w", err),
}
}

defer func() {
// Drain any unread bytes (anything beyond probeBodyReadLimit) so the
// connection can be returned to the keep-alive pool.
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}()

details := map[string]any{}
if detail := readUpstreamDetail(resp.Body); detail != "" {
details["upstream_detail"] = detail
}

// Status classification for 401/403/5xx is left to the shared command
// layer (applyProbeOutcome). Owning that decision here would drift from
// the Midaz prober contract and double-classify the outcome.
return executor.ProbeOutcome{
URL: probeURL,
Reached: true,
AuthApplied: true,
StatusCode: resp.StatusCode,
DurationMs: time.Since(start).Milliseconds(),
Details: details,
}
}

// extractDataRudderProbeTargets pulls the base_url from a provider config and
// validates that the required auth credentials are present. Returns the
// base_url even on error so the caller can surface it for diagnostics.
func extractDataRudderProbeTargets(cfg map[string]any) (string, error) {
baseURL, ok := cfg["base_url"].(string)
if !ok || baseURL == "" {
return "", errors.New(`missing required field "base_url" in provider config`)
}

authBlock, ok := cfg["auth"].(map[string]any)
if !ok || len(authBlock) == 0 {
return baseURL, errors.New("provider requires credentials but no auth block configured")
}

for _, key := range []string{"token_url", "client_id", "client_secret"} {
v, ok := authBlock[key].(string)
if !ok || v == "" {
return baseURL, fmt.Errorf(`missing required field "auth.%s" in provider config`, key)
}
}

return baseURL, nil
}

// applyDataRudderAuth builds the auth provider from the nested auth block and
// applies it to the request. Returns nil on success; the underlying error on
// any token exchange or header injection failure. The caller treats a non-nil
// return as "AuthApplied=false, probe URL never reached".
func applyDataRudderAuth(ctx context.Context, req *http.Request, cfg map[string]any, httpClient *http.Client) error {
authConfig := buildDataRudderAuth(cfg)
if authConfig == nil {
return errors.New("auth block missing from provider config")
}

authProvider, err := auth.NewFromConfig(authConfig, httpClient)
if err != nil {
return fmt.Errorf("build auth provider: %w", err)
}

if err := authProvider.Apply(ctx, req); err != nil {
return fmt.Errorf("apply oauth2 auth: %w", err)
}

return nil
}

// readUpstreamDetail extracts the "detail" field from a JSON error body if
// present. Delorean's error responses return {"detail": "..."} and surfacing
// it gives operators a human-readable diagnostic without parsing the full
// body. Returns "" on any parse failure — best-effort, never fatal.
func readUpstreamDetail(body io.Reader) string {
limited := io.LimitReader(body, probeBodyReadLimit)

raw, err := io.ReadAll(limited)
if err != nil || len(raw) == 0 {
return ""
}

var parsed struct {
Detail string `json:"detail"`
}

if err := json.Unmarshal(raw, &parsed); err != nil {
return ""
}

return parsed.Detail
}
Loading