mj_bale_chat/router/bale/bale_handle.py

1053 lines
38 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

################# modularity
### import from external-package
from fastapi import FastAPI, Request, HTTPException
import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re
from dotenv import load_dotenv
from pathlib import Path
from time import sleep
from enum import Enum
import time
from typing import Dict, Optional
from dataclasses import dataclass
### import from internal-file
from router.bale.base_model import *
from router.bale.bale_buttons import *
from router.bale.bale_massages import *
from core.core import *
from core.static import *
from router.bale.config import STATE_CONFIG, STATE, BUSY_TEXT,MAIN_BUTTON, STATE_REGISTERY, build_buttons_form
from typing import Iterable, Any, List
def extract_user_info(update: BaleUpdate) -> dict:
uc_id = encode_uc(update)
user_id, chat_id = decode_uc(uc_id)
if update.message:
u = update.message.from_user
return {
"uc_id": str(uc_id),
"chat_id": int(chat_id),
"user_id": user_id,
"username": u.username,
"first_name": u.first_name,
"last_name": u.last_name or "",
"is_bot": u.is_bot,
"update": update,
}
if update.callback_query:
u = update.callback_query.from_user
return {
"uc_id": str(uc_id),
"chat_id": int(chat_id),
"user_id": user_id,
"username": u.username,
"first_name": u.first_name,
"last_name": "",
"is_bot": u.is_bot,
"update": update,
}
raise ValueError("No user info in update")
class UserManager:
def __init__(self):
self.users: Dict[str, BaleUser] = {}
def get_or_create(self, update: BaleUpdate) -> BaleUser:
user_data = extract_user_info(update)
uc_id = user_data["uc_id"]
if uc_id not in self.users:
self.users[uc_id] = BaleUser(
**user_data,
)
user = self.users[uc_id]
user.update = update
return user
class BaleBot:
"""
input → set_state → render_user_state
"""
def __init__(
self,
user_manager: UserManager,
es_index_name: str,
token: str,
es_helper: ElasticHelper,
formatter: Formatter,
back_end_url: str,
request_manager: RequestManager,
):
self.formatter = formatter
self.request_manager = request_manager
self.es_helper = es_helper
self.token = token
self.es_index_name = es_index_name
self.user_manager = user_manager
self.max_limit = 100
self.back_end_url = back_end_url
self.update_message_url = (
f"https://tapi.bale.ai/bot{self.token}/editMessageText"
)
self.send_message_url = f"https://tapi.bale.ai/bot{self.token}/sendMessage"
# self.user_state = self.make_state()
async def render_user_state(self, user: BaleUser, run_internal=None):
"""Buse سیستم"""
try:
state_detail = user.state_detail
if user.input_query == "":
await self.send_message_helper(
user=user,
text=state_detail.message,
end_buttons=state_detail.end_buttons,
)
return {"ok": True}
# if user.input_query != "" and not run_internal and
if run_internal == "subject_unities":
await self.handle_advanced_check_conflict(user)
elif not run_internal and state_detail.handler:
handler = getattr(self, state_detail.handler)
await handler(user)
except:
await self.handle_chat_in_law(user)
return {"ok": True}
async def update_message_to_bale(
self,
user: BaleUser,
text: str = None,
buttons: List = [],
reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST,
):
payload = {
"chat_id": user.chat_id,
"message_id": user.last_message_id,
"text": text,
"reply_markup": {
# "keyboard": reply_markup,
"resize_keyboard": True,
"one_time_keyboard": False,
},
}
if buttons:
payload["reply_markup"]["inline_keyboard"] = buttons
try:
r = requests.post(self.update_message_url, json=payload)
if not r.ok:
print("Bale API Error:", r.status_code, r.text)
r_ = r.json()
print(f"Send:", r.status_code)
# user.last_message_id = int(r_["result"]["message_id"])
except Exception:
print("ERROR in update_message_to_bale:")
traceback.print_exc()
async def send_message_helper(
self,
user: BaleUser,
update_message: bool = False,
text: str = None,
chunked_text: List[str] = None,
structed_output: List = None,
end_buttons: List = [],
reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST,
):
_i = 0
if text:
_i += 1
if chunked_text:
_i += 1
if structed_output:
_i += 1
if _i != 1:
raise ValueError(
"In send_message_helper Only Send One Of {text, chunked_text, structed_output}"
)
if update_message == True:
if user.last_message_id == 0:
await self.send_message_helper(
user=user,
update_message=False,
text=text,
end_buttons=end_buttons,
chunked_text=chunked_text,
structed_output=structed_output,
reply_markup=reply_markup,
)
else:
if text:
new_text = text
if chunked_text:
new_text = chunked_text[-1]
if structed_output:
new_text = structed_output[-1][0]
await self.update_message_to_bale(
user=user,
text=new_text,
buttons=end_buttons,
reply_markup=reply_markup,
)
else:
if structed_output:
for i, item in enumerate(structed_output, start=1):
if i == len(structed_output) and len(end_buttons) > 0: # is_last
item[1] += end_buttons
await self.send_message_to_bale(
user=user,
text=item[0],
buttons=item[1],
reply_markup=reply_markup,
)
if chunked_text:
_buttons = []
for i, item in enumerate(chunked_text, start=1):
if i == len(chunked_text) and len(end_buttons) > 0: # is_last
_buttons = end_buttons
await self.send_message_to_bale(
user=user,
text=item,
buttons=_buttons,
reply_markup=reply_markup,
)
if text:
await self.send_message_to_bale(
user=user,
text=text,
buttons=end_buttons,
reply_markup=reply_markup,
)
async def send_message_to_bale(
self,
user: BaleUser,
text: str,
buttons: List = [],
reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST,
):
print(f"send_message_to_bale--- {text}")
print(f"send_message_to_bale buttons--- {buttons}")
payload = {
"chat_id": user.chat_id,
"text": text,
"reply_markup": {
# "keyboard": reply_markup,
"resize_keyboard": True,
"one_time_keyboard": False,
},
}
if buttons:
payload["reply_markup"]["inline_keyboard"] = buttons
try:
r = requests.post(self.send_message_url, json=payload)
if not r.ok:
print("Bale API Error:", r.status_code, r.text)
r_ = r.json()
print(f"Send:", r.status_code)
user.last_message_id = int(r_["result"]["message_id"])
except Exception:
print("ERROR in send_message_to_bale:")
traceback.print_exc()
async def render_update(self, update: BaleUpdate):
"""
مهمترین تابع ربات بله
ورودی شما یا به صورت متن ساده هست یا بصورت query
render_message : متن
render_callback : کوئری
"""
# 2⃣ ساخت یا بازیابی user
user = self.user_manager.get_or_create(update)
print(f"render_update user.state_detail.state -> {user.state_detail}")
if user.is_processing_lock == False:
if update.message:
print(f"render_message")
user.input_query = str(user.update.message.text or "").strip()
# async def render_message(self, user: BaleUser):
if user.input_query in ("/start", "main", "بازگشت به خانه"):
await self.handle_main(user)
user.input_query = ""
return {"ok": True}
if user.input_query == "جستجو":
user.state_detail = STATE_CONFIG["search_in_law"]
user.input_query = ""
if user.input_query == "گفتگو":
user.state_detail = STATE_CONFIG["chat_in_law"]
user.input_query = ""
# --- WorkFlow Logic by State
await self.render_user_state(user)
return {"ok": True}
if update.callback_query:
"""
اینجا فقط باید مرحله و state عوض شود و داده ای وارد نمیشود!!!
برای سوال پرسیدن از کاربر و ...
"""
print("render_callback")
user.is_call_back_query = True
user.call_back_query = user.update.callback_query.data
# user.update.callback_query.data
run_internal = None
if user.call_back_query == "main":
user.input_query = ""
await self.handle_main(user)
return {"ok": True}
# Dynamic Change Options
if user.call_back_query == "more_limit":
user.limit += 10
# Dynamic Change Options
elif user.call_back_query == "more_effort":
user.effort = "medium"
elif user.call_back_query.startswith("subject_unities:"):
run_internal = "subject_unities"
# subject_unities:qq:qq_983214
# Dynamic Change State
else:
user.state_detail = STATE_CONFIG[user.call_back_query]
await self.render_user_state(user, run_internal)
return {"ok": True}
else:
# beta-mode
await self.send_message_helper(user=user, text=BUSY_TEXT)
return {"ok": True}
async def talk(self, user: BaleUser):
pass
async def handle_main(self, user: BaleUser):
# ریست کردن پشته‌ها و نتایج
""" """
################################### SAVE2ELASTIC ###################################
user.last_query = ""
user.last_result = ""
user.stack.clear()
user.input_query = ""
user.sub_state = ""
user.limit = 10
user.effort = "low"
user.last_message_id = 0
message = """👋 سلام دوست عزیز! 🤗
به دستیار هوشمند قانون یار خوش آمدید!
فقط کافیه به من بگید چه کمکی از دستم برمیاد!"""
await self.send_message_helper(
user=user,
text=message, #
end_buttons=MAIN_BUTTON,
)
async def handle_search_in_law(self, user: BaleUser):
user.is_processing_lock = True
user.last_query = user.input_query
try:
result = await self.request_manager.get_result(
payload={
"section_content": user.input_query,
"limit": user.limit,
"effort": user.effort,
},
url="/semantic_search/run_semantic_search",
)
# print(f'result rag {result} {type(result["ss_answer"])}')
chunked_text_ = self.formatter.form_search_in_law(
title=user.input_query,
sections=result["answer"],
)
# print(f'chunked_text_ rag {chunked_text_}')
_buttons = [[HOME_BUTTON]]
if user.limit < self.max_limit:
_buttons.insert(0, [MORE_LIMIT_BUTTON])
await self.send_message_helper(
user=user, chunked_text=chunked_text_, end_buttons=_buttons
)
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.input_query = ""
user.is_processing_lock = False
return {"ok": True}
async def handle_rule_making(self, user: BaleUser):
user.is_processing_lock = True
user.effort = "medium"
try:
result = await self.request_manager.get_result(
payload={
"section_content": user.input_query,
"limit": user.limit,
"effort": user.effort,
},
url="/rule_making",
)
print(f"handle_rule_making {result}")
res_ = await self.formatter.form_rule_making(_input=result)
_buttons = [[HOME_BUTTON]]
if user.effort != "medium":
_buttons.append([MORE_EFFORT_BUTTON])
await self.send_message_helper(
user=user, chunked_text=res_, end_buttons=_buttons
)
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.input_query = ""
user.is_processing_lock = False
return {"ok": True}
async def handle_qanon_title_repeat(self, user: BaleUser):
user.is_processing_lock = True
try:
result_ = await title_repeated(qanontitle=user.input_query)
res_ = await self.formatter.form_title_repeated(data=result_)
_buttons = [[HOME_BUTTON]]
await self.send_message_helper(
user=user, chunked_text=res_, end_buttons=_buttons
)
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.is_processing_lock = False
user.input_query = ""
return {"ok": True}
async def handle_talk(self, user: BaleUser):
user.is_processing_lock = True
try:
result = await self.request_manager.get_result(
payload={
"user_input": user.input_query,
},
url="/talk",
)
# answer
# answer_type
await self.send_message_helper(
user=user, chunked_text=result['answer'], end_buttons=MAIN_BUTTON
)
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.is_processing_lock = False
return {"ok": True}
async def handle_conflict_qanon_asasi(self, user: BaleUser):
user.is_processing_lock = True
user.last_message_id = 0
try:
print(f"effort=user.effort {user.effort}")
async for step_data in self.request_manager.stream_result(
payload={
"section_content": user.input_query,
"effort": user.effort,
"limit": user.limit,
"mode_type": "bale",
},
url="/conflict/constitution",
):
step = step_data.get("step")
data = step_data.get("data")
is_first = step_data.get("is_first", False)
is_last = step_data.get("is_last", False)
print(f"is_first {is_first} is_last {is_last}")
# is_last = step_data.get("is_last")
_button = [[HOME_BUTTON]]
print(f"==== handle_constitution ====")
if step == "low":
print(f"low {user.effort}")
chunked_response = await self.formatter.form_constitution(data)
# await self.send_message_helper(user, chunked_text=response)
# if is_last:
if is_first is True:
await self.send_message_helper(
user=user,
update_message=False,
chunked_text=chunked_response,
)
elif is_last is True:
_button.append([MORE_EFFORT_BUTTON])
await self.send_message_helper(
user=user,
update_message=True,
chunked_text=chunked_response,
end_buttons=_button,
)
else:
await self.send_message_helper(
user=user,
update_message=True,
chunked_text=chunked_response,
)
elif step == "rule_making":
print(f"rule_making {user.effort}")
chunked_response = await self.formatter.form_rule_making(
_input=data
)
await self.send_message_helper(
user=user,
update_message=False,
chunked_text=chunked_response,
)
elif step == "semantich_search":
print(f"semantich_search {user.effort}")
_header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n"
response = await self.formatter.form_ss_rules(data, header=_header)
await self.send_message_helper(user, chunked_text=response)
elif step == "subject_unities":
print(f"subject_unities {user.effort}")
_header = "نتایج اولیه مغایرت های احتمالی :\n"
print(f"data type {type(data)}")
try:
print(f"data -> RuleRelation")
_data = [RuleRelation.parse_obj(i) for i in data]
except:
print(f"data -> String")
_data = data
chunked_text, _button, mapping_qs = (
await self.formatter.form_subject_unity(_data, header=_header)
)
await self.send_message_helper(
user=user,
chunked_text=chunked_text,
end_buttons=_button,
)
user.subject_unities = mapping_qs
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.is_processing_lock = False
user.input_query = ""
return {"ok": True}
async def handle_conflict_law_writing_policy(self, user: BaleUser):
user.is_processing_lock = True
try:
user.last_query = user.input_query
_result = await self.request_manager.get_result(
payload={
"section_content": user.input_query,
"effort": user.effort,
},
url="/conflict/law_writing_policy",
)
result = await self.formatter.from_law_writing_policy(
_input_dict=_result, header="نتیجه بررسی با سیاست های قانون گذاری"
)
_buttons = [[HOME_BUTTON]]
if user.effort != "medium":
_buttons.insert(0, [MORE_EFFORT_BUTTON])
await self.send_message_helper(
user=user, chunked_text=result, end_buttons=_buttons
)
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
# user.input_query = ""
user.is_processing_lock = False
return {"ok": True}
async def handle_conflict_all_qavanin(self, user: BaleUser):
user.is_processing_lock = True
try:
async for step_data in self.request_manager.stream_result(
payload={
"section_content": user.input_query,
"effort": user.effort,
"limit": user.limit,
"mode_type": "bale",
},
url="/conflict/all_qanon/qs_unity",
):
step = step_data.get("step")
data = step_data.get("data")
is_first = step_data.get("is_first", False)
is_last = step_data.get("is_last", False)
_button = [[HOME_BUTTON]]
print(f"==== handle_conflict_general_policy ====")
if step == "rule_making":
print(f"rule_making {user.effort}")
chunked_response = await self.formatter.form_rule_making(
_input=data
)
await self.send_message_helper(
user=user,
update_message=False,
chunked_text=chunked_response,
)
elif step == "semantich_search":
print(f"semantich_search {user.effort}")
_header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n"
response = await self.formatter.form_ss_rules(data, header=_header)
await self.send_message_helper(user, chunked_text=response)
elif step == "subject_unities":
print(f"subject_unities {user.effort}")
_header = "نتایج اولیه مغایرت های احتمالی :\n"
print(f"data type {type(data)}")
try:
print(f"data -> RuleRelation")
_data = [RuleRelation.parse_obj(i) for i in data]
except:
print(f"data -> String")
_data = data
chunked_text, _button, mapping_qs = (
await self.formatter.form_subject_unity(_data, header=_header)
)
await self.send_message_helper(
user=user,
chunked_text=chunked_text,
end_buttons=_button,
)
user.subject_unities = mapping_qs
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
# user.input_query = ""
user.is_processing_lock = False
return {"ok": True}
async def handle_conflict_general_policy(self, user: BaleUser):
user.is_processing_lock = True
try:
async for step_data in self.request_manager.stream_result(
payload={
"section_content": user.input_query,
"effort": user.effort,
"limit": user.limit,
"mode_type": "bale",
},
url="/conflict/general_policy/qs_unity",
):
step = step_data.get("step")
data = step_data.get("data")
is_first = step_data.get("is_first", False)
is_last = step_data.get("is_last", False)
_button = [[HOME_BUTTON]]
print(f"==== handle_conflict_general_policy ====")
if step == "rule_making":
print(f"rule_making {user.effort}")
chunked_response = await self.formatter.form_rule_making(
_input=data
)
await self.send_message_helper(
user=user,
update_message=False,
chunked_text=chunked_response,
)
elif step == "semantich_search":
print(f"semantich_search {user.effort}")
_header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n"
response = await self.formatter.form_ss_rules(data, header=_header)
await self.send_message_helper(user, chunked_text=response)
elif step == "subject_unities":
print(f"subject_unities {user.effort}")
_header = "نتایج اولیه مغایرت های احتمالی :\n"
print(f"data type {type(data)}")
try:
print(f"data -> RuleRelation")
_data = [RuleRelation.parse_obj(i) for i in data]
except:
print(f"data -> String")
_data = data
chunked_text, _button, mapping_qs = (
await self.formatter.form_subject_unity(_data, header=_header)
)
await self.send_message_helper(
user=user,
chunked_text=chunked_text,
end_buttons=_button,
)
user.subject_unities = mapping_qs
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
# user.input_query = ""
user.is_processing_lock = False
return {"ok": True}
async def handle_advanced_check_conflict(self, user: BaleUser):
user.is_processing_lock = True
print(f"handle_advanced_check_conflict======================================")
try:
print(f'user.call_back_query {user.call_back_query}')
step, _type, qq_title = user.call_back_query.split(":")
if _type == "qq":
print('A')
groups = {}
for qanon_title, item in user.subject_unities.items():
if qanon_title == qq_title:
for i in item:
groups[qanon_title] = i.db_rule.section_id
print('B')
if len(groups) > 1:
_button = []
for i, (k, v) in enumerate(groups.items(), start=1):
_button.append(
[{"text": i, "callback_data": f"subject_unities:qs:{v}"}]
)
await self.send_message_helper(
user=user,
text=f"برای ادامه یک مورد را از این قانون انتخاب کنید{k} انتخاب کنید",
end_buttons=_button,
)
else:
user.call_back_query = f"subject_unities:qs:{groups[qq_title]}"
print('ccccc')
elif _type == "qs":
print('AAA@@@')
content = None
for k, v in user.subject_unities.items():
if v.db_rule.section_id == qq_title:
content = v.model_dump()
print(f"content qs -> {content}")
async for step_data in self.request_manager.stream_result(
payload={
"rule_relation": content,
"effort": user.effort,
"mode_type": "bale",
},
url="/conflict/unity_eval",
):
print(f"qs {user.call_back_query}")
step = step_data.get("step")
data = step_data.get("data")
# is_last = step_data.get("is_last")
_button = [[HOME_BUTTON]]
print(f"==== handle_advanced_check_conflict ====")
if step == "step2":
print(f"*********************Step2 {user.effort}")
_header = "نتایج اولیه مغایرت های احتمالی :\n"
_data = RuleRelation.parse_obj(data)
chunked_text = await self.formatter.form_conflict_detection(
_data, header=_header
)
await self.send_message_helper(user=user, text=chunked_text)
elif step == "step3":
print(f"*********************Step3")
_data = RuleRelation.parse_obj(data)
chunked_text = (
await self.formatter.form_conflict_type_detection(
_data, header=_header
)
)
await self.send_message_helper(user=user, text=chunked_text)
elif step == "step4":
print(f"*********************Step4")
_data = RuleRelation.parse_obj(data)
chunked_text = (
await self.formatter.form_relation_identification(
_data, header=_header
)
)
await self.send_message_helper(user=user, text=chunked_text)
elif step == "step5":
print(f"*********************Step5")
_data = Evaluation.parse_obj(data)
chunked_text = await self.formatter.form_evaluation(
_data, header=_header
)
await self.send_message_helper(user=user, text=chunked_text)
else:
print(f"eerror in uknown step --> {step}")
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
# user.input_query = ""
user.is_processing_lock = False
return {"ok": True}
async def handle_chat_in_law(
self, user: BaleUser
) -> List[BalePayload]: # -> List[List[str, dict]]
user.is_processing_lock = True
user.last_query = user.input_query
try:
# اگر runtime تغییر کرده یا نتیجه قبلی وجود ندارد → درخواست جدید
result = await self.request_manager.get_result(
payload={
"section_content": user.input_query,
"effort": user.effort,
"limit": user.limit,
"mode_type": "bale",
},
url="/semantic_search/run_chat",
)
print(f"===================={result}")
text_result = self.formatter.form_law_chat(
result["answer"] # , llm_answer["source"]
)
_buttons = [[HOME_BUTTON]]
_b = []
if result['answer_type'] == 'legal_question':
if user.limit < self.max_limit:
_b += [MORE_LIMIT_BUTTON]
if user.effort != "medium":
_b += [MORE_EFFORT_BUTTON]
if len(_b) > 0:
_buttons.insert(0, _b)
await self.send_message_helper(
user=user, chunked_text=text_result, end_buttons=_buttons
)
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.input_query = ""
user.is_processing_lock = False
async def handle_stream_chat(self, user: BaleUser):
"""
stream_talk()
async generator → chunk
handle_stream_chat()
send first message
update_message (throttled)
"""
full_text = ""
message_id = None
last_update = 0
async for chunk in stream_talk(user.input_query):
full_text += chunk
now = time.time()
# جلوگیری از اسپم API (مثلاً هر 0.6 ثانیه آپدیت)
if now - last_update < 0.6:
continue
last_update = now
if message_id is None:
# اولین پیام
message_id = await self.send_message_to_bale(
user=user,
text=full_text + "",
)
else:
payload = BalePayload(
chat_id=user.chat_id, message_id=message_id, text=full_text + ""
)
self.update_message_to_bale(user, payload)
# آخر کار، بدون cursor
if message_id:
payload = BalePayload(
chat_id=user.chat_id, message_id=message_id, text=full_text
)
self.update_message_to_bale(user, payload)
async def handle_logical_chat_in_law(self, user: BaleUser):
user.is_processing_lock = True
try:
async for step_data in self.request_manager.stream_result(
payload={
"section_content": user.input_query,
"effort": user.effort,
},
url="/stream/chat_logical",
):
step = step_data.get("step")
data = step_data.get("data")
if step == "rule_making":
_header = f"✅ مرحله استخراج اجزاء حقوقی متن انجام شد.\nتعداد جزء ها: {len(data)}\n اجزاء حقوقی:\n"
response = await self.formatter.form_rule_making(
data, header=_header
)
await self.send_message_helper(user, chunked_text=response)
elif step == "semantic_search":
_header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n"
response = await self.formatter.form_ss_rules(data, header=_header)
await self.send_message_helper(user, chunked_text=response)
elif step == "llm_answer":
_button = [[HOME_BUTTON]]
if user.effort != "medium":
_button.insert(0, [MORE_EFFORT_BUTTON])
_header = f"📝 پاسخ نهایی:\n"
response = await self.formatter.form_llm_answer_chat(
data, header=_header
)
print(f"response {response}")
await self.send_message_helper(
user, chunked_text=response, end_buttons=_button
)
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.input_query = ""
user.is_processing_lock = False
return {"ok": True}
async def handle_beta(self, user: BaleUser):
"""
state:
user.is_processing_lock:
query:
time.time
first_name
last_name
"""
try:
text_1 = "سلام"
text_2 = "سلام، عرض ادب و احترام"
await self.send_message_helper(
user=user,
structed_output=[
["این تست است ", [[{"text": "تستتت", "callback_data": "not_yet"}]]],
[
"این تست اس2ت ",
[[{"text": "تستت2ت", "callback_data": "not_yet"}]],
],
],
end_buttons=[
[
{"text": "دکمه های نهایی", "callback_data": "not_yet"},
{"text": "دکمه های ن2هایی", "callback_data": "not_yet"},
{"text": "دکمه های ن23هایی", "callback_data": "not_yet"},
],
[{"text": "دکمه های آخر", "callback_data": "not_yet"}],
],
)
except Exception as e:
print("ERROR in handl_beta:", str(traceback.print_exc()))
async def save_to_es(self, data: QaChat):
# print("save_to_es data rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr")
try:
es_res = self.es_helper.update_index_doc(
is_update_state=False,
index_name_o=self.es_index_name,
eid=data.id,
data=data.model_dump(),
)
# type_name, payload, request
print(f"Saved {es_res}")
except Exception as e:
print("save_to_es ", str(traceback.print_exc()))