diff --git a/.github/workflows/dev-build.yml b/.github/workflows/dev-build.yml new file mode 100644 index 00000000..55e3feda --- /dev/null +++ b/.github/workflows/dev-build.yml @@ -0,0 +1,193 @@ +name: Dev Build + +on: + workflow_dispatch: + inputs: + version_suffix: + description: 'Version suffix (leave empty for auto: dev-YYYYMMDD-sha8)' + required: false + type: string + +concurrency: + group: dev-build-${{ github.ref }} + cancel-in-progress: true + +jobs: + build-binaries: + name: Build Binaries + runs-on: ubuntu-latest + permissions: + contents: read + strategy: + matrix: + include: + - goos: linux + goarch: amd64 + - goos: linux + goarch: arm64 + - goos: darwin + goarch: amd64 + - goos: darwin + goarch: arm64 + - goos: windows + goarch: amd64 + steps: + - name: Checkout Code + uses: actions/checkout@v4 + + - name: Compute Version + id: version + run: | + SUFFIX="${{ inputs.version_suffix }}" + if [ -z "$SUFFIX" ]; then + # Auto-generate: dev-YYYYMMDD-sha8 + VERSION="dev-$(date -u +'%Y%m%d')-${GITHUB_SHA::8}" + elif [[ "$SUFFIX" == dev-* ]]; then + # Already has dev- prefix + VERSION="$SUFFIX" + else + VERSION="dev-$SUFFIX" + fi + echo "version=$VERSION" >> $GITHUB_OUTPUT + echo "Version: $VERSION" + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: 22 + + - name: Build WebUI + working-directory: ./webui + run: | + npm ci + npm run build + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: '1.25.x' + + - name: Build Go Binary + env: + GOOS: ${{ matrix.goos }} + GOARCH: ${{ matrix.goarch }} + CGO_ENABLED: 0 + run: | + VERSION=${{ steps.version.outputs.version }} + GIT_COMMIT=${GITHUB_SHA::8} + BUILD_TIME=$(date -u +'%Y-%m-%dT%H:%M:%SZ') + + OUTPUT_NAME=resin-${GOOS}-${GOARCH} + if [ "$GOOS" = "windows" ]; then + OUTPUT_NAME="${OUTPUT_NAME}.exe" + fi + echo "OUTPUT_NAME=${OUTPUT_NAME}" >> $GITHUB_ENV + + mkdir -p build + + go build -trimpath -tags "with_quic with_wireguard with_grpc with_utls with_embedded_tor with_naive_outbound" \ + -ldflags="-s -w \ + -X github.com/Resinat/Resin/internal/buildinfo.Version=${VERSION} \ + -X github.com/Resinat/Resin/internal/buildinfo.GitCommit=${GIT_COMMIT} \ + -X github.com/Resinat/Resin/internal/buildinfo.BuildTime=${BUILD_TIME}" \ + -o build/${OUTPUT_NAME} ./cmd/resin + + cd build + + SIMPLE_NAME="resin" + if [ "$GOOS" = "windows" ]; then + SIMPLE_NAME="resin.exe" + fi + cp ${OUTPUT_NAME} ${SIMPLE_NAME} + + if [ "$GOOS" = "windows" ]; then + zip resin-${GOOS}-${GOARCH}.zip ${SIMPLE_NAME} + PACKAGE_NAME="resin-${GOOS}-${GOARCH}.zip" + else + tar -czvf resin-${GOOS}-${GOARCH}.tar.gz ${SIMPLE_NAME} + PACKAGE_NAME="resin-${GOOS}-${GOARCH}.tar.gz" + fi + + rm ${SIMPLE_NAME} + echo "PACKAGE_NAME=${PACKAGE_NAME}" >> $GITHUB_ENV + + - name: Upload Release Package Artifact + uses: actions/upload-artifact@v4 + with: + name: dev-release-${{ matrix.goos }}-${{ matrix.goarch }} + path: build/${{ env.PACKAGE_NAME }} + retention-days: 7 + + - name: Upload Linux bin for Docker + if: matrix.goos == 'linux' + uses: actions/upload-artifact@v4 + with: + name: dev-binary-${{ matrix.goos }}-${{ matrix.goarch }} + path: build/${{ env.OUTPUT_NAME }} + retention-days: 1 + + docker: + name: Build & Push Docker Image + runs-on: ubuntu-latest + needs: build-binaries + permissions: + contents: read + packages: write + steps: + - name: Checkout Code + uses: actions/checkout@v4 + + - name: Compute Version + id: version + run: | + SUFFIX="${{ inputs.version_suffix }}" + if [ -z "$SUFFIX" ]; then + VERSION="dev-$(date -u +'%Y%m%d')-${GITHUB_SHA::8}" + elif [[ "$SUFFIX" == dev-* ]]; then + VERSION="$SUFFIX" + else + VERSION="dev-$SUFFIX" + fi + echo "version=$VERSION" >> $GITHUB_OUTPUT + + - name: Download Linux amd64 binary + uses: actions/download-artifact@v4 + with: + name: dev-binary-linux-amd64 + path: release-bin/linux/amd64/ + + - name: Download Linux arm64 binary + uses: actions/download-artifact@v4 + with: + name: dev-binary-linux-arm64 + path: release-bin/linux/arm64/ + + - name: Give binaries execute permission + run: chmod +x release-bin/linux/amd64/resin-linux-amd64 release-bin/linux/arm64/resin-linux-arm64 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Lowercase repository name + run: echo "REPO_LC=${GITHUB_REPOSITORY,,}" >> $GITHUB_ENV + + - name: Login to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + context: . + file: ./.github/Dockerfile.release + push: true + platforms: linux/amd64,linux/arm64 + tags: | + ghcr.io/${{ env.REPO_LC }}:${{ steps.version.outputs.version }} + ghcr.io/${{ env.REPO_LC }}:dev-latest diff --git a/.gitignore b/.gitignore index 62dc1ff5..8d5fe5ad 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,12 @@ /resin sing-box-reference .agents +.ace-tool/ +.claude/ +.cursor/ +.trellis/ +AGENTS.md +data/ +start.sh .devcontainer -start-instance.sh \ No newline at end of file +start-instance.sh diff --git a/DESIGN.md b/DESIGN.md index dd51b623..47da2beb 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -281,12 +281,13 @@ No available proxy nodes * 职责:维护当前 Platform *此刻* 可用的节点列表。 * 特质:支持 O(1) 的随机选取与 O(1) 的增删查。 * 过滤条件: - 1. 节点状态正常(非 Circuit Break)。 - 2. 调用 `NodeEntry.MatchRegexs(Platform.RegexFilters)` 判断 Tag 是否匹配。 - 3. 节点必须有出口 IP(无论 Platform 是否配置 `RegionFilters`)。 - 4. 若 `RegionFilters` 非空,则节点出口 IP 地区必须符合 `RegionFilters`。 - 5. 有至少一条延迟信息。 - 6. Outbound 不为空 + 1. 节点未被管理员手动禁用(`NodeEntry.ManuallyDisabled == false`)。命中即整体短路,后续条件不再评估。 + 2. 节点状态正常(非 Circuit Break)。 + 3. 调用 `NodeEntry.MatchRegexs(Platform.RegexFilters)` 判断 Tag 是否匹配。 + 4. 节点必须有出口 IP(无论 Platform 是否配置 `RegionFilters`)。 + 5. 若 `RegionFilters` 非空,则节点出口 IP 地区必须符合 `RegionFilters`。 + 6. 有至少一条延迟信息。 + 7. Outbound 不为空 * 过滤源:遍历全局节点池中的所有 `NodeEntry`。 ##### Platform 节点视图动态更新 @@ -300,6 +301,7 @@ Platform 应该向外提供一个脏更新的接口,用来通知脏节点。 * 出口 IP 变更:当 `ProbeManager` 探测到节点出口 IP 发生变化(或从无到有)。属于脏更新。 * 节点引用变更:当节点的 SubscriptionIDs 发生变化,可能会影响 MatchRegexs 的结果(因为 Tag 集合变了)。属于脏更新。 * 熔断触发 / 恢复:属于脏更新。 + * 管理员手动禁用 / 启用:属于脏更新。`Pool.SetNodeManualDisable` 翻位后立即触发 `notifyAllPlatformsDirty`。禁用时随后调用 `Router.DeleteLeasesByNode` 解绑该节点所有 sticky lease。 * Platform 过滤器配置变更:全量重建。 ### 订阅 @@ -593,7 +595,7 @@ Resin 项目中所有的数据库都设计为单写,不会有多进程写入 #### cache.db * nodes_static(hash PK, raw_options_json, created_at_ns) -* nodes_dynamic(hash PK, failure_count, circuit_open_since, egress_ip, egress_updated_at_ns, last_latency_probe_attempt_ns, last_authority_latency_probe_attempt_ns, last_egress_update_attempt_ns) +* nodes_dynamic(hash PK, failure_count, circuit_open_since, egress_ip, egress_updated_at_ns, last_latency_probe_attempt_ns, last_authority_latency_probe_attempt_ns, last_egress_update_attempt_ns, manually_disabled) * node_latency(node_hash, domain, ewma_ns, last_updated_ns, PK(node_hash,domain))。 * leases(platform_id, account, node_hash, egress_ip, expiry_ns, last_accessed_ns, PK(platform_id,account))。 * subscription_nodes(subscription_id, node_hash, tags_json, PK(subscription_id,node_hash)) @@ -1630,6 +1632,8 @@ Query: * `region`:hk/us/...(可选) * `circuit_open`:true|false(可选) * `has_outbound`:true|false(可选) +* `enabled`:true|false(可选),按"订阅维度"启用状态过滤 +* `manually_disabled`:true|false(可选),按"管理员手动禁用"标记过滤 * `egress_ip`:IP 地址(可选) * `probed_since`:RFC3339Nano(可选),按节点 `LastLatencyProbeAttempt` 过滤 * `sort_by`:排序字段(可选) @@ -1650,6 +1654,8 @@ Response: "node_hash": "9f2c0b1a6d3e4f5c8a9b0c1d2e3f4a5b", "created_at": "2026-02-10T12:00:00Z", + "enabled": true, + "manually_disabled": false, "has_outbound": true, "last_error": "...", "circuit_open_since": null, @@ -1724,6 +1730,60 @@ Response: 返回 LatencyTestURL 的站点在 TD-EWMA 后的延迟 `latency_ewma_ms`。 +#### 手动禁用节点(Action) + +**POST** `/nodes/{node_hash}/actions/disable` + +请求体:无。 + +效果: + +* 在节点上置位"管理员手动禁用"标记,节点立即从所有 Platform 的可路由视图中移除;后续 `RouteRequest` 不会再分配到该节点(即使存在 sticky lease 的旧绑定也会因下一次评估失败被替换)。 +* 同步遍历所有 Platform,将该节点上的全部 sticky leases 解绑(发出 `LeaseRemove` 事件以驱动 `leases` 表删除),并把解绑数量回传给调用方。 +* 标记位通过 `nodes_dynamic.manually_disabled` 列持久化,重启后恢复。 + +校验规则: + +* `node_hash` 必须为 32 位十六进制字符串(大小写均可)。 + +错误码映射(最小集): + +* `400 INVALID_ARGUMENT`:`node_hash` 格式非法。 +* `404 NOT_FOUND`:节点不存在。 + +返回: + +```json +{ "released_lease_count": 3 } +``` + +#### 启用节点(Action) + +**POST** `/nodes/{node_hash}/actions/enable` + +请求体:无。 + +效果: + +* 清除"管理员手动禁用"标记,节点会在下一次 Platform 视图评估中重新被纳入候选集合。不影响任何已有 lease,也不会主动创建新 lease。 + +校验规则: + +* `node_hash` 必须为 32 位十六进制字符串(大小写均可)。 + +错误码映射(最小集): + +* `400 INVALID_ARGUMENT`:`node_hash` 格式非法。 +* `404 NOT_FOUND`:节点不存在。 + +返回: + +```json +{ "released_lease_count": 0 } +``` + +`released_lease_count` 始终为 0,仅保持与 `disable` 同形以方便前端复用。 + ### Leases #### Lease 模型 diff --git a/cmd/resin/main.go b/cmd/resin/main.go index 11263158..5d40902b 100644 --- a/cmd/resin/main.go +++ b/cmd/resin/main.go @@ -541,6 +541,7 @@ func newFlushReaders( LastLatencyProbeAttemptNs: entry.LastLatencyProbeAttempt.Load(), LastAuthorityLatencyProbeAttemptNs: entry.LastAuthorityLatencyProbeAttempt.Load(), LastEgressUpdateAttemptNs: entry.LastEgressUpdateAttempt.Load(), + ManuallyDisabled: entry.IsManuallyDisabled(), } }, ReadNodeLatency: func(key model.NodeLatencyKey) *model.NodeLatency { @@ -871,6 +872,7 @@ func restoreBootstrapNodeDynamics( entry.LastLatencyProbeAttempt.Store(nd.LastLatencyProbeAttemptNs) entry.LastAuthorityLatencyProbeAttempt.Store(nd.LastAuthorityLatencyProbeAttemptNs) entry.LastEgressUpdateAttempt.Store(nd.LastEgressUpdateAttemptNs) + entry.ManuallyDisabled.Store(nd.ManuallyDisabled) if nd.EgressIP != "" { if ip, err := netip.ParseAddr(nd.EgressIP); err == nil { entry.SetEgressIP(ip) diff --git a/internal/api/handler_data.go b/internal/api/handler_data.go new file mode 100644 index 00000000..4de0f2ae --- /dev/null +++ b/internal/api/handler_data.go @@ -0,0 +1,42 @@ +package api + +import ( + "net/http" + "time" + + "github.com/Resinat/Resin/internal/service" +) + +// HandleExportData returns a handler for GET /api/v1/data/export. +func HandleExportData(cp *service.ControlPlaneService) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + payload, err := cp.ExportData() + if err != nil { + writeServiceError(w, err) + return + } + + filename := "resin-export-" + time.Now().UTC().Format("20060102-150405") + ".json" + w.Header().Set("Content-Disposition", "attachment; filename=\""+filename+"\"") + WriteJSON(w, http.StatusOK, payload) + } +} + +// HandleImportData returns a handler for POST /api/v1/data/import. +func HandleImportData(cp *service.ControlPlaneService) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var payload service.ExportPayload + if err := DecodeBody(r, &payload); err != nil { + writeDecodeBodyError(w, err) + return + } + + strategy := r.URL.Query().Get("strategy") + result, err := cp.ImportData(payload, strategy) + if err != nil { + writeServiceError(w, err) + return + } + WriteJSON(w, http.StatusOK, result) + } +} diff --git a/internal/api/handler_lease.go b/internal/api/handler_lease.go index 15757a99..161e4ca2 100644 --- a/internal/api/handler_lease.go +++ b/internal/api/handler_lease.go @@ -19,6 +19,12 @@ func validateAccountPath(r *http.Request) (string, error) { func leaseSortKey(sortBy string, l service.LeaseResponse) string { switch sortBy { + case "node_tag": + return l.NodeTag + case "egress_ip": + return l.EgressIP + case "created_at": + return l.CreatedAt case "expiry": return l.Expiry case "last_accessed": @@ -92,7 +98,7 @@ func HandleListLeases(cp *service.ControlPlaneService) http.HandlerFunc { leases = filtered } - sorting, ok := parseSortingOrWriteInvalid(w, r, []string{"account", "expiry", "last_accessed"}, "expiry", "asc") + sorting, ok := parseSortingOrWriteInvalid(w, r, []string{"account", "node_tag", "egress_ip", "created_at", "expiry", "last_accessed"}, "expiry", "asc") if !ok { return } @@ -164,6 +170,35 @@ func HandleDeleteAllLeases(cp *service.ControlPlaneService) http.HandlerFunc { } } +// HandleBindLease returns a handler for PUT /api/v1/platforms/{id}/leases/{account}. +func HandleBindLease(cp *service.ControlPlaneService) http.HandlerFunc { + type bindRequest struct { + NodeHash string `json:"node_hash"` + } + return func(w http.ResponseWriter, r *http.Request) { + platformID, ok := requireUUIDPathParam(w, r, "id", "platform_id") + if !ok { + return + } + account, err := validateAccountPath(r) + if err != nil { + writeServiceError(w, err) + return + } + var req bindRequest + if err := DecodeBody(r, &req); err != nil { + writeDecodeBodyError(w, err) + return + } + lease, err := cp.BindLease(platformID, account, req.NodeHash) + if err != nil { + writeServiceError(w, err) + return + } + WriteJSON(w, http.StatusOK, lease) + } +} + // HandleIPLoad returns a handler for GET /api/v1/platforms/{id}/ip-load. func HandleIPLoad(cp *service.ControlPlaneService) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/api/handler_node.go b/internal/api/handler_node.go index 53809b3c..dfec9a5f 100644 --- a/internal/api/handler_node.go +++ b/internal/api/handler_node.go @@ -42,6 +42,8 @@ func compareNodeSummaries(sortBy string, a, b service.NodeSummary) int { order = cmp.Compare(a.FailureCount, b.FailureCount) case "region": order = strings.Compare(a.Region, b.Region) + case "lease_count": + order = cmp.Compare(a.LeaseCount, b.LeaseCount) default: order = strings.Compare(nodeTagSortKey(a), nodeTagSortKey(b)) } @@ -137,6 +139,12 @@ func HandleListNodes(cp *service.ControlPlaneService) http.HandlerFunc { } filters.Enabled = enabled + manuallyDisabled, ok := parseBoolQueryOrWriteInvalid(w, r, "manually_disabled") + if !ok { + return + } + filters.ManuallyDisabled = manuallyDisabled + if v := q.Get("probed_since"); v != "" { t, err := time.Parse(time.RFC3339Nano, v) if err != nil { @@ -152,7 +160,7 @@ func HandleListNodes(cp *service.ControlPlaneService) http.HandlerFunc { return } - sorting, ok := parseSortingOrWriteInvalid(w, r, []string{"tag", "created_at", "failure_count", "region"}, "tag", "asc") + sorting, ok := parseSortingOrWriteInvalid(w, r, []string{"tag", "created_at", "failure_count", "region", "lease_count"}, "tag", "asc") if !ok { return } @@ -211,3 +219,56 @@ func HandleProbeLatency(cp *service.ControlPlaneService) http.HandlerFunc { WriteJSON(w, http.StatusOK, result) } } + +// HandleListNodeLeases returns a handler for GET /api/v1/nodes/{hash}/leases. +// It returns every lease currently bound to the node; pass platform_id=... to +// scope the result to a single platform. +func HandleListNodeLeases(cp *service.ControlPlaneService) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + hash := PathParam(r, "hash") + platformID, ok := parseOptionalUUIDQuery(w, r, "platform_id", "platform_id") + if !ok { + return + } + pid := "" + if platformID != nil { + pid = *platformID + } + leases, err := cp.ListLeasesByNode(hash, pid) + if err != nil { + writeServiceError(w, err) + return + } + WriteJSON(w, http.StatusOK, leases) + } +} + +// HandleDisableNode returns a handler for POST /api/v1/nodes/{hash}/actions/disable. +// Disabling a node removes it from all platform views and releases every lease +// currently bound to it; the released count is returned in the response. +func HandleDisableNode(cp *service.ControlPlaneService) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + hash := PathParam(r, "hash") + result, err := cp.SetNodeManualDisable(hash, true) + if err != nil { + writeServiceError(w, err) + return + } + WriteJSON(w, http.StatusOK, result) + } +} + +// HandleEnableNode returns a handler for POST /api/v1/nodes/{hash}/actions/enable. +// Enabling a node clears the admin disable flag so it can be selected again on +// the next routing decision. No leases are touched on enable. +func HandleEnableNode(cp *service.ControlPlaneService) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + hash := PathParam(r, "hash") + result, err := cp.SetNodeManualDisable(hash, false) + if err != nil { + writeServiceError(w, err) + return + } + WriteJSON(w, http.StatusOK, result) + } +} diff --git a/internal/api/server.go b/internal/api/server.go index fa070244..4b3f4aac 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -89,6 +89,7 @@ func NewServerWithAddress( authed.Handle("GET /api/v1/platforms/{id}/leases", HandleListLeases(cp)) authed.Handle("DELETE /api/v1/platforms/{id}/leases", HandleDeleteAllLeases(cp)) authed.Handle("GET /api/v1/platforms/{id}/leases/{account}", HandleGetLease(cp)) + authed.Handle("PUT /api/v1/platforms/{id}/leases/{account}", HandleBindLease(cp)) authed.Handle("DELETE /api/v1/platforms/{id}/leases/{account}", HandleDeleteLease(cp)) authed.Handle("GET /api/v1/platforms/{id}/ip-load", HandleIPLoad(cp)) @@ -111,14 +112,21 @@ func NewServerWithAddress( // Nodes. authed.Handle("GET /api/v1/nodes", HandleListNodes(cp)) authed.Handle("GET /api/v1/nodes/{hash}", HandleGetNode(cp)) + authed.Handle("GET /api/v1/nodes/{hash}/leases", HandleListNodeLeases(cp)) authed.Handle("POST /api/v1/nodes/{hash}/actions/probe-egress", HandleProbeEgress(cp)) authed.Handle("POST /api/v1/nodes/{hash}/actions/probe-latency", HandleProbeLatency(cp)) + authed.Handle("POST /api/v1/nodes/{hash}/actions/disable", HandleDisableNode(cp)) + authed.Handle("POST /api/v1/nodes/{hash}/actions/enable", HandleEnableNode(cp)) // GeoIP. authed.Handle("GET /api/v1/geoip/status", HandleGeoIPStatus(cp)) authed.Handle("GET /api/v1/geoip/lookup", HandleGeoIPLookup(cp)) authed.Handle("POST /api/v1/geoip/lookup", HandleGeoIPLookupPost(cp)) authed.Handle("POST /api/v1/geoip/actions/update-now", HandleGeoIPUpdate(cp)) + + // Data export / import. + authed.Handle("GET /api/v1/data/export", HandleExportData(cp)) + authed.Handle("POST /api/v1/data/import", HandleImportData(cp)) } // Request log endpoints (always registered if repo is available). diff --git a/internal/model/models.go b/internal/model/models.go index 04afa608..8e1396b9 100644 --- a/internal/model/models.go +++ b/internal/model/models.go @@ -58,6 +58,7 @@ type NodeDynamic struct { LastLatencyProbeAttemptNs int64 `json:"last_latency_probe_attempt_ns"` LastAuthorityLatencyProbeAttemptNs int64 `json:"last_authority_latency_probe_attempt_ns"` LastEgressUpdateAttemptNs int64 `json:"last_egress_update_attempt_ns"` + ManuallyDisabled bool `json:"manually_disabled"` } // NodeLatency holds per-domain latency statistics for a node. diff --git a/internal/node/entry.go b/internal/node/entry.go index 692fdd7f..befb4c39 100644 --- a/internal/node/entry.go +++ b/internal/node/entry.go @@ -43,6 +43,10 @@ type NodeEntry struct { LastEgressUpdateAttempt atomic.Int64 LatencyTable *LatencyTable // per-domain latency stats; nil if not initialized + // Admin-controlled disable flag. When true, the node is excluded from all + // platform views regardless of subscription state. Toggled via the API. + ManuallyDisabled atomic.Bool + // Outbound instance for this node. Outbound atomic.Pointer[adapter.Outbound] } @@ -201,6 +205,14 @@ func (e *NodeEntry) IsCircuitOpen() bool { return e.CircuitOpenSince.Load() != 0 } +// IsManuallyDisabled reports whether an admin has flagged this node as disabled. +func (e *NodeEntry) IsManuallyDisabled() bool { + if e == nil { + return false + } + return e.ManuallyDisabled.Load() +} + // HasLatency returns true if the node has at least one latency record. func (e *NodeEntry) HasLatency() bool { return e.LatencyTable != nil && e.LatencyTable.Size() > 0 diff --git a/internal/platform/platform.go b/internal/platform/platform.go index 657aacce..45227238 100644 --- a/internal/platform/platform.go +++ b/internal/platform/platform.go @@ -113,6 +113,11 @@ func (p *Platform) evaluateNode( subLookup node.SubLookupFunc, geoLookup GeoLookupFunc, ) bool { + // -1. Admin manually disabled — short-circuit before any other check. + if entry.IsManuallyDisabled() { + return false + } + // 0. Disabled nodes are never routable. if entry.IsDisabledBySubscriptions(subLookup) { return false diff --git a/internal/routing/router.go b/internal/routing/router.go index 76cb70fd..21fda080 100644 --- a/internal/routing/router.go +++ b/internal/routing/router.go @@ -261,6 +261,16 @@ func (r *Router) tryLeaseHit( newLease := current newLease.LastAccessedNs = nowNs + + // Auto-renew: extend lease when within 1 minute of expiry. + if newLease.ExpiryNs-nowNs < int64(time.Minute) { + ttl := plat.StickyTTLNs + if ttl <= 0 { + ttl = int64(24 * time.Hour) + } + newLease.ExpiryNs = nowNs + ttl + } + r.emitLeaseEvent(LeaseEvent{ Type: LeaseTouch, PlatformID: plat.ID, @@ -297,6 +307,16 @@ func (r *Router) tryLeaseSameIPRotation( newLease := current newLease.NodeHash = bestHash newLease.LastAccessedNs = nowNs + + // Auto-renew: extend lease when within 1 minute of expiry. + if newLease.ExpiryNs-nowNs < int64(time.Minute) { + ttl := plat.StickyTTLNs + if ttl <= 0 { + ttl = int64(24 * time.Hour) + } + newLease.ExpiryNs = nowNs + ttl + } + r.emitLeaseEvent(LeaseEvent{ Type: LeaseReplace, PlatformID: plat.ID, @@ -586,6 +606,52 @@ func (r *Router) RangeLeases(platformID string, fn func(account string, lease Le return true } +// RangeAllLeases iterates over all leases across every platform. +// fn receives the owning platform ID alongside the account/lease pair. +// Returning false from fn stops the iteration early. +func (r *Router) RangeAllLeases(fn func(platformID, account string, lease Lease) bool) { + r.states.Range(func(platformID string, state *PlatformRoutingState) bool { + keepGoing := true + state.Leases.Range(func(account string, lease Lease) bool { + if !fn(platformID, account, lease) { + keepGoing = false + return false + } + return true + }) + return keepGoing + }) +} + +// SnapshotNodeLoad returns a best-effort point-in-time count of leases per +// node hash for a single platform. Empty map if the platform has no state. +func (r *Router) SnapshotNodeLoad(platformID string) map[node.Hash]int64 { + state, ok := r.states.Load(platformID) + if !ok { + return map[node.Hash]int64{} + } + out := make(map[node.Hash]int64) + state.Leases.Range(func(_ string, lease Lease) bool { + out[lease.NodeHash]++ + return true + }) + return out +} + +// SnapshotNodeLoadAll returns a best-effort point-in-time count of leases per +// node hash, aggregated across every platform. +func (r *Router) SnapshotNodeLoadAll() map[node.Hash]int64 { + out := make(map[node.Hash]int64) + r.states.Range(func(_ string, state *PlatformRoutingState) bool { + state.Leases.Range(func(_ string, lease Lease) bool { + out[lease.NodeHash]++ + return true + }) + return true + }) + return out +} + // DeleteLease removes a single lease by platform and account. // Returns true if a lease was deleted. Emits a LeaseRemove event. func (r *Router) DeleteLease(platformID, account string) bool { @@ -633,3 +699,33 @@ func (r *Router) DeleteAllLeases(platformID string) int { }) return count } + +// DeleteLeasesByNode removes every lease whose node_hash matches the target, +// across all platforms. Returns the total number of leases deleted. Emits a +// LeaseRemove event per deletion so persistence and metrics stay in sync. +func (r *Router) DeleteLeasesByNode(target node.Hash) int { + count := 0 + r.states.Range(func(platformID string, state *PlatformRoutingState) bool { + state.Leases.Range(func(account string, lease Lease) bool { + if lease.NodeHash != target { + return true + } + removed, deleted := state.Leases.DeleteLease(account) + if !deleted { + return true + } + r.emitLeaseEvent(LeaseEvent{ + Type: LeaseRemove, + PlatformID: platformID, + Account: account, + NodeHash: removed.NodeHash, + EgressIP: removed.EgressIP, + CreatedAtNs: removed.CreatedAtNs, + }) + count++ + return true + }) + return true + }) + return count +} diff --git a/internal/service/control_plane_data.go b/internal/service/control_plane_data.go new file mode 100644 index 00000000..dc295299 --- /dev/null +++ b/internal/service/control_plane_data.go @@ -0,0 +1,360 @@ +package service + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/Resinat/Resin/internal/platform" +) + +// ------------------------------------------------------------------ +// Export / Import data types +// ------------------------------------------------------------------ + +const exportVersion = 1 + +// ExportPlatformEntry is the portable representation of a platform. +type ExportPlatformEntry struct { + Name string `json:"name"` + StickyTTL string `json:"sticky_ttl"` + RegexFilters []string `json:"regex_filters"` + RegionFilters []string `json:"region_filters"` + AllocationPolicy string `json:"allocation_policy"` + ReverseProxyMissAction string `json:"reverse_proxy_miss_action"` + ReverseProxyEmptyAccountBehavior string `json:"reverse_proxy_empty_account_behavior"` + ReverseProxyFixedAccountHeader string `json:"reverse_proxy_fixed_account_header"` +} + +// ExportSubscriptionEntry is the portable representation of a subscription. +type ExportSubscriptionEntry struct { + Name string `json:"name"` + SourceType string `json:"source_type"` + URL string `json:"url"` + Content string `json:"content"` + UpdateInterval string `json:"update_interval"` + Enabled bool `json:"enabled"` + Ephemeral bool `json:"ephemeral"` + EphemeralNodeEvictDelay string `json:"ephemeral_node_evict_delay"` +} + +// ExportPayload is the top-level JSON structure for data export/import. +type ExportPayload struct { + Version int `json:"version"` + ExportedAt string `json:"exported_at"` + Platforms []ExportPlatformEntry `json:"platforms"` + Subscriptions []ExportSubscriptionEntry `json:"subscriptions"` +} + +// ImportResult summarises what happened during an import. +type ImportResult struct { + PlatformsCreated int `json:"platforms_created"` + PlatformsSkipped int `json:"platforms_skipped"` + PlatformsOverwritten int `json:"platforms_overwritten"` + SubscriptionsCreated int `json:"subscriptions_created"` + SubscriptionsSkipped int `json:"subscriptions_skipped"` + SubscriptionsOverwritten int `json:"subscriptions_overwritten"` + Errors []string `json:"errors"` +} + +// ------------------------------------------------------------------ +// Export +// ------------------------------------------------------------------ + +// ExportData builds an ExportPayload containing all user-created platforms +// and all subscriptions. +func (s *ControlPlaneService) ExportData() (*ExportPayload, error) { + // --- platforms (exclude Default) --- + platforms, err := s.Engine.ListPlatforms() + if err != nil { + return nil, internal("list platforms for export", err) + } + + exportPlatforms := make([]ExportPlatformEntry, 0, len(platforms)) + for _, p := range platforms { + if p.ID == platform.DefaultPlatformID { + continue + } + resp := platformToResponse(p) + exportPlatforms = append(exportPlatforms, ExportPlatformEntry{ + Name: resp.Name, + StickyTTL: resp.StickyTTL, + RegexFilters: resp.RegexFilters, + RegionFilters: resp.RegionFilters, + AllocationPolicy: resp.AllocationPolicy, + ReverseProxyMissAction: resp.ReverseProxyMissAction, + ReverseProxyEmptyAccountBehavior: resp.ReverseProxyEmptyAccountBehavior, + ReverseProxyFixedAccountHeader: resp.ReverseProxyFixedAccountHeader, + }) + } + + // --- subscriptions --- + subs, err := s.ListSubscriptions(nil) + if err != nil { + return nil, internal("list subscriptions for export", err) + } + + exportSubs := make([]ExportSubscriptionEntry, 0, len(subs)) + for _, sub := range subs { + exportSubs = append(exportSubs, ExportSubscriptionEntry{ + Name: sub.Name, + SourceType: sub.SourceType, + URL: sub.URL, + Content: sub.Content, + UpdateInterval: sub.UpdateInterval, + Enabled: sub.Enabled, + Ephemeral: sub.Ephemeral, + EphemeralNodeEvictDelay: sub.EphemeralNodeEvictDelay, + }) + } + + return &ExportPayload{ + Version: exportVersion, + ExportedAt: time.Now().UTC().Format(time.RFC3339), + Platforms: exportPlatforms, + Subscriptions: exportSubs, + }, nil +} + +// ------------------------------------------------------------------ +// Import +// ------------------------------------------------------------------ + +// ImportData imports platforms and subscriptions from the given payload. +// strategy must be "skip" (default) or "overwrite". +func (s *ControlPlaneService) ImportData(payload ExportPayload, strategy string) (*ImportResult, error) { + if strategy == "" { + strategy = "skip" + } + if strategy != "skip" && strategy != "overwrite" { + return nil, invalidArg("strategy must be 'skip' or 'overwrite'") + } + + result := &ImportResult{Errors: []string{}} + + // ----- import platforms ----- + s.importPlatforms(payload.Platforms, strategy, result) + + // ----- import subscriptions ----- + s.importSubscriptions(payload.Subscriptions, strategy, result) + + return result, nil +} + +func (s *ControlPlaneService) importPlatforms(entries []ExportPlatformEntry, strategy string, result *ImportResult) { + // Build existing name→id lookup. + existing, err := s.Engine.ListPlatforms() + if err != nil { + result.Errors = append(result.Errors, "failed to list existing platforms: "+err.Error()) + return + } + nameToID := make(map[string]string, len(existing)) + for _, p := range existing { + nameToID[p.Name] = p.ID + } + + // Detect duplicates inside the import payload itself. + seen := make(map[string]bool, len(entries)) + + for i, entry := range entries { + name := strings.TrimSpace(entry.Name) + if name == "" { + result.Errors = append(result.Errors, fmt.Sprintf("platforms[%d]: name is empty, skipped", i)) + continue + } + if seen[name] { + result.Errors = append(result.Errors, fmt.Sprintf("platforms[%d]: duplicate name %q in import payload, skipped", i, name)) + continue + } + seen[name] = true + + existingID, exists := nameToID[name] + if exists && strategy == "skip" { + result.PlatformsSkipped++ + continue + } + + if exists && strategy == "overwrite" { + // Overwrite: build a patch JSON and call UpdatePlatform. + patch := buildPlatformPatch(entry) + patchJSON, _ := json.Marshal(patch) + if _, err := s.UpdatePlatform(existingID, patchJSON); err != nil { + result.Errors = append(result.Errors, fmt.Sprintf("platform %q: overwrite failed: %v", name, err)) + continue + } + result.PlatformsOverwritten++ + continue + } + + // Create new platform. + req := buildCreatePlatformRequest(entry) + if _, err := s.CreatePlatform(req); err != nil { + result.Errors = append(result.Errors, fmt.Sprintf("platform %q: create failed: %v", name, err)) + continue + } + result.PlatformsCreated++ + } +} + +func (s *ControlPlaneService) importSubscriptions(entries []ExportSubscriptionEntry, strategy string, result *ImportResult) { + // Build existing name→id and url→id lookup tables. + existingSubs, err := s.ListSubscriptions(nil) + if err != nil { + result.Errors = append(result.Errors, "failed to list existing subscriptions: "+err.Error()) + return + } + nameToID := make(map[string]string, len(existingSubs)) + urlToID := make(map[string]string, len(existingSubs)) + for _, sub := range existingSubs { + nameToID[sub.Name] = sub.ID + if sub.URL != "" { + urlToID[sub.URL] = sub.ID + } + } + + // Detect duplicates inside the import payload itself. + seenName := make(map[string]bool, len(entries)) + seenURL := make(map[string]bool, len(entries)) + + for i, entry := range entries { + name := strings.TrimSpace(entry.Name) + if name == "" { + result.Errors = append(result.Errors, fmt.Sprintf("subscriptions[%d]: name is empty, skipped", i)) + continue + } + + // Internal dedup by name. + if seenName[name] { + result.Errors = append(result.Errors, fmt.Sprintf("subscriptions[%d]: duplicate name %q in import payload, skipped", i, name)) + continue + } + seenName[name] = true + + // Internal dedup by URL for remote subs. + url := strings.TrimSpace(entry.URL) + if entry.SourceType == "remote" && url != "" { + if seenURL[url] { + result.Errors = append(result.Errors, fmt.Sprintf("subscriptions[%d]: duplicate url %q in import payload, skipped", i, url)) + continue + } + seenURL[url] = true + } + + // Match against existing: first by name, then by URL. + existingID := "" + if id, ok := nameToID[name]; ok { + existingID = id + } else if entry.SourceType == "remote" && url != "" { + if id, ok := urlToID[url]; ok { + existingID = id + } + } + + if existingID != "" && strategy == "skip" { + result.SubscriptionsSkipped++ + continue + } + + if existingID != "" && strategy == "overwrite" { + patch := buildSubscriptionPatch(entry) + patchJSON, _ := json.Marshal(patch) + if _, err := s.UpdateSubscription(existingID, patchJSON); err != nil { + result.Errors = append(result.Errors, fmt.Sprintf("subscription %q: overwrite failed: %v", name, err)) + continue + } + result.SubscriptionsOverwritten++ + continue + } + + // Create new subscription. + req := buildCreateSubscriptionRequest(entry) + if _, err := s.CreateSubscription(req); err != nil { + result.Errors = append(result.Errors, fmt.Sprintf("subscription %q: create failed: %v", name, err)) + continue + } + result.SubscriptionsCreated++ + } +} + +// ------------------------------------------------------------------ +// helpers: build request structs from export entries +// ------------------------------------------------------------------ + +func buildCreatePlatformRequest(e ExportPlatformEntry) CreatePlatformRequest { + name := strings.TrimSpace(e.Name) + return CreatePlatformRequest{ + Name: &name, + StickyTTL: strPtr(e.StickyTTL), + RegexFilters: e.RegexFilters, + RegionFilters: e.RegionFilters, + AllocationPolicy: strPtr(e.AllocationPolicy), + ReverseProxyMissAction: strPtr(e.ReverseProxyMissAction), + ReverseProxyEmptyAccountBehavior: strPtr(e.ReverseProxyEmptyAccountBehavior), + ReverseProxyFixedAccountHeader: strPtr(e.ReverseProxyFixedAccountHeader), + } +} + +func buildPlatformPatch(e ExportPlatformEntry) map[string]any { + regexFilters := e.RegexFilters + if regexFilters == nil { + regexFilters = []string{} + } + regionFilters := e.RegionFilters + if regionFilters == nil { + regionFilters = []string{} + } + patch := map[string]any{ + "sticky_ttl": e.StickyTTL, + "regex_filters": regexFilters, + "region_filters": regionFilters, + "allocation_policy": e.AllocationPolicy, + "reverse_proxy_miss_action": e.ReverseProxyMissAction, + "reverse_proxy_empty_account_behavior": e.ReverseProxyEmptyAccountBehavior, + "reverse_proxy_fixed_account_header": e.ReverseProxyFixedAccountHeader, + } + return patch +} + +func buildCreateSubscriptionRequest(e ExportSubscriptionEntry) CreateSubscriptionRequest { + name := strings.TrimSpace(e.Name) + sourceType := e.SourceType + url := strings.TrimSpace(e.URL) + content := e.Content + enabled := e.Enabled + ephemeral := e.Ephemeral + return CreateSubscriptionRequest{ + Name: &name, + SourceType: &sourceType, + URL: &url, + Content: &content, + UpdateInterval: strPtr(e.UpdateInterval), + Enabled: &enabled, + Ephemeral: &ephemeral, + EphemeralNodeEvictDelay: strPtr(e.EphemeralNodeEvictDelay), + } +} + +func buildSubscriptionPatch(e ExportSubscriptionEntry) map[string]any { + patch := map[string]any{ + "name": strings.TrimSpace(e.Name), + "update_interval": e.UpdateInterval, + "enabled": e.Enabled, + "ephemeral": e.Ephemeral, + "ephemeral_node_evict_delay": e.EphemeralNodeEvictDelay, + } + if e.SourceType == "remote" { + patch["url"] = strings.TrimSpace(e.URL) + } + if e.SourceType == "local" { + patch["content"] = e.Content + } + return patch +} + +func strPtr(s string) *string { + if s == "" { + return nil + } + return &s +} diff --git a/internal/service/control_plane_leases.go b/internal/service/control_plane_leases.go index 1702e2ba..a93a4510 100644 --- a/internal/service/control_plane_leases.go +++ b/internal/service/control_plane_leases.go @@ -1,6 +1,7 @@ package service import ( + "sort" "strings" "time" @@ -20,6 +21,7 @@ type LeaseResponse struct { NodeHash string `json:"node_hash"` NodeTag string `json:"node_tag"` EgressIP string `json:"egress_ip"` + CreatedAt string `json:"created_at"` Expiry string `json:"expiry"` LastAccessed string `json:"last_accessed"` } @@ -31,6 +33,7 @@ func leaseToResponse(lease model.Lease, nodeTag string) LeaseResponse { NodeHash: lease.NodeHash, NodeTag: nodeTag, EgressIP: lease.EgressIP, + CreatedAt: time.Unix(0, lease.CreatedAtNs).UTC().Format(time.RFC3339Nano), Expiry: time.Unix(0, lease.ExpiryNs).UTC().Format(time.RFC3339Nano), LastAccessed: time.Unix(0, lease.LastAccessedNs).UTC().Format(time.RFC3339Nano), } @@ -63,6 +66,7 @@ func (s *ControlPlaneService) ListLeases(platformID string) ([]LeaseResponse, er Account: account, NodeHash: lease.NodeHash.Hex(), EgressIP: lease.EgressIP.String(), + CreatedAtNs: lease.CreatedAtNs, ExpiryNs: lease.ExpiryNs, LastAccessedNs: lease.LastAccessedNs, }, s.resolveLeaseNodeTag(lease.NodeHash))) @@ -148,12 +152,164 @@ func (s *ControlPlaneService) DeleteAllLeases(platformID string) error { return nil } +// BindLease binds (or rebinds) an account to a specific node on the given platform. +// The node must be routable on the platform. +func (s *ControlPlaneService) BindLease(platformID, account, nodeHashHex string) (*LeaseResponse, error) { + account = strings.TrimSpace(account) + if account == "" { + return nil, invalidArg("account: must be non-empty") + } + nodeHashHex = strings.TrimSpace(nodeHashHex) + h, err := node.ParseHex(nodeHashHex) + if err != nil { + return nil, invalidArg("node_hash: invalid format") + } + + plat, ok := s.Pool.GetPlatform(platformID) + if !ok { + return nil, notFound("platform not found") + } + + if !plat.View().Contains(h) { + return nil, notFound("node is not routable on this platform") + } + + entry, ok := s.Pool.GetEntry(h) + if !ok { + return nil, notFound("node not found") + } + egressIP := entry.GetEgressIP() + if !egressIP.IsValid() { + return nil, invalidArg("node has no egress IP") + } + + nowNs := time.Now().UnixNano() + ttlNs := plat.StickyTTLNs + if ttlNs <= 0 { + ttlNs = int64(24 * time.Hour) // default 24h + } + + ml := model.Lease{ + PlatformID: platformID, + Account: account, + NodeHash: h.Hex(), + EgressIP: egressIP.String(), + CreatedAtNs: nowNs, + ExpiryNs: nowNs + ttlNs, + LastAccessedNs: nowNs, + } + if err := s.Router.UpsertLease(ml); err != nil { + return nil, internal("bind lease", err) + } + + resp := leaseToResponse(ml, s.resolveLeaseNodeTag(h)) + return &resp, nil +} + // IPLoadEntry is the API response for IP load stats. type IPLoadEntry struct { EgressIP string `json:"egress_ip"` LeaseCount int64 `json:"lease_count"` } +// NodeLeaseResponse is the API response for a lease scoped to a specific node. +// Unlike LeaseResponse, it carries the owning platform so the caller can render +// cross-platform lease bindings for a single node. +type NodeLeaseResponse struct { + PlatformID string `json:"platform_id"` + PlatformName string `json:"platform_name"` + Account string `json:"account"` + NodeHash string `json:"node_hash"` + EgressIP string `json:"egress_ip"` + CreatedAt string `json:"created_at"` + Expiry string `json:"expiry"` + LastAccessed string `json:"last_accessed"` +} + +// ListLeasesByNode returns every lease bound to the given node hash. +// When platformID is non-empty, only leases under that platform are returned; +// otherwise leases across all platforms are aggregated. +// Results are sorted by CreatedAtNs descending (newest first). +func (s *ControlPlaneService) ListLeasesByNode(nodeHashHex, platformID string) ([]NodeLeaseResponse, error) { + nodeHashHex = strings.TrimSpace(nodeHashHex) + h, err := node.ParseHex(nodeHashHex) + if err != nil { + return nil, invalidArg("node_hash: invalid format") + } + if _, ok := s.Pool.GetEntry(h); !ok { + return nil, notFound("node not found") + } + + type entry struct { + resp NodeLeaseResponse + createdAtNs int64 + } + platformNameCache := make(map[string]string) + resolvePlatformName := func(pid string) string { + if name, ok := platformNameCache[pid]; ok { + return name + } + name := "" + if plat, ok := s.Pool.GetPlatform(pid); ok { + name = plat.Name + } + platformNameCache[pid] = name + return name + } + + var entries []entry + addLease := func(pid, account string, lease routing.Lease) { + if lease.NodeHash != h { + return + } + entries = append(entries, entry{ + resp: NodeLeaseResponse{ + PlatformID: pid, + PlatformName: resolvePlatformName(pid), + Account: account, + NodeHash: lease.NodeHash.Hex(), + EgressIP: lease.EgressIP.String(), + CreatedAt: time.Unix(0, lease.CreatedAtNs).UTC().Format(time.RFC3339Nano), + Expiry: time.Unix(0, lease.ExpiryNs).UTC().Format(time.RFC3339Nano), + LastAccessed: time.Unix(0, lease.LastAccessedNs).UTC().Format(time.RFC3339Nano), + }, + createdAtNs: lease.CreatedAtNs, + }) + } + + platformID = strings.TrimSpace(platformID) + if platformID != "" { + if _, ok := s.Pool.GetPlatform(platformID); !ok { + return nil, notFound("platform not found") + } + s.Router.RangeLeases(platformID, func(account string, lease routing.Lease) bool { + addLease(platformID, account, lease) + return true + }) + } else { + s.Router.RangeAllLeases(func(pid, account string, lease routing.Lease) bool { + addLease(pid, account, lease) + return true + }) + } + + sort.SliceStable(entries, func(i, j int) bool { + if entries[i].createdAtNs != entries[j].createdAtNs { + return entries[i].createdAtNs > entries[j].createdAtNs + } + if entries[i].resp.PlatformName != entries[j].resp.PlatformName { + return entries[i].resp.PlatformName < entries[j].resp.PlatformName + } + return entries[i].resp.Account < entries[j].resp.Account + }) + + result := make([]NodeLeaseResponse, 0, len(entries)) + for _, e := range entries { + result = append(result, e.resp) + } + return result, nil +} + // GetIPLoad returns IP load stats for a platform. func (s *ControlPlaneService) GetIPLoad(platformID string) ([]IPLoadEntry, error) { if _, ok := s.Pool.GetPlatform(platformID); !ok { diff --git a/internal/service/control_plane_nodes.go b/internal/service/control_plane_nodes.go index 56d7f91b..5997a666 100644 --- a/internal/service/control_plane_nodes.go +++ b/internal/service/control_plane_nodes.go @@ -15,15 +15,16 @@ import ( // NodeFilters holds query filters for listing nodes. type NodeFilters struct { - PlatformID *string - SubscriptionID *string - Enabled *bool - Region *string - CircuitOpen *bool - HasOutbound *bool - EgressIP *string - ProbedSince *time.Time - TagKeyword *string + PlatformID *string + SubscriptionID *string + Enabled *bool + ManuallyDisabled *bool + Region *string + CircuitOpen *bool + HasOutbound *bool + EgressIP *string + ProbedSince *time.Time + TagKeyword *string } // ListNodes returns nodes from the pool with optional filters. @@ -115,6 +116,25 @@ func (s *ControlPlaneService) ListNodes(filters NodeFilters) ([]NodeSummary, err if result == nil { result = []NodeSummary{} } + + if s.Router != nil { + var leaseLoads map[node.Hash]int64 + if filters.PlatformID != nil { + leaseLoads = s.Router.SnapshotNodeLoad(*filters.PlatformID) + } else { + leaseLoads = s.Router.SnapshotNodeLoadAll() + } + if len(leaseLoads) > 0 { + for i := range result { + h, err := node.ParseHex(result[i].NodeHash) + if err != nil { + continue + } + result[i].LeaseCount = leaseLoads[h] + } + } + } + return result, nil } @@ -123,6 +143,13 @@ func (s *ControlPlaneService) nodeEntryMatchesFilters( filters NodeFilters, subLookup node.SubLookupFunc, ) bool { + // Manually disabled filter (admin-controlled flag, independent of subscription state). + if filters.ManuallyDisabled != nil { + if entry.IsManuallyDisabled() != *filters.ManuallyDisabled { + return false + } + } + // Enabled/disabled filter. if filters.Enabled != nil { enabled := true @@ -255,3 +282,41 @@ func (s *ControlPlaneService) ProbeLatency(hashStr string) (*probe.LatencyProbeR } return result, nil } + +// SetNodeManualDisableResult is returned by SetNodeManualDisable. When the +// caller disables a node we report the number of bound leases that were +// released as a side effect, so the UI can surface it. +type SetNodeManualDisableResult struct { + ReleasedLeaseCount int `json:"released_lease_count"` +} + +// SetNodeManualDisable toggles the admin-controlled disable flag on a node. +// When transitioning to disabled, all leases currently bound to the node are +// immediately released so connected platforms reroute to other nodes. The +// flag is persisted via the node-dynamic dirty-set on the next flush. +func (s *ControlPlaneService) SetNodeManualDisable(hashStr string, disable bool) (SetNodeManualDisableResult, error) { + h, err := node.ParseHex(hashStr) + if err != nil { + return SetNodeManualDisableResult{}, invalidArg("node_hash: invalid format") + } + if _, ok := s.Pool.GetEntry(h); !ok { + return SetNodeManualDisableResult{}, notFound("node not found") + } + + // Flip the flag and trigger platform-view rebuilds so the node is removed + // from (or restored to) all routable views before we touch any leases. + s.Pool.SetNodeManualDisable(h, disable) + if s.Engine != nil { + s.Engine.MarkNodeDynamic(strings.ToLower(hashStr)) + } + + if !disable { + return SetNodeManualDisableResult{}, nil + } + + released := 0 + if s.Router != nil { + released = s.Router.DeleteLeasesByNode(h) + } + return SetNodeManualDisableResult{ReleasedLeaseCount: released}, nil +} diff --git a/internal/service/control_plane_platform.go b/internal/service/control_plane_platform.go index c0f8701b..ebe07c83 100644 --- a/internal/service/control_plane_platform.go +++ b/internal/service/control_plane_platform.go @@ -578,6 +578,7 @@ type NodeSummary struct { NodeHash string `json:"node_hash"` CreatedAt string `json:"created_at"` Enabled bool `json:"enabled"` + ManuallyDisabled bool `json:"manually_disabled"` DisplayTag string `json:"display_tag,omitempty"` HasOutbound bool `json:"has_outbound"` LastError string `json:"last_error,omitempty"` @@ -590,13 +591,14 @@ type NodeSummary struct { LastAuthorityLatencyProbeAttempt string `json:"last_authority_latency_probe_attempt,omitempty"` ReferenceLatencyMs *float64 `json:"reference_latency_ms,omitempty"` LastEgressUpdateAttempt string `json:"last_egress_update_attempt,omitempty"` + LeaseCount int64 `json:"lease_count"` Tags []NodeTag `json:"tags"` } // IsHealthyAndEnabled follows the node-summary health rule used by API/UI // aggregates: enabled, outbound-ready, and not circuit-open. func (n NodeSummary) IsHealthyAndEnabled() bool { - return n.Enabled && n.HasOutbound && n.CircuitOpenSince == nil + return n.Enabled && !n.ManuallyDisabled && n.HasOutbound && n.CircuitOpenSince == nil } type NodeTag struct { @@ -620,6 +622,7 @@ func (s *ControlPlaneService) nodeEntryToSummary(h node.Hash, entry *node.NodeEn ns.Enabled = !s.Pool.IsNodeDisabled(h) ns.DisplayTag = s.Pool.ResolveNodeDisplayTag(h) } + ns.ManuallyDisabled = entry.IsManuallyDisabled() if cos := entry.CircuitOpenSince.Load(); cos > 0 { t := time.Unix(0, cos).UTC().Format(time.RFC3339Nano) diff --git a/internal/state/migrations/cache/000003_nodes_dynamic_manually_disabled.down.sql b/internal/state/migrations/cache/000003_nodes_dynamic_manually_disabled.down.sql new file mode 100644 index 00000000..467ae64d --- /dev/null +++ b/internal/state/migrations/cache/000003_nodes_dynamic_manually_disabled.down.sql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS nodes_dynamic__old_schema ( + hash TEXT PRIMARY KEY, + failure_count INTEGER NOT NULL DEFAULT 0, + circuit_open_since INTEGER NOT NULL DEFAULT 0, + egress_ip TEXT NOT NULL DEFAULT '', + egress_region TEXT NOT NULL DEFAULT '', + egress_updated_at_ns INTEGER NOT NULL DEFAULT 0, + last_latency_probe_attempt_ns INTEGER NOT NULL DEFAULT 0, + last_authority_latency_probe_attempt_ns INTEGER NOT NULL DEFAULT 0, + last_egress_update_attempt_ns INTEGER NOT NULL DEFAULT 0 +); + +INSERT INTO nodes_dynamic__old_schema ( + hash, failure_count, circuit_open_since, egress_ip, egress_region, egress_updated_at_ns, + last_latency_probe_attempt_ns, last_authority_latency_probe_attempt_ns, last_egress_update_attempt_ns +) +SELECT + hash, failure_count, circuit_open_since, egress_ip, egress_region, egress_updated_at_ns, + last_latency_probe_attempt_ns, last_authority_latency_probe_attempt_ns, last_egress_update_attempt_ns +FROM nodes_dynamic; + +DROP TABLE nodes_dynamic; + +ALTER TABLE nodes_dynamic__old_schema RENAME TO nodes_dynamic; diff --git a/internal/state/migrations/cache/000003_nodes_dynamic_manually_disabled.up.sql b/internal/state/migrations/cache/000003_nodes_dynamic_manually_disabled.up.sql new file mode 100644 index 00000000..f6908bfc --- /dev/null +++ b/internal/state/migrations/cache/000003_nodes_dynamic_manually_disabled.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE nodes_dynamic +ADD COLUMN manually_disabled INTEGER NOT NULL DEFAULT 0; diff --git a/internal/state/repo_cache.go b/internal/state/repo_cache.go index ac2261dd..80639d43 100644 --- a/internal/state/repo_cache.go +++ b/internal/state/repo_cache.go @@ -86,6 +86,7 @@ func (r *CacheRepo) BulkUpsertNodesDynamic(nodes []model.NodeDynamic) error { n.LastLatencyProbeAttemptNs, n.LastAuthorityLatencyProbeAttemptNs, n.LastEgressUpdateAttemptNs, + n.ManuallyDisabled, ) return err }, @@ -109,7 +110,8 @@ func (r *CacheRepo) BulkDeleteNodesDynamic(hashes []string) error { func (r *CacheRepo) LoadAllNodesDynamic() ([]model.NodeDynamic, error) { rows, err := r.db.Query(` SELECT hash, failure_count, circuit_open_since, egress_ip, egress_region, egress_updated_at_ns, - last_latency_probe_attempt_ns, last_authority_latency_probe_attempt_ns, last_egress_update_attempt_ns + last_latency_probe_attempt_ns, last_authority_latency_probe_attempt_ns, last_egress_update_attempt_ns, + manually_disabled FROM nodes_dynamic`) if err != nil { return nil, err @@ -129,6 +131,7 @@ func (r *CacheRepo) LoadAllNodesDynamic() ([]model.NodeDynamic, error) { &n.LastLatencyProbeAttemptNs, &n.LastAuthorityLatencyProbeAttemptNs, &n.LastEgressUpdateAttemptNs, + &n.ManuallyDisabled, ); err != nil { return nil, err } @@ -398,6 +401,7 @@ func (r *CacheRepo) FlushTx(ops FlushOps) error { n.LastLatencyProbeAttemptNs, n.LastAuthorityLatencyProbeAttemptNs, n.LastEgressUpdateAttemptNs, + n.ManuallyDisabled, ) return err }}, @@ -453,9 +457,10 @@ const ( upsertNodesDynamicSQL = `INSERT INTO nodes_dynamic ( hash, failure_count, circuit_open_since, egress_ip, egress_region, egress_updated_at_ns, - last_latency_probe_attempt_ns, last_authority_latency_probe_attempt_ns, last_egress_update_attempt_ns + last_latency_probe_attempt_ns, last_authority_latency_probe_attempt_ns, last_egress_update_attempt_ns, + manually_disabled ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(hash) DO UPDATE SET failure_count = excluded.failure_count, circuit_open_since = excluded.circuit_open_since, @@ -464,7 +469,8 @@ const ( egress_updated_at_ns = excluded.egress_updated_at_ns, last_latency_probe_attempt_ns = excluded.last_latency_probe_attempt_ns, last_authority_latency_probe_attempt_ns = excluded.last_authority_latency_probe_attempt_ns, - last_egress_update_attempt_ns = excluded.last_egress_update_attempt_ns` + last_egress_update_attempt_ns = excluded.last_egress_update_attempt_ns, + manually_disabled = excluded.manually_disabled` upsertNodeLatencySQL = `INSERT INTO node_latency (node_hash, domain, ewma_ns, last_updated_ns) VALUES (?, ?, ?, ?) diff --git a/internal/topology/pool.go b/internal/topology/pool.go index d2bb1540..7f392c67 100644 --- a/internal/topology/pool.go +++ b/internal/topology/pool.go @@ -401,6 +401,22 @@ func (p *GlobalNodePool) IsNodeDisabled(hash node.Hash) bool { return entry.IsDisabledBySubscriptions(p.MakeSubLookup()) } +// SetNodeManualDisable flips the admin-controlled disable flag and triggers +// per-platform re-evaluation so the node is added to or removed from each +// platform's routable view in the same call. Returns true when the flag value +// actually changed (i.e. the caller transitioned the node). +func (p *GlobalNodePool) SetNodeManualDisable(hash node.Hash, disabled bool) bool { + entry, ok := p.GetEntry(hash) + if !ok || entry == nil { + return false + } + if entry.ManuallyDisabled.Swap(disabled) == disabled { + return false + } + p.notifyAllPlatformsDirty(hash) + return true +} + // MakeHealthyAndEnabledEvaluator builds a predicate for pool-context health // aggregates: the node must not be disabled by subscription state and must // satisfy the entry-local health checks. diff --git a/webui/index.html b/webui/index.html index 939be235..5edaa66b 100644 --- a/webui/index.html +++ b/webui/index.html @@ -7,6 +7,15 @@ Resin · Sticky Proxy Pool +
diff --git a/webui/package-lock.json b/webui/package-lock.json index 88505cff..96b10159 100644 --- a/webui/package-lock.json +++ b/webui/package-lock.json @@ -73,7 +73,6 @@ "integrity": "sha512-CGOfOJqWjg2qW/Mb6zNsDm+u5vFQ8DxXfbM09z69p5Z6+mE1ikP2jUXw+j42Pf1XTYED2Rni5f95npYeuwMDQA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.29.0", "@babel/generator": "^7.29.0", @@ -1666,7 +1665,6 @@ "integrity": "sha512-oH72nZRfDv9lADUBSo104Aq7gPHpQZc4BTx38r9xf9pg5LfP6EzSyH2n7qFmmxRQXh7YlUXODcYsg6PuTDSxGg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "undici-types": "~7.16.0" } @@ -1677,7 +1675,6 @@ "integrity": "sha512-ilcTH/UniCkMdtexkoCN0bI7pMcJDvmQFPvuPvmEaYA/NSfFTAgdUSLAoVjaRJm7+6PvcM+q1zYOwS4wTYMF9w==", "devOptional": true, "license": "MIT", - "peer": true, "dependencies": { "csstype": "^3.2.2" } @@ -1743,7 +1740,6 @@ "integrity": "sha512-IgSWvLobTDOjnaxAfDTIHaECbkNlAlKv2j5SjpB2v7QHKv1FIfjwMy8FsDbVfDX/KjmCmYICcw7uGaXLhtsLNg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.56.0", "@typescript-eslint/types": "8.56.0", @@ -2008,7 +2004,6 @@ "integrity": "sha512-UVJyE9MttOsBQIDKw1skb9nAwQuR5wuGD3+82K6JgJlm/Y+KI92oNsMNGZCYdDsVtRHSak0pcV5Dno5+4jh9sw==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -2117,7 +2112,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -2491,7 +2485,6 @@ "integrity": "sha512-LEyamqS7W5HB3ujJyvi0HQK/dtVINZvd5mAAp9eT5S/ujByGjiZLCzPcHVzuXbpJDJF/cxwHlfceVUDZ2lnSTw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -2911,7 +2904,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "@babel/runtime": "^7.28.4" }, @@ -3307,7 +3299,6 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -3369,7 +3360,6 @@ "resolved": "https://registry.npmjs.org/react/-/react-19.2.4.tgz", "integrity": "sha512-9nfp2hYpCwOjAN+8TZFGhtWEwgvWHXqESH8qT89AT/lWklpLON22Lc8pEtnpsZz7VmawabSU0gCjnj8aC0euHQ==", "license": "MIT", - "peer": true, "engines": { "node": ">=0.10.0" } @@ -3379,7 +3369,6 @@ "resolved": "https://registry.npmjs.org/react-dom/-/react-dom-19.2.4.tgz", "integrity": "sha512-AXJdLo8kgMbimY95O2aKQqsz2iWi9jMgKJhRBAxECE4IFxfcazB2LmzloIoibJI3C12IlY20+KFaLv+71bUJeQ==", "license": "MIT", - "peer": true, "dependencies": { "scheduler": "^0.27.0" }, @@ -3392,7 +3381,6 @@ "resolved": "https://registry.npmjs.org/react-hook-form/-/react-hook-form-7.71.1.tgz", "integrity": "sha512-9SUJKCGKo8HUSsCO+y0CtqkqI5nNuaDqTxyqPsZPqIwudpj4rCrAz/jZV+jn57bx5gtZKOh3neQu94DXMc+w5w==", "license": "MIT", - "peer": true, "engines": { "node": ">=18.0.0" }, @@ -3443,7 +3431,6 @@ "resolved": "https://registry.npmjs.org/react-redux/-/react-redux-9.2.0.tgz", "integrity": "sha512-ROY9fvHhwOD9ySfrF0wmvu//bKCQ6AeZZq1nJNtbDC+kk5DuSuNX/n6YWYF/SYy7bSba4D4FSz8DJeKY/S/r+g==", "license": "MIT", - "peer": true, "dependencies": { "@types/use-sync-external-store": "^0.0.6", "use-sync-external-store": "^1.4.0" @@ -3538,8 +3525,7 @@ "version": "5.0.1", "resolved": "https://registry.npmjs.org/redux/-/redux-5.0.1.tgz", "integrity": "sha512-M9/ELqF6fy8FwmkpnF0S3YKOqMyoWJ4+CS5Efg2ct3oY9daQvd/Pc71FpGZsVsbl3Cpb+IIcjBDUnnyBdQbq4w==", - "license": "MIT", - "peer": true + "license": "MIT" }, "node_modules/redux-thunk": { "version": "3.1.0", @@ -3747,7 +3733,6 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "devOptional": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -3865,7 +3850,6 @@ "integrity": "sha512-w+N7Hifpc3gRjZ63vYBXA56dvvRlNWRczTdmCBBa+CotUzAPf5b7YMdMR/8CQoeYE5LX3W4wj6RYTgonm1b9DA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.27.0", "fdir": "^6.5.0", @@ -3995,7 +3979,6 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-4.3.6.tgz", "integrity": "sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg==", "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/webui/src/components/AppShell.tsx b/webui/src/components/AppShell.tsx index 6e49f9c7..0d8fe42c 100644 --- a/webui/src/components/AppShell.tsx +++ b/webui/src/components/AppShell.tsx @@ -20,6 +20,7 @@ import { useAuthStore } from "../features/auth/auth-store"; import { getEnvConfig } from "../features/systemConfig/api"; import { useI18n } from "../i18n"; import { LanguageSwitcher } from "./LanguageSwitcher"; +import { ThemeToggle } from "./ThemeToggle"; type NavItem = { label: string; @@ -151,6 +152,7 @@ export function AppShell() { ) : (