Skip to content

Commit 6008726

Browse files
maheshsattalamahesh_sattala
andauthored
Analytics pipeline functions (#2355)
* added key_vaults in analytics_pipeline * added key_vaults in analytics_pipeline * added analytic_pipeline functions. * added analytic_pipeline functions. * added analytic_pipeline functions. * added analytic_pipeline functions. * added tests for coverage. * added tests for coverage. * added tests for coverage. * added tests for coverage. * added tests for coverage. * added tests for coverage. --------- Co-authored-by: mahesh_sattala <mahesh.sattala@digite.com>
1 parent babb3d9 commit 6008726

9 files changed

Lines changed: 483 additions & 5 deletions

File tree

kairon/events/definitions/analytic_pipeline_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def execute(self, event_id: str, **kwargs):
6161
"pipeline_name": pipeline_name,
6262
"callback_name": callback_name,
6363
"event_id": event_id,
64-
"slot": {"bot": self.bot},
64+
"slot": {"bot": self.bot, "user": self.user},
6565
"key_vault": key_vault
6666
}
6767

kairon/shared/cognition/data_objects.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,21 @@ def validate(self, clean=True):
146146

147147
def clean(self):
148148
if self.collection_name:
149-
self.collection_name = self.collection_name.strip().lower()
149+
self.collection_name = self.collection_name.strip().lower()
150+
151+
152+
@auditlogger.log
153+
@push_notification.apply
154+
class EmbeddingMetadata(Auditlog):
155+
collection_name = StringField(required=True)
156+
bot = StringField(required=True)
157+
vector_config = DictField()
158+
user = StringField(required=True)
159+
timestamp = DateTimeField(default=datetime.utcnow)
160+
knowledge_vault_name = StringField(required=True)
161+
model_id = StringField(required=True)
162+
163+
meta = {"indexes": [{"fields": ["bot", "collection_name"]}]}
164+
165+
def clean(self):
166+
self.collection_name = self.collection_name.strip().lower()

