1+ import asyncio
2+ import json
3+ from typing import List
4+
15from fastapi import APIRouter , HTTPException
26from fastapi .responses import StreamingResponse
37from sqlmodel import select
48
59from apps .chat .curd .chat import list_chats , get_chat_with_records , create_chat , save_question , save_answer , rename_chat , \
6- delete_chat
7- from apps .chat .models .chat_model import CreateChat , ChatRecord , RenameChat , Chat
8- from apps .chat .schemas .chat_base_schema import LLMConfig
9- from apps .chat .schemas .chat_schema import ChatQuestion
10- from apps .chat .schemas .llm import AgentService
11- from apps .datasource .crud .datasource import get_table_obj_by_ds
10+ delete_chat , list_records
11+ from apps .chat .models .chat_model import CreateChat , ChatRecord , RenameChat , Chat , ChatQuestion
12+ from apps .chat .task .llm import LLMService
13+ from apps .datasource .crud .datasource import get_table_schema
1214from apps .datasource .models .datasource import CoreDatasource
1315from apps .system .models .system_model import AiModelDetail
1416from common .core .deps import SessionDep , CurrentUser
15- import json
16- import asyncio
1717
1818router = APIRouter (tags = ["Data Q&A" ], prefix = "/chat" )
1919
@@ -96,14 +96,7 @@ async def stream_sql(session: SessionDep, current_user: CurrentUser, request_que
9696 detail = "No available datasource configuration found"
9797 )
9898
99- record : ChatRecord
100- try :
101- record = save_question (session = session , current_user = current_user , question = request_question )
102- except Exception as e1 :
103- raise HTTPException (
104- status_code = 400 ,
105- detail = str (e1 )
106- )
99+ request_question .engine = ds .type_name
107100
108101 # Get available AI model
109102 aimodel = session .exec (select (AiModelDetail ).where (
@@ -112,93 +105,54 @@ async def stream_sql(session: SessionDep, current_user: CurrentUser, request_que
112105 )).first ()
113106 if not aimodel :
114107 raise HTTPException (
115- status_code = 400 ,
108+ status_code = 500 ,
116109 detail = "No available AI model configuration found"
117110 )
118111
119- # Use Tongyi Qianwen
120- tongyi_config = LLMConfig (
121- model_type = "openai" ,
122- model_name = aimodel .name ,
123- api_key = aimodel .api_key ,
124- api_base_url = aimodel .endpoint ,
125- additional_params = {"temperature" : aimodel .temperature }
126- )
127- # llm_service = LLMService(tongyi_config)
128- llm_service = AgentService (tongyi_config , ds )
129-
130- # Use Custom VLLM model
131- """ vllm_config = LLMConfig(
132- model_type="vllm",
133- model_name="your-model-path",
134- additional_params={
135- "max_new_tokens": 200,
136- "temperature": 0.3
137- }
138- )
139- vllm_service = LLMService(vllm_config) """
140- """ result = llm_service.generate_sql(question)
141- return result """
142-
112+ history_records : List [ChatRecord ] = list_records (session = session , current_user = current_user ,
113+ chart_id = request_question .chat_id )
143114 # get schema
144- schema_str = ""
145- table_objs = get_table_obj_by_ds (session = session , ds = ds )
146- db_name = table_objs [0 ].schema
147- schema_str += f"【DB_ID】 { db_name } \n 【Schema】\n "
148- for obj in table_objs :
149- schema_str += f"# Table: { db_name } .{ obj .table .table_name } "
150- table_comment = ''
151- if obj .table .custom_comment :
152- table_comment = obj .table .custom_comment .strip ()
153- if table_comment == '' :
154- schema_str += '\n [\n '
155- else :
156- schema_str += f", { table_comment } \n [\n "
157-
158- field_list = []
159- for field in obj .fields :
160- field_comment = ''
161- if field .custom_comment :
162- field_comment = field .custom_comment .strip ()
163- if field_comment == '' :
164- field_list .append (f"({ field .field_name } :{ field .field_type } )" )
165- else :
166- field_list .append (f"({ field .field_name } :{ field .field_type } , { field_comment } )" )
167- schema_str += ",\n " .join (field_list )
168- schema_str += '\n ]\n '
169-
170- print (schema_str )
171-
172- async def event_generator ():
173- all_text = ''
174- try :
175- async for chunk in llm_service .async_generate (question , schema_str ):
176- data = json .loads (chunk .replace ('data: ' , '' ))
177-
178- if data ['type' ] in ['final' , 'tool_result' ]:
179- content = data ['content' ]
180- print ('-- ' + content )
181- all_text += content
182- for char in content :
183- yield f"data: { json .dumps ({'type' : 'char' , 'content' : char })} \n \n "
184- await asyncio .sleep (0.05 )
185-
186- if 'html' in data :
187- yield f"data: { json .dumps ({'type' : 'html' , 'content' : data ['html' ]})} \n \n "
188- else :
189- yield chunk
190-
191- except Exception as e :
192- all_text = 'Exception:' + str (e )
193- yield f"data: { json .dumps ({'type' : 'error' , 'content' : str (e )})} \n \n "
194-
195- try :
196- save_answer (session = session , id = record .id , answer = all_text )
197- except Exception as e :
198- raise HTTPException (
199- status_code = 500 ,
200- detail = str (e )
201- )
115+ request_question .db_schema = get_table_schema (session = session , ds = ds )
116+ llm_service = LLMService (request_question , history_records , ds , aimodel )
117+
118+ llm_service .init_record (session = session , current_user = current_user )
119+
120+ def run_task ():
121+ sql_res = llm_service .generate_sql (session = session )
122+ for chunk in sql_res :
123+ yield json .dumps ({'content' : chunk , 'type' : 'sql' }) + '\n \n '
124+ yield json .dumps ({'type' : 'info' , 'msg' : 'sql generated' }) + '\n \n '
125+
126+ # async def event_generator():
127+ # all_text = ''
128+ # try:
129+ # async for chunk in llm_service.async_generate(question, request_question.db_schema):
130+ # data = json.loads(chunk.replace('data: ', ''))
131+ #
132+ # if data['type'] in ['final', 'tool_result']:
133+ # content = data['content']
134+ # print('-- ' + content)
135+ # all_text += content
136+ # for char in content:
137+ # yield f"data: {json.dumps({'type': 'char', 'content': char})}\n\n"
138+ # await asyncio.sleep(0.05)
139+ #
140+ # if 'html' in data:
141+ # yield f"data: {json.dumps({'type': 'html', 'content': data['html']})}\n\n"
142+ # else:
143+ # yield chunk
144+ #
145+ # except Exception as e:
146+ # all_text = 'Exception:' + str(e)
147+ # yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n"
148+ #
149+ # try:
150+ # save_answer(session=session, id=record.id, answer=all_text)
151+ # except Exception as e:
152+ # raise HTTPException(
153+ # status_code=500,
154+ # detail=str(e)
155+ # )
202156
203157 # return EventSourceResponse(event_generator(), headers={"Content-Type": "text/event-stream"})
204- return StreamingResponse (event_generator (), media_type = "text/event-stream" )
158+ return StreamingResponse (run_task (), media_type = "text/event-stream" )
0 commit comments