Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 209 additions & 1 deletion docs/swarms/structs/sequential_workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ The SequentialWorkflow now includes a powerful **sequential awareness** feature

## Methods

### `__init__(self, agents: List[Agent] = None, max_loops: int = 1, team_awareness: bool = False, time_enabled: bool = False, message_id_on: bool = False, *args, **kwargs)`
### `__init__(self, agents: List[Agent] = None, max_loops: int = 1, team_awareness: bool = False, streaming_callback: Optional[Callable[[str], None]] = None, time_enabled: bool = False, message_id_on: bool = False, *args, **kwargs)`

The constructor initializes the `SequentialWorkflow` object with enhanced sequential awareness capabilities.

Expand All @@ -66,6 +66,7 @@ The constructor initializes the `SequentialWorkflow` object with enhanced sequen
- `team_awareness` (`bool`, optional): **NEW**: Enables sequential awareness features. Defaults to `False`.
- `time_enabled` (`bool`, optional): **NEW**: Enables timestamps in conversation. Defaults to `False`.
- `message_id_on` (`bool`, optional): **NEW**: Enables message IDs in conversation. Defaults to `False`.
- `streaming_callback` (`Callable[[str], None]`, optional): **NEW**: Enables streaming callback in conversation. Defaults to `None`.
- `*args`: Variable length argument list.
- `**kwargs`: Arbitrary keyword arguments.

Expand Down Expand Up @@ -281,3 +282,210 @@ The `run` method now includes enhanced logging to track the sequential awareness
5. **Professional Workflows**: Mimics real-world team collaboration patterns

The SequentialWorkflow with sequential awareness represents a significant advancement in multi-agent coordination, enabling more sophisticated and professional workflows that closely mirror human team collaboration patterns.

# SequentialWorkflow Streaming Callback Documentation

## **NEW: Streaming Callback Feature**

The SequentialWorkflow now includes a powerful **streaming callback** feature that allows you to receive and process tokens in real-time as the workflow executes. This enables real-time streaming of agent responses, making it ideal for interactive applications and live monitoring of workflow progress.

### What the Streaming Callback Does

- **Real-time Token Streaming**: Receive individual tokens as they are generated by agents
- **Live Progress Monitoring**: Track workflow execution progress in real-time
- **Interactive Applications**: Enable streaming responses in chat applications or live demos
- **Custom Processing**: Implement custom logic for handling streaming tokens (buffering, filtering, etc.)

## `streaming_callback(self, streaming_callback: Optional[Callable[[str], None]] = None)`

Integrates streaming callback functionality into the SequentialWorkflow for real-time token processing.

### Parameters:
- `streaming_callback` (`Optional[Callable[[str], None]]`): A callback function that receives streaming tokens in real-time. The function should accept a single string parameter (the token) and return None. Defaults to `None`.

### Callback Function Signature:
```python
def callback_function(token: str) -> None:
pass
```

## **Usage Example with Streaming Callback:**

```python
from swarms import Agent, SequentialWorkflow

def streaming_callback(token: str) -> None:
"""
Custom streaming callback function that buffers tokens and prints them
when a threshold is reached or a newline is encountered.
"""
buffer.append(token)
if len(buffer) >= 20 or token.endswith("\n"):
print("".join(buffer), end="", flush=True)
buffer.clear()

# Initialize agents for research and analysis workflow
research_agent = Agent(
agent_name="Research Agent",
system_prompt="Conduct thorough research on the given topic and provide comprehensive findings.",
model_name="gpt-4o",
max_loops=1,
)

analysis_agent = Agent(
agent_name="Analysis Agent",
system_prompt="Analyze the research findings and provide actionable insights and conclusions.",
model_name="gpt-4o-mini",
max_loops=1,
)

# Create workflow with streaming callback
workflow = SequentialWorkflow(
agents=[research_agent, analysis_agent],
max_loops=1,
team_awareness=True,
streaming_callback=streaming_callback, # Enable real-time streaming
time_enabled=True,
message_id_on=True
)

# Execute workflow with live streaming
buffer = [] # Initialize buffer for the callback
result = workflow.run(
"Research the latest advancements in quantum computing and analyze their potential impact on cryptography"
)

print(f"\n\nFinal Result: {result}")
```

