Skip to content

Commit e404f31

Browse files
upstream_mergerclaude
andcommitted
feat(storage): add decision points types and SQLite implementation
Add foundation types and storage support for decision points - human-in-the-loop gates that enable structured input during agent workflows. Types: - DecisionPoint: human-in-the-loop decision gate with iteration support - DecisionOption: structured option with ID, short label, and description Storage interface: - CreateDecisionPoint, GetDecisionPoint, UpdateDecisionPoint, ListPendingDecisions - Added to both Storage and Transaction interfaces SQLite implementation: - decision_points table with FK to issues, iteration chain via prior_id - Migration 041 creates table, 046 adds requested_by column - Full CRUD support in both storage.go and transaction.go Dolt/Memory stubs: - Added placeholder implementations returning "not implemented" errors - Full Dolt support planned for separate PR Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent b191f27 commit e404f31

10 files changed

Lines changed: 561 additions & 4 deletions

File tree

internal/storage/dolt/store.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
_ "github.com/go-sql-driver/mysql"
3333

3434
"github.com/steveyegge/beads/internal/storage"
35+
"github.com/steveyegge/beads/internal/types"
3536
)
3637

3738
// DoltStore implements the Storage interface using Dolt
@@ -755,3 +756,27 @@ func cleanupSingleLock(lockPath string) error {
755756
// Non-empty or recent lock file - don't touch it
756757
return nil
757758
}
759+
760+
// CreateDecisionPoint creates a new decision point for an issue.
761+
// TODO(dolt-parity): Implement decision point support for Dolt backend.
762+
func (s *DoltStore) CreateDecisionPoint(ctx context.Context, dp *types.DecisionPoint) error {
763+
return fmt.Errorf("decision points not yet implemented for Dolt backend")
764+
}
765+
766+
// GetDecisionPoint retrieves the decision point for an issue.
767+
// TODO(dolt-parity): Implement decision point support for Dolt backend.
768+
func (s *DoltStore) GetDecisionPoint(ctx context.Context, issueID string) (*types.DecisionPoint, error) {
769+
return nil, fmt.Errorf("decision points not yet implemented for Dolt backend")
770+
}
771+
772+
// UpdateDecisionPoint updates an existing decision point.
773+
// TODO(dolt-parity): Implement decision point support for Dolt backend.
774+
func (s *DoltStore) UpdateDecisionPoint(ctx context.Context, dp *types.DecisionPoint) error {
775+
return fmt.Errorf("decision points not yet implemented for Dolt backend")
776+
}
777+
778+
// ListPendingDecisions returns all decision points that haven't been responded to.
779+
// TODO(dolt-parity): Implement decision point support for Dolt backend.
780+
func (s *DoltStore) ListPendingDecisions(ctx context.Context) ([]*types.DecisionPoint, error) {
781+
return nil, fmt.Errorf("decision points not yet implemented for Dolt backend")
782+
}

internal/storage/dolt/transaction.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,3 +470,27 @@ func scanIssueTx(ctx context.Context, tx *sql.Tx, id string) (*types.Issue, erro
470470

471471
return &issue, nil
472472
}
473+
474+
// CreateDecisionPoint creates a new decision point within the transaction.
475+
// TODO(dolt-parity): Implement decision point support for Dolt backend.
476+
func (t *doltTransaction) CreateDecisionPoint(ctx context.Context, dp *types.DecisionPoint) error {
477+
return fmt.Errorf("decision points not yet implemented for Dolt backend")
478+
}
479+
480+
// GetDecisionPoint retrieves the decision point for an issue within the transaction.
481+
// TODO(dolt-parity): Implement decision point support for Dolt backend.
482+
func (t *doltTransaction) GetDecisionPoint(ctx context.Context, issueID string) (*types.DecisionPoint, error) {
483+
return nil, fmt.Errorf("decision points not yet implemented for Dolt backend")
484+
}
485+
486+
// UpdateDecisionPoint updates an existing decision point within the transaction.
487+
// TODO(dolt-parity): Implement decision point support for Dolt backend.
488+
func (t *doltTransaction) UpdateDecisionPoint(ctx context.Context, dp *types.DecisionPoint) error {
489+
return fmt.Errorf("decision points not yet implemented for Dolt backend")
490+
}
491+
492+
// ListPendingDecisions returns all decision points that haven't been responded to.
493+
// TODO(dolt-parity): Implement decision point support for Dolt backend.
494+
func (t *doltTransaction) ListPendingDecisions(ctx context.Context) ([]*types.DecisionPoint, error) {
495+
return nil, fmt.Errorf("decision points not yet implemented for Dolt backend")
496+
}

internal/storage/memory/memory.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1863,3 +1863,27 @@ func (m *MemoryStorage) MarkIssueDirty(ctx context.Context, issueID string) erro
18631863
m.dirty[issueID] = true
18641864
return nil
18651865
}
1866+
1867+
// CreateDecisionPoint creates a new decision point for an issue.
1868+
// Decision points are not supported in memory storage.
1869+
func (m *MemoryStorage) CreateDecisionPoint(ctx context.Context, dp *types.DecisionPoint) error {
1870+
return fmt.Errorf("decision points not supported in --no-db mode: use SQLite storage")
1871+
}
1872+
1873+
// GetDecisionPoint retrieves the decision point for an issue.
1874+
// Decision points are not supported in memory storage.
1875+
func (m *MemoryStorage) GetDecisionPoint(ctx context.Context, issueID string) (*types.DecisionPoint, error) {
1876+
return nil, fmt.Errorf("decision points not supported in --no-db mode: use SQLite storage")
1877+
}
1878+
1879+
// UpdateDecisionPoint updates an existing decision point.
1880+
// Decision points are not supported in memory storage.
1881+
func (m *MemoryStorage) UpdateDecisionPoint(ctx context.Context, dp *types.DecisionPoint) error {
1882+
return fmt.Errorf("decision points not supported in --no-db mode: use SQLite storage")
1883+
}
1884+
1885+
// ListPendingDecisions returns all decision points that haven't been responded to.
1886+
// Decision points are not supported in memory storage.
1887+
func (m *MemoryStorage) ListPendingDecisions(ctx context.Context) ([]*types.DecisionPoint, error) {
1888+
return nil, fmt.Errorf("decision points not supported in --no-db mode: use SQLite storage")
1889+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package sqlite
2+
3+
// TODO(dolt-parity): These DecisionPoint methods need corresponding implementations
4+
// in the dolt storage backend when it's added. The interface is defined in
5+
// storage/storage.go and must be implemented for all backends.
6+
7+
import (
8+
"context"
9+
"database/sql"
10+
"fmt"
11+
12+
"github.com/steveyegge/beads/internal/types"
13+
)
14+
15+
// CreateDecisionPoint creates a new decision point for an issue.
16+
func (s *SQLiteStorage) CreateDecisionPoint(ctx context.Context, dp *types.DecisionPoint) error {
17+
// Verify issue exists
18+
var exists bool
19+
err := s.db.QueryRowContext(ctx, `SELECT EXISTS(SELECT 1 FROM issues WHERE id = ?)`, dp.IssueID).Scan(&exists)
20+
if err != nil {
21+
return fmt.Errorf("failed to check issue existence: %w", err)
22+
}
23+
if !exists {
24+
return fmt.Errorf("issue %s not found", dp.IssueID)
25+
}
26+
27+
// Convert empty strings to NULL for optional FK fields
28+
var priorID interface{}
29+
if dp.PriorID != "" {
30+
priorID = dp.PriorID
31+
}
32+
33+
// Insert decision point
34+
_, err = s.db.ExecContext(ctx, `
35+
INSERT INTO decision_points (
36+
issue_id, prompt, options, default_option, selected_option,
37+
response_text, responded_at, responded_by, iteration, max_iterations,
38+
prior_id, guidance, requested_by, created_at
39+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
40+
`, dp.IssueID, dp.Prompt, dp.Options, dp.DefaultOption, dp.SelectedOption,
41+
dp.ResponseText, dp.RespondedAt, dp.RespondedBy, dp.Iteration, dp.MaxIterations,
42+
priorID, dp.Guidance, dp.RequestedBy)
43+
if err != nil {
44+
return fmt.Errorf("failed to insert decision point: %w", err)
45+
}
46+
47+
return nil
48+
}
49+
50+
// GetDecisionPoint retrieves the decision point for an issue.
51+
func (s *SQLiteStorage) GetDecisionPoint(ctx context.Context, issueID string) (*types.DecisionPoint, error) {
52+
// Hold read lock during database operations to prevent reconnect() from
53+
// closing the connection mid-query (GH#607 race condition fix)
54+
s.reconnectMu.RLock()
55+
defer s.reconnectMu.RUnlock()
56+
57+
dp := &types.DecisionPoint{}
58+
err := s.db.QueryRowContext(ctx, `
59+
SELECT issue_id, prompt, options,
60+
COALESCE(default_option, ''), COALESCE(selected_option, ''),
61+
COALESCE(response_text, ''), responded_at, COALESCE(responded_by, ''),
62+
iteration, max_iterations,
63+
COALESCE(prior_id, ''), COALESCE(guidance, ''), COALESCE(requested_by, ''), created_at
64+
FROM decision_points
65+
WHERE issue_id = ?
66+
`, issueID).Scan(
67+
&dp.IssueID, &dp.Prompt, &dp.Options,
68+
&dp.DefaultOption, &dp.SelectedOption,
69+
&dp.ResponseText, &dp.RespondedAt, &dp.RespondedBy,
70+
&dp.Iteration, &dp.MaxIterations,
71+
&dp.PriorID, &dp.Guidance, &dp.RequestedBy, &dp.CreatedAt,
72+
)
73+
if err == sql.ErrNoRows {
74+
return nil, nil
75+
}
76+
if err != nil {
77+
return nil, fmt.Errorf("failed to query decision point: %w", err)
78+
}
79+
80+
return dp, nil
81+
}
82+
83+
// UpdateDecisionPoint updates an existing decision point.
84+
func (s *SQLiteStorage) UpdateDecisionPoint(ctx context.Context, dp *types.DecisionPoint) error {
85+
// Convert empty strings to NULL for optional FK fields
86+
var priorID interface{}
87+
if dp.PriorID != "" {
88+
priorID = dp.PriorID
89+
}
90+
91+
result, err := s.db.ExecContext(ctx, `
92+
UPDATE decision_points SET
93+
prompt = ?,
94+
options = ?,
95+
default_option = ?,
96+
selected_option = ?,
97+
response_text = ?,
98+
responded_at = ?,
99+
responded_by = ?,
100+
iteration = ?,
101+
max_iterations = ?,
102+
prior_id = ?,
103+
guidance = ?
104+
WHERE issue_id = ?
105+
`, dp.Prompt, dp.Options, dp.DefaultOption, dp.SelectedOption,
106+
dp.ResponseText, dp.RespondedAt, dp.RespondedBy,
107+
dp.Iteration, dp.MaxIterations, priorID, dp.Guidance, dp.IssueID)
108+
if err != nil {
109+
return fmt.Errorf("failed to update decision point: %w", err)
110+
}
111+
112+
rowsAffected, err := result.RowsAffected()
113+
if err != nil {
114+
return fmt.Errorf("failed to get rows affected: %w", err)
115+
}
116+
if rowsAffected == 0 {
117+
return fmt.Errorf("decision point not found for issue %s", dp.IssueID)
118+
}
119+
120+
return nil
121+
}
122+
123+
// ListPendingDecisions returns all decision points that haven't been responded to.
124+
func (s *SQLiteStorage) ListPendingDecisions(ctx context.Context) ([]*types.DecisionPoint, error) {
125+
// Hold read lock during database operations to prevent reconnect() from
126+
// closing the connection mid-query (GH#607 race condition fix)
127+
s.reconnectMu.RLock()
128+
defer s.reconnectMu.RUnlock()
129+
130+
rows, err := s.db.QueryContext(ctx, `
131+
SELECT issue_id, prompt, options,
132+
COALESCE(default_option, ''), COALESCE(selected_option, ''),
133+
COALESCE(response_text, ''), responded_at, COALESCE(responded_by, ''),
134+
iteration, max_iterations,
135+
COALESCE(prior_id, ''), COALESCE(guidance, ''), COALESCE(requested_by, ''), created_at
136+
FROM decision_points
137+
WHERE responded_at IS NULL
138+
ORDER BY created_at ASC
139+
`)
140+
if err != nil {
141+
return nil, fmt.Errorf("failed to query pending decisions: %w", err)
142+
}
143+
defer func() { _ = rows.Close() }()
144+
145+
var results []*types.DecisionPoint
146+
for rows.Next() {
147+
dp := &types.DecisionPoint{}
148+
err := rows.Scan(
149+
&dp.IssueID, &dp.Prompt, &dp.Options,
150+
&dp.DefaultOption, &dp.SelectedOption,
151+
&dp.ResponseText, &dp.RespondedAt, &dp.RespondedBy,
152+
&dp.Iteration, &dp.MaxIterations,
153+
&dp.PriorID, &dp.Guidance, &dp.RequestedBy, &dp.CreatedAt,
154+
)
155+
if err != nil {
156+
return nil, fmt.Errorf("failed to scan decision point: %w", err)
157+
}
158+
results = append(results, dp)
159+
}
160+
161+
if err := rows.Err(); err != nil {
162+
return nil, fmt.Errorf("error iterating decision points: %w", err)
163+
}
164+
165+
return results, nil
166+
}

internal/storage/sqlite/migrations.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ var migrationsList = []Migration{
5757
{"work_type_column", migrations.MigrateWorkTypeColumn},
5858
{"source_system_column", migrations.MigrateSourceSystemColumn},
5959
{"quality_score_column", migrations.MigrateQualityScoreColumn},
60+
{"decision_points_table", migrations.MigrateDecisionPointColumns},
61+
{"decision_requested_by_column", migrations.MigrateDecisionRequestedBy},
6062
}
6163

6264
// MigrationInfo contains metadata about a migration for inspection
@@ -120,7 +122,9 @@ func getMigrationDescription(name string) string {
120122
"crystallizes_column": "Adds crystallizes column for work economics (compounds vs evaporates) per Decision 006",
121123
"work_type_column": "Adds work_type column for work assignment model (mutex vs open_competition per Decision 006)",
122124
"source_system_column": "Adds source_system column for federation adapter tracking",
123-
"quality_score_column": "Adds quality_score column for aggregate quality (0.0-1.0) set by Refineries",
125+
"quality_score_column": "Adds quality_score column for aggregate quality (0.0-1.0) set by Refineries",
126+
"decision_points_table": "Adds decision_points table for human-in-the-loop choices",
127+
"decision_requested_by_column": "Adds requested_by column to decision_points for agent wake notifications",
124128
}
125129

126130
if desc, ok := descriptions[name]; ok {
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package migrations
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
)
7+
8+
// MigrateDecisionPointColumns creates the decision_points table for human-in-the-loop choices.
9+
// This is a separate table with FK to issues rather than columns on issues, because:
10+
// 1. Decision points have their own lifecycle (can be iterated multiple times)
11+
// 2. Many issues won't have decision points, avoiding sparse columns
12+
// 3. prior_id creates a chain of iterations requiring self-referential structure
13+
func MigrateDecisionPointColumns(db *sql.DB) error {
14+
var tableExists bool
15+
err := db.QueryRow(`
16+
SELECT COUNT(*) > 0
17+
FROM sqlite_master
18+
WHERE type='table' AND name='decision_points'
19+
`).Scan(&tableExists)
20+
if err != nil {
21+
return fmt.Errorf("failed to check decision_points table: %w", err)
22+
}
23+
24+
if tableExists {
25+
return nil
26+
}
27+
28+
_, err = db.Exec(`
29+
CREATE TABLE decision_points (
30+
issue_id TEXT PRIMARY KEY,
31+
prompt TEXT NOT NULL,
32+
options TEXT NOT NULL,
33+
default_option TEXT,
34+
selected_option TEXT,
35+
response_text TEXT,
36+
responded_at DATETIME,
37+
responded_by TEXT,
38+
iteration INTEGER DEFAULT 1,
39+
max_iterations INTEGER DEFAULT 3,
40+
prior_id TEXT,
41+
guidance TEXT,
42+
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
43+
FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE,
44+
FOREIGN KEY (prior_id) REFERENCES issues(id) ON DELETE SET NULL
45+
);
46+
CREATE INDEX idx_decision_points_prior ON decision_points(prior_id);
47+
`)
48+
if err != nil {
49+
return fmt.Errorf("failed to create decision_points table: %w", err)
50+
}
51+
52+
return nil
53+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package migrations
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
)
7+
8+
// MigrateDecisionRequestedBy adds requested_by column to decision_points table.
9+
// This column stores the agent/session that created the decision, enabling
10+
// wake notifications when the decision is resolved.
11+
func MigrateDecisionRequestedBy(db *sql.DB) error {
12+
// Check if column already exists
13+
var colCount int
14+
err := db.QueryRow(`
15+
SELECT COUNT(*)
16+
FROM pragma_table_info('decision_points')
17+
WHERE name = 'requested_by'
18+
`).Scan(&colCount)
19+
if err != nil {
20+
return fmt.Errorf("failed to check for requested_by column: %w", err)
21+
}
22+
23+
if colCount > 0 {
24+
return nil // Column already exists
25+
}
26+
27+
_, err = db.Exec(`ALTER TABLE decision_points ADD COLUMN requested_by TEXT`)
28+
if err != nil {
29+
return fmt.Errorf("failed to add requested_by column: %w", err)
30+
}
31+
32+
return nil
33+
}

0 commit comments

Comments
 (0)