kairon/shared/concurrency/actors/analytics_runner.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def execute(self, source_code: Text, predefined_objects: Optional[Dict] = None,
4646
raise AppException(f"Validation failed: {e}")
4747

4848
bot = predefined_objects.get("slot", {}).get("bot")
49+
user = predefined_objects.get("slot", {}).get("user")
4950

5051
safe_objects = {
5152
"add_data": partial(PyscriptSharedUtility.add_data, bot=bot),
@@ -57,6 +58,9 @@ def execute(self, source_code: Text, predefined_objects: Optional[Dict] = None,
5758
"mark_as_processed": partial(CallbackScriptUtility.mark_as_processed, bot=bot),
5859
"update_data_analytics": partial(CallbackScriptUtility.update_data_analytics, bot=bot),
5960
"delete_data_analytics": partial(CallbackScriptUtility.delete_data_analytics, bot=bot),
61+
"extract_data": partial(CallbackScriptUtility.extract_data, bot=bot, user=user),
62+
"process_instruction": partial(CallbackScriptUtility.process_instruction, bot=bot, user=user),
63+
"create_vector_collection": partial(CallbackScriptUtility.create_vector_collection, bot=bot),
6064
"srtp_time": PyscriptUtility.srtptime,
6165
"srtf_time": PyscriptUtility.srtftime,
6266
"url_parse": PyscriptUtility.url_parse_quote_plus,

kairon/shared/pyscript/analytics_worker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ def main():
5050
"mark_as_processed": partial(CallbackScriptUtility.mark_as_processed, bot=bot),
5151
"update_data_analytics": partial(CallbackScriptUtility.update_data_analytics, bot=bot),
5252
"delete_data_analytics": partial(CallbackScriptUtility.delete_data_analytics, bot=bot),
53+
"extract_data": CallbackScriptUtility.extract_data,
54+
"process_instruction": CallbackScriptUtility.process_instruction,
55+
"create_vector_collection": CallbackScriptUtility.create_vector_collection,
5356
"srtp_time": PyscriptUtility.srtptime,
5457
"srtf_time": PyscriptUtility.srtftime,
5558
"url_parse": PyscriptUtility.url_parse_quote_plus,

kairon/shared/pyscript/callback_pyscript_utils.py

Lines changed: 155 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,4 +467,158 @@ def update_data_analytics(collection_id: str, user: str, payload: dict, bot: str
467467
return {
468468
"message": "Record updated!",
469469
"data": {"_id": collection_id}
470-
}
470+
}
471+
472+
@staticmethod
473+
def extract_data(input_source: str,
474+
prompt: str = None,
475+
result_type: str="markdown",
476+
llm_type: str = "openrouter",
477+
high_res_ocr: bool = False,
478+
language: str = "en",
479+
bot: str = None,
480+
user: str = None):
481+
482+
import requests
483+
484+
llm_server_url = Utility.environment['llm']['url']
485+
486+
payload = {
487+
"input_source": input_source,
488+
"llama_parser_api_key": Utility.environment['llama_parse']['key'],
489+
"result_type": result_type,
490+
"high_res_ocr": high_res_ocr,
491+
"language": language,
492+
"parsing_instruction": prompt,
493+
"user": user,
494+
"llm_type": llm_type
495+
}
496+
497+
response = requests.post(
498+
f"{llm_server_url}/{bot}/parse/{llm_type}",
499+
json=payload
500+
)
501+
502+
if response.status_code != 200:
503+
raise Exception(response.text)
504+
505+
response = response.json()
506+
507+
if not response.get("success"):
508+
raise Exception(response)
509+
510+
result = response.get("data")
511+
512+
return {
513+
"full_text": result.get("full_text"),
514+
"extracted_data": result.get("extracted_data")
515+
}
516+
517+
518+
@staticmethod
519+
def process_instruction(data_list, prompt, operation_type, model_id, llm_type: str = "openrouter",
520+
bot: str = None, user: str = None):
521+
import requests
522+
from kairon.shared.admin.data_objects import LLMSecret
523+
524+
doc = LLMSecret.objects(llm_type="openrouter").first()
525+
api_key = Utility.decrypt_message(doc.api_key)
526+
527+
if operation_type == "embedding":
528+
529+
llm_server_url = Utility.environment['llm']['url']
530+
payload = {
531+
"text": data_list,
532+
"user": user,
533+
"kwargs": {
534+
"model": model_id,
535+
"api_key": api_key
536+
}
537+
}
538+
539+
response = requests.request(method="POST",
540+
url=f"{llm_server_url}/{bot}/aembedding/{llm_type}",
541+
json=payload)
542+
response.raise_for_status()
543+
response = response.json()
544+
logger.info(response)
545+
546+
return {
547+
"embeddings": response
548+
}
549+
550+
else:
551+
text_input = data_list[0]
552+
final_prompt = prompt.format(document=text_input)
553+
payload = {
554+
"user": user,
555+
"hyperparameters": {"temperature": 0, "model": model_id},
556+
"messages": [{"role": "user", "content": final_prompt}]
557+
}
558+
llm_server_url = Utility.environment['llm']['url']
559+
response = requests.request(method="POST",
560+
url=f"{llm_server_url}/{bot}/completion/{llm_type}",
561+
json=payload)
562+
563+
response.raise_for_status()
564+
response = response.json()
565+
extracted_data = response['formatted_response']
566+
567+
logger.info(response)
568+
logger.info(extracted_data)
569+
570+
return extracted_data
571+
572+
573+
@staticmethod
574+
def create_vector_collection(collection_name, model_id: str, user: str, emb_size: int = 3072,
575+
overwrite: bool = False, metadata: list = None, bot: str = None):
576+
from kairon.shared.cognition.data_objects import CognitionSchema, EmbeddingMetadata, ColumnMetadata
577+
from qdrant_client.models import VectorParams, Distance
578+
from qdrant_client import QdrantClient
579+
580+
db_url = Utility.environment['vector']['db']
581+
knowledge_vault_name = collection_name
582+
collection_name = f"{bot}_{collection_name}_faq_embd"
583+
schema = {
584+
"metadata": metadata,
585+
"collection_name": knowledge_vault_name
586+
}
587+
588+
client = QdrantClient(url=db_url)
589+
590+
collections = client.get_collections().collections
591+
exists = any(c.name == collection_name for c in collections)
592+
embed_config = {
593+
"size": emb_size,
594+
"distance": Distance.COSINE
595+
}
596+
vector_config = VectorParams(**embed_config)
597+
if exists and overwrite:
598+
client.delete_collection(collection_name=collection_name)
599+
exist = CognitionSchema.objects(bot=bot, collection_name=knowledge_vault_name).first()
600+
if exist:
601+
exist.delete()
602+
603+
if not exists or overwrite:
604+
client.create_collection(
605+
collection_name=collection_name,
606+
vectors_config=vector_config
607+
)
608+
metadata_obj = CognitionSchema(bot=bot, user=user)
609+
metadata_obj.metadata = [ColumnMetadata(**meta) for meta in schema.get("metadata") or []]
610+
metadata_obj.collection_name = schema.get("collection_name")
611+
metadata_obj.save()
612+
else:
613+
return {
614+
"message": "collection already exists"
615+
}
616+
617+
exist = EmbeddingMetadata.objects(bot=bot, collection_name=collection_name, model_id=model_id,
618+
knowledge_vault_name=knowledge_vault_name).first()
619+
if not exist:
620+
EmbeddingMetadata(bot=bot, collection_name=collection_name, model_id=model_id,
621+
knowledge_vault_name=knowledge_vault_name, user=user, vector_config=embed_config).save()
622+
return {
623+
"message": "collection created successfully"
624+
}

requirements/dev.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ deepdiff==7.0.1
1212
pytest-cov==5.0.0
1313
pytest-html==4.1.1
1414
pytest-aioresponses==0.3.0
15-
aioresponses==0.7.6
15+
aioresponses==0.7.6

requirements/prod.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,5 @@ nltk
7474
blacksheep==2.0.7
7575
fastembed==0.5.1
7676
markdown-pdf==1.7
77-
genson==1.3.0
77+
genson==1.3.0
78+
qdrant-client==1.13.3

system.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@ llm:
227227
url: ${LLM_SERVER_URL:http://localhost}
228228
request_timeout: ${LLM_REQUEST_TIMEOUT:30}
229229

230+
llama_parse:
231+
key: ${LLAMA_PARSER_API_KEY:"test-key"}
232+
230233
vector:
231234
db: ${VECTOR_DB:http://localhost:6333}
232235
key: ${VECTOR_DB_KEY}

0 commit comments

Comments
 (0)