-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathinteractive_pipeline.yaml
More file actions
320 lines (292 loc) · 11.4 KB
/
interactive_pipeline.yaml
File metadata and controls
320 lines (292 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# Interactive Pipeline with User Input and Approval
# Demonstrates human-in-the-loop workflows with real data processing
id: interactive_pipeline
name: Interactive Sales Data Processing Pipeline
description: Process sales data with user-selected transformations, approval gates, and feedback
version: "2.0.0"
parameters:
input_file:
type: string
default: "data/sales_data.csv"
output_dir:
type: string
default: "examples/outputs/interactive_pipeline"
steps:
- id: get_processing_options
tool: user-prompt
action: execute
parameters:
prompt: "Select data processing method"
input_type: "choice"
choices: ["aggregate", "filter", "transform", "analyze"]
default: "aggregate"
context: "cli"
- id: get_specific_operation
tool: user-prompt
action: execute
parameters:
prompt: |
{% if get_processing_options.value == 'aggregate' %}
Select aggregation type
{% elif get_processing_options.value == 'filter' %}
Select filter criteria
{% elif get_processing_options.value == 'transform' %}
Select transformation type
{% else %}
Select analysis type
{% endif %}
input_type: "choice"
choices: |
{% if get_processing_options.value == 'aggregate' %}
["by_category", "by_region", "by_date", "top_products"]
{% elif get_processing_options.value == 'filter' %}
["high_value", "electronics_only", "recent_orders", "top_customers"]
{% elif get_processing_options.value == 'transform' %}
["add_totals", "calculate_margins", "normalize_prices", "pivot_data"]
{% else %}
["summary_stats", "sales_trends", "customer_analysis", "product_performance"]
{% endif %}
default: |
{% if get_processing_options.value == 'aggregate' %}by_category{% elif get_processing_options.value == 'filter' %}high_value{% elif get_processing_options.value == 'transform' %}add_totals{% else %}summary_stats{% endif %}
context: "cli"
dependencies:
- get_processing_options
- id: get_output_format
tool: user-prompt
action: execute
parameters:
prompt: "Select output format"
input_type: "choice"
choices: ["csv", "json", "markdown"]
default: "csv"
context: "cli"
dependencies:
- get_specific_operation
- id: read_data
tool: filesystem
action: read
parameters:
path: "{{ output_dir }}/{{ input_file }}"
dependencies:
- get_output_format
- id: process_data
action: generate_text
parameters:
prompt: |
Process this CSV sales data according to the following instructions:
Data:
{{ read_data.result.content }}
Processing Method: {{ get_processing_options.value }}
Specific Operation: {{ get_specific_operation.value }}
Instructions:
{% if get_processing_options.value == 'aggregate' %}
{% if get_specific_operation.value == 'by_category' %}
Group by category and calculate:
- Total quantity sold
- Total revenue (quantity * unit_price)
- Average unit price
- Number of transactions
{% elif get_specific_operation.value == 'by_region' %}
Group by region and calculate:
- Total sales revenue
- Number of orders
- Average order value
- Most popular product
{% elif get_specific_operation.value == 'by_date' %}
Group by date and show:
- Daily revenue
- Number of transactions
- Best selling product of the day
{% else %}
Find the top 5 products by total revenue with columns:
- Product name
- Total quantity sold
- Total revenue
- Average price
{% endif %}
{% elif get_processing_options.value == 'filter' %}
{% if get_specific_operation.value == 'high_value' %}
Filter to show only orders where (quantity * unit_price) > 500.
Include all original columns plus a 'total_value' column.
{% elif get_specific_operation.value == 'electronics_only' %}
Filter to show only Electronics category items.
Sort by total value (quantity * unit_price) descending.
{% elif get_specific_operation.value == 'recent_orders' %}
Filter to show only orders from January 20, 2024 onwards.
Sort by date descending.
{% else %}
Show top 3 customers by total purchase amount with:
- Customer ID
- Number of orders
- Total amount spent
- Favorite category
{% endif %}
{% elif get_processing_options.value == 'transform' %}
{% if get_specific_operation.value == 'add_totals' %}
Add these new columns to each row:
- total_value = quantity * unit_price
- tax_amount = total_value * 0.08
- final_amount = total_value + tax_amount
{% elif get_specific_operation.value == 'calculate_margins' %}
Add profit margin columns:
- cost_basis = unit_price * 0.7 for Electronics, unit_price * 0.6 for Furniture
- profit = (unit_price - cost_basis) * quantity
- margin_percent = (profit / (unit_price * quantity)) * 100
{% elif get_specific_operation.value == 'normalize_prices' %}
Normalize all unit prices to an index where the highest price = 100.
Add columns: original_price, normalized_price, price_index
{% else %}
Create a pivot table with:
- Dates as rows
- Categories as columns
- Sum of quantities as values
{% endif %}
{% else %}
{% if get_specific_operation.value == 'summary_stats' %}
Provide summary statistics:
- Total revenue
- Total orders
- Average order value
- Best selling product
- Best sales day
- Top customer
{% elif get_specific_operation.value == 'sales_trends' %}
Analyze sales trends:
- Daily average sales
- Growth rate from first to last day
- Best and worst days
- Weekend vs weekday performance
{% elif get_specific_operation.value == 'customer_analysis' %}
Analyze customers:
- Total unique customers
- Repeat customer rate
- Average purchase per customer
- Top 3 spenders
- Most popular category per customer segment
{% else %}
Product performance analysis:
- Best seller by quantity
- Top revenue generator
- Category breakdown (% of total sales)
- Price point analysis
{% endif %}
{% endif %}
Output format: {{ get_output_format.value }}
{% if get_output_format.value == 'csv' %}
Format as CSV with headers. Use comma separators.
{% elif get_output_format.value == 'json' %}
Format as valid JSON array of objects.
{% else %}
Format as a markdown table with a title and summary.
{% endif %}
IMPORTANT: Output ONLY the processed data in the requested format, no explanations or markdown code blocks.
model: <AUTO>
max_tokens: 2000
dependencies:
- read_data
- id: create_summary
action: generate_text
parameters:
prompt: |
Create a concise business summary based on this data processing:
Original data: {{ read_data.result.content.split('\n') | length - 1 }} rows of sales data
Processing type: {{ get_processing_options.value }} - {{ get_specific_operation.value }}
Processed results:
{{ process_data | truncate(500) }}
Write exactly 2-3 sentences summarizing:
1. What processing was done
2. Key findings with specific numbers from the results above
Do NOT ask for more data or say you need more information. Use only the data shown above.
Write in past tense. Be direct and specific.
model: <AUTO>
max_tokens: 200
dependencies:
- process_data
- id: approve_results
tool: approval-gate
action: execute
parameters:
title: "Review Processed Data"
content: |
## Processing Summary
{{ create_summary }}
## Processed Data (first 1000 chars)
```
{{ process_data | truncate(1000) }}
```
Approve to save the results?
format: "text"
allow_modifications: true
require_reason: true
context: "cli"
dependencies:
- create_summary
- id: save_if_approved
tool: filesystem
action: write
parameters:
path: "{{ output_dir }}/data/processed_{{ get_specific_operation.value }}.{{ get_output_format.value }}"
content: "{{ approve_results.modified_content | default(process_data) }}"
dependencies:
- approve_results
condition: "{{ approve_results.approved }}"
- id: collect_feedback
tool: feedback-collection
action: execute
parameters:
title: "Pipeline Experience Feedback"
questions:
- id: "data_quality"
text: "Rate the quality of data processing"
type: "rating"
scale: 5
- id: "ease_of_use"
text: "How easy was the pipeline to use?"
type: "rating"
scale: 5
- id: "processing_useful"
text: "Was the processing useful for your needs?"
type: "boolean"
- id: "would_use_again"
text: "Would you use this pipeline again?"
type: "boolean"
- id: "suggestions"
text: "Any suggestions for improvement?"
type: "text"
required_questions: ["data_quality", "ease_of_use", "would_use_again"]
anonymous: false
save_to_file: "{{ output_dir }}/feedback/pipeline_feedback.json"
context: "cli"
dependencies:
- save_if_approved
- id: generate_summary
tool: filesystem
action: write
parameters:
path: "{{ output_dir }}/summary.md"
content: |
# Pipeline Execution Summary
## Processing Configuration
- **Method**: {{ get_processing_options.value }}
- **Operation**: {{ get_specific_operation.value }}
- **Output Format**: {{ get_output_format.value }}
## Data Summary
- **Input File**: {{ input_file }}
- **Rows Processed**: {{ read_data.result.content.split('\n') | length - 1 }}
## Business Insights
{{ create_summary }}
## Approval Status
- **Status**: {{ 'Approved' if approve_results.approved else 'Rejected' }}
{% if approve_results.approved %}
- **Output File**: data/processed_{{ get_specific_operation.value }}.{{ get_output_format.value }}
{% else %}
- **Rejection Reason**: {{ approve_results.rejection_reason }}
{% endif %}
## User Feedback
- **Data Quality**: {{ collect_feedback.summary.rating_average | round(1) }}/5
- **Would Use Again**: {{ 'Yes' if collect_feedback.summary.boolean_summary.would_use_again else 'No' }}
- **Processing Useful**: {{ 'Yes' if collect_feedback.summary.boolean_summary.processing_useful else 'No' }}
## Timestamp
Generated at: {{ now() }}
dependencies:
- collect_feedback