mj_bale_chat/router/bale/bale_handle.py
2026-01-25 11:29:33 +00:00

1534 lines
57 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

################# modularity
### import from external-package
import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re, random
from typing import Iterable, Any, List, Dict, Optional
from core.base_model import *
### import from internal-file
from router.bale.base_model import *
from router.bale.bale_buttons import *
from router.bale.bale_massages import *
from router.bale.bale_manager import UserManager
from core.operation import Operation
from core.core import *
from core.static import *
from router.bale.config import *
class BaleBotGeneral:
"""
کلاس اصلی ارتباط با Bale و متد های اصلی ارتباط
"""
def __init__(
self,
token: str,
send_message_url: Optional[str] = None,
delete_message_url: Optional[str] = None,
update_message_url: Optional[str] = None,
):
self.token = token
self.send_message_url = (
send_message_url or f"https://tapi.bale.ai/bot{self.token}/sendMessage"
)
self.delete_message_url = (
delete_message_url or f"https://tapi.bale.ai/bot{self.token}/deleteMessage"
)
self.update_message_url = (
update_message_url
or f"https://tapi.bale.ai/bot{self.token}/editMessageText"
)
async def delete_bale_message(self, chat_id, message_id):
payload = {
"chat_id": chat_id,
"message_id": message_id,
}
try:
r = requests.post(self.delete_message_url, json=payload)
print(f"Delete Message Status code:", r.status_code)
except Exception:
print("ERROR in Delete Message Status code:")
traceback.print_exc()
async def update_message_to_bale(
self,
user: BaleUser,
text: str = None,
buttons: List = [],
reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST,
):
payload = {
"chat_id": user.chat_id,
"message_id": user.active_message_id,
"text": text,
"reply_markup": {
# "keyboard": reply_markup,
"resize_keyboard": True,
"one_time_keyboard": False,
},
}
if buttons:
payload["reply_markup"]["inline_keyboard"] = buttons
try:
r = requests.post(self.update_message_url, json=payload)
if not r.ok:
print("ERROR in Update Message Bale API Error:", r.status_code, r.text)
print(f"Update Message Status code:", r.status_code)
except Exception:
print("ERROR in Update Message Status code:")
traceback.print_exc()
async def send_message_helper(
self,
user: BaleUser,
update_message: bool = False,
text: str = None,
chunked_text: List[str] = None,
structed_output: List = None,
end_buttons: List = [],
reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST,
is_save_active_message_id=False,
):
_i = 0
if text:
_i += 1
if chunked_text:
_i += 1
if structed_output:
_i += 1
if _i != 1:
raise ValueError(
"In send_message_helper Only Send One Of {text, chunked_text, structed_output}"
)
if update_message == True:
if user.active_message_id == 0:
await self.send_message_helper(
user=user,
update_message=False,
text=text,
end_buttons=end_buttons,
chunked_text=chunked_text,
structed_output=structed_output,
reply_markup=reply_markup,
is_save_active_message_id=is_save_active_message_id,
)
else:
if text:
new_text = text
if chunked_text:
new_text = chunked_text[-1]
if structed_output:
new_text = structed_output[-1][0]
await self.update_message_to_bale(
user=user,
text=new_text,
buttons=end_buttons,
reply_markup=reply_markup,
)
else:
if structed_output:
for i, item in enumerate(structed_output, start=1):
if i == len(structed_output) and len(end_buttons) > 0: # is_last
item[1] += end_buttons
await self.send_message_to_bale(
user=user,
text=item[0],
buttons=item[1],
reply_markup=reply_markup,
is_save_active_message_id=is_save_active_message_id,
)
if chunked_text:
_buttons = []
for i, item in enumerate(chunked_text, start=1):
if i == len(chunked_text) and len(end_buttons) > 0: # is_last
_buttons = end_buttons
await self.send_message_to_bale(
user=user,
text=item,
buttons=_buttons,
reply_markup=reply_markup,
is_save_active_message_id=is_save_active_message_id,
)
await asyncio.sleep(0.05)
if text:
await self.send_message_to_bale(
user=user,
text=text,
buttons=end_buttons,
reply_markup=reply_markup,
is_save_active_message_id=is_save_active_message_id,
)
async def send_message_to_bale(
self,
user: BaleUser,
text: str,
buttons: List = [],
reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST,
is_save_active_message_id=False,
):
payload = {
"chat_id": user.chat_id,
"text": text,
"reply_markup": {
# "keyboard": reply_markup,
"resize_keyboard": True,
"one_time_keyboard": False,
},
}
if buttons:
payload["reply_markup"]["inline_keyboard"] = buttons
try:
r = requests.post(self.send_message_url, json=payload)
if not r.ok:
print("ERROR in Send Message Bale API Error:", r.status_code, r.text)
r_ = r.json()
print(f"Send Message Status code:", r.status_code)
if is_save_active_message_id:
user.active_message_id = int(r_["result"]["message_id"])
except Exception:
print("ERROR in Send Message Status code:")
traceback.print_exc()
class BaleBot(BaleBotGeneral):
"""
input → set_state → render_user_state
"""
def __init__(
self,
user_manager: UserManager,
operation: Operation,
es_index_name: str,
token: str,
es_helper: ElasticHelper,
formatter: Formatter,
back_end_url: str,
request_manager: RequestManager,
):
super().__init__(
token=token,
)
self.ui_handler = UIState(
delay_time=0.4,
token=token,
)
self.freeze_limit = 3
self.operator = operation
self.formatter = formatter
self.request_manager = request_manager
self.es_helper = es_helper
self.token = token
self.es_index_name = es_index_name
self.user_manager = user_manager
self.max_limit = 100
self.back_end_url = back_end_url
"""
deleteMessage
message_id
chat_id
"""
# self.user_state = self.make_state()
# ------------------------------------------------------------------
# توابعی که در والد BaleBotGeneral هستند
# ------------------------------------------------------------------
async def delete_bale_message(self, **kwargs):
await super().delete_bale_message(**kwargs)
async def update_message_to_bale(self, *args, **kwargs):
await super().update_message_to_bale(*args, **kwargs)
async def send_message_helper(self, *args, **kwargs):
await super().send_message_helper(*args, **kwargs)
async def send_message_to_bale(self, *args, **kwargs):
await super().send_message_to_bale(*args, **kwargs)
# ------------------------------------------------------------------
# توابع همین کلاس
# ------------------------------------------------------------------
async def render_user_state(self, user: BaleUser):
"""Bus سیستم"""
try:
print(f"user.input_query.render_user_state >{user.input_query}<")
print(f"user.state >{user.state_detail.state}<")
if user.input_query == "":
await self.send_message_helper(
user=user,
text=user.state_detail.message,
end_buttons=user.state_detail.end_buttons,
)
return {"ok": True}
# if user.input_query != "" and not run_internal and
# and user.last_input_query == ""
# if user.last_input_query != "" and user.input_query != "" and "subject_unities":
# await self.handle_advanced_check_conflict(user)
if user.state_detail.handler:
user.is_processing_lock = True
handler = getattr(self, user.state_detail.handler)
await handler(user)
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
return {"ok": True}
async def render_update(self, update: BaleUpdate):
"""
مهمترین تابع ربات بله
ورودی شما یا به صورت متن ساده هست یا بصورت query
render_message : متن
render_callback : کوئری
"""
# 2⃣ ساخت یا بازیابی user
user = self.user_manager.get_or_create(update)
print(f"render_update user -> {user.uc_id} user.is_processing_lock {user.is_processing_lock }")
if user.is_processing_lock == False:
if update.message:
user.message_limit = 0
user.input_query = str(user.update.message.text or "").strip()
# async def render_message(self, user: BaleUser):
if user.input_query in ("/start", "main", "بازگشت به خانه"):
user.input_query = ""
await self.handle_main(user)
return {"ok": True}
elif user.input_query == "جستجو":
user.state_detail = STATE_CONFIG["search_in_law"]
user.input_query = ""
elif user.input_query == "گفتگو":
user.state_detail = STATE_CONFIG["chat_in_law"]
user.input_query = ""
# --- WorkFlow Logic by State
await self.render_user_state(user)
return {"ok": True}
if update.callback_query:
"""
اینجا فقط باید مرحله و state عوض شود و داده ای وارد نمیشود!!!
برای سوال پرسیدن از کاربر و ...
"""
user.message_limit = 0
user.is_call_back_query = True
user.call_back_query = user.update.callback_query.data
# user.update.callback_query.data
run_internal = None
if user.call_back_query == "main":
await self.handle_main(user)
return {"ok": True}
# Dynamic Change Options
if user.call_back_query == "more_limit":
user.limit += 10
user.input_query = user.last_input_query
if user.state_detail == STATE_CONFIG["logical_chat_in_law"]:
user.state_detail = STATE_CONFIG["chat_in_law"]
# Dynamic Change Options
elif user.call_back_query == "more_effort":
user.effort = "medium"
user.input_query = user.last_input_query
elif user.call_back_query == "logical_chat_in_law":
# user.effort = "medium"
user.input_query = user.last_input_query
user.state_detail = STATE_CONFIG["logical_chat_in_law"]
elif user.call_back_query.startswith("subject_unities:"):
run_internal = "subject_unities"
# subject_unities:qq:qq_983214
# Dynamic Change State
else:
user.state_detail = STATE_CONFIG[user.call_back_query]
await self.render_user_state(user)
return {"ok": True}
else:
if user.call_back_query == "main":
await self.handle_main(user)
return {"ok": True}
# beta-mode
if user.message_limit < self.freeze_limit:
await self.send_message_helper(
user=user,
text=BUSY_TEXT,
end_buttons=[[{"text":"توقف عملیات در حال اجراء", "callback_data":"main"}]]
)
user.message_limit += 1
return {"ok": True}
async def talk(self, user: BaleUser):
pass
async def handle_main(self, user: BaleUser):
# if user.input_query != "":
# "اگر کاربر پیام داد در حالت عادی به چت منتقل شود"
# user.state_detail = STATE_CONFIG["chat_in_law"]
# await self.render_user_state(user)
# return {"ok": True}
"""
اگر کاربر پیامی نداده بود صفحه اصلی نمایش داده شود و داده ها پاک شود
"""
# ریست کردن پشته‌ها و نتایج
user.last_query = ""
user.last_result = ""
user.stack.clear()
user.input_query = ""
user.sub_state = ""
user.limit = 10
user.effort = "low"
user.active_message_id = 0
user.message_limit = 0
message = "من ربات تست ام"
await self.send_message_helper(
user=user,
text=message, #
end_buttons=MAIN_BUTTON,
)
async def choice_button(self, user: BaleUser, choice_buttons):
# handle_conflict_law_writing_policy
# handle_conflict_qanon_asasi
# handle_conflict_general_policy
# handle_conflict_all_qavanin
result = await self.send_message_helper(
user=user,
text="یک مورد را انتخاب کنید:",
end_buttons=choice_buttons,
)
"""
-> Response from render message -> return
"""
return None
async def handle_search_in_law(self, user: BaleUser):
user.is_processing_lock = True
print(f"handle_search_in_law -> {user.input_query}")
self.ui_handler.create(user=user, main_text="در حال جستجو")
_buttons = [[HOME_BUTTON]]
try:
async for result in self.operator.stream_search_in_law(
query=user.input_query,
limit=user.limit,
rerank_model="BAAI/bge-reranker-v2-m3",
embed_model="BAAI/bge-m3",
):
result = BMNewSemanticSearchOutput.model_validate(result["result"])
if isinstance(result, BMNewSemanticSearchOutput):
self.ui_handler.cancel(user)
chunked_text_ = self.formatter.form_search_in_law(
header=f"برای پرسش: {user.input_query}\n\n",
footer=f"مدت زمان بردارسازی {result.embed_model_time} مدت زمان تشابه یابی {result.cosine_similarity_time} مدت زمان تشابه یابی دقیق {result.rerank_time}",
sections=result.result,
)
if user.limit < self.max_limit:
_buttons.insert(0, [MORE_LIMIT_BUTTON])
await self.send_message_helper(
user=user,
chunked_text=chunked_text_,
end_buttons=_buttons,
)
# else:
# heartbeat_task
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
data = {
"state": "search_in_law",
"contents": [i.content for i in result.result],
"scores": [i.score for i in result.result],
"ref_ids": [i.id for i in result.result],
"user_input": user.input_query,
"metadata": {
"embed_model_time": result.embed_model_time,
"cosine_similarity_time": result.cosine_similarity_time,
"rerank_time": result.rerank_time,
},
}
data["metadata"].update(result.metadata)
await self.save_to_db(user, data=data)
user.is_processing_lock = False
user.last_input_query = user.input_query
user.input_query = ""
self.ui_handler.cancel(user)
return {"ok": True}
async def handle_search_in_law_rules(self, user: BaleUser):
user.is_processing_lock = True
_status_show_map = {
"total_process": "تعداد کل موارد مورد بررسی",
"total_processed": "تعداد موارد بررسی شده",
}
total_process = len(user.last_result) * user.limit
es_data = None
es_metadata = None
print(
f"user.limit {user.limit}",
f"len(user.last_result) {len(user.last_result)}",
f"total_process*****s {total_process}",
)
try:
if user.last_result:
print(f"/////////////user.last_result", len(user.last_result))
await self.send_message_helper(
user=user,
text="در حال جستجو ...",
update_message=True,
is_save_active_message_id=True,
)
_buttons = [
[HOME_BUTTON],
[{"text": "بررسی مغایرت", "callback_data": "not_yet"}],
]
filter_qanon_ids = []
# filter_qanon_ids باید از کاربر پرسیده شود
async for step_data in self.operator.stream_rule_semantic_search(
queries=[i.model_dump() for i in user.last_result],
filter_qanon_ids=filter_qanon_ids,
limit_rerank=user.limit,
):
status = step_data.get("status")
data = step_data.get("data")
metadata = step_data.get("metadata")
# time
if status == "end":
# print(f'*****data {data}')
es_data = data[status]
es_metadata = metadata
data = [
SemanticSearchP2P.model_validate(i) for i in data[status]
]
_res = self.formatter.form_search_in_law_rules(
header="نتایج جستجوی اجزاء در قانون\n",
body=data,
)
print(f"len(_res) {len(_res) }")
if user.limit < self.max_limit:
_buttons.insert(0, [MORE_LIMIT_BUTTON])
if len(_res) == 1:
await self.send_message_helper(
user=user,
chunked_text=_res,
update_message=True,
end_buttons=_buttons,
)
else:
for i, item in enumerate(_res):
print(i)
if i == 0:
print("start-batch")
await self.send_message_helper(
user=user,
text=item,
update_message=True,
)
elif i == len(_res) - 1:
print("end-batch")
await self.send_message_helper(
user=user,
text=item,
end_buttons=_buttons,
)
else:
print("middle-batch")
await self.send_message_helper(
user=user,
text=item,
)
user.active_message_id = 0
else:
# print(
# f"status {status}\nText {type(_status_show_map[status])}\ndata type {type(data[status])}"
# )
if status == "total_processed":
_result = (
_status_show_map[status]
+ f": {data['total_processed']}/{total_process}"
)
elif status == "total_process":
_result = (
_status_show_map[status] + f": {data['total_process']}"
)
await self.send_message_helper(
user=user,
text=_result,
update_message=True,
is_save_active_message_id=True,
)
else:
print("-----------------------No Found Any Last_data")
if es_data:
_data = {
"state": "search_in_law_rules",
"result": es_data,
"metadata": {},
}
_data["metadata"].update(es_metadata)
await self.save_to_db(user, data=_data)
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.is_processing_lock = False
return {"ok": True}
async def handle_rule_making(self, user: BaleUser):
user.is_processing_lock = True
self.ui_handler.create(user=user, main_text="در حال بررسی")
_buttons = [
[
{
"text": "مشابهت یابی در قانون",
"callback_data": "not_yet", # "search_in_law_rules",
}
],
[HOME_BUTTON],
]
_max_token = None
try:
f_data = None
f_metadata = None
async for step_data in self.operator.stream_rule_making(
query=user.input_query, llm_name="gpt-oss-20b", effort=user.effort
):
status = step_data.get("status")
data = step_data.get("data")
metadata = step_data.get("metadata")
# time
if status == "end":
self.ui_handler.cancel(user)
f_data = data[status]
f_metadata = metadata
res_ = await self.formatter.form_rule_making(
_input=data[status],
footer=f'تعداد توکن های مصرف شده {_max_token}',
)
if user.effort != "medium":
_buttons.insert(0, [MORE_EFFORT_BUTTON])
await self.send_message_helper(
user=user,
chunked_text=res_,
end_buttons=_buttons,
)
user.last_result = [
InputRule.model_validate(i) for i in data[status]
]
if user.is_vip:
_result2 = await self.formatter.form_chat(
header=" *reasoning* :\n",
llm_text=metadata["llm_reason"],
)
await self.send_message_helper(
user=user,
chunked_text=_result2,
)
else:
_max_token = data["max_token"]
if _max_token:
self.ui_handler.main_text = [
f'تعداد توکن های مصرف شده {_max_token}',
]
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
if f_data:
_data = {
"state": "rule_making",
"result": f_data,
"llm_reason": metadata["llm_reason"],
"metadata": {
"llm_token":_max_token
},
}
print(
f'_data {_data}'
)
_data["metadata"].update(f_metadata)
await self.save_to_db(user, data=_data)
user.last_input_query = user.input_query
user.input_query = ""
user.is_processing_lock = False
self.ui_handler.cancel(user)
return {"ok": True}
async def handle_chat_in_law(self, user: BaleUser):
user.is_processing_lock = True
self.ui_handler.create(user=user, main_text=[
"در حال تحلیل",
"در حال طبقه بندی",
"در حال تفکر",
"ساختار بندی",
],
)
# print('asyncio')
# sleep(80)
# print('wait')
_buttons = [[HOME_BUTTON]]
try:
async for _result in self.operator.stream_chat_in_law(
query=user.input_query, effort=user.effort, limit=user.limit
):
print(f"handle_chat_in_law **--**---*--*-*-*-***-**-*-*-*-** ")
self.ui_handler.cancel(user)
result = ChatLaw.model_validate(_result["result"])
user.last_result = result
text_result = self.formatter.form_law_chat(result.answer)
if result.answer_type == "legal_question":
_b = [
{
"text": "بررسی عمیق تر",
"callback_data": "logical_chat_in_law",
}
]
if user.limit < self.max_limit:
_b.append(
{
"text": "بررسی گسترده تر",
"callback_data": "more_limit",
}
)
_buttons.insert(0, _b)
await self.send_message_helper(
user=user, chunked_text=text_result, end_buttons=_buttons
)
if user.is_vip:
print(f"user.is_vip {user.is_vip}")
reason_result = await self.formatter.form_chat(
header="model-reason", llm_text=result.llm_reason
)
await self.send_message_helper(
user=user,
chunked_text=reason_result,
)
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
print(f"result {type(result)}")
print(f"result.answer {type(result.answer)}")
print(f"result.ref_ids {type(result.ref_ids)}")
print(f"result.answer_type {type(result.answer_type)}")
_data = {
"state": "chat_in_law",
"result": result.answer,
"llm_reason": result.llm_reason,
"metadata": {
"ref_ids": result.ref_ids,
"answer_type": result.answer_type,
},
}
await self.save_to_db(user=user, data=_data)
user.last_input_query = user.input_query
user.input_query = ""
user.is_processing_lock = False
self.ui_handler.cancel(user)
async def handle_qanon_title_repeat(self, user: BaleUser):
user.is_processing_lock = True
try:
_result: List[TitleRepeat] = await self.operator.title_repeated(
qanontitle=user.input_query
)
print(f"*/*/*/*/*/*/*/*/*/*/*/result_ {_result}")
_res = await self.formatter.form_title_repeated(_input=_result)
_buttons = [[HOME_BUTTON]]
await self.send_message_helper(
user=user, chunked_text=_res, end_buttons=_buttons
)
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.is_processing_lock = False
return {"ok": True}
async def handle_talk(self, user: BaleUser):
user.is_processing_lock = True
try:
_result = await self.operator.talk(query=user.input_query)
await self.send_message_helper(
user=user, text=_result, end_buttons=MAIN_BUTTON
)
except Exception as e:
print("ERROR in handle_talk:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.is_processing_lock = False
return {"ok": True}
async def handle_logical_chat_in_law(self, user: BaleUser):
user.is_processing_lock = True
self.ui_handler.create(user=user, main_text=["در حال استخراج اجزاء"])
f_data = ""
_button = [[HOME_BUTTON]]
len_rules = 0
try:
metadata = user.last_result["metadata"]
except:
metadata = {}
try:
async for _data in self.operator.stream_logical_chat_in_law(
query=user.input_query, effort=user.effort, metadata=metadata, limit=20
):
status = _data.get("status")
data = _data.get("data")
# _metadata = _data.get("metadata")
print(
f"status {status}",
)
if status == "rules":
len_rules = data
self.ui_handler.main_text = [f"تعداد اجزاء استخراج شده {data}"]
elif status == "semantic":
self.ui_handler.main_text = [
f"مشابهت‌ یابی اجزاء {user.limit*len_rules}/{data}"
]
elif status == "llm":
self.ui_handler.main_text = ["ساخت پاسخ نهایی"]
self.ui_handler.cancel(user)
_header = f"📝 پاسخ نهایی:\n"
f_data = data["text"]
response = await self.formatter.form_llm_answer_chat(
data["text"], header=_header
)
await self.send_message_helper(
user,
chunked_text=response,
end_buttons=_button,
)
if user.is_vip:
response2 = await self.formatter.form_chat(
data["llm_reason"], header="دلیل مدل برای خروجی"
)
await self.send_message_helper(
user,
chunked_text=response2,
)
# break
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.is_processing_lock = False
user.input_query = ""
self.ui_handler.cancel(user)
_data = {
"state": "logical_chat_in_law",
"result": f_data,
"llm_reason": data["llm_reason"],
"metadata": {},
}
# _data['metadata'].update(_metadata)
await self.save_to_db(user=user, data=_data)
return {"ok": True}
async def handle_conflict_qanon_asasi(self, user: BaleUser):
user.is_processing_lock = True
user.active_message_id = 0
try:
print(f"effort=user.effort {user.effort}")
_buttons = [[HOME_BUTTON]]
if user.effort == "low":
current_text = "*نتیجه بررسی مغایرت با اصول مهم قانون اساسی*:\n\n"
seen_k = set()
_buttons.insert(0, [MORE_EFFORT_BUTTON])
async for _data in self.operator.conflict_qanon_asasi_low(
user.input_query, user.effort, user.limit
):
status = _data.get("status")
data = _data.get("data")
print(f"status {type(status)}-{status}")
data = {k: v for k, v in data.items() if k not in seen_k}
print(f"data {len(data)} seen_k {len(list(seen_k))}")
text = await self.formatter.form_constitution_low(data, status)
text = text + "\n\n"
seen_k.update(data.keys())
if status == 11:
await self.send_message_helper(
user,
text=current_text,
end_buttons=_buttons,
update_message=True,
is_save_active_message_id=True,
)
else:
text = [text[i : i + 50] for i in range(0, len(text), 50)]
for word in text:
current_text += word
await self.send_message_helper(
user,
text=current_text,
update_message=True,
is_save_active_message_id=True,
)
text = ""
elif user.effort == "medium":
print(f"==== handle_constitution ====")
# chunked_response = await self.formatter.form_rule_making(
# _input=data
# )
await self.send_message_helper(
user=user,
text="در دست توسعه...",
)
# async for _data in self.operator.conflict_qanon_asasi_steps(
# user.input_query, user.effort, user.limit
# ):
# status = _data.get("status")
# data = _data.get("data")
# print(f"status {type(status)}-{status}")
# print(f"semantich_search {user.effort}")
# _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n"
# response = await self.formatter.form_ss_rules(data, header=_header)
# await self.send_message_helper(user, chunked_text=response)
# print(f"subject_unities {user.effort}")
# _header = "نتایج اولیه مغایرت های احتمالی :\n"
# print(f"data type {type(data)}")
# try:
# print(f"data -> RuleRelation")
# _data = [RuleRelation.model_validate(i) for i in data]
# except:
# print(f"data -> String")
# _data = data
# chunked_text, _button, mapping_qs = (
# await self.formatter.form_subject_unity(_data, header=_header)
# )
# await self.send_message_helper(
# user=user,
# chunked_text=chunked_text,
# end_buttons=_button,
# )
# user.subject_unities = mapping_qs
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.last_input_query = user.input_query
user.input_query = ""
user.is_processing_lock = False
return {"ok": True}
async def handle_conflict_law_writing_policy(self, user: BaleUser):
user.is_processing_lock = True
_buttons = [[HOME_BUTTON]]
self.ui_handler.create(user=user, main_text=[
"در حال تحلیل",
"در حال طبقه بندی",
"در حال تفکر",
"ساختار بندی",
],
)
try:
_result = await self.operator.conflict_law_writing_policy(
query=user.input_query, effort=user.effort
)
result = await self.formatter.from_law_writing_policy(
_input_dict=_result, header="نتیجه بررسی با سیاست های قانون گذاری"
)
if user.effort != "medium":
_buttons.insert(0, [MORE_EFFORT_BUTTON])
self.ui_handler.cancel(user)
await self.send_message_helper(
user=user, chunked_text=result, end_buttons=_buttons
)
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.last_input_query = user.input_query
user.input_query = ""
user.is_processing_lock = False
return {"ok": True}
async def handle_conflict_all_qavanin(self, user: BaleUser):
user.is_processing_lock = True
try:
async for step_data in self.request_manager.stream_result(
payload={
"section_content": user.input_query,
"effort": user.effort,
"limit": user.limit,
"mode_type": "bale",
},
url="/conflict/all_qanon/qs_unity",
):
step = step_data.get("step")
data = step_data.get("data")
is_first = step_data.get("is_first", False)
is_last = step_data.get("is_last", False)
_button = [[HOME_BUTTON]]
print(f"==== handle_conflict_general_policy ====")
if step == "rule_making":
print(f"rule_making {user.effort}")
chunked_response = await self.formatter.form_rule_making(
_input=data
)
await self.send_message_helper(
user=user,
update_message=False,
chunked_text=chunked_response,
)
elif step == "semantich_search":
print(f"semantich_search {user.effort}")
_header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n"
response = await self.formatter.form_ss_rules(data, header=_header)
await self.send_message_helper(user, chunked_text=response)
elif step == "subject_unities":
print(f"subject_unities {user.effort}")
_header = "نتایج اولیه مغایرت های احتمالی :\n"
print(f"data type {type(data)}")
try:
print(f"data -> RuleRelation")
_data = [RuleRelation.model_validate(i) for i in data]
except:
print(f"data -> String")
_data = data
chunked_text, _button, mapping_qs = (
await self.formatter.form_subject_unity(_data, header=_header)
)
await self.send_message_helper(
user=user,
chunked_text=chunked_text,
end_buttons=_button,
)
user.subject_unities = mapping_qs
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
# user.input_query = ""
user.is_processing_lock = False
return {"ok": True}
async def handle_conflict_general_policy(self, user: BaleUser):
user.is_processing_lock = True
try:
async for step_data in self.request_manager.stream_result(
payload={
"section_content": user.input_query,
"effort": user.effort,
"limit": user.limit,
"mode_type": "bale",
},
url="/conflict/general_policy/qs_unity",
):
step = step_data.get("step")
data = step_data.get("data")
is_first = step_data.get("is_first", False)
is_last = step_data.get("is_last", False)
_button = [[HOME_BUTTON]]
print(f"==== handle_conflict_general_policy ====")
if step == "rule_making":
print(f"rule_making {user.effort}")
chunked_response = await self.formatter.form_rule_making(
_input=data
)
await self.send_message_helper(
user=user,
update_message=False,
chunked_text=chunked_response,
)
elif step == "semantich_search":
print(f"semantich_search {user.effort}")
_header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n"
response = await self.formatter.form_ss_rules(data, header=_header)
await self.send_message_helper(user, chunked_text=response)
elif step == "subject_unities":
print(f"subject_unities {user.effort}")
_header = "نتایج اولیه مغایرت های احتمالی :\n"
print(f"data type {type(data)}")
try:
print(f"data -> RuleRelation")
_data = [RuleRelation.model_validate(i) for i in data]
except:
print(f"data -> String")
_data = data
chunked_text, _button, mapping_qs = (
await self.formatter.form_subject_unity(_data, header=_header)
)
await self.send_message_helper(
user=user,
chunked_text=chunked_text,
end_buttons=_button,
)
user.subject_unities = mapping_qs
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
# user.input_query = ""
user.is_processing_lock = False
return {"ok": True}
async def handle_advanced_check_conflict(self, user: BaleUser):
user.is_processing_lock = True
print(f"handle_advanced_check_conflict======================================")
try:
print(f"user.call_back_query {user.call_back_query}")
step, _type, qq_title = user.call_back_query.split(":")
if _type == "qq":
print("A")
groups = {}
for qanon_title, item in user.subject_unities.items():
if qanon_title == qq_title:
for i in item:
groups[qanon_title] = i.db_rule.section_id
print("B")
if len(groups) > 1:
_button = []
for i, (k, v) in enumerate(groups.items(), start=1):
_button.append(
[{"text": i, "callback_data": f"subject_unities:qs:{v}"}]
)
await self.send_message_helper(
user=user,
text=f"برای ادامه یک مورد را از این قانون انتخاب کنید{k} انتخاب کنید",
end_buttons=_button,
)
else:
user.call_back_query = f"subject_unities:qs:{groups[qq_title]}"
print("ccccc")
elif _type == "qs":
print("AAA@@@")
content = None
for k, v in user.subject_unities.items():
if v.db_rule.section_id == qq_title:
content = v.model_dump()
print(f"content qs -> {content}")
async for step_data in self.request_manager.stream_result(
payload={
"rule_relation": content,
"effort": user.effort,
"mode_type": "bale",
},
url="/conflict/unity_eval",
):
print(f"qs {user.call_back_query}")
step = step_data.get("step")
data = step_data.get("data")
# is_last = step_data.get("is_last")
_button = [[HOME_BUTTON]]
print(f"==== handle_advanced_check_conflict ====")
if step == "step2":
print(f"*********************Step2 {user.effort}")
_header = "نتایج اولیه مغایرت های احتمالی :\n"
_data = RuleRelation.model_validate(data)
chunked_text = await self.formatter.form_conflict_detection(
_data, header=_header
)
await self.send_message_helper(user=user, text=chunked_text)
elif step == "step3":
print(f"*********************Step3")
_data = RuleRelation.model_validate(data)
chunked_text = (
await self.formatter.form_conflict_type_detection(
_data, header=_header
)
)
await self.send_message_helper(user=user, text=chunked_text)
elif step == "step4":
print(f"*********************Step4")
_data = RuleRelation.model_validate(data)
chunked_text = (
await self.formatter.form_relation_identification(
_data, header=_header
)
)
await self.send_message_helper(user=user, text=chunked_text)
elif step == "step5":
print(f"*********************Step5")
_data = Evaluation.model_validate(data)
chunked_text = await self.formatter.form_evaluation(
_data, header=_header
)
await self.send_message_helper(user=user, text=chunked_text)
else:
print(f"eerror in uknown step --> {step}")
except Exception as e:
print("ERROR in handle_chat:", str(traceback.print_exc()))
await self.send_message_helper(user=user, text=ERROR_IN_PROCESS)
finally:
user.last_input_query = user.input_query
user.input_query = ""
user.is_processing_lock = False
return {"ok": True}
async def handle_stream_chat(self, user: BaleUser):
"""
stream_talk()
async generator → chunk
handle_stream_chat()
send first message
update_message (throttled)
"""
full_text = ""
message_id = None
last_update = 0
async for chunk in stream_talk(user.input_query):
full_text += chunk
now = time.time()
# جلوگیری از اسپم API (مثلاً هر 0.6 ثانیه آپدیت)
if now - last_update < 0.6:
continue
last_update = now
if message_id is None:
# اولین پیام
message_id = await self.send_message_to_bale(
user=user,
text=full_text + "",
)
else:
payload = BalePayload(
chat_id=user.chat_id, message_id=message_id, text=full_text + ""
)
self.update_message_to_bale(user, payload)
# آخر کار، بدون cursor
if message_id:
payload = BalePayload(
chat_id=user.chat_id, message_id=message_id, text=full_text
)
self.update_message_to_bale(user, payload)
async def handle_beta(self, user: BaleUser):
"""
state:
user.is_processing_lock:
query:
time.time
first_name
last_name
"""
try:
text_1 = "سلام"
text_2 = "سلام، عرض ادب و احترام"
await self.send_message_helper(
user=user,
structed_output=[
["این تست است ", [[{"text": "تستتت", "callback_data": "not_yet"}]]],
[
"این تست اس2ت ",
[[{"text": "تستت2ت", "callback_data": "not_yet"}]],
],
],
end_buttons=[
[
{"text": "دکمه های نهایی", "callback_data": "not_yet"},
{"text": "دکمه های ن2هایی", "callback_data": "not_yet"},
{"text": "دکمه های ن23هایی", "callback_data": "not_yet"},
],
[{"text": "دکمه های آخر", "callback_data": "not_yet"}],
],
)
except Exception as e:
print("ERROR in handl_beta:", str(traceback.print_exc()))
async def save_to_db(self, user: BaleUser, data: Dict):
time_now = int(time.time())
_id = f"ble_{user.chat_id}_{time_now}"
data.update(
{
"id": _id,
"uc_id": user.uc_id,
"message_id": user.active_message_id,
"username": user.username,
"is_bot": user.is_bot,
"first_name": user.first_name,
"last_name": user.last_name,
"effort": user.effort,
"limit": user.limit,
"input_query": user.input_query,
"timestamp": time_now,
}
)
try:
es_res = self.es_helper.update_index_doc(
is_update_state=False,
index_name_o=self.es_index_name,
eid=_id,
data=data,
)
# type_name, payload, request
print(f"-- ES-Saved {es_res}")
except Exception as e:
print("++ failed save_to_Es ", str(traceback.print_exc()))
class UIState(BaleBotGeneral):
"""
مسئول نمایش «در حال تحلیل…» و پاک‌کردن پیام اصلی
در زمان لغو یا پایان کار است.
برای سرگرم کردن کاربر
یک روند کاملا مجزا برای نشان دادن پیام و حذف در انتها
"""
def __init__(
self,
token:str,
delay_time: float = 0.4,
):
super().__init__(
token=token,
)
"""
:param bot: نمونهٔ BaleBot که متدهای API را در اختیار دارد
:param user: کاربری که پیامش در حال پردازش است
"""
self.delay_time = delay_time
self.done = False
self._heartbeat_task: Dict[str, asyncio.Task] = {}
# ------------------------------------------------------------------
# توابعی که در والد BaleBotGeneral هستند
# ------------------------------------------------------------------
async def delete_bale_message(self, *args, **kwargs):
await super().delete_bale_message(*args, **kwargs)
async def send_message_helper(self, *args, **kwargs):
await super().send_message_helper(*args, **kwargs)
# ------------------------------------------------------------------
# توابع خود کلاس
# ------------------------------------------------------------------
async def active(self, user: BaleUser,):
await self.send_message_helper(
user=user,
text=self.main_text[0],
update_message=False,
is_save_active_message_id=True,
)
# ------------------------------------------------------------------
# متدهای عمومی
# ------------------------------------------------------------------
def cancel(self, user: BaleUser,):
"""
متوقف کردن حلقهٔ heartbeat و پاک‌کردن پیام اصلی
"""
self.done = True
_user_task = self._heartbeat_task.get(user.uc_id, None)
if _user_task:
self._heartbeat_task[user.uc_id].cancel() # اگر هنوز در حال اجرا باشد
# پاک‌کردن پیام اصلی
asyncio.create_task(
self.delete_bale_message(
chat_id=user.chat_id,
message_id=user.active_message_id,
)
)
self.main_text = ["در حال تحلیل"]
def create(self, user: BaleUser, main_text: str | list = ["در حال تحلیل"], waiting_list:list=[]):
"""
شروع حلقهٔ heartbeat
"""
if isinstance(main_text, str):
self.main_text = [main_text]
elif isinstance(main_text, list):
self.main_text = main_text
print(
f'create create'
)
self._heartbeat_task[user.uc_id] = asyncio.create_task(self.heartbeat_loop(user=user, waiting_list=waiting_list))
# ------------------------------------------------------------------
# حلقهٔ heartbeat
# ------------------------------------------------------------------
async def heartbeat_loop(self, user: BaleUser, waiting_list=[] ):
"""
هر 0.2 ثانیه متن «در حال تحلیل…» را به‌روزرسانی می‌کند.
"""
await self.active(user=user)
if len(waiting_list) == 0:
waiting_list = [".", "..", "...", "...."]
while not self.done:
text = random.choice(self.main_text)
for j in waiting_list:
# ویرایش پیام
_text = f"{j} {text}"
await self.send_message_helper(
user=user,
text=_text,
update_message=True,
is_save_active_message_id=True,
)
await asyncio.sleep(self.delay_time)
# صبر 5 ثانیه
# await asyncio.sleep(0.2)