Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 121 additions & 32 deletions internal/managedobserve/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,14 @@ type DaemonOptions struct {
}

func RunDaemon(ctx context.Context, opts DaemonOptions) error {
loadedConfig, err := managedconfig.Load()
runtimeConfig, err := loadManagedRuntimeConfig(ctx)
if err != nil {
if errors.Is(err, managedconfig.ErrNotManaged) {
return err
}
return fmt.Errorf("load managed config: %w", err)
return err
}
installationState, err := installation.Ensure()
if err != nil {
return fmt.Errorf("ensure installation identity: %w", err)
}
installToken, err := managedconfig.ResolveInstallToken(ctx, loadedConfig.Config.Credentials.InstallTokenRef)
if err != nil {
return fmt.Errorf("resolve install token: %w", err)
}

socketPath := opts.SocketPath
if socketPath == "" {
Expand All @@ -66,7 +59,7 @@ func RunDaemon(ctx context.Context, opts DaemonOptions) error {

// The deployment-level mode from managed.json drives every hook edge:
// observe records would-decisions, enforce returns real denies.
mode, err := guardhookruntime.ParseMode(loadedConfig.Config.Mode)
mode, err := guardhookruntime.ParseMode(runtimeConfig.Mode)
if err != nil {
return fmt.Errorf("parse managed mode: %w", err)
}
Expand Down Expand Up @@ -97,38 +90,19 @@ func RunDaemon(ctx context.Context, opts DaemonOptions) error {

policyCtx, stopPolicyRefresh := context.WithCancel(ctx)
defer stopPolicyRefresh()
go githubpolicy.Run(policyCtx, githubpolicy.RunOptions{
Cache: policyCache,
CloudURL: loadedConfig.Config.CloudURL,
InstallToken: installToken,
Interval: opts.GithubPolicyInterval,
HTTPClient: opts.GithubPolicyClient,
Diagnostic: opts.Diagnostic,
})
go runGithubPolicyRefresh(policyCtx, opts, policyCache)

streamCtx, stopStream := context.WithCancel(ctx)
defer stopStream()
streamErr := make(chan error, 1)
go func() {
streamErr <- managedstream.Run(streamCtx, managedstream.Options{
DBPath: dbPath,
StatePath: opts.StreamStatePath,
CloudURL: loadedConfig.Config.CloudURL,
OrganizationID: loadedConfig.Config.OrganizationID,
InstallationID: installationState.InstallationID,
InstallToken: installToken,
DeviceLabel: loadedConfig.Config.Device.Label,
DeploymentVersion: managedconfig.DeploymentVersion,
Interval: opts.StreamInterval,
HTTPClient: opts.StreamHTTPClient,
Diagnostic: opts.Diagnostic,
})
streamErr <- runManagedStream(streamCtx, opts, dbPath, installationState.InstallationID)
}()

// Cowork observation runs alongside Claude Code in the same daemon, replaying
// in-VM Cowork tool events into the same localruntime socket as agent "cowork".
// Enabled via managed.json (cowork_enabled) or the env var override.
if loadedConfig.Config.CoworkEnabled || coworkobserve.Enabled() {
if runtimeConfig.CoworkEnabled || coworkobserve.Enabled() {
go coworkobserve.Run(ctx, coworkobserve.Options{
SocketPath: socketPath,
StatePath: filepath.Join(filepath.Dir(dbPath), "cowork-spool-offsets.json"),
Expand Down Expand Up @@ -162,6 +136,37 @@ func RunDaemon(ctx context.Context, opts DaemonOptions) error {
}
}

type managedRuntimeConfig struct {
CloudURL string
OrganizationID string
InstallToken string
DeviceLabel string
Mode string
CoworkEnabled bool
}

func loadManagedRuntimeConfig(ctx context.Context) (managedRuntimeConfig, error) {
loadedConfig, err := managedconfig.Load()
if err != nil {
if errors.Is(err, managedconfig.ErrNotManaged) {
return managedRuntimeConfig{}, err
}
return managedRuntimeConfig{}, fmt.Errorf("load managed config: %w", err)
}
installToken, err := managedconfig.ResolveInstallToken(ctx, loadedConfig.Config.Credentials.InstallTokenRef)
if err != nil {
return managedRuntimeConfig{}, fmt.Errorf("resolve install token: %w", err)
}
return managedRuntimeConfig{
CloudURL: loadedConfig.Config.CloudURL,
OrganizationID: loadedConfig.Config.OrganizationID,
InstallToken: installToken,
DeviceLabel: loadedConfig.Config.Device.Label,
Mode: loadedConfig.Config.Mode,
CoworkEnabled: loadedConfig.Config.CoworkEnabled,
}, nil
}

func idleTimeoutOrDefault(idleTimeout time.Duration) time.Duration {
if idleTimeout == 0 {
return DefaultIdleTimeout()
Expand All @@ -178,6 +183,90 @@ func cleanupStaleSessions(ctx context.Context, dbPath string, idleTimeout time.D
return store.CloseStaleDaemonObservedSessions(ctx, time.Now().UTC().Add(-idleTimeout))
}

func runGithubPolicyRefresh(ctx context.Context, opts DaemonOptions, cache *githubpolicy.Cache) {
interval := opts.GithubPolicyInterval
if interval == 0 {
interval = githubpolicy.DefaultIntervalFromEnv()
}
refreshGithubPolicy(ctx, opts, cache)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
refreshGithubPolicy(ctx, opts, cache)
}
}
}

func refreshGithubPolicy(ctx context.Context, opts DaemonOptions, cache *githubpolicy.Cache) {
runtimeConfig, err := loadManagedRuntimeConfig(ctx)
if err != nil {
cache.MarkFetchFailed(err)
opts.Diagnostic.Printf("github policy config reload: %v\n", err)
return
}
snapshot, err := githubpolicy.FetchSnapshot(ctx, opts.GithubPolicyClient, runtimeConfig.CloudURL, runtimeConfig.InstallToken)
if err != nil {
cache.MarkFetchFailed(err)
opts.Diagnostic.Printf("github policy refresh: %v\n", err)
return
}
if err := cache.Apply(snapshot, time.Now().UTC()); err != nil {
opts.Diagnostic.Printf("github policy persist: %v\n", err)
}
}

func runManagedStream(ctx context.Context, opts DaemonOptions, dbPath, installationID string) error {
interval := opts.StreamInterval
if interval == 0 {
interval = managedstream.DefaultIntervalFromEnv()
}
flushManagedStream(ctx, opts, dbPath, installationID)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
flushManagedStream(ctx, opts, dbPath, installationID)
}
}
}

func flushManagedStream(ctx context.Context, opts DaemonOptions, dbPath, installationID string) {
streamOpts, err := managedStreamOptions(ctx, opts, dbPath, installationID)
if err != nil {
opts.Diagnostic.Printf("managed stream config reload: %v\n", err)
return
}
if err := managedstream.Flush(ctx, streamOpts); err != nil {
opts.Diagnostic.Printf("managed stream flush: %v\n", err)
}
}

func managedStreamOptions(ctx context.Context, opts DaemonOptions, dbPath, installationID string) (managedstream.Options, error) {
runtimeConfig, err := loadManagedRuntimeConfig(ctx)
if err != nil {
return managedstream.Options{}, err
}
return managedstream.Options{
DBPath: dbPath,
StatePath: opts.StreamStatePath,
CloudURL: runtimeConfig.CloudURL,
OrganizationID: runtimeConfig.OrganizationID,
InstallationID: installationID,
InstallToken: runtimeConfig.InstallToken,
DeviceLabel: runtimeConfig.DeviceLabel,
DeploymentVersion: managedconfig.DeploymentVersion,
HTTPClient: opts.StreamHTTPClient,
Diagnostic: opts.Diagnostic,
}, nil
}

func cleanupInterval(idleTimeout time.Duration) time.Duration {
interval := idleTimeout / 2
if interval <= 0 {
Expand Down
91 changes: 91 additions & 0 deletions internal/managedobserve/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,97 @@ func TestDaemonStreamsLedgerBatches(t *testing.T) {
}
}

func TestDaemonRefreshesGithubPolicyInstallToken(t *testing.T) {
policyTokens := make(chan string, 8)
server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v1/policy/github/snapshot":
token := r.Header.Get("Authorization")
select {
case policyTokens <- token:
default:
}
if token != "Bearer refreshed-install-token" {
w.WriteHeader(http.StatusUnauthorized)
return
}
_ = json.NewEncoder(w).Encode(map[string]any{
"schemaVersion": "github-policy-snapshot-v1",
"organizationId": "org_123",
"providerKey": "github",
"mode": "observe",
"epoch": 1,
"hash": "hash-1",
"rules": []any{},
"generatedAt": "2026-06-15T00:00:00.000Z",
})
case "/api/v1/authorization-ledger/batches":
w.WriteHeader(http.StatusAccepted)
default:
t.Fatalf("path = %q", r.URL.Path)
}
}))
t.Cleanup(server.Close)

ctx, cancel := context.WithCancel(context.Background())
dir := t.TempDir()
socketDir, err := os.MkdirTemp("/tmp", "kontext-managedobserve-policy-refresh-test-*")
if err != nil {
t.Fatalf("MkdirTemp() error = %v", err)
}
t.Cleanup(func() { _ = os.RemoveAll(socketDir) })
socketPath := filepath.Join(socketDir, "kontext.sock")
dbPath := filepath.Join(dir, "guard.db")
writeTestManagedConfigWithCloudURL(t, filepath.Join(dir, "managed.json"), server.URL)
t.Setenv("KONTEXT_INSTALL_TOKEN", "stale-install-token")
writeTestInstallation(t, filepath.Join(dir, "installation.json"))

errCh := make(chan error, 1)
go func() {
errCh <- RunDaemon(ctx, DaemonOptions{
SocketPath: socketPath,
DBPath: dbPath,
IdleTimeout: time.Hour,
StreamInterval: time.Hour,
StreamHTTPClient: server.Client(),
GithubPolicyInterval: 20 * time.Millisecond,
GithubPolicyClient: server.Client(),
})
}()
stopped := false
stop := func() {
if stopped {
return
}
stopped = true
cancel()
select {
case err := <-errCh:
if err != nil {
t.Fatalf("RunDaemon() error = %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("RunDaemon did not stop")
}
}
t.Cleanup(stop)
waitForSocket(t, socketPath, errCh)
t.Setenv("KONTEXT_INSTALL_TOKEN", "refreshed-install-token")

deadline := time.After(2 * time.Second)
for {
select {
case token := <-policyTokens:
if token == "Bearer refreshed-install-token" {
stop()
return
}
case <-deadline:
t.Fatal("timed out waiting for refreshed policy token")
}
}
}

func TestCleanupIntervalNeverReturnsZero(t *testing.T) {
if got := cleanupInterval(time.Nanosecond); got != time.Nanosecond {
t.Fatalf("cleanupInterval(1ns) = %s, want 1ns", got)
Expand Down
6 changes: 2 additions & 4 deletions internal/managedstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func Flush(ctx context.Context, opts Options) error {
var hostedErr *hostedIngestError
if errors.As(err, &hostedErr) && shouldRetryWithSmallerBatch(hostedErr.StatusCode) {
if limit == 1 {
return err
return advancePastMinimumBatch(statePath, batch, fmt.Sprintf("hosted status %d", hostedErr.StatusCode), err)
}
Comment on lines 195 to 197

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Avoid skipping generic 413s

This branch saves the cursor past a one-record batch for any HTTP 413. A 413 can come from a proxy, load balancer, or misconfigured route rather than from the ledger service proving this specific record can never be accepted. In that case the daemon records the cursor before returning the error, so the next flush starts after this action and the ledger entry is never retried or streamed. Please only advance here when the client has a local payload-limit violation or a structured hosted response that identifies the single record as permanently too large.

nextLimit := reducedBatchLimit(limit)
opts.Diagnostic.Printf(
Expand Down Expand Up @@ -282,9 +282,7 @@ func reducedBatchLimit(limit int) int {
}

func shouldRetryWithSmallerBatch(statusCode int) bool {
return statusCode == http.StatusBadRequest ||
statusCode == http.StatusRequestEntityTooLarge ||
statusCode == http.StatusUnprocessableEntity
return statusCode == http.StatusRequestEntityTooLarge
}

func payloadLimitViolation(payload Payload, bodyBytes int) string {
Expand Down
Loading
Loading