From f0138f759f0d3f660d630a538c4c57de2f1aa4ab Mon Sep 17 00:00:00 2001 From: Stan Girard Date: Tue, 7 May 2024 18:10:49 +0200 Subject: [PATCH 1/4] feat(celery): moved assistant summary to celery --- backend/celery_config.py | 3 + backend/modules/assistant/ito/ito.py | 77 ++++++---- backend/modules/assistant/ito/summary.py | 188 +++++++++++++---------- backend/modules/assistant/ito/tasks.py | 42 +++++ 4 files changed, 197 insertions(+), 113 deletions(-) create mode 100644 backend/modules/assistant/ito/tasks.py diff --git a/backend/celery_config.py b/backend/celery_config.py index 971594de44fc..561be493c788 100644 --- a/backend/celery_config.py +++ b/backend/celery_config.py @@ -8,6 +8,7 @@ celery = Celery(__name__) + if CELERY_BROKER_URL.startswith("sqs"): broker_transport_options = { CELERY_BROKER_QUEUE_NAME: { @@ -36,3 +37,5 @@ ) else: raise ValueError(f"Unsupported broker URL: {CELERY_BROKER_URL}") + +celery.autodiscover_tasks(["modules.assistant.ito"]) diff --git a/backend/modules/assistant/ito/ito.py b/backend/modules/assistant/ito/ito.py index 0a92fbb5f43c..877a647374f9 100644 --- a/backend/modules/assistant/ito/ito.py +++ b/backend/modules/assistant/ito/ito.py @@ -9,13 +9,13 @@ from fastapi import UploadFile from logger import get_logger -from modules.user.service.user_usage import UserUsage from modules.assistant.dto.inputs import InputAssistant from modules.assistant.ito.utils.pdf_generator import PDFGenerator, PDFModel from modules.chat.controller.chat.utils import update_user_usage from modules.contact_support.controller.settings import ContactsSettings from modules.upload.controller.upload_routes import upload_file from modules.user.entity.user_identity import UserIdentity +from modules.user.service.user_usage import UserUsage from packages.emails.send_email import send_email from pydantic import BaseModel from unidecode import unidecode @@ -62,31 +62,36 @@ def increase_usage_user(self): def calculate_pricing(self): return 20 - def generate_pdf(self, filename: str, title: str, content: str): - pdf_model = PDFModel(title=title, content=content) - pdf = PDFGenerator(pdf_model) - pdf.print_pdf() - pdf.output(filename, "F") - @abstractmethod async def process_assistant(self): pass + +async def uploadfile_to_file(uploadFile: UploadFile): + # Transform the UploadFile object to a file object with same name and content + tmp_file = NamedTemporaryFile(delete=False) + tmp_file.write(uploadFile.file.read()) + tmp_file.flush() # Make sure all data is written to disk + return tmp_file + + +class OutputHandler(BaseModel): async def send_output_by_email( self, - file: UploadFile, filename: str, + file: UploadFile, task_name: str, custom_message: str, brain_id: str = None, + user_email: str = None, ): settings = ContactsSettings() - file = await self.uploadfile_to_file(file) + file = await uploadfile_to_file(file) domain_quivr = os.getenv("QUIVR_DOMAIN", "https://chat.quivr.app/") with open(file.name, "rb") as f: mail_from = settings.resend_contact_sales_from - mail_to = self.current_user.email + mail_to = user_email body = f"""
Quivr Logo @@ -116,20 +121,34 @@ async def send_output_by_email( "subject": "Quivr Ingestion Processed", "reply_to": "no-reply@quivr.app", "html": body, - "attachments": [{"filename": filename, "content": list(f.read())}], + "attachments": [ + { + "filename": filename, + "content": list(f.read()), + "type": "application/pdf", + } + ], } logger.info(f"Sending email to {mail_to} with file {filename}") send_email(params) - async def uploadfile_to_file(self, uploadFile: UploadFile): - # Transform the UploadFile object to a file object with same name and content - tmp_file = NamedTemporaryFile(delete=False) - tmp_file.write(uploadFile.file.read()) - tmp_file.flush() # Make sure all data is written to disk - return tmp_file + def generate_pdf(self, filename: str, title: str, content: str): + pdf_model = PDFModel(title=title, content=content) + pdf = PDFGenerator(pdf_model) + pdf.print_pdf() + pdf.output(filename, "F") async def create_and_upload_processed_file( - self, processed_content: str, original_filename: str, file_description: str + self, + processed_content: str, + original_filename: str, + file_description: str, + content: str, + task_name: str, + custom_message: str, + brain_id: str = None, + email_activated: bool = False, + current_user: UserIdentity = None, ) -> dict: """Handles creation and uploading of the processed file.""" # remove any special characters from the filename that aren't http safe @@ -164,29 +183,25 @@ async def create_and_upload_processed_file( headers={"content-type": "application/pdf"}, ) - if self.input.outputs.email.activated: + logger.info(f"current_user: {current_user}") + if email_activated: await self.send_output_by_email( - file_to_upload, new_filename, + file_to_upload, "Summary", f"{file_description} of {original_filename}", - brain_id=( - self.input.outputs.brain.value - if ( - self.input.outputs.brain.activated - and self.input.outputs.brain.value - ) - else None - ), + brain_id=brain_id, + user_email=current_user["email"], ) # Reset to start of file before upload file_to_upload.file.seek(0) - if self.input.outputs.brain.activated: + UserIdentity(**current_user) + if brain_id: await upload_file( uploadFile=file_to_upload, - brain_id=self.input.outputs.brain.value, - current_user=self.current_user, + brain_id=brain_id, + current_user=current_user, chat_id=None, ) diff --git a/backend/modules/assistant/ito/summary.py b/backend/modules/assistant/ito/summary.py index c97740313199..cbd363339742 100644 --- a/backend/modules/assistant/ito/summary.py +++ b/backend/modules/assistant/ito/summary.py @@ -1,6 +1,7 @@ import tempfile from typing import List +from celery_config import celery from fastapi import UploadFile from langchain.chains import ( MapReduceDocumentsChain, @@ -23,9 +24,12 @@ Outputs, ) from modules.assistant.ito.ito import ITO +from modules.notification.dto.inputs import CreateNotification +from modules.notification.service.notification_service import NotificationService from modules.user.entity.user_identity import UserIdentity logger = get_logger(__name__) +notification_service = NotificationService() class SummaryAssistant(ITO): @@ -69,97 +73,117 @@ def check_input(self): return True async def process_assistant(self): - try: - self.increase_usage_user() - except Exception as e: - logger.error(f"Error increasing usage: {e}") - return {"error": str(e)} - - # Create a temporary file with the uploaded file as a temporary file and then pass it to the loader - tmp_file = tempfile.NamedTemporaryFile(delete=False) - - # Write the file to the temporary file - tmp_file.write(self.files[0].file.read()) - - # Now pass the path of the temporary file to the loader - - loader = UnstructuredPDFLoader(tmp_file.name) - - tmp_file.close() - - data = loader.load() - - llm = ChatLiteLLM(model="gpt-3.5-turbo", max_tokens=2000) - - map_template = """The following is a document that has been divided into multiple sections: - {docs} - - Please carefully analyze each section and identify the following: - - 1. Main Themes: What are the overarching ideas or topics in this section? - 2. Key Points: What are the most important facts, arguments, or ideas presented in this section? - 3. Important Information: Are there any crucial details that stand out? This could include data, quotes, specific events, entity, or other relevant information. - 4. People: Who are the key individuals mentioned in this section? What roles do they play? - 5. Reasoning: What logic or arguments are used to support the key points? - 6. Chapters: If the document is divided into chapters, what is the main focus of each chapter? - - Remember to consider the language and context of the document. This will help in understanding the nuances and subtleties of the text.""" - map_prompt = PromptTemplate.from_template(map_template) - map_chain = LLMChain(llm=llm, prompt=map_prompt) + notification_service.add_notification( + CreateNotification( + user_id=self.current_user.id, + status="info", + title=f"Creating Summary for {self.files[0].filename}", + ) + ) + # Create a temporary file with the uploaded file as a temporary file and then pass it to the loader + tmp_file = tempfile.NamedTemporaryFile(delete=False) - # Reduce - reduce_template = """The following is a set of summaries for parts of the document: - {docs} - Take these and distill it into a final, consolidated summary of the document. Make sure to include the main themes, key points, and important information such as data, quotes,people and specific events. - Use markdown such as bold, italics, underlined. For example, **bold**, *italics*, and _underlined_ to highlight key points. - Please provide the final summary with sections using bold headers. - Sections should always be Summary and Key Points, but feel free to add more sections as needed. - Always use bold text for the sections headers. - Keep the same language as the documents. - Answer:""" - reduce_prompt = PromptTemplate.from_template(reduce_template) + # Write the file to the temporary file + tmp_file.write(self.files[0].file.read()) - # Run chain - reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt) + # Now pass the path of the temporary file to the loader - # Takes a list of documents, combines them into a single string, and passes this to an LLMChain - combine_documents_chain = StuffDocumentsChain( - llm_chain=reduce_chain, document_variable_name="docs" - ) + loader = UnstructuredPDFLoader(tmp_file.name) - # Combines and iteratively reduces the mapped documents - reduce_documents_chain = ReduceDocumentsChain( - # This is final chain that is called. - combine_documents_chain=combine_documents_chain, - # If documents exceed context for `StuffDocumentsChain` - collapse_documents_chain=combine_documents_chain, - # The maximum number of tokens to group documents into. - token_max=4000, - ) + tmp_file.close() - # Combining documents by mapping a chain over them, then combining results - map_reduce_chain = MapReduceDocumentsChain( - # Map chain - llm_chain=map_chain, - # Reduce chain - reduce_documents_chain=reduce_documents_chain, - # The variable name in the llm_chain to put the documents in - document_variable_name="docs", - # Return the results of the map steps in the output - return_intermediate_steps=False, - ) + data = loader.load() - text_splitter = CharacterTextSplitter.from_tiktoken_encoder( - chunk_size=1000, chunk_overlap=100 - ) - split_docs = text_splitter.split_documents(data) + text_splitter = CharacterTextSplitter.from_tiktoken_encoder( + chunk_size=1000, chunk_overlap=100 + ) + split_docs = text_splitter.split_documents(data) + logger.info(f"Split {len(split_docs)} documents") + # Jsonify the split docs + split_docs = [doc.to_json() for doc in split_docs] + ## Turn this into a task + brain_id = ( + self.input.outputs.brain.id + if self.input.outputs.brain.activated + else None + ) + email_activated = self.input.outputs.email.activated + celery.send_task( + name="task_summary", + args=( + split_docs, + self.files[0].filename, + brain_id, + email_activated, + self.current_user.model_dump(mode="json"), + ), + ) + except Exception as e: + logger.error(f"Error processing summary: {e}") + + +def map_reduce_chain(): + llm = ChatLiteLLM(model="gpt-3.5-turbo", max_tokens=2000) + + map_template = """The following is a document that has been divided into multiple sections: + {docs} + + Please carefully analyze each section and identify the following: + + 1. Main Themes: What are the overarching ideas or topics in this section? + 2. Key Points: What are the most important facts, arguments, or ideas presented in this section? + 3. Important Information: Are there any crucial details that stand out? This could include data, quotes, specific events, entity, or other relevant information. + 4. People: Who are the key individuals mentioned in this section? What roles do they play? + 5. Reasoning: What logic or arguments are used to support the key points? + 6. Chapters: If the document is divided into chapters, what is the main focus of each chapter? + + Remember to consider the language and context of the document. This will help in understanding the nuances and subtleties of the text.""" + map_prompt = PromptTemplate.from_template(map_template) + map_chain = LLMChain(llm=llm, prompt=map_prompt) + + # Reduce + reduce_template = """The following is a set of summaries for parts of the document : + {docs} + Take these and distill it into a final, consolidated summary of the document. Make sure to include the main themes, key points, and important information such as data, quotes,people and specific events. + Use markdown such as bold, italics, underlined. For example, **bold**, *italics*, and _underlined_ to highlight key points. + Please provide the final summary with sections using bold headers. + Sections should always be Summary and Key Points, but feel free to add more sections as needed. + Always use bold text for the sections headers. + Keep the same language as the documents. + Answer:""" + reduce_prompt = PromptTemplate.from_template(reduce_template) + + # Run chain + reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt) + + # Takes a list of documents, combines them into a single string, and passes this to an LLMChain + combine_documents_chain = StuffDocumentsChain( + llm_chain=reduce_chain, document_variable_name="docs" + ) - content = map_reduce_chain.run(split_docs) + # Combines and iteratively reduces the mapped documents + reduce_documents_chain = ReduceDocumentsChain( + # This is final chain that is called. + combine_documents_chain=combine_documents_chain, + # If documents exceed context for `StuffDocumentsChain` + collapse_documents_chain=combine_documents_chain, + # The maximum number of tokens to group documents into. + token_max=4000, + ) - return await self.create_and_upload_processed_file( - content, self.files[0].filename, "Summary" - ) + # Combining documents by mapping a chain over them, then combining results + map_reduce_chain = MapReduceDocumentsChain( + # Map chain + llm_chain=map_chain, + # Reduce chain + reduce_documents_chain=reduce_documents_chain, + # The variable name in the llm_chain to put the documents in + document_variable_name="docs", + # Return the results of the map steps in the output + return_intermediate_steps=False, + ) + return map_reduce_chain def summary_inputs(): diff --git a/backend/modules/assistant/ito/tasks.py b/backend/modules/assistant/ito/tasks.py new file mode 100644 index 000000000000..08f190828adc --- /dev/null +++ b/backend/modules/assistant/ito/tasks.py @@ -0,0 +1,42 @@ +import asyncio + +from celery_config import celery +from langchain_core.documents import Document +from logger import get_logger + +from .ito import OutputHandler +from .summary import map_reduce_chain + +logger = get_logger(__name__) + + +@celery.task(name="task_summary") +def task_summary(split_docs, filename, brain_id, email_activated, current_user): + loop = asyncio.get_event_loop() + # turn split_docs into a list of Document objects + logger.info("split_docs: %s", split_docs) + split_docs = [ + Document( + page_content=doc["kwargs"]["page_content"], + metadata=doc["kwargs"]["metadata"], + ) + for doc in split_docs + if "kwargs" in doc + and "page_content" in doc["kwargs"] + and "metadata" in doc["kwargs"] + ] + content = map_reduce_chain().run(split_docs) + output_handler = OutputHandler() + return loop.run_until_complete( + output_handler.create_and_upload_processed_file( + content, + filename, + "Summary", + content, + "Summary", + "Summary", + brain_id, + email_activated, + current_user, + ) + ) From 87d7334d8413cf4b6ee3d22a7be75b1398d5ed36 Mon Sep 17 00:00:00 2001 From: Stan Girard Date: Wed, 8 May 2024 15:23:24 +0200 Subject: [PATCH 2/4] Add notification service and update notification status --- backend/modules/assistant/ito/ito.py | 14 ++++++++++++++ backend/modules/assistant/ito/summary.py | 3 ++- backend/modules/assistant/ito/tasks.py | 5 ++++- docker-compose.yml | 2 -- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/backend/modules/assistant/ito/ito.py b/backend/modules/assistant/ito/ito.py index 877a647374f9..4c1bc59d4b3e 100644 --- a/backend/modules/assistant/ito/ito.py +++ b/backend/modules/assistant/ito/ito.py @@ -13,6 +13,9 @@ from modules.assistant.ito.utils.pdf_generator import PDFGenerator, PDFModel from modules.chat.controller.chat.utils import update_user_usage from modules.contact_support.controller.settings import ContactsSettings +from modules.notification.dto.inputs import NotificationUpdatableProperties +from modules.notification.entity.notification import NotificationsStatusEnum +from modules.notification.service.notification_service import NotificationService from modules.upload.controller.upload_routes import upload_file from modules.user.entity.user_identity import UserIdentity from modules.user.service.user_usage import UserUsage @@ -22,6 +25,8 @@ logger = get_logger(__name__) +notification_service = NotificationService() + class ITO(BaseModel): input: InputAssistant @@ -149,6 +154,7 @@ async def create_and_upload_processed_file( brain_id: str = None, email_activated: bool = False, current_user: UserIdentity = None, + notification_id: str = None, ) -> dict: """Handles creation and uploading of the processed file.""" # remove any special characters from the filename that aren't http safe @@ -207,4 +213,12 @@ async def create_and_upload_processed_file( os.remove(new_filename) + notification_service.update_notification_by_id( + notification_id, + NotificationUpdatableProperties( + status=NotificationsStatusEnum.SUCCESS, + title=f"Summary of {original_filename} generated successfully", + ), + ) + return {"message": f"{file_description} generated successfully"} diff --git a/backend/modules/assistant/ito/summary.py b/backend/modules/assistant/ito/summary.py index cbd363339742..4adbc315242b 100644 --- a/backend/modules/assistant/ito/summary.py +++ b/backend/modules/assistant/ito/summary.py @@ -74,7 +74,7 @@ def check_input(self): async def process_assistant(self): try: - notification_service.add_notification( + notification = notification_service.add_notification( CreateNotification( user_id=self.current_user.id, status="info", @@ -117,6 +117,7 @@ async def process_assistant(self): brain_id, email_activated, self.current_user.model_dump(mode="json"), + notification.id, ), ) except Exception as e: diff --git a/backend/modules/assistant/ito/tasks.py b/backend/modules/assistant/ito/tasks.py index 08f190828adc..904d8f6a49d1 100644 --- a/backend/modules/assistant/ito/tasks.py +++ b/backend/modules/assistant/ito/tasks.py @@ -11,7 +11,9 @@ @celery.task(name="task_summary") -def task_summary(split_docs, filename, brain_id, email_activated, current_user): +def task_summary( + split_docs, filename, brain_id, email_activated, current_user, notification_id +): loop = asyncio.get_event_loop() # turn split_docs into a list of Document objects logger.info("split_docs: %s", split_docs) @@ -38,5 +40,6 @@ def task_summary(split_docs, filename, brain_id, email_activated, current_user): brain_id, email_activated, current_user, + notification_id, ) ) diff --git a/docker-compose.yml b/docker-compose.yml index 5f6f1a803dcd..8da1a6bdee80 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -44,8 +44,6 @@ services: - "--workers" - "6" restart: always - volumes: - - ./backend/:/code/ ports: - 5050:5050 From e9c850c03ff7e062b374c3e80ee6a3d7c8711dba Mon Sep 17 00:00:00 2001 From: Stan Girard Date: Wed, 22 May 2024 00:06:02 +0200 Subject: [PATCH 3/4] Update notification title to description in ITO module --- backend/modules/assistant/ito/ito.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/modules/assistant/ito/ito.py b/backend/modules/assistant/ito/ito.py index 34facab9f39f..873605349b32 100644 --- a/backend/modules/assistant/ito/ito.py +++ b/backend/modules/assistant/ito/ito.py @@ -217,7 +217,7 @@ async def create_and_upload_processed_file( notification_id, NotificationUpdatableProperties( status=NotificationsStatusEnum.SUCCESS, - title=f"Summary of {original_filename} generated successfully", + description=f"Summary of {original_filename} generated successfully", ), ) From 96f52e36a0a984604bd4408d0755b4b83522adb7 Mon Sep 17 00:00:00 2001 From: Stan Girard Date: Wed, 22 May 2024 15:50:41 +0200 Subject: [PATCH 4/4] Fix brain ID retrieval in SummaryAssistant --- backend/modules/assistant/ito/summary.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/modules/assistant/ito/summary.py b/backend/modules/assistant/ito/summary.py index fa47f4045ad7..509b271513c6 100644 --- a/backend/modules/assistant/ito/summary.py +++ b/backend/modules/assistant/ito/summary.py @@ -104,7 +104,7 @@ async def process_assistant(self): split_docs = [doc.to_json() for doc in split_docs] ## Turn this into a task brain_id = ( - self.input.outputs.brain.id + self.input.outputs.brain.value if self.input.outputs.brain.activated else None )