diff --git a/cmd/resin/app_runtime.go b/cmd/resin/app_runtime.go index e18dc802..3b01ed2e 100644 --- a/cmd/resin/app_runtime.go +++ b/cmd/resin/app_runtime.go @@ -397,6 +397,7 @@ func (a *resinApp) buildNetworkServers(engine *state.StateEngine) error { ProxyToken: a.envCfg.ProxyToken, Router: a.topoRuntime.router, Pool: a.topoRuntime.pool, + PlatformLookup: a.topoRuntime.pool, Health: a.topoRuntime.pool, Events: proxyEvents, MetricsSink: a.metricsManager, diff --git a/internal/api/handler_requestlog.go b/internal/api/handler_requestlog.go index d15332bf..957f72a0 100644 --- a/internal/api/handler_requestlog.go +++ b/internal/api/handler_requestlog.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/Resinat/Resin/internal/proxy" "github.com/Resinat/Resin/internal/requestlog" ) @@ -309,8 +310,10 @@ type logListItem struct { RespBodyLen int `json:"resp_body_len"` ReqHeadersTruncated bool `json:"req_headers_truncated"` ReqBodyTruncated bool `json:"req_body_truncated"` - RespHeadersTruncated bool `json:"resp_headers_truncated"` - RespBodyTruncated bool `json:"resp_body_truncated"` + RespHeadersTruncated bool `json:"resp_headers_truncated"` + RespBodyTruncated bool `json:"resp_body_truncated"` + RetryAttempts int `json:"retry_attempts"` + RetryDetails []proxy.RetryDetail `json:"retry_details"` } func toLogListItem(s requestlog.LogSummary) logListItem { @@ -347,6 +350,8 @@ func toLogListItem(s requestlog.LogSummary) logListItem { ReqBodyTruncated: s.ReqBodyTruncated, RespHeadersTruncated: s.RespHeadersTruncated, RespBodyTruncated: s.RespBodyTruncated, + RetryAttempts: s.RetryAttempts, + RetryDetails: s.RetryDetails, } } diff --git a/internal/config/env.go b/internal/config/env.go index 1877dad1..749b5418 100644 --- a/internal/config/env.go +++ b/internal/config/env.go @@ -39,6 +39,7 @@ type EnvConfig struct { DefaultPlatformReverseProxyEmptyAccountBehavior string DefaultPlatformReverseProxyFixedAccountHeader string DefaultPlatformAllocationPolicy string + DefaultPlatformMaxRetries int ProbeTimeout time.Duration ResourceFetchTimeout time.Duration ProxyTransportMaxIdleConns int @@ -107,6 +108,7 @@ func LoadEnvConfig() (*EnvConfig, error) { "RESIN_DEFAULT_PLATFORM_ALLOCATION_POLICY", string(platform.AllocationPolicyBalanced), ) + cfg.DefaultPlatformMaxRetries = envInt("RESIN_DEFAULT_PLATFORM_MAX_RETRIES", 0, &errs) cfg.ProbeTimeout = envDuration("RESIN_PROBE_TIMEOUT", 15*time.Second, &errs) cfg.ResourceFetchTimeout = envDuration("RESIN_RESOURCE_FETCH_TIMEOUT", 30*time.Second, &errs) cfg.ProxyTransportMaxIdleConns = envInt("RESIN_PROXY_TRANSPORT_MAX_IDLE_CONNS", 1024, &errs) @@ -225,6 +227,9 @@ func LoadEnvConfig() (*EnvConfig, error) { platform.AllocationPolicyPreferIdleIP, )) } + if cfg.DefaultPlatformMaxRetries < 0 { + errs = append(errs, "RESIN_DEFAULT_PLATFORM_MAX_RETRIES must be >= 0") + } if cfg.ProbeTimeout <= 0 { errs = append(errs, "RESIN_PROBE_TIMEOUT must be positive") } diff --git a/internal/model/models.go b/internal/model/models.go index 9e6a2055..c6972f6f 100644 --- a/internal/model/models.go +++ b/internal/model/models.go @@ -14,6 +14,7 @@ type Platform struct { ReverseProxyEmptyAccountBehavior string `json:"reverse_proxy_empty_account_behavior"` ReverseProxyFixedAccountHeader string `json:"reverse_proxy_fixed_account_header"` AllocationPolicy string `json:"allocation_policy"` + MaxRetries int `json:"max_retries"` UpdatedAtNs int64 `json:"updated_at_ns"` } diff --git a/internal/platform/model_codec.go b/internal/platform/model_codec.go index 35b098e8..a5ab1906 100644 --- a/internal/platform/model_codec.go +++ b/internal/platform/model_codec.go @@ -48,6 +48,7 @@ func NewConfiguredPlatform( emptyAccountBehavior string, fixedAccountHeader string, allocationPolicy string, + maxRetries int, ) *Platform { normalizedFixedHeaders, fixedHeaders, err := NormalizeFixedAccountHeaders(fixedAccountHeader) if err != nil { @@ -61,6 +62,7 @@ func NewConfiguredPlatform( plat.ReverseProxyFixedAccountHeader = normalizedFixedHeaders plat.ReverseProxyFixedAccountHeaders = append([]string(nil), fixedHeaders...) plat.AllocationPolicy = ParseAllocationPolicy(allocationPolicy) + plat.MaxRetries = maxRetries return plat } @@ -116,5 +118,6 @@ func BuildFromModel(mp model.Platform) (*Platform, error) { emptyAccountBehavior, fixedHeader, mp.AllocationPolicy, + mp.MaxRetries, ), nil } diff --git a/internal/platform/platform.go b/internal/platform/platform.go index 2eae0649..40c916a7 100644 --- a/internal/platform/platform.go +++ b/internal/platform/platform.go @@ -40,6 +40,7 @@ type Platform struct { ReverseProxyFixedAccountHeader string ReverseProxyFixedAccountHeaders []string AllocationPolicy AllocationPolicy + MaxRetries int // Routable view & its lock. // viewMu serializes both FullRebuild and NotifyDirty. diff --git a/internal/proxy/events.go b/internal/proxy/events.go index f3710ca0..5e0652ae 100644 --- a/internal/proxy/events.go +++ b/internal/proxy/events.go @@ -39,6 +39,14 @@ type RequestFinishedEvent struct { DurationNs int64 } +// RetryDetail records the outcome of a single failed retry attempt. +type RetryDetail struct { + NodeHash string `json:"node_hash"` + NodeTag string `json:"node_tag"` + ErrKind string `json:"err_kind"` + ErrMsg string `json:"err_msg"` +} + // RequestLogEntry captures per-request details for the structured request log. // Used by the requestlog subsystem (Phase 8). type RequestLogEntry struct { @@ -63,7 +71,9 @@ type RequestLogEntry struct { UpstreamErrKind string // normalized error family UpstreamErrno string // normalized errno, when available UpstreamErrMsg string // sanitized upstream error message - IngressBytes int64 // bytes from upstream to client (header + body) + RetryAttempts int // number of upstream retry attempts (0 = no retry) + RetryDetails []RetryDetail // per-attempt failure details (len == RetryAttempts) + IngressBytes int64 // bytes from upstream to client (header + body) EgressBytes int64 // bytes from client to upstream (header + body) // Optional detail payload (mainly for reverse proxy request logging). diff --git a/internal/proxy/forward.go b/internal/proxy/forward.go index cba96874..a269d444 100644 --- a/internal/proxy/forward.go +++ b/internal/proxy/forward.go @@ -24,6 +24,7 @@ type ForwardProxyConfig struct { ProxyToken string Router *routing.Router Pool outbound.PoolAccessor + PlatformLookup PlatformLookup Health HealthRecorder Events EventEmitter MetricsSink MetricsEventSink @@ -37,6 +38,7 @@ type ForwardProxy struct { token string router *routing.Router pool outbound.PoolAccessor + platLook PlatformLookup health HealthRecorder events EventEmitter metricsSink MetricsEventSink @@ -60,6 +62,7 @@ func NewForwardProxy(cfg ForwardProxyConfig) *ForwardProxy { token: cfg.ProxyToken, router: cfg.Router, pool: cfg.Pool, + platLook: cfg.PlatformLookup, health: cfg.Health, events: ev, metricsSink: cfg.MetricsSink, @@ -240,71 +243,105 @@ func (p *ForwardProxy) handleHTTP(w http.ResponseWriter, r *http.Request) { lifecycle := newRequestLifecycle(p.events, r, ProxyTypeForward, false) lifecycle.setTarget(r.Host, r.URL.String()) - defer lifecycle.finish() lifecycle.setAccount(account) - routed, routeErr := resolveRoutedOutbound(p.router, p.pool, platName, account, r.Host) - if routeErr != nil { - lifecycle.setProxyError(routeErr) - lifecycle.setHTTPStatus(routeErr.HTTPCode) - writeProxyError(w, routeErr) - return - } - lifecycle.setRouteResult(routed.Route) - go p.health.RecordLatency(routed.Route.NodeHash, netutil.ExtractDomain(r.Host), nil) + maxRetries := platformMaxRetries(p.platLook, platName) + var finalAttempt int + defer func() { + lifecycle.setRetryAttempts(finalAttempt) + lifecycle.finish() + }() - transport := p.outboundHTTPTransport(routed) - outReq := prepareForwardOutboundRequest(r) - lifecycle.addEgressBytes(headerWireLen(outReq.Header)) - var egressBodyCounter *countingReadCloser - if outReq.Body != nil && outReq.Body != http.NoBody { - egressBodyCounter = newCountingReadCloser(outReq.Body) - outReq.Body = egressBodyCounter + // Buffer request body upfront so it can be replayed on retry. + var bodyBytes []byte + if maxRetries > 0 && r.Body != nil && r.Body != http.NoBody { + var err error + bodyBytes, err = io.ReadAll(r.Body) + r.Body.Close() + if err != nil { + lifecycle.setProxyError(ErrInternalError) + lifecycle.setHTTPStatus(ErrInternalError.HTTPCode) + writeProxyError(w, ErrInternalError) + return + } + r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + r.ContentLength = int64(len(bodyBytes)) } - // Forward the request. - resp, err := transport.RoundTrip(outReq) - if egressBodyCounter != nil { - lifecycle.addEgressBytes(egressBodyCounter.Total()) - } - if err != nil { - proxyErr := classifyUpstreamError(err) - if proxyErr == nil { - // context.Canceled — skip health recording, close silently. - // Request ended due to client-side cancellation before upstream - // response; treat as net-ok in request log semantics. - lifecycle.setNetOK(true) + for attempt := 0; ; attempt++ { + finalAttempt = attempt + routed, routeErr := resolveRoutedOutbound(p.router, p.pool, platName, account, r.Host) + if routeErr != nil { + lifecycle.setProxyError(routeErr) + lifecycle.setHTTPStatus(routeErr.HTTPCode) + writeProxyError(w, routeErr) return } - lifecycle.setProxyError(proxyErr) - lifecycle.setUpstreamError("forward_roundtrip", err) - lifecycle.setHTTPStatus(proxyErr.HTTPCode) - go p.health.RecordResult(routed.Route.NodeHash, false) - writeProxyError(w, proxyErr) - return - } - defer resp.Body.Close() - - lifecycle.setHTTPStatus(resp.StatusCode) - lifecycle.setNetOK(true) - - // Copy end-to-end response headers and body. - lifecycle.addIngressBytes(copyEndToEndHeaders(w.Header(), resp.Header)) - w.WriteHeader(resp.StatusCode) - copiedBytes, copyErr := io.Copy(w, resp.Body) - lifecycle.addIngressBytes(copiedBytes) - if copyErr != nil { - if shouldRecordForwardCopyFailure(r, copyErr) { - lifecycle.setProxyError(ErrUpstreamRequestFailed) - lifecycle.setUpstreamError("forward_upstream_to_client_copy", copyErr) - lifecycle.setNetOK(false) + lifecycle.setRouteResult(routed.Route) + go p.health.RecordLatency(routed.Route.NodeHash, netutil.ExtractDomain(r.Host), nil) + + transport := p.outboundHTTPTransport(routed) + outReq := prepareForwardOutboundRequest(r) + if attempt > 0 && bodyBytes != nil { + outReq.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + outReq.ContentLength = int64(len(bodyBytes)) + } + lifecycle.addEgressBytes(headerWireLen(outReq.Header)) + var egressBodyCounter *countingReadCloser + if outReq.Body != nil && outReq.Body != http.NoBody { + egressBodyCounter = newCountingReadCloser(outReq.Body) + outReq.Body = egressBodyCounter + } + + resp, err := transport.RoundTrip(outReq) + if egressBodyCounter != nil { + lifecycle.addEgressBytes(egressBodyCounter.Total()) + } + if err != nil { + proxyErr := classifyUpstreamError(err) + if proxyErr == nil { + lifecycle.setNetOK(true) + return + } go p.health.RecordResult(routed.Route.NodeHash, false) + + if attempt < maxRetries && isRetryableUpstreamError(proxyErr) { + detail := summarizeUpstreamError(err) + lifecycle.addRetryDetail(routed.Route.NodeHash.Hex(), routed.Route.NodeTag, detail.Kind, detail.Message) + if bodyBytes != nil { + r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + } + continue + } + + lifecycle.setProxyError(proxyErr) + lifecycle.setUpstreamError("forward_roundtrip", err) + lifecycle.setHTTPStatus(proxyErr.HTTPCode) + writeProxyError(w, proxyErr) + return } + defer resp.Body.Close() + + lifecycle.setHTTPStatus(resp.StatusCode) + lifecycle.setNetOK(true) + + lifecycle.addIngressBytes(copyEndToEndHeaders(w.Header(), resp.Header)) + w.WriteHeader(resp.StatusCode) + copiedBytes, copyErr := io.Copy(w, resp.Body) + lifecycle.addIngressBytes(copiedBytes) + if copyErr != nil { + if shouldRecordForwardCopyFailure(r, copyErr) { + lifecycle.setProxyError(ErrUpstreamRequestFailed) + lifecycle.setUpstreamError("forward_upstream_to_client_copy", copyErr) + lifecycle.setNetOK(false) + go p.health.RecordResult(routed.Route.NodeHash, false) + } + return + } + + go p.health.RecordResult(routed.Route.NodeHash, true) return } - - // Full body transfer succeeded — count as network success even for 5xx HTTP. - go p.health.RecordResult(routed.Route.NodeHash, true) } func (p *ForwardProxy) handleCONNECT(w http.ResponseWriter, r *http.Request) { @@ -317,39 +354,57 @@ func (p *ForwardProxy) handleCONNECT(w http.ResponseWriter, r *http.Request) { lifecycle := newRequestLifecycle(p.events, r, ProxyTypeForward, true) lifecycle.setTarget(target, "") - defer lifecycle.finish() lifecycle.setAccount(account) - routed, routeErr := resolveRoutedOutbound(p.router, p.pool, platName, account, target) - if routeErr != nil { - lifecycle.setProxyError(routeErr) - lifecycle.setHTTPStatus(routeErr.HTTPCode) - writeProxyError(w, routeErr) - return - } - lifecycle.setRouteResult(routed.Route) - - // Wrap the dialed connection with tlsLatencyConn for passive TLS latency. + maxRetries := platformMaxRetries(p.platLook, platName) domain := netutil.ExtractDomain(target) - nodeHashRaw := routed.Route.NodeHash - go p.health.RecordLatency(nodeHashRaw, domain, nil) + var finalAttempt int + defer func() { + lifecycle.setRetryAttempts(finalAttempt) + lifecycle.finish() + }() - rawConn, err := routed.Outbound.DialContext(r.Context(), "tcp", M.ParseSocksaddr(target)) - if err != nil { - proxyErr := classifyConnectError(err) - if proxyErr == nil { - // context.Canceled before CONNECT response — no health penalty, - // but mark log as net-ok. - lifecycle.setNetOK(true) + var routed routedOutbound + var rawConn net.Conn + for attempt := 0; ; attempt++ { + finalAttempt = attempt + var routeErr *ProxyError + routed, routeErr = resolveRoutedOutbound(p.router, p.pool, platName, account, target) + if routeErr != nil { + lifecycle.setProxyError(routeErr) + lifecycle.setHTTPStatus(routeErr.HTTPCode) + writeProxyError(w, routeErr) return } - lifecycle.setProxyError(proxyErr) - lifecycle.setUpstreamError("connect_dial", err) - lifecycle.setHTTPStatus(proxyErr.HTTPCode) - go p.health.RecordResult(nodeHashRaw, false) - writeProxyError(w, proxyErr) - return + lifecycle.setRouteResult(routed.Route) + go p.health.RecordLatency(routed.Route.NodeHash, domain, nil) + + var err error + rawConn, err = routed.Outbound.DialContext(r.Context(), "tcp", M.ParseSocksaddr(target)) + if err != nil { + proxyErr := classifyConnectError(err) + if proxyErr == nil { + lifecycle.setNetOK(true) + return + } + go p.health.RecordResult(routed.Route.NodeHash, false) + + if attempt < maxRetries && isRetryableUpstreamError(proxyErr) { + detail := summarizeUpstreamError(err) + lifecycle.addRetryDetail(routed.Route.NodeHash.Hex(), routed.Route.NodeTag, detail.Kind, detail.Message) + continue + } + + lifecycle.setProxyError(proxyErr) + lifecycle.setUpstreamError("connect_dial", err) + lifecycle.setHTTPStatus(proxyErr.HTTPCode) + writeProxyError(w, proxyErr) + return + } + break } + + nodeHashRaw := routed.Route.NodeHash recordConnectResult := func(ok bool) { lifecycle.setNetOK(ok) go p.health.RecordResult(nodeHashRaw, ok) diff --git a/internal/proxy/request_lifecycle.go b/internal/proxy/request_lifecycle.go index 3e3ea6fa..3f23de48 100644 --- a/internal/proxy/request_lifecycle.go +++ b/internal/proxy/request_lifecycle.go @@ -111,6 +111,19 @@ func (l *requestLifecycle) addEgressBytes(n int64) { } } +func (l *requestLifecycle) setRetryAttempts(n int) { + l.log.RetryAttempts = n +} + +func (l *requestLifecycle) addRetryDetail(nodeHash, nodeTag, errKind, errMsg string) { + l.log.RetryDetails = append(l.log.RetryDetails, RetryDetail{ + NodeHash: nodeHash, + NodeTag: nodeTag, + ErrKind: errKind, + ErrMsg: errMsg, + }) +} + func (l *requestLifecycle) setNetOK(ok bool) { l.finished.NetOK = ok l.log.NetOK = ok diff --git a/internal/proxy/retry.go b/internal/proxy/retry.go new file mode 100644 index 00000000..c5732e81 --- /dev/null +++ b/internal/proxy/retry.go @@ -0,0 +1,28 @@ +package proxy + +// isRetryableUpstreamError returns true if the proxy error is a transient +// upstream failure that may succeed on a different node. +func isRetryableUpstreamError(pe *ProxyError) bool { + if pe == nil { + return false + } + switch pe { + case ErrUpstreamConnectFailed, ErrUpstreamTimeout, ErrUpstreamRequestFailed: + return true + default: + return false + } +} + +// platformMaxRetries returns MaxRetries for the named platform, or 0 if +// the platform is unknown or lookup is nil. +func platformMaxRetries(look PlatformLookup, platName string) int { + if look == nil || platName == "" { + return 0 + } + plat, ok := look.GetPlatformByName(platName) + if !ok || plat == nil { + return 0 + } + return plat.MaxRetries +} diff --git a/internal/proxy/reverse.go b/internal/proxy/reverse.go index 71b3aec9..bc5f76d7 100644 --- a/internal/proxy/reverse.go +++ b/internal/proxy/reverse.go @@ -1,6 +1,7 @@ package proxy import ( + "bytes" "io" "net/http" "net/http/httptrace" @@ -246,6 +247,7 @@ func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { lifecycle := newRequestLifecycle(p.events, r, ProxyTypeReverse, false) lifecycle.setTarget(parsed.Host, "") + var finalAttempt int var egressBodyCounter *countingReadCloser var ingressBodyCounter *countingReadCloser var upgradedStreamCounter *countingReadWriteCloser @@ -263,7 +265,10 @@ func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { egressBodyCounter = newCountingReadCloser(body) r.Body = egressBodyCounter } - defer lifecycle.finish() + defer func() { + lifecycle.setRetryAttempts(finalAttempt) + lifecycle.finish() + }() // Resolve account in three phases: // 1) Use path account directly when present. @@ -280,19 +285,6 @@ func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - routed, routeErr := resolveRoutedOutbound(p.router, p.pool, parsed.PlatformName, account, parsed.Host) - if routeErr != nil { - lifecycle.setProxyError(routeErr) - lifecycle.setHTTPStatus(routeErr.HTTPCode) - writeProxyError(w, routeErr) - return - } - lifecycle.setRouteResult(routed.Route) - - nodeHashRaw := routed.Route.NodeHash - domain := netutil.ExtractDomain(parsed.Host) - go p.health.RecordLatency(nodeHashRaw, domain, nil) - target, targetErr := buildReverseTargetURL(parsed, r.URL.RawQuery) if targetErr != nil { lifecycle.setProxyError(targetErr) @@ -302,89 +294,126 @@ func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { } lifecycle.setTarget(parsed.Host, target.String()) - transport := p.outboundHTTPTransport(routed) - - proxy := &httputil.ReverseProxy{ - Director: func(req *http.Request) { - req.URL = target - req.Host = parsed.Host - stripForwardingIdentityHeaders(req.Header) - lifecycle.addEgressBytes(headerWireLen(req.Header)) - - // Add httptrace for TLS latency measurement on HTTPS. - if parsed.Protocol == "https" { - reporter := newReverseLatencyReporter(p.health, nodeHashRaw, domain) - reqCtx := httptrace.WithClientTrace(req.Context(), reporter.clientTrace()) - *req = *req.WithContext(reqCtx) - } - }, - Transport: transport, - ErrorHandler: func(rw http.ResponseWriter, req *http.Request, err error) { - proxyErr := classifyUpstreamError(err) - if proxyErr == nil { - // context.Canceled — no health recording, silently close. - // Treat as net-ok for request-log semantics when canceled - // before upstream response. - lifecycle.setNetOK(true) - return - } - lifecycle.setProxyError(proxyErr) - lifecycle.setUpstreamError("reverse_roundtrip", err) - lifecycle.setNetOK(false) - lifecycle.setHTTPStatus(proxyErr.HTTPCode) - go p.health.RecordResult(nodeHashRaw, false) - writeProxyError(rw, proxyErr) - }, - ModifyResponse: func(resp *http.Response) error { - lifecycle.setHTTPStatus(resp.StatusCode) - lifecycle.addIngressBytes(headerWireLen(resp.Header)) - if resp.StatusCode == http.StatusSwitchingProtocols { - // 101 upgrade responses require a writable backend body - // (io.ReadWriteCloser). Do not wrap resp.Body here; wrapping with a - // read-only wrapper breaks websocket/h2c upgrade tunneling in - // net/http/httputil.ReverseProxy. - // - // We can still account upgrade-session traffic by wrapping with a - // read-write counter that preserves io.ReadWriteCloser semantics. - if rwc, ok := resp.Body.(io.ReadWriteCloser); ok { - upgradedStreamCounter = newCountingReadWriteCloser(rwc) - resp.Body = upgradedStreamCounter + maxRetries := platformMaxRetries(p.platLook, parsed.PlatformName) + + // Buffer request body for potential retry. + var bodyBytes []byte + if maxRetries > 0 && r.Body != nil && r.Body != http.NoBody { + raw, readErr := io.ReadAll(r.Body) + r.Body.Close() + if readErr != nil { + lifecycle.setProxyError(ErrInternalError) + lifecycle.setHTTPStatus(ErrInternalError.HTTPCode) + writeProxyError(w, ErrInternalError) + return + } + bodyBytes = raw + r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + r.ContentLength = int64(len(bodyBytes)) + } + + domain := netutil.ExtractDomain(parsed.Host) + + for attempt := 0; ; attempt++ { + finalAttempt = attempt + routed, routeErr := resolveRoutedOutbound(p.router, p.pool, parsed.PlatformName, account, parsed.Host) + if routeErr != nil { + lifecycle.setProxyError(routeErr) + lifecycle.setHTTPStatus(routeErr.HTTPCode) + writeProxyError(w, routeErr) + return + } + lifecycle.setRouteResult(routed.Route) + + nodeHashRaw := routed.Route.NodeHash + go p.health.RecordLatency(nodeHashRaw, domain, nil) + + transport := p.outboundHTTPTransport(routed) + + var retryNeeded bool + proxy := &httputil.ReverseProxy{ + Director: func(req *http.Request) { + req.URL = target + req.Host = parsed.Host + stripForwardingIdentityHeaders(req.Header) + lifecycle.addEgressBytes(headerWireLen(req.Header)) + + if parsed.Protocol == "https" { + reporter := newReverseLatencyReporter(p.health, nodeHashRaw, domain) + reqCtx := httptrace.WithClientTrace(req.Context(), reporter.clientTrace()) + *req = *req.WithContext(reqCtx) + } + }, + Transport: transport, + ErrorHandler: func(rw http.ResponseWriter, req *http.Request, err error) { + proxyErr := classifyUpstreamError(err) + if proxyErr == nil { + lifecycle.setNetOK(true) + return + } + go p.health.RecordResult(nodeHashRaw, false) + + if attempt < maxRetries && isRetryableUpstreamError(proxyErr) { + detail := summarizeUpstreamError(err) + lifecycle.addRetryDetail(routed.Route.NodeHash.Hex(), routed.Route.NodeTag, detail.Kind, detail.Message) + retryNeeded = true + return } - if detailCfg.Enabled { + + lifecycle.setProxyError(proxyErr) + lifecycle.setUpstreamError("reverse_roundtrip", err) + lifecycle.setNetOK(false) + lifecycle.setHTTPStatus(proxyErr.HTTPCode) + writeProxyError(rw, proxyErr) + }, + ModifyResponse: func(resp *http.Response) error { + lifecycle.setHTTPStatus(resp.StatusCode) + lifecycle.addIngressBytes(headerWireLen(resp.Header)) + if resp.StatusCode == http.StatusSwitchingProtocols { + if rwc, ok := resp.Body.(io.ReadWriteCloser); ok { + upgradedStreamCounter = newCountingReadWriteCloser(rwc) + resp.Body = upgradedStreamCounter + } + if detailCfg.Enabled { + respHeaders, respHeadersLen, respHeadersTruncated := captureHeadersWithLimit(resp.Header, detailCfg.RespHeadersMaxBytes) + lifecycle.setRespHeadersCaptured(respHeaders, respHeadersLen, respHeadersTruncated) + } + lifecycle.setNetOK(true) + go p.health.RecordResult(nodeHashRaw, true) + return nil + } + if resp.Body != nil && resp.Body != http.NoBody { + body := resp.Body + if detailCfg.Enabled { + respHeaders, respHeadersLen, respHeadersTruncated := captureHeadersWithLimit(resp.Header, detailCfg.RespHeadersMaxBytes) + lifecycle.setRespHeadersCaptured(respHeaders, respHeadersLen, respHeadersTruncated) + respBodyCapture := newPayloadCaptureReadCloser(body, detailCfg.RespBodyMaxBytes) + body = respBodyCapture + lifecycle.setRespBodyCapture(respBodyCapture) + } + ingressBodyCounter = newCountingReadCloser(body) + resp.Body = ingressBodyCounter + } else if detailCfg.Enabled { respHeaders, respHeadersLen, respHeadersTruncated := captureHeadersWithLimit(resp.Header, detailCfg.RespHeadersMaxBytes) lifecycle.setRespHeadersCaptured(respHeaders, respHeadersLen, respHeadersTruncated) } lifecycle.setNetOK(true) go p.health.RecordResult(nodeHashRaw, true) return nil - } - if resp.Body != nil && resp.Body != http.NoBody { - body := resp.Body - if detailCfg.Enabled { - respHeaders, respHeadersLen, respHeadersTruncated := captureHeadersWithLimit(resp.Header, detailCfg.RespHeadersMaxBytes) - lifecycle.setRespHeadersCaptured(respHeaders, respHeadersLen, respHeadersTruncated) - respBodyCapture := newPayloadCaptureReadCloser(body, detailCfg.RespBodyMaxBytes) - body = respBodyCapture - lifecycle.setRespBodyCapture(respBodyCapture) - } - ingressBodyCounter = newCountingReadCloser(body) - resp.Body = ingressBodyCounter - } else if detailCfg.Enabled { - respHeaders, respHeadersLen, respHeadersTruncated := captureHeadersWithLimit(resp.Header, detailCfg.RespHeadersMaxBytes) - lifecycle.setRespHeadersCaptured(respHeaders, respHeadersLen, respHeadersTruncated) - } - // Intentional coarse-grained policy: - // mark node success once upstream response headers arrive. - // Further attribution for mid-body stream failures is expensive and noisy - // (client abort vs upstream reset vs network blip), and the added - // complexity is not worth it for the current phase. - lifecycle.setNetOK(true) - go p.health.RecordResult(nodeHashRaw, true) - return nil - }, - } - - proxy.ServeHTTP(w, r) + }, + } + + proxy.ServeHTTP(w, r) + + if !retryNeeded { + break + } + // Reset request body for next attempt. + if bodyBytes != nil { + r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + r.ContentLength = int64(len(bodyBytes)) + } + } if egressBodyCounter != nil { lifecycle.addEgressBytes(egressBodyCounter.Total()) } diff --git a/internal/requestlog/repo.go b/internal/requestlog/repo.go index d3e973f6..902837f1 100644 --- a/internal/requestlog/repo.go +++ b/internal/requestlog/repo.go @@ -2,6 +2,7 @@ package requestlog import ( "database/sql" + "encoding/json" "errors" "fmt" "log" @@ -18,7 +19,7 @@ import ( "github.com/Resinat/Resin/internal/state" ) -const logSummarySelectColumns = "id, ts_ns, proxy_type, client_ip, platform_id, platform_name, account, target_host, target_url, node_hash, node_tag, egress_ip, duration_ns, net_ok, http_method, http_status, resin_error, upstream_stage, upstream_err_kind, upstream_errno, upstream_err_msg, ingress_bytes, egress_bytes, payload_present, req_headers_len, req_body_len, resp_headers_len, resp_body_len, req_headers_truncated, req_body_truncated, resp_headers_truncated, resp_body_truncated" +const logSummarySelectColumns = "id, ts_ns, proxy_type, client_ip, platform_id, platform_name, account, target_host, target_url, node_hash, node_tag, egress_ip, duration_ns, net_ok, http_method, http_status, resin_error, upstream_stage, upstream_err_kind, upstream_errno, upstream_err_msg, ingress_bytes, egress_bytes, payload_present, req_headers_len, req_body_len, resp_headers_len, resp_body_len, req_headers_truncated, req_body_truncated, resp_headers_truncated, resp_body_truncated, retry_attempts, retry_details" // Repo manages rolling SQLite databases for request logs. // Each DB is named request_logs-.db and lives in logDir. @@ -118,8 +119,9 @@ func (r *Repo) InsertBatch(entries []proxy.RequestLogEntry) (int, error) { ingress_bytes, egress_bytes, payload_present, req_headers_len, req_body_len, resp_headers_len, resp_body_len, - req_headers_truncated, req_body_truncated, resp_headers_truncated, resp_body_truncated - ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`) + req_headers_truncated, req_body_truncated, resp_headers_truncated, resp_body_truncated, + retry_attempts, retry_details + ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`) if err != nil { return 0, fmt.Errorf("requestlog repo prepare log: %w", err) } @@ -149,6 +151,13 @@ func (r *Repo) InsertBatch(entries []proxy.RequestLogEntry) (int, error) { hasPayload = 1 } + retryDetailsJSON := "" + if len(e.RetryDetails) > 0 { + if b, jerr := json.Marshal(e.RetryDetails); jerr == nil { + retryDetailsJSON = string(b) + } + } + _, err := insertLog.Exec( id, e.StartedAtNs, int(e.ProxyType), e.ClientIP, e.PlatformID, e.PlatformName, e.Account, @@ -160,6 +169,7 @@ func (r *Repo) InsertBatch(entries []proxy.RequestLogEntry) (int, error) { e.ReqHeadersLen, e.ReqBodyLen, e.RespHeadersLen, e.RespBodyLen, boolToInt(e.ReqHeadersTruncated), boolToInt(e.ReqBodyTruncated), boolToInt(e.RespHeadersTruncated), boolToInt(e.RespBodyTruncated), + e.RetryAttempts, retryDetailsJSON, ) if err != nil { log.Printf("[requestlog] warning: skip log row id=%q insert failed: %v", id, err) @@ -230,8 +240,10 @@ type LogSummary struct { RespBodyLen int `json:"resp_body_len"` ReqHeadersTruncated bool `json:"req_headers_truncated"` ReqBodyTruncated bool `json:"req_body_truncated"` - RespHeadersTruncated bool `json:"resp_headers_truncated"` - RespBodyTruncated bool `json:"resp_body_truncated"` + RespHeadersTruncated bool `json:"resp_headers_truncated"` + RespBodyTruncated bool `json:"resp_body_truncated"` + RetryAttempts int `json:"retry_attempts"` + RetryDetails []proxy.RetryDetail `json:"retry_details"` } // PayloadRow holds the payload data for a single log entry. @@ -420,6 +432,13 @@ func (r *Repo) runReadBarrier() { // --- internal helpers --- +// migrateSchema adds columns introduced after the initial schema. +// It is safe to call on databases that already have the columns. +func migrateSchema(db *sql.DB) { + db.Exec("ALTER TABLE request_logs ADD COLUMN retry_attempts INTEGER NOT NULL DEFAULT 0") //nolint:errcheck + db.Exec("ALTER TABLE request_logs ADD COLUMN retry_details TEXT NOT NULL DEFAULT ''") //nolint:errcheck +} + func (r *Repo) openDB(path string) error { db, err := state.OpenDB(path) if err != nil { @@ -429,6 +448,7 @@ func (r *Repo) openDB(path string) error { db.Close() return err } + migrateSchema(db) r.activeDB = db r.activePath = path return nil @@ -501,12 +521,12 @@ func (r *Repo) listDBFiles() ([]string, error) { } func (r *Repo) openReadOnly(path string) (*sql.DB, error) { - dsn := path + "?mode=ro" - db, err := sql.Open("sqlite", dsn) + db, err := sql.Open("sqlite", path) if err != nil { return nil, err } db.SetMaxOpenConns(1) + migrateSchema(db) return db, nil } @@ -636,6 +656,7 @@ type rowScanner interface { func scanLogSummary(s rowScanner) (LogSummary, error) { var row LogSummary var netOK, payloadPresent, rht, rbt, rsht, rsbt int + var retryDetailsRaw string err := s.Scan( &row.ID, &row.TsNs, &row.ProxyType, &row.ClientIP, &row.PlatformID, &row.PlatformName, &row.Account, @@ -646,10 +667,14 @@ func scanLogSummary(s rowScanner) (LogSummary, error) { &payloadPresent, &row.ReqHeadersLen, &row.ReqBodyLen, &row.RespHeadersLen, &row.RespBodyLen, &rht, &rbt, &rsht, &rsbt, + &row.RetryAttempts, &retryDetailsRaw, ) if err != nil { return LogSummary{}, err } + if retryDetailsRaw != "" { + json.Unmarshal([]byte(retryDetailsRaw), &row.RetryDetails) //nolint:errcheck + } row.NetOK = netOK != 0 row.PayloadPresent = payloadPresent != 0 row.ReqHeadersTruncated = rht != 0 diff --git a/internal/requestlog/schema.go b/internal/requestlog/schema.go index 2f34bc40..c9e166f5 100644 --- a/internal/requestlog/schema.go +++ b/internal/requestlog/schema.go @@ -37,7 +37,9 @@ CREATE TABLE IF NOT EXISTS request_logs ( req_headers_truncated INTEGER NOT NULL DEFAULT 0, req_body_truncated INTEGER NOT NULL DEFAULT 0, resp_headers_truncated INTEGER NOT NULL DEFAULT 0, - resp_body_truncated INTEGER NOT NULL DEFAULT 0 + resp_body_truncated INTEGER NOT NULL DEFAULT 0, + retry_attempts INTEGER NOT NULL DEFAULT 0, + retry_details TEXT NOT NULL DEFAULT '' ); CREATE TABLE IF NOT EXISTS request_log_payloads ( diff --git a/internal/service/control_plane_platform.go b/internal/service/control_plane_platform.go index 1f6aa50f..e83b2e79 100644 --- a/internal/service/control_plane_platform.go +++ b/internal/service/control_plane_platform.go @@ -32,6 +32,7 @@ type PlatformResponse struct { ReverseProxyEmptyAccountBehavior string `json:"reverse_proxy_empty_account_behavior"` ReverseProxyFixedAccountHeader string `json:"reverse_proxy_fixed_account_header"` AllocationPolicy string `json:"allocation_policy"` + MaxRetries int `json:"max_retries"` UpdatedAt string `json:"updated_at"` } @@ -49,6 +50,7 @@ func platformToResponse(p model.Platform) PlatformResponse { ReverseProxyEmptyAccountBehavior: behavior, ReverseProxyFixedAccountHeader: fixedHeader, AllocationPolicy: p.AllocationPolicy, + MaxRetries: p.MaxRetries, UpdatedAt: time.Unix(0, p.UpdatedAtNs).UTC().Format(time.RFC3339Nano), } } @@ -74,6 +76,7 @@ type platformConfig struct { ReverseProxyEmptyAccountBehavior string ReverseProxyFixedAccountHeader string AllocationPolicy string + MaxRetries int } func normalizePlatformMissAction(raw string) string { @@ -105,6 +108,7 @@ func (s *ControlPlaneService) defaultPlatformConfig(name string) platformConfig s.EnvCfg.DefaultPlatformReverseProxyFixedAccountHeader, ), AllocationPolicy: s.EnvCfg.DefaultPlatformAllocationPolicy, + MaxRetries: s.EnvCfg.DefaultPlatformMaxRetries, } } @@ -118,6 +122,7 @@ func platformConfigFromModel(mp model.Platform) platformConfig { ReverseProxyEmptyAccountBehavior: normalizePlatformEmptyAccountBehavior(mp.ReverseProxyEmptyAccountBehavior), ReverseProxyFixedAccountHeader: normalizeHeaderFieldName(mp.ReverseProxyFixedAccountHeader), AllocationPolicy: mp.AllocationPolicy, + MaxRetries: mp.MaxRetries, } } @@ -132,6 +137,7 @@ func (cfg platformConfig) toModel(id string, updatedAtNs int64) model.Platform { ReverseProxyEmptyAccountBehavior: cfg.ReverseProxyEmptyAccountBehavior, ReverseProxyFixedAccountHeader: cfg.ReverseProxyFixedAccountHeader, AllocationPolicy: cfg.AllocationPolicy, + MaxRetries: cfg.MaxRetries, UpdatedAtNs: updatedAtNs, } } @@ -151,6 +157,7 @@ func (cfg platformConfig) toRuntime(id string) (*platform.Platform, error) { cfg.ReverseProxyEmptyAccountBehavior, cfg.ReverseProxyFixedAccountHeader, cfg.AllocationPolicy, + cfg.MaxRetries, ), nil } @@ -321,6 +328,7 @@ type CreatePlatformRequest struct { ReverseProxyEmptyAccountBehavior *string `json:"reverse_proxy_empty_account_behavior"` ReverseProxyFixedAccountHeader *string `json:"reverse_proxy_fixed_account_header"` AllocationPolicy *string `json:"allocation_policy"` + MaxRetries *int `json:"max_retries"` } // CreatePlatform creates a new platform. @@ -369,6 +377,12 @@ func (s *ControlPlaneService) CreatePlatform(req CreatePlatformRequest) (*Platfo return nil, err } } + if req.MaxRetries != nil { + if *req.MaxRetries < 0 { + return nil, invalidArg("max_retries: must be >= 0") + } + cfg.MaxRetries = *req.MaxRetries + } if err := validatePlatformConfig(&cfg, true); err != nil { return nil, err } @@ -478,6 +492,14 @@ func (s *ControlPlaneService) UpdatePlatform(id string, patchJSON json.RawMessag return nil, err } } + if retries, ok, err := patch.optionalInt("max_retries"); err != nil { + return nil, err + } else if ok { + if retries < 0 { + return nil, invalidArg("max_retries: must be >= 0") + } + cfg.MaxRetries = retries + } if err := validatePlatformConfig(&cfg, regionFiltersPatched); err != nil { return nil, err } diff --git a/internal/service/control_plane_system.go b/internal/service/control_plane_system.go index bed3ddf8..c170dff5 100644 --- a/internal/service/control_plane_system.go +++ b/internal/service/control_plane_system.go @@ -99,6 +99,7 @@ var platformPatchAllowedFields = map[string]bool{ "reverse_proxy_empty_account_behavior": true, "reverse_proxy_fixed_account_header": true, "allocation_policy": true, + "max_retries": true, } var subscriptionPatchAllowedFields = map[string]bool{ diff --git a/internal/service/control_plane_test.go b/internal/service/control_plane_test.go index 66849f04..df98133f 100644 --- a/internal/service/control_plane_test.go +++ b/internal/service/control_plane_test.go @@ -787,6 +787,7 @@ func TestDeletePlatform_DoesNotDecodeCorruptPersistedFiltersJSON(t *testing.T) { string(platform.ReverseProxyEmptyAccountBehaviorAccountHeaderRule), "", platformRow.AllocationPolicy, + 0, )) cp := &ControlPlaneService{ @@ -849,6 +850,7 @@ func TestResetPlatformToDefault_SupportsBuiltInDefaultPlatform(t *testing.T) { string(platform.ReverseProxyEmptyAccountBehaviorAccountHeaderRule), "", defaultRow.AllocationPolicy, + 0, )) cp := &ControlPlaneService{ @@ -990,6 +992,7 @@ func TestResetPlatformToDefault_DoesNotDecodeCorruptPersistedFiltersJSON(t *test string(platform.ReverseProxyEmptyAccountBehaviorAccountHeaderRule), "", platformRow.AllocationPolicy, + 0, )) cp := &ControlPlaneService{ diff --git a/internal/service/patch_helpers.go b/internal/service/patch_helpers.go index d38f2fba..ad3aa18c 100644 --- a/internal/service/patch_helpers.go +++ b/internal/service/patch_helpers.go @@ -76,6 +76,28 @@ func (p mergePatch) optionalBool(field string) (bool, bool, *ServiceError) { return value, true, nil } +func (p mergePatch) optionalInt(field string) (int, bool, *ServiceError) { + raw, ok := p[field] + if !ok { + return 0, false, nil + } + switch v := raw.(type) { + case float64: + if v != float64(int(v)) { + return 0, true, invalidArg(fmt.Sprintf("%s: must be an integer", field)) + } + return int(v), true, nil + case json.Number: + n, err := v.Int64() + if err != nil { + return 0, true, invalidArg(fmt.Sprintf("%s: must be an integer", field)) + } + return int(n), true, nil + default: + return 0, true, invalidArg(fmt.Sprintf("%s: must be an integer", field)) + } +} + func (p mergePatch) optionalStringSlice(field string) ([]string, bool, *ServiceError) { raw, ok := p[field] if !ok { diff --git a/internal/state/migrate.go b/internal/state/migrate.go index 55cc9537..58747a55 100644 --- a/internal/state/migrate.go +++ b/internal/state/migrate.go @@ -23,6 +23,7 @@ const ( stateVersionAddEmptyAccountBehavior = 2 stateVersionAddFixedAccountHeader = 3 stateVersionNormalizeMissAction = 4 + stateVersionAddMaxRetries = 5 stateLegacyBaselineVersion = stateVersionAddFixedAccountHeader ) diff --git a/internal/state/migrations/state/000005_platforms_add_max_retries.down.sql b/internal/state/migrations/state/000005_platforms_add_max_retries.down.sql new file mode 100644 index 00000000..2b2720d3 --- /dev/null +++ b/internal/state/migrations/state/000005_platforms_add_max_retries.down.sql @@ -0,0 +1,3 @@ +-- SQLite does not support DROP COLUMN before 3.35.0; this is a best-effort rollback. +-- For older SQLite versions, a table recreation would be needed. +ALTER TABLE platforms DROP COLUMN max_retries; diff --git a/internal/state/migrations/state/000005_platforms_add_max_retries.up.sql b/internal/state/migrations/state/000005_platforms_add_max_retries.up.sql new file mode 100644 index 00000000..466bd63f --- /dev/null +++ b/internal/state/migrations/state/000005_platforms_add_max_retries.up.sql @@ -0,0 +1 @@ +ALTER TABLE platforms ADD COLUMN max_retries INTEGER NOT NULL DEFAULT 0; diff --git a/internal/state/repo_state.go b/internal/state/repo_state.go index 399ee6af..bb582f3e 100644 --- a/internal/state/repo_state.go +++ b/internal/state/repo_state.go @@ -146,8 +146,8 @@ func (r *StateRepo) UpsertPlatform(p model.Platform) error { _, err = r.db.Exec(` INSERT INTO platforms (id, name, sticky_ttl_ns, regex_filters_json, region_filters_json, reverse_proxy_miss_action, reverse_proxy_empty_account_behavior, - reverse_proxy_fixed_account_header, allocation_policy, updated_at_ns) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + reverse_proxy_fixed_account_header, allocation_policy, max_retries, updated_at_ns) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = excluded.name, sticky_ttl_ns = excluded.sticky_ttl_ns, @@ -157,10 +157,11 @@ func (r *StateRepo) UpsertPlatform(p model.Platform) error { reverse_proxy_empty_account_behavior = excluded.reverse_proxy_empty_account_behavior, reverse_proxy_fixed_account_header = excluded.reverse_proxy_fixed_account_header, allocation_policy = excluded.allocation_policy, + max_retries = excluded.max_retries, updated_at_ns = excluded.updated_at_ns `, p.ID, p.Name, p.StickyTTLNs, regexFiltersJSON, regionFiltersJSON, p.ReverseProxyMissAction, p.ReverseProxyEmptyAccountBehavior, p.ReverseProxyFixedAccountHeader, - p.AllocationPolicy, p.UpdatedAtNs) + p.AllocationPolicy, p.MaxRetries, p.UpdatedAtNs) if err != nil { if isSQLiteUniqueConstraint(err) { return fmt.Errorf("%w: platform name already exists", ErrConflict) @@ -215,14 +216,14 @@ func (r *StateRepo) GetPlatformName(id string) (string, error) { func (r *StateRepo) GetPlatform(id string) (*model.Platform, error) { row := r.db.QueryRow(`SELECT id, name, sticky_ttl_ns, regex_filters_json, region_filters_json, reverse_proxy_miss_action, reverse_proxy_empty_account_behavior, - reverse_proxy_fixed_account_header, allocation_policy, updated_at_ns + reverse_proxy_fixed_account_header, allocation_policy, max_retries, updated_at_ns FROM platforms WHERE id = ?`, id) var p model.Platform var regexFiltersJSON, regionFiltersJSON string if err := row.Scan(&p.ID, &p.Name, &p.StickyTTLNs, ®exFiltersJSON, ®ionFiltersJSON, &p.ReverseProxyMissAction, &p.ReverseProxyEmptyAccountBehavior, - &p.ReverseProxyFixedAccountHeader, &p.AllocationPolicy, &p.UpdatedAtNs); err != nil { + &p.ReverseProxyFixedAccountHeader, &p.AllocationPolicy, &p.MaxRetries, &p.UpdatedAtNs); err != nil { if err == sql.ErrNoRows { return nil, ErrNotFound } @@ -243,7 +244,7 @@ func (r *StateRepo) GetPlatform(id string) (*model.Platform, error) { // ListPlatforms returns all platforms. func (r *StateRepo) ListPlatforms() ([]model.Platform, error) { - rows, err := r.db.Query("SELECT id, name, sticky_ttl_ns, regex_filters_json, region_filters_json, reverse_proxy_miss_action, reverse_proxy_empty_account_behavior, reverse_proxy_fixed_account_header, allocation_policy, updated_at_ns FROM platforms") + rows, err := r.db.Query("SELECT id, name, sticky_ttl_ns, regex_filters_json, region_filters_json, reverse_proxy_miss_action, reverse_proxy_empty_account_behavior, reverse_proxy_fixed_account_header, allocation_policy, max_retries, updated_at_ns FROM platforms") if err != nil { return nil, err } @@ -255,7 +256,7 @@ func (r *StateRepo) ListPlatforms() ([]model.Platform, error) { var regexFiltersJSON, regionFiltersJSON string if err := rows.Scan(&p.ID, &p.Name, &p.StickyTTLNs, ®exFiltersJSON, ®ionFiltersJSON, &p.ReverseProxyMissAction, &p.ReverseProxyEmptyAccountBehavior, - &p.ReverseProxyFixedAccountHeader, &p.AllocationPolicy, &p.UpdatedAtNs); err != nil { + &p.ReverseProxyFixedAccountHeader, &p.AllocationPolicy, &p.MaxRetries, &p.UpdatedAtNs); err != nil { return nil, err } regexFilters, err := decodeStringSliceJSON(regexFiltersJSON) diff --git a/internal/state/repo_state_test.go b/internal/state/repo_state_test.go index e5f7536f..b10fcfc7 100644 --- a/internal/state/repo_state_test.go +++ b/internal/state/repo_state_test.go @@ -102,8 +102,8 @@ func TestMigrateStateDB_LegacyBaselineAdvancesToLatest(t *testing.T) { if dirty { t.Fatalf("schema_migrations dirty=true") } - if version != stateVersionNormalizeMissAction { - t.Fatalf("schema_migrations version: got %d, want %d", version, stateVersionNormalizeMissAction) + if version != stateVersionAddMaxRetries { + t.Fatalf("schema_migrations version: got %d, want %d", version, stateVersionAddMaxRetries) } } @@ -174,8 +174,8 @@ func TestMigrateStateDB_NormalizesLegacyRandomMissAction(t *testing.T) { if dirty { t.Fatalf("schema_migrations dirty=true") } - if version != stateVersionNormalizeMissAction { - t.Fatalf("schema_migrations version: got %d, want %d", version, stateVersionNormalizeMissAction) + if version != stateVersionAddMaxRetries { + t.Fatalf("schema_migrations version: got %d, want %d", version, stateVersionAddMaxRetries) } } diff --git a/webui/package-lock.json b/webui/package-lock.json index 88505cff..96b10159 100644 --- a/webui/package-lock.json +++ b/webui/package-lock.json @@ -73,7 +73,6 @@ "integrity": "sha512-CGOfOJqWjg2qW/Mb6zNsDm+u5vFQ8DxXfbM09z69p5Z6+mE1ikP2jUXw+j42Pf1XTYED2Rni5f95npYeuwMDQA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.29.0", "@babel/generator": "^7.29.0", @@ -1666,7 +1665,6 @@ "integrity": "sha512-oH72nZRfDv9lADUBSo104Aq7gPHpQZc4BTx38r9xf9pg5LfP6EzSyH2n7qFmmxRQXh7YlUXODcYsg6PuTDSxGg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "undici-types": "~7.16.0" } @@ -1677,7 +1675,6 @@ "integrity": "sha512-ilcTH/UniCkMdtexkoCN0bI7pMcJDvmQFPvuPvmEaYA/NSfFTAgdUSLAoVjaRJm7+6PvcM+q1zYOwS4wTYMF9w==", "devOptional": true, "license": "MIT", - "peer": true, "dependencies": { "csstype": "^3.2.2" } @@ -1743,7 +1740,6 @@ "integrity": "sha512-IgSWvLobTDOjnaxAfDTIHaECbkNlAlKv2j5SjpB2v7QHKv1FIfjwMy8FsDbVfDX/KjmCmYICcw7uGaXLhtsLNg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.56.0", "@typescript-eslint/types": "8.56.0", @@ -2008,7 +2004,6 @@ "integrity": "sha512-UVJyE9MttOsBQIDKw1skb9nAwQuR5wuGD3+82K6JgJlm/Y+KI92oNsMNGZCYdDsVtRHSak0pcV5Dno5+4jh9sw==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -2117,7 +2112,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -2491,7 +2485,6 @@ "integrity": "sha512-LEyamqS7W5HB3ujJyvi0HQK/dtVINZvd5mAAp9eT5S/ujByGjiZLCzPcHVzuXbpJDJF/cxwHlfceVUDZ2lnSTw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -2911,7 +2904,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "@babel/runtime": "^7.28.4" }, @@ -3307,7 +3299,6 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -3369,7 +3360,6 @@ "resolved": "https://registry.npmjs.org/react/-/react-19.2.4.tgz", "integrity": "sha512-9nfp2hYpCwOjAN+8TZFGhtWEwgvWHXqESH8qT89AT/lWklpLON22Lc8pEtnpsZz7VmawabSU0gCjnj8aC0euHQ==", "license": "MIT", - "peer": true, "engines": { "node": ">=0.10.0" } @@ -3379,7 +3369,6 @@ "resolved": "https://registry.npmjs.org/react-dom/-/react-dom-19.2.4.tgz", "integrity": "sha512-AXJdLo8kgMbimY95O2aKQqsz2iWi9jMgKJhRBAxECE4IFxfcazB2LmzloIoibJI3C12IlY20+KFaLv+71bUJeQ==", "license": "MIT", - "peer": true, "dependencies": { "scheduler": "^0.27.0" }, @@ -3392,7 +3381,6 @@ "resolved": "https://registry.npmjs.org/react-hook-form/-/react-hook-form-7.71.1.tgz", "integrity": "sha512-9SUJKCGKo8HUSsCO+y0CtqkqI5nNuaDqTxyqPsZPqIwudpj4rCrAz/jZV+jn57bx5gtZKOh3neQu94DXMc+w5w==", "license": "MIT", - "peer": true, "engines": { "node": ">=18.0.0" }, @@ -3443,7 +3431,6 @@ "resolved": "https://registry.npmjs.org/react-redux/-/react-redux-9.2.0.tgz", "integrity": "sha512-ROY9fvHhwOD9ySfrF0wmvu//bKCQ6AeZZq1nJNtbDC+kk5DuSuNX/n6YWYF/SYy7bSba4D4FSz8DJeKY/S/r+g==", "license": "MIT", - "peer": true, "dependencies": { "@types/use-sync-external-store": "^0.0.6", "use-sync-external-store": "^1.4.0" @@ -3538,8 +3525,7 @@ "version": "5.0.1", "resolved": "https://registry.npmjs.org/redux/-/redux-5.0.1.tgz", "integrity": "sha512-M9/ELqF6fy8FwmkpnF0S3YKOqMyoWJ4+CS5Efg2ct3oY9daQvd/Pc71FpGZsVsbl3Cpb+IIcjBDUnnyBdQbq4w==", - "license": "MIT", - "peer": true + "license": "MIT" }, "node_modules/redux-thunk": { "version": "3.1.0", @@ -3747,7 +3733,6 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "devOptional": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -3865,7 +3850,6 @@ "integrity": "sha512-w+N7Hifpc3gRjZ63vYBXA56dvvRlNWRczTdmCBBa+CotUzAPf5b7YMdMR/8CQoeYE5LX3W4wj6RYTgonm1b9DA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.27.0", "fdir": "^6.5.0", @@ -3995,7 +3979,6 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-4.3.6.tgz", "integrity": "sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg==", "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/webui/src/features/platforms/PlatformDetailPage.tsx b/webui/src/features/platforms/PlatformDetailPage.tsx index 0672aa42..0ad245d4 100644 --- a/webui/src/features/platforms/PlatformDetailPage.tsx +++ b/webui/src/features/platforms/PlatformDetailPage.tsx @@ -368,6 +368,23 @@ export function PlatformDetailPage() { +
+ + + {editForm.formState.errors.max_retries?.message ? ( +

{t(editForm.formState.errors.max_retries.message)}

+ ) : null} +
+
+
+ + + {createForm.formState.errors.max_retries?.message ? ( +

{t(createForm.formState.errors.max_retries.message)}

+ ) : null} +
+
+
+ {t("重试次数")} +

{detailLog.retry_attempts || "-"}

+
@@ -934,59 +948,120 @@ export function RequestLogsPage() {

{t("诊断")}

{t("异常排查与连接状态分析。")}

-
- {(detailLog.resin_error || detailLog.upstream_stage || detailLog.upstream_err_kind || detailLog.upstream_errno || detailLog.upstream_err_msg) ? ( - - - {detailLog.resin_error ? ( - - - - - ) : null} - {detailLog.upstream_stage ? ( - - - - - ) : null} - {detailLog.upstream_err_kind ? ( - - - - - ) : null} - {detailLog.upstream_errno ? ( - - - - - ) : null} - {detailLog.upstream_err_msg ? ( - - - - - ) : null} - -
{t("Resin 错误:")}{detailLog.resin_error}
{t("失败阶段:")}{detailLog.upstream_stage}
{t("错误类型:")}{detailLog.upstream_err_kind}
Errno:{detailLog.upstream_errno}
{t("错误详情:")}{detailLog.upstream_err_msg}
- ) : null} - {!detailLog.resin_error && !detailLog.upstream_stage && !detailLog.upstream_err_kind && !detailLog.upstream_err_msg ? ( -
- - {t("当前请求未产生异常诊断信息")} + {(() => { + const hasRetryDetails = detailLog.retry_details && detailLog.retry_details.length > 0; + const hasFinalError = detailLog.resin_error || detailLog.upstream_stage || detailLog.upstream_err_kind || detailLog.upstream_errno || detailLog.upstream_err_msg; + + if (!hasFinalError && !hasRetryDetails) { + return ( +
+
+ + {t("当前请求未产生异常诊断信息")} +
+
+ ); + } + + return ( +
+ {hasRetryDetails && detailLog.retry_details!.map((rd, idx) => ( +
+
+ {t(`第 ${idx + 1} 次尝试`)} + {t("失败")} +
+ + + + + + + {rd.err_kind && ( + + + + + )} + {rd.err_msg && ( + + + + + )} + +
{t("节点:")}{rd.node_tag || rd.node_hash || "-"}
{t("错误类型:")}{rd.err_kind}
{t("错误详情:")}{rd.err_msg}
+
+ ))} + + {hasFinalError && ( +
+ {hasRetryDetails && ( +
+ {t("最终结果")} +
+ )} + + + {detailLog.resin_error ? ( + + + + + ) : null} + {detailLog.upstream_stage ? ( + + + + + ) : null} + {detailLog.upstream_err_kind ? ( + + + + + ) : null} + {detailLog.upstream_errno ? ( + + + + + ) : null} + {detailLog.upstream_err_msg ? ( + + + + + ) : null} + +
{t("Resin 错误:")}{detailLog.resin_error}
{t("失败阶段:")}{detailLog.upstream_stage}
{t("错误类型:")}{detailLog.upstream_err_kind}
Errno:{detailLog.upstream_errno}
{t("错误详情:")}{detailLog.upstream_err_msg}
+
+ )}
- ) : null} -
+ ); + })()}
diff --git a/webui/src/features/requestLogs/types.ts b/webui/src/features/requestLogs/types.ts index c49bde55..fbe178ea 100644 --- a/webui/src/features/requestLogs/types.ts +++ b/webui/src/features/requestLogs/types.ts @@ -1,3 +1,10 @@ +export type RetryDetail = { + node_hash: string; + node_tag: string; + err_kind: string; + err_msg: string; +}; + export type RequestLogItem = { id: string; ts: string; @@ -31,6 +38,8 @@ export type RequestLogItem = { req_body_truncated: boolean; resp_headers_truncated: boolean; resp_body_truncated: boolean; + retry_attempts: number; + retry_details: RetryDetail[] | null; }; export type RequestLogListResponse = { diff --git a/webui/src/i18n/translations.ts b/webui/src/i18n/translations.ts index 033e153d..6a71381c 100644 --- a/webui/src/i18n/translations.ts +++ b/webui/src/i18n/translations.ts @@ -371,6 +371,14 @@ const EXACT_ZH_TO_EN: Record = { "节点的网络出口、探测状态以及失败历史。": "Node network egress, probe status, and failure history.", "节点分配策略": "Node allocation policy", + "失败自动重试次数": "Max retries on failure", + "不能为负数": "Must not be negative", + "重试 {n} 次": "Retried {n}x", + "请求经过 {n} 次重试后成功": "Request succeeded after {n} retry attempt(s)", + "请求经过 {n} 次重试后仍然失败": "Request failed after {n} retry attempt(s)", + "第 {n} 次尝试": "Attempt #{n}", + "最终结果": "Final result", + "节点:": "Node:", "节点延迟分布": "Node latency distribution", "节点延迟分布(实时快照)": "Node latency distribution (realtime snapshot)", "节点延迟最大测试间隔": "Max node latency probe interval",