### Expected Output:
- Output appears in real time, streaming partial results as they are generated.
- Chunks of text are printed to the terminal as soon as they are available.
- Each agent's output is shown in sequence (e.g., research findings, then analysis).
- The final result is printed at the end after all agents have finished.
- There may be brief pauses between streamed outputs as each agent completes their step.


## **Advanced Streaming Examples**

### **Example 1: File Logging with Streaming**
```python
def file_logging_callback(token: str) -> None:
"""Stream tokens to a log file in real-time."""
with open("workflow_stream.log", "a", encoding="utf-8") as f:
f.write(token)

workflow = SequentialWorkflow(
agents=[research_agent, analysis_agent],
streaming_callback=file_logging_callback
)
```

### **Example 2: Progress Bar Integration**
```python
import sys

def progress_callback(token: str) -> None:
"""Update a progress bar as tokens stream in."""
sys.stdout.write(token)
sys.stdout.flush()

# Use in your workflow
workflow = SequentialWorkflow(
agents=[agent1, agent2, agent3],
streaming_callback=progress_callback
)
```

### **Example 3: Token Filtering and Processing**
```python
def smart_callback(token: str) -> None:
"""Filter and process tokens based on custom logic."""
# Skip whitespace-only tokens
if token.strip():
# Highlight key terms
if any(keyword in token.lower() for keyword in ["error", "warning", "success"]):
print(f"\033[93m{token}\033[0m", end="", flush=True) # Yellow highlighting
else:
print(token, end="", flush=True)

workflow = SequentialWorkflow(
agents=[agent1, agent2],
streaming_callback=smart_callback
)
```

## **How Streaming Callback Works**

### 1. **Real-Time Token Reception**
As each agent in the workflow generates responses, tokens are immediately passed to your callback function:

```python
# Tokens flow like this:
# Agent1: "Research" -> callback("Research")
# Agent1: " shows" -> callback(" shows")
# Agent1: " that" -> callback(" that")
# Agent2: "Analysis" -> callback("Analysis")
# ...and so on
```

### 2. **Non-Blocking Execution**
The streaming callback operates asynchronously and doesn't block the workflow execution. Your callback function receives tokens as soon as they're available.

### 3. **Memory Efficient**
Tokens are processed individually, making it memory-efficient for long-running workflows.

## **Benefits of Streaming Callback**

1. **Real-Time User Experience**: Users see responses as they're generated, improving perceived performance
2. **Live Monitoring**: Track workflow progress and agent outputs in real-time
3. **Interactive Applications**: Perfect for chat interfaces, dashboards, and live demos
4. **Debugging**: Monitor agent outputs token-by-token for debugging purposes
5. **Custom Integration**: Easily integrate with logging systems, progress bars, or custom UI components

The streaming callback feature transforms the SequentialWorkflow into a powerful tool for real-time AI applications, enabling seamless integration with modern streaming interfaces and live monitoring systems.

## **Notes:**

- **Backward Compatibility**: Existing workflows continue to work without changes when `streaming_callback=None`
- **Performance**: Streaming adds minimal overhead while providing significant real-time benefits
- **Error Handling**: Implement proper error handling in your callback function to prevent workflow interruption
- **Thread Safety**: Ensure your callback function is thread-safe if used in concurrent workflows

## **Integration Examples**

### **WebSocket Streaming**
```python
import asyncio
import websockets

async def websocket_callback(token: str) -> None:
"""Send tokens via WebSocket for real-time web updates."""
if websocket_connection:
await websocket_connection.send(token)

# In your async workflow
workflow = SequentialWorkflow(
agents=[agent1, agent2],
streaming_callback=lambda t: asyncio.create_task(websocket_callback(t))
)
```

### **Database Streaming**
```python
def database_callback(token: str) -> None:
"""Stream tokens to database for real-time analytics."""
# Buffer tokens and batch insert to database
token_buffer.append(token)
if len(token_buffer) >= 100:
# Batch insert to database
db.insert_tokens(token_buffer.copy())
token_buffer.clear()

workflow = SequentialWorkflow(
agents=[agent1, agent2, agent3],
streaming_callback=database_callback
)
```

Using a streaming callback in SequentialWorkflow enables real-time visibility into agent outputs, making it ideal for interactive applications and live monitoring. This feature enhances user experience and debugging by allowing immediate feedback and seamless integration with modern interfaces.
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import time
from typing import Callable
from swarms.structs.agent import Agent
from swarms.structs.sequential_workflow import SequentialWorkflow

def create_streaming_callback() -> Callable[[str, str, bool], None]:
"""
Create a streaming callback that shows live paragraph formation.
"""
agent_buffers = {}
paragraph_count = {}

def streaming_callback(agent_name: str, chunk: str, is_final: bool):
timestamp = time.strftime("%H:%M:%S")

# Initialize buffers for new agents
if agent_name not in agent_buffers:
agent_buffers[agent_name] = ""
paragraph_count[agent_name] = 1
print(f"\n🎬 [{timestamp}] {agent_name} starting...")
print("=" * 60)

if chunk.strip():
# Split chunk into tokens (words/punctuation)
tokens = chunk.replace("\n", " \n ").split()

for token in tokens:
# Handle paragraph breaks
if token == "\n":
if agent_buffers[agent_name].strip():
print(
f"\n📄 [{timestamp}] {agent_name} - Paragraph {paragraph_count[agent_name]} Complete:"
)
print(f"{agent_buffers[agent_name].strip()}")
print("=" * 60)
paragraph_count[agent_name] += 1
agent_buffers[agent_name] = ""
else:
# Add token to buffer and show live accumulation
agent_buffers[agent_name] += token + " "
print(
f"\r[{timestamp}] {agent_name} | {agent_buffers[agent_name].strip()}",
end="",
flush=True,
)

if is_final:
print() # New line after live updates
# Print any remaining content as final paragraph
if agent_buffers[agent_name].strip():
print(
f"\n✅ [{timestamp}] {agent_name} COMPLETED - Final Paragraph:"
)
print(f"{agent_buffers[agent_name].strip()}")
print()
print(f"🎯 [{timestamp}] {agent_name} finished processing")
print(f"📊 Total paragraphs processed: {paragraph_count[agent_name] - 1}")
print("=" * 60)

return streaming_callback

def create_agents():
"""Create specialized agents for the workflow."""
return [
Agent(
agent_name="Research_Agent",
agent_description="Specialized in gathering and analyzing information",
system_prompt="You are a research specialist. Provide detailed, accurate information on any topic.",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
),
Agent(
agent_name="Analysis_Agent",
agent_description="Expert at analyzing data and drawing insights",
system_prompt="You are an analysis expert. Break down complex information and provide clear insights.",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
),
]

if __name__ == "__main__":
print("🎯 SEQUENTIAL WORKFLOW STREAMING DEMO")
print("=" * 50)

# Create agents and workflow
agents = create_agents()
workflow = SequentialWorkflow(
id="research_analysis_workflow",
name="Research Analysis Workflow",
description="A sequential workflow that researches and analyzes topics",
agents=agents,
max_loops=1,
output_type="str",
multi_agent_collab_prompt=True,
)

# Define task
task = "What are the latest advancements in AI?"

print(f"Task: {task.strip()}")

# Create streaming callback
streaming_callback = create_streaming_callback()

print("\n🎬 EXECUTING WITH STREAMING CALLBACKS...")
print("Watch real-time agent outputs below:\n")

# Execute with streaming
result = workflow.run(
task=task,
streaming_callback=streaming_callback,
)

print("\n🎉 EXECUTION COMPLETED!")
print("\n📊 FINAL RESULT:")
print("-" * 50)
print(result)
Loading
Loading