Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions apps/backend/cmd/kandev/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,22 @@ func startAgentInfrastructure(
return false
}

// Wire the soft-deleted-profile pre-flight into the watcher dispatch.
// Orphan watchers (their agent profile was soft-deleted by the
// reconciler when its agent type left the registry) self-heal on the
// next poll instead of looping on "profile not found" forever.
orchestratorSvc.SetProfileLookup(&profileLookupAdapter{store: repos.AgentSettings})

// Wire the watcher-dependency enumerator into the agent settings
// controller so the profile-delete UI can surface "this will also
// disable N watchers" before the user confirms.
agentSettingsController.SetWatcherDependencyChecker(&watcherDepsAdapter{
linear: services.Linear,
jira: services.Jira,
github: services.GitHub,
log: log,
})

// Wire GitHub service into orchestrator for PR auto-detection on push
if services.GitHub != nil {
orchestratorSvc.SetGitHubService(services.GitHub)
Expand Down
263 changes: 263 additions & 0 deletions apps/backend/cmd/kandev/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"crypto/sha1"
"database/sql"
"errors"
"fmt"
"os"
Expand All @@ -14,10 +15,13 @@ import (

"github.com/kandev/kandev/internal/agent/registry"
"github.com/kandev/kandev/internal/agent/runtime/lifecycle"
agentsettingscontroller "github.com/kandev/kandev/internal/agent/settings/controller"
settingsstore "github.com/kandev/kandev/internal/agent/settings/store"
"github.com/kandev/kandev/internal/common/config"
"github.com/kandev/kandev/internal/common/logger"
"github.com/kandev/kandev/internal/db"
"github.com/kandev/kandev/internal/events/bus"
githubpkg "github.com/kandev/kandev/internal/github"
jirapkg "github.com/kandev/kandev/internal/jira"
linearpkg "github.com/kandev/kandev/internal/linear"
"github.com/kandev/kandev/internal/orchestrator"
Expand Down Expand Up @@ -310,6 +314,10 @@ func (a *jiraServiceAdapter) ReleaseIssueWatchTask(ctx context.Context, watchID,
return a.svc.Store().ReleaseIssueWatchTask(ctx, watchID, issueKey)
}

func (a *jiraServiceAdapter) DisableIssueWatchWithError(ctx context.Context, watchID, cause string) error {
return a.svc.Store().DisableIssueWatchWithError(ctx, watchID, cause)
}

// linearServiceAdapter exposes the Linear service's issue-watch dedup methods
// to the orchestrator without leaking the rest of the package surface area.
type linearServiceAdapter struct {
Expand All @@ -328,6 +336,261 @@ func (a *linearServiceAdapter) ReleaseIssueWatchTask(ctx context.Context, watchI
return a.svc.Store().ReleaseIssueWatchTask(ctx, watchID, identifier)
}

func (a *linearServiceAdapter) DisableIssueWatchWithError(ctx context.Context, watchID, cause string) error {
return a.svc.Store().DisableIssueWatchWithError(ctx, watchID, cause)
}

// profileLookupAdapter satisfies orchestrator.ProfileLookup by delegating to
// the agent settings store. Returns (true, name, nil) when the row exists
// but is soft-deleted, which the orchestrator treats as a signal to self-heal
// the bound watcher (set enabled=0 + last_error). All other shapes — live
// row, missing row, driver error — are returned verbatim so the dispatch
// pipeline fails open on transient failures.
type profileLookupAdapter struct {
store settingsstore.Repository
}

func (a *profileLookupAdapter) LookupProfile(ctx context.Context, profileID string) (bool, string, error) {
if _, err := a.store.GetAgentProfile(ctx, profileID); err == nil {
return false, "", nil
} else if !errors.Is(err, sql.ErrNoRows) {
return false, "", err
}
profile, err := a.store.GetAgentProfileIncludingDeleted(ctx, profileID)
if err != nil || profile == nil || profile.DeletedAt == nil {
return false, "", err
}
return true, profile.Name, nil
}

// watcherDepsAdapter enumerates linear / jira / github_issue / github_review
// watcher rows that reference an agent profile. The profile-delete confirm
// dialog uses the list to render "this will also disable N watchers — are
// you sure?".
//
// Each integration's package is optional in dev mode; nil-safe so the
// adapter degrades gracefully when one isn't wired.
type watcherDepsAdapter struct {
linear *linearpkg.Service
jira *jirapkg.Service
github githubWatcherLister
log *logger.Logger
}

// githubWatcherLister is the slice of github.Service the adapter needs.
// Local alias so the adapter file stays free of the wider GitHubService
// orchestrator interface, which carries many unrelated methods. Uses the
// enabled-only listers so already-disabled (self-healed) watchers do not
// inflate the dependency count surfaced in ErrProfileInUseDetail. Disable
// methods are the same store-level helpers the dispatch coordinator's
// self-heal path uses.
type githubWatcherLister interface {
ListEnabledIssueWatches(ctx context.Context) ([]*githubpkg.IssueWatch, error)
ListEnabledReviewWatches(ctx context.Context) ([]*githubpkg.ReviewWatch, error)
DisableIssueWatchWithError(ctx context.Context, watchID, cause string) error
DisableReviewWatchWithError(ctx context.Context, watchID, cause string) error
}

func (a *watcherDepsAdapter) ListWatchersByAgentProfile(ctx context.Context, profileID string) ([]agentsettingscontroller.WatcherReference, error) {
if profileID == "" {
return nil, nil
}
var refs []agentsettingscontroller.WatcherReference
more, err := a.listLinearRefs(ctx, profileID)
if err != nil {
return nil, err
}
refs = append(refs, more...)
more, err = a.listJiraRefs(ctx, profileID)
if err != nil {
return nil, err
}
refs = append(refs, more...)
more, err = a.listGitHubRefs(ctx, profileID)
if err != nil {
return nil, err
}
refs = append(refs, more...)
return refs, nil
}

// DisableWatchersByAgentProfile enumerates the enabled watcher rows that
// reference profileID and flips each to enabled=0 with the supplied cause.
// Returns the list it disabled so the caller can log. Best-effort across
// integrations: a failure for one integration logs and continues to the
// next so a single broken store doesn't strand the rest of the user's
// orphaned watchers.
func (a *watcherDepsAdapter) DisableWatchersByAgentProfile(ctx context.Context, profileID, cause string) ([]agentsettingscontroller.WatcherReference, error) {
refs, err := a.ListWatchersByAgentProfile(ctx, profileID)
if err != nil {
return nil, err
}
disabled := make([]agentsettingscontroller.WatcherReference, 0, len(refs))
for _, ref := range refs {
if err := a.disableByKind(ctx, ref.Kind, ref.ID, cause); err != nil {
a.log.Warn("failed to disable referencing watcher",
zap.String("profile_id", profileID),
zap.String("watcher_kind", ref.Kind),
zap.String("watcher_id", ref.ID),
zap.Error(err))
continue
}
disabled = append(disabled, ref)
}
return disabled, nil
}

func (a *watcherDepsAdapter) disableByKind(ctx context.Context, kind, watchID, cause string) error {
switch kind {
case "linear":
if a.linear == nil {
return nil
}
return a.linear.Store().DisableIssueWatchWithError(ctx, watchID, cause)
case "jira":
if a.jira == nil {
return nil
}
return a.jira.Store().DisableIssueWatchWithError(ctx, watchID, cause)
case "github_issue":
if a.github == nil {
return nil
}
return a.github.DisableIssueWatchWithError(ctx, watchID, cause)
case "github_review":
if a.github == nil {
return nil
}
return a.github.DisableReviewWatchWithError(ctx, watchID, cause)
default:
return fmt.Errorf("unknown watcher kind: %s", kind)
}
}

func (a *watcherDepsAdapter) listLinearRefs(ctx context.Context, profileID string) ([]agentsettingscontroller.WatcherReference, error) {
if a.linear == nil {
return nil, nil
}
watches, err := a.linear.Store().ListEnabledIssueWatches(ctx)
if err != nil {
return nil, fmt.Errorf("linear watchers: %w", err)
}
var out []agentsettingscontroller.WatcherReference
for _, w := range watches {
if w.AgentProfileID == profileID {
out = append(out, agentsettingscontroller.WatcherReference{
ID: w.ID, Kind: "linear", Label: linearWatchLabel(w),
})
}
}
return out, nil
}

func (a *watcherDepsAdapter) listJiraRefs(ctx context.Context, profileID string) ([]agentsettingscontroller.WatcherReference, error) {
if a.jira == nil {
return nil, nil
}
watches, err := a.jira.Store().ListEnabledIssueWatches(ctx)
if err != nil {
return nil, fmt.Errorf("jira watchers: %w", err)
}
var out []agentsettingscontroller.WatcherReference
for _, w := range watches {
if w.AgentProfileID == profileID {
out = append(out, agentsettingscontroller.WatcherReference{
ID: w.ID, Kind: "jira", Label: truncateWatcherLabel(w.JQL),
})
}
}
return out, nil
}

func (a *watcherDepsAdapter) listGitHubRefs(ctx context.Context, profileID string) ([]agentsettingscontroller.WatcherReference, error) {
if a.github == nil {
return nil, nil
}
issues, err := a.github.ListEnabledIssueWatches(ctx)
if err != nil {
return nil, fmt.Errorf("github issue watchers: %w", err)
}
var out []agentsettingscontroller.WatcherReference
for _, w := range issues {
if w.AgentProfileID == profileID {
out = append(out, agentsettingscontroller.WatcherReference{
ID: w.ID, Kind: "github_issue", Label: githubIssueWatchLabel(w),
})
}
}
reviews, err := a.github.ListEnabledReviewWatches(ctx)
if err != nil {
return nil, fmt.Errorf("github review watchers: %w", err)
}
for _, w := range reviews {
if w.AgentProfileID == profileID {
out = append(out, agentsettingscontroller.WatcherReference{
ID: w.ID, Kind: "github_review", Label: githubReviewWatchLabel(w),
})
}
}
return out, nil
}

// linearWatchLabel renders a Linear filter into a short human-readable label
// for the confirmation dialog. The team key is the most-recognisable scoping
// fact; the query string is a fallback. Capped at watcherLabelMaxLen.
func linearWatchLabel(w *linearpkg.IssueWatch) string {
switch {
case w.Filter.TeamKey != "":
return truncateWatcherLabel("team " + w.Filter.TeamKey)
case w.Filter.Query != "":
return truncateWatcherLabel(w.Filter.Query)
default:
return "all teams"
}
}

func githubIssueWatchLabel(w *githubpkg.IssueWatch) string {
if w.CustomQuery != "" {
return truncateWatcherLabel(w.CustomQuery)
}
return truncateWatcherLabel(githubReposJoin(w.Repos))
}

func githubReviewWatchLabel(w *githubpkg.ReviewWatch) string {
if w.CustomQuery != "" {
return truncateWatcherLabel(w.CustomQuery)
}
return truncateWatcherLabel(githubReposJoin(w.Repos))
}

func githubReposJoin(repos []githubpkg.RepoFilter) string {
if len(repos) == 0 {
return "all repos"
}
parts := make([]string, 0, len(repos))
for _, r := range repos {
parts = append(parts, r.Owner+"/"+r.Name)
}
return strings.Join(parts, ", ")
}

const watcherLabelMaxLen = 80

// truncateWatcherLabel caps a watcher's human-readable label at
// watcherLabelMaxLen runes, appending "…" when truncation happens. Counted
// in runes (not bytes) so a JQL or filter ending in multi-byte UTF-8
// (Japanese, emoji) is not split mid-codepoint — slicing a byte string
// would emit invalid UTF-8 and the UI would render the replacement
// character.
func truncateWatcherLabel(s string) string {
s = strings.TrimSpace(s)
runes := []rune(s)
if len(runes) <= watcherLabelMaxLen {
return s
}
return string(runes[:watcherLabelMaxLen-1]) + "…"
}

// repoLocalPathUpdater adapts the task service's UpdateRepository to the executor.RepoUpdater interface.
type repoLocalPathUpdater struct {
svc *taskservice.Service
Expand Down
51 changes: 49 additions & 2 deletions apps/backend/internal/agent/runtime/lifecycle/profile_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,31 @@ package lifecycle

import (
"context"
"database/sql"
"errors"
"fmt"

"github.com/kandev/kandev/internal/agent/registry"
"github.com/kandev/kandev/internal/agent/settings/store"
)

// DeletedProfileError carries the soft-deleted profile's ID and name so the
// caller (watcher self-heal) can surface a human-readable cause. Wraps
// store.ErrAgentProfileDeleted — use errors.Is / errors.As.
type DeletedProfileError struct {
ProfileID string
ProfileName string
}

func (e *DeletedProfileError) Error() string {
if e.ProfileName != "" {
return fmt.Sprintf("agent profile %q (%s) was removed", e.ProfileName, e.ProfileID)
}
return fmt.Sprintf("agent profile %s was removed", e.ProfileID)
}

func (e *DeletedProfileError) Unwrap() error { return store.ErrAgentProfileDeleted }

// StoreProfileResolver implements ProfileResolver using the agent settings store
type StoreProfileResolver struct {
store store.Repository
Expand All @@ -20,11 +39,19 @@ func NewStoreProfileResolver(store store.Repository, reg *registry.Registry) *St
return &StoreProfileResolver{store: store, registry: reg}
}

// ResolveProfile looks up an agent profile by ID and returns the profile info
// ResolveProfile looks up an agent profile by ID and returns the profile info.
//
// When the row is missing, ResolveProfile retries via
// GetAgentProfileIncludingDeleted to disambiguate "never existed" (returns
// the original "profile not found" error) from "soft-deleted" (returns a
// *DeletedProfileError wrapping store.ErrAgentProfileDeleted so callers can
// trigger watcher self-heal).
func (r *StoreProfileResolver) ResolveProfile(ctx context.Context, profileID string) (*AgentProfileInfo, error) {
// Get the profile from the store
profile, err := r.store.GetAgentProfile(ctx, profileID)
if err != nil {
if deletedErr := r.checkSoftDeleted(ctx, profileID, err); deletedErr != nil {
return nil, deletedErr
}
return nil, fmt.Errorf("profile not found: %w", err)
}

Expand Down Expand Up @@ -55,6 +82,26 @@ func (r *StoreProfileResolver) ResolveProfile(ctx context.Context, profileID str
}, nil
}

// checkSoftDeleted returns a *DeletedProfileError when the missing-row error
// from GetAgentProfile resolves to an existing-but-soft-deleted row. Returns
// nil when the row really doesn't exist or the secondary lookup fails — the
// caller falls back to the original "profile not found" error.
func (r *StoreProfileResolver) checkSoftDeleted(ctx context.Context, profileID string, primaryErr error) error {
// Only retry on the well-known "missing row" signal. The store layer
// returns sql.ErrNoRows from QueryRow.Scan when the filtered SELECT
// matches nothing — that includes both "no row at all" and
// "row exists but deleted_at IS NOT NULL". Other errors (driver,
// permissions) are returned as-is by the caller.
if !errors.Is(primaryErr, sql.ErrNoRows) {
return nil
}
profile, err := r.store.GetAgentProfileIncludingDeleted(ctx, profileID)
if err != nil || profile == nil || profile.DeletedAt == nil {
return nil
}
return &DeletedProfileError{ProfileID: profile.ID, ProfileName: profile.Name}
}

// resolveAgentCapabilities looks up the agent in the registry and returns the
// effective model and whether the agent supports native session resume.
// The model comes straight from the profile; static per-agent defaults have
Expand Down
Loading
Loading