Skip to content

Commit

Permalink
Merge pull request #48 from intelligentnode/39-revamp-the-flow
Browse files Browse the repository at this point in the history
39 revamp the flow
  • Loading branch information
intelligentnode authored Feb 8, 2024
2 parents 559e8ce + 007e43a commit 6379ba0
Show file tree
Hide file tree
Showing 21 changed files with 307 additions and 52 deletions.
8 changes: 6 additions & 2 deletions PIPREADME.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ You can create a flow of tasks executed by different AI models. Here's an exampl

```python
from intelli.flow.agents.agent import Agent
from intelli.flow.task import Task
from intelli.flow.tasks.task import Task
from intelli.flow.sequence_flow import SequenceFlow
from intelli.flow.input.task_input import TextTaskInput
from intelli.flow.processors.basic_processor import TextProcessor
Expand All @@ -49,6 +49,8 @@ flow = SequenceFlow([task1, task2, task3], log=True)
final_result = flow.start()
```

To build async flows with multiple paths, refer to the [flow tutorial](https://github.com/intelligentnode/Intelli/wiki/Flows).

## Create Chatbot
Switch between multiple chatbot providers without changing your code.

Expand All @@ -65,6 +67,8 @@ def call_chatbot(provider, model=None):
openai_bot = Chatbot(YOUR_OPENAI_API_KEY, "openai")
response = openai_bot.chat(input)

return response

# call openai
call_chatbot("openai", "gpt-4")

Expand All @@ -76,7 +80,7 @@ call_chatbot("gemini")
```


## Chat With Docs
## Connect Your Docs With Chatbot
IntelliPy allows you to chat with your docs using multiple LLMs. To connect your data, visit the [IntelliNode App](https://app.intellinode.ai/), start a project using the Document option, upload your documents or images, and copy the generated One Key. This key will be used to connect the chatbot to your uploaded data.

```python
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ You can create a flow of tasks executed by different AI models. Here's an exampl

```python
from intelli.flow.agents.agent import Agent
from intelli.flow.task import Task
from intelli.flow.tasks.task import Task
from intelli.flow.sequence_flow import SequenceFlow
from intelli.flow.input.task_input import TextTaskInput
from intelli.flow.processors.basic_processor import TextProcessor
Expand All @@ -56,6 +56,8 @@ flow = SequenceFlow([task1, task2, task3], log=True)
final_result = flow.start()
```

To build async flows with multiple paths, refer to the [flow tutorial](https://github.com/intelligentnode/Intelli/wiki/Flows).

## Create Chatbot
Switch between multiple chatbot providers without changing your code.

Expand All @@ -72,6 +74,8 @@ def call_chatbot(provider, model=None):
openai_bot = Chatbot(YOUR_OPENAI_API_KEY, "openai")
response = openai_bot.chat(input)

return response

# call openai
call_chatbot("openai", "gpt-4")

Expand All @@ -83,7 +87,7 @@ call_chatbot("gemini")
```


## Chat With Docs
## Connect Your Docs With Chatbot
IntelliPy allows you to chat with your docs using multiple LLMs. To connect your data, visit the [IntelliNode App](https://app.intellinode.ai/), start a project using the Document option, upload your documents or images, and copy the generated One Key. This key will be used to connect the chatbot to your uploaded data.

```python
Expand Down
5 changes: 5 additions & 0 deletions instructions/run_integration_text.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ python3 -m unittest intelli.test.integration.test_chatbot
# chatbot azure
python3 -m unittest intelli.test.integration.test_azure_chatbot

# chatbot with data
python3 -m unittest intelli.test.integration.test_chatbot_with_data

## flows
# basic flow
python3 -m unittest intelli.test.integration.test_flow_sequence
# map flow
python3 -m unittest intelli.test.integration.test_flow_map
2 changes: 2 additions & 0 deletions instructions/useful_comands.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# print the project tree
tree -I '__pycache__|test|Instructions|assets'
23 changes: 13 additions & 10 deletions intelli/flow/agents/agent.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,45 @@
from abc import ABC, abstractmethod

from intelli.controller.remote_image_model import RemoteImageModel
from intelli.flow.types import AgentTypes
from intelli.function.chatbot import Chatbot
from intelli.model.input.chatbot_input import ChatModelInput
from intelli.controller.remote_image_model import RemoteImageModel
from intelli.model.input.image_input import ImageModelInput
from abc import ABC, abstractmethod
from intelli.flow.types import AgentTypes
from intelli.flow.input.agent_input import AgentInput, TextAgentInput, ImageAgentInput


class BasicAgent(ABC):

@abstractmethod
def execute(self, agent_input):
pass


class Agent(BasicAgent):
def __init__(self, agent_type, provider, mission, model_params, options=None):

if agent_type not in AgentTypes._value2member_map_:
raise ValueError("Incorrect agent type. Accepted types in AgentTypes.")

self.type = agent_type
self.provider = provider
self.mission = mission
self.model_params = model_params
self.options = options


def execute(self, agent_input):
def execute(self, agent_input: AgentInput):

# Check the agent type and call the appropriate function
if self.type == AgentTypes.TEXT.value:
chatbot = Chatbot(self.model_params['key'], self.provider, self.options)
chat_input = ChatModelInput(self.mission, model=self.model_params.get('model'))
chat_input.add_user_message(agent_input)
chat_input.add_user_message(agent_input.desc)
result = chatbot.chat(chat_input)[0]
elif self.type == AgentTypes.IMAGE.value:
image_model = RemoteImageModel(self.model_params['key'], self.provider)
image_input = ImageModelInput(prompt=agent_input, model=self.model_params.get('model'))
image_input = ImageModelInput(prompt=agent_input.desc, model=self.model_params.get('model'))
result = image_model.generate_images(image_input)
else:
raise ValueError(f"Unsupported agent type: {self.type}.")

return result
71 changes: 71 additions & 0 deletions intelli/flow/flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import asyncio
import networkx as nx
from intelli.utils.logging import Logger
from functools import partial


class Flow:
def __init__(self, tasks, map_paths, log=False):
self.tasks = tasks
self.map_paths = map_paths
self.graph = nx.DiGraph()
self.output = {}
self.logger = Logger(log)
self._prepare_graph()

def _prepare_graph(self):
# Initialize the graph with tasks as nodes
for task_name in self.tasks:
self.graph.add_node(task_name)

# Add edges based on map_paths to define dependencies
for parent_task, dependencies in self.map_paths.items():
for child_task in dependencies:
self.graph.add_edge(parent_task, child_task)

# Check for cycles in the graph
if not nx.is_directed_acyclic_graph(self.graph):
raise ValueError("The dependency graph has cycles, please revise map_paths.")

async def _execute_task(self, task_name):
self.logger.log(f'---- execute task {task_name} ---- ')
task = self.tasks[task_name]
predecessor_outputs = []
predecessor_types = set()

# Gather inputs and types from previous tasks based on the graph
for pred in self.graph.predecessors(task_name):
if pred in self.output:
predecessor_outputs.append(self.output[pred]['output'])
predecessor_types.add(self.output[pred]['type'])
else:
print(f"Warning: Output for predecessor task '{pred}' not found. Skipping...")

self.logger.log(f'The number of combined inputs for task {task_name} is {len(predecessor_outputs)}')
merged_input = " ".join(predecessor_outputs)
merged_type = next(iter(predecessor_types)) if len(predecessor_types) == 1 else None

# Execute task with merged input
loop = asyncio.get_event_loop()
execute_task = partial(task.execute, merged_input, input_type=merged_type)

# Run the synchronous function
await loop.run_in_executor(None, execute_task)

# Collect outputs and types
self.output[task_name] = {'output': task.output, 'type': task.output_type}

async def start(self, max_workers=10):
ordered_tasks = list(nx.topological_sort(self.graph))
task_coroutines = {task_name: self._execute_task(task_name) for task_name in ordered_tasks}
async with asyncio.Semaphore(max_workers):
for task_name in ordered_tasks:
await task_coroutines[task_name]

# Filter the outputs (and types) of excluded tasks
filtered_output = {
task_name: { 'output': self.output[task_name]['output'], 'type': self.output[task_name]['type'] }
for task_name in ordered_tasks if not self.tasks[task_name].exclude
}

return filtered_output
15 changes: 15 additions & 0 deletions intelli/flow/input/agent_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class AgentInput:
def __init__(self, desc=None, img=None, audio=None):
self.desc = desc
self.img = img
self.audio = audio


class TextAgentInput(AgentInput):
def __init__(self, desc):
super().__init__(desc=desc)


class ImageAgentInput(AgentInput):
def __init__(self, desc, img):
super().__init__(desc=desc, img=img)
2 changes: 2 additions & 0 deletions intelli/flow/input/task_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ def __init__(self, desc=None, img=None, audio=None):
self.img = img
self.audio = audio


class TextTaskInput(TaskInput):
def __init__(self, desc):
super().__init__(desc=desc)


class ImageTaskInput(TaskInput):
def __init__(self, desc, img):
super().__init__(desc=desc, img=img)
15 changes: 8 additions & 7 deletions intelli/flow/sequence_flow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from intelli.utils.logging import Logger


class SequenceFlow:
def __init__(self, order, log=False):
self.order = order
Expand All @@ -11,19 +12,19 @@ def start(self):

flow_input = None
flow_input_type = None

for index, task in enumerate(self.order, start=1):

# log
self.logger.log(f"- Executing task: {task.desc}")
self.logger.log_head(f"- Executing task: {task.desc}")

task.execute(flow_input, flow_input_type)

if not task.exclude:
result[f'task{index}'] = task.output

# define the input for next step
flow_input = task.output
flow_input_type = task.output_type

return result
Empty file added intelli/flow/tasks/__init__.py
Empty file.
27 changes: 17 additions & 10 deletions intelli/flow/task.py → intelli/flow/tasks/task.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from intelli.flow.template.basic_template import TextInputTemplate
from intelli.flow.types import AgentTypes, InputTypes
from intelli.utils.logging import Logger
from intelli.flow.input.agent_input import AgentInput, TextAgentInput, ImageAgentInput


class Task:
def __init__(self, task_input, agent, exclude=False, pre_process=None,
def __init__(self, task_input, agent, exclude=False, pre_process=None,
post_process=None, template=None, log=False):
self.desc = task_input.desc
self.agent = agent
Expand All @@ -21,28 +23,33 @@ def execute(self, input_data=None, input_type=None):

# logging
if input_type in [InputTypes.TEXT.value, InputTypes.IMAGE.value]:
self.logger.log('- Inside the task with input data head: ', input_data)
elif input_type == InputTypes.IMAGE.value and self.agent.type in [AgentTypes.TEXT.value, AgentTypes.IMAGE.value]:
self.logger.log('- Inside the task. the previous step input not supported')
self.logger.log_head('- Inside the task with input data head: ', input_data)
elif input_type == InputTypes.IMAGE.value and self.agent.type in [AgentTypes.TEXT.value,
AgentTypes.IMAGE.value]:
self.logger.log_head('- Inside the task. the previous step input not supported')

# Run task pre procesing
if self.pre_process:
input_data = self.pre_process(input_data)

# Apply template
if input_data and input_type in [InputTypes.TEXT.value, InputTypes.IMAGE.value]:
agent_input = self.template.apply_input(input_data)
agent_text = self.template.apply_input(input_data)
# log
self.logger.log('- Input data with template: ', agent_input)
self.logger.log_head('- Input data with template: ', agent_text)
else:
agent_input = self.desc
agent_text = self.desc

# Check the agent type and call the appropriate function
result = self.agent.execute(agent_input)
result = self.agent.execute(TextAgentInput(agent_text))

# log
self.logger.log('- The task output head: ', result)
if self.agent.type in [AgentTypes.TEXT.value]:
self.logger.log_head('- The task output head: ', result)
else:
self.logger.log('- The task output count: ', len(result))

if self.post_process:
result = self.post_process(result)

self.output = result
11 changes: 8 additions & 3 deletions intelli/flow/template/basic_template.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from abc import ABC, abstractmethod


class Template(ABC):

@abstractmethod
def apply_input(self, data):
pass
Expand All @@ -10,11 +11,15 @@ def apply_input(self, data):
def apply_output(self, data):
pass


class TextInputTemplate(Template):

def __init__(self, template_text: str):
def __init__(self, template_text: str, previous_input_tag='context', user_request_tag='user request'):
if '{0}' not in template_text:
template_text = template_text + ' {0}'
context = previous_input_tag + ': {0}\n'
request = user_request_tag + ': ' + template_text
template_text = context+request

self.template_text = template_text.strip()

def apply_input(self, data):
Expand Down
2 changes: 2 additions & 0 deletions intelli/flow/types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from enum import Enum


class AgentTypes(Enum):
TEXT = 'text'
IMAGE = 'image'


class InputTypes(Enum):
TEXT = 'text'
IMAGE = 'image'
2 changes: 1 addition & 1 deletion intelli/function/chatbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def _augment_with_semantic_search(self, chat_input):
).strip()

# Load the static prompt template for an augmented chatbot response.
augmented_message_template = self.system_helper.load_prompt("augmented_chatbot")
augmented_message_template = self.system_helper.load_static_prompt("augmented_chatbot")
augmented_message = augmented_message_template.replace("${semantic_search}", context_data).replace("${user_query}", last_user_message)

# Replace the content of the last user message with the augmented message in the ChatModelInput.
Expand Down
Empty file added intelli/resource/__init__.py
Empty file.
Loading

0 comments on commit 6379ba0

Please sign in to comment.