################# modularity ### import from external-package from fastapi import FastAPI, Request, HTTPException import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re from dotenv import load_dotenv from pathlib import Path from time import sleep from enum import Enum import time from typing import Dict, Optional from dataclasses import dataclass ### import from internal-file from utils.base_model import * from utils.bale_buttons import * from utils.bale_massages import * from utils.workflow import * from utils.static import * from utils.config import STATE_CONFIG, STATE, BUSY_TEXT,MAIN_BUTTON, STATE_REGISTERY, build_buttons_form from typing import Iterable, Any, List def extract_user_info(update: BaleUpdate) -> dict: uc_id = encode_uc(update) user_id, chat_id = decode_uc(uc_id) if update.message: u = update.message.from_user return { "uc_id": str(uc_id), "chat_id": int(chat_id), "user_id": user_id, "username": u.username, "first_name": u.first_name, "last_name": u.last_name or "", "is_bot": u.is_bot, "update": update, } if update.callback_query: u = update.callback_query.from_user return { "uc_id": str(uc_id), "chat_id": int(chat_id), "user_id": user_id, "username": u.username, "first_name": u.first_name, "last_name": "", "is_bot": u.is_bot, "update": update, } raise ValueError("No user info in update") class UserManager: def __init__(self): self.users: Dict[str, BaleUser] = {} def get_or_create(self, update: BaleUpdate) -> BaleUser: user_data = extract_user_info(update) uc_id = user_data["uc_id"] if uc_id not in self.users: self.users[uc_id] = BaleUser( **user_data, ) user = self.users[uc_id] user.update = update return user class BaleBot: """ input → set_state → render_user_state """ def __init__( self, user_manager: UserManager, es_index_name: str, token: str, es_helper: ElasticHelper, formatter: Formatter, back_end_url: str, request_manager: RequestManager, ): self.formatter = formatter self.request_manager = request_manager self.es_helper = es_helper self.token = token self.es_index_name = es_index_name self.user_manager = user_manager self.max_limit = 100 self.back_end_url = back_end_url self.update_message_url = ( f"https://tapi.bale.ai/bot{self.token}/editMessageText" ) self.send_message_url = f"https://tapi.bale.ai/bot{self.token}/sendMessage" # self.user_state = self.make_state() async def render_user_state(self, user: BaleUser, run_internal=None): """Buse سیستم""" try: state_detail = user.state_detail if user.input_query == "": await self.send_message_helper( user=user, text=state_detail.message, end_buttons=state_detail.end_buttons, ) return {"ok": True} if run_internal == "subject_unities": await self.handle_advanced_check_conflict(user) elif not run_internal and state_detail.handler: handler = getattr(self, state_detail.handler) await handler(user) except: await self.handle_main(user) return {"ok": True} async def update_message_to_bale( self, user: BaleUser, text: str = None, buttons: List = [], reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, ): payload = { "chat_id": user.chat_id, "message_id": user.last_message_id, "text": text, "reply_markup": { # "keyboard": reply_markup, "resize_keyboard": True, "one_time_keyboard": False, }, } if buttons: payload["reply_markup"]["inline_keyboard"] = buttons try: r = requests.post(self.update_message_url, json=payload) if not r.ok: print("Bale API Error:", r.status_code, r.text) r_ = r.json() print(f"Send:", r.status_code) # user.last_message_id = int(r_["result"]["message_id"]) except Exception: print("ERROR in update_message_to_bale:") traceback.print_exc() async def send_message_helper( self, user: BaleUser, update_message: bool = False, text: str = None, chunked_text: List[str] = None, structed_output: List = None, end_buttons: List = [], reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, ): _i = 0 if text: _i += 1 if chunked_text: _i += 1 if structed_output: _i += 1 if _i != 1: raise ValueError( "In send_message_helper Only Send One Of {text, chunked_text, structed_output}" ) if update_message == True: if user.last_message_id == 0: await self.send_message_helper( user=user, update_message=False, text=text, end_buttons=end_buttons, chunked_text=chunked_text, structed_output=structed_output, reply_markup=reply_markup, ) else: if text: new_text = text if chunked_text: new_text = chunked_text[-1] if structed_output: new_text = structed_output[-1][0] await self.update_message_to_bale( user=user, text=new_text, buttons=end_buttons, reply_markup=reply_markup, ) else: if structed_output: for i, item in enumerate(structed_output, start=1): if i == len(structed_output) and len(end_buttons) > 0: # is_last item[1] += end_buttons await self.send_message_to_bale( user=user, text=item[0], buttons=item[1], reply_markup=reply_markup, ) if chunked_text: _buttons = [] for i, item in enumerate(chunked_text, start=1): if i == len(chunked_text) and len(end_buttons) > 0: # is_last _buttons = end_buttons await self.send_message_to_bale( user=user, text=item, buttons=_buttons, reply_markup=reply_markup, ) if text: await self.send_message_to_bale( user=user, text=text, buttons=end_buttons, reply_markup=reply_markup, ) async def send_message_to_bale( self, user: BaleUser, text: str, buttons: List = [], reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, ): print(f"send_message_to_bale--- {text}") print(f"send_message_to_bale buttons--- {buttons}") payload = { "chat_id": user.chat_id, "text": text, "reply_markup": { # "keyboard": reply_markup, "resize_keyboard": True, "one_time_keyboard": False, }, } if buttons: payload["reply_markup"]["inline_keyboard"] = buttons try: r = requests.post(self.send_message_url, json=payload) if not r.ok: print("Bale API Error:", r.status_code, r.text) r_ = r.json() print(f"Send:", r.status_code) user.last_message_id = int(r_["result"]["message_id"]) except Exception: print("ERROR in send_message_to_bale:") traceback.print_exc() async def render_update(self, update: BaleUpdate): """ مهمترین تابع ربات بله ورودی شما یا به صورت متن ساده هست یا بصورت query render_message : متن render_callback : کوئری """ # 2️⃣ ساخت یا بازیابی user user = self.user_manager.get_or_create(update) print(f"render_update user.state_detail.state -> {user.state_detail}") if user.is_processing_lock == False: if update.message: print(f"render_message") user.input_query = str(user.update.message.text or "").strip() # async def render_message(self, user: BaleUser): if user.input_query in ("/start", "main", "بازگشت به خانه"): await self.handle_main(user) user.input_query = "" return {"ok": True} if user.input_query == "جستجو": user.state_detail = STATE_CONFIG["search_in_law"] user.input_query = "" if user.input_query == "گفتگو": user.state_detail = STATE_CONFIG["chat_in_law"] user.input_query = "" # --- WorkFlow Logic by State await self.render_user_state(user) return {"ok": True} if update.callback_query: """ اینجا فقط باید مرحله و state عوض شود و داده ای وارد نمیشود!!! برای سوال پرسیدن از کاربر و ... """ print("render_callback") user.is_call_back_query = True user.call_back_query = user.update.callback_query.data # user.update.callback_query.data run_internal = None if user.call_back_query == "main": user.input_query = "" await self.handle_main(user) return {"ok": True} # Dynamic Change Options if user.call_back_query == "more_limit": user.limit += 10 # Dynamic Change Options elif user.call_back_query == "more_effort": user.effort = "medium" elif user.call_back_query.startswith("subject_unities:"): run_internal = "subject_unities" # subject_unities:qq:qq_983214 # Dynamic Change State else: user.state_detail = STATE_CONFIG[user.call_back_query] await self.render_user_state(user, run_internal) return {"ok": True} else: # beta-mode await self.send_message_helper(user=user, text=BUSY_TEXT) return {"ok": True} async def talk(self, user: BaleUser): pass async def handle_main(self, user: BaleUser): # ریست کردن پشته‌ها و نتایج """ """ ################################### SAVE2ELASTIC ################################### user.last_query = "" user.last_result = "" user.stack.clear() user.input_query = "" user.sub_state = "" user.limit = 10 user.effort = "low" user.last_message_id = 0 message = """👋 سلام دوست عزیز! 🤗 به دستیار هوشمند قانون یار خوش آمدید! فقط کافیه به من بگید چه کمکی از دستم برمیاد!""" await self.send_message_helper( user=user, text=message, # end_buttons=MAIN_BUTTON, ) async def handle_search_in_law(self, user: BaleUser): user.is_processing_lock = True user.last_query = user.input_query try: result = await self.request_manager.get_result( payload={ "section_content": user.input_query, "limit": user.limit, "effort": user.effort, }, url="/semantic_search/run_semantic_search", ) # print(f'result rag {result} {type(result["ss_answer"])}') chunked_text_ = self.formatter.form_search_in_law( title=user.input_query, sections=result["answer"], ) # print(f'chunked_text_ rag {chunked_text_}') _buttons = [[HOME_BUTTON]] if user.limit < self.max_limit: _buttons.insert(0, [MORE_LIMIT_BUTTON]) await self.send_message_helper( user=user, chunked_text=chunked_text_, end_buttons=_buttons ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_rule_making(self, user: BaleUser): user.is_processing_lock = True try: result = await self.request_manager.get_result( payload={ "section_content": user.input_query, "limit": user.limit, "effort": user.effort, }, url="/rule_making", ) print(f"handle_rule_making {result}") res_ = await self.formatter.form_rule_making(_input=result) _buttons = [[HOME_BUTTON]] if user.effort != "medium": _buttons.append([MORE_EFFORT_BUTTON]) await self.send_message_helper( user=user, chunked_text=res_, end_buttons=_buttons ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_qanon_title_repeat(self, user: BaleUser): user.is_processing_lock = True try: result_ = await title_repeated(qanontitle=user.input_query) res_ = await self.formatter.form_title_repeated(data=result_) _buttons = [[HOME_BUTTON]] await self.send_message_helper( user=user, chunked_text=res_, end_buttons=_buttons ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_conflict_qanon_asasi(self, user: BaleUser): user.is_processing_lock = True user.last_message_id = 0 user.effort = "medium" try: print(f"effort=user.effort {user.effort}") async for step_data in self.request_manager.stream_result( payload={ "section_content": user.input_query, "effort": user.effort, "limit": user.limit, "mode_type": "bale", }, url="/conflict/constitution", ): step = step_data.get("step") data = step_data.get("data") is_first = step_data.get("is_first", False) is_last = step_data.get("is_last", False) print(f"is_first {is_first} is_last {is_last}") # is_last = step_data.get("is_last") _button = [[HOME_BUTTON]] print(f"==== handle_constitution ====") if step == "low": print(f"low {user.effort}") chunked_response = await self.formatter.form_constitution(data) # await self.send_message_helper(user, chunked_text=response) # if is_last: if is_first is True: await self.send_message_helper( user=user, update_message=False, chunked_text=chunked_response, ) elif is_last is True: _button.append([MORE_EFFORT_BUTTON]) await self.send_message_helper( user=user, update_message=True, chunked_text=chunked_response, end_buttons=_button, ) else: await self.send_message_helper( user=user, update_message=True, chunked_text=chunked_response, ) elif step == "rule_making": print(f"rule_making {user.effort}") chunked_response = await self.formatter.form_rule_making( _input=data ) await self.send_message_helper( user=user, update_message=False, chunked_text=chunked_response, ) elif step == "semantich_search": print(f"semantich_search {user.effort}") _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" response = await self.formatter.form_ss_rules(data, header=_header) await self.send_message_helper(user, chunked_text=response) elif step == "subject_unities": print(f"subject_unities {user.effort}") _header = "نتایج اولیه مغایرت های احتمالی :\n" print(f"data type {type(data)}") try: print(f"data -> RuleRelation") _data = [RuleRelation.parse_obj(i) for i in data] except: print(f"data -> String") _data = data chunked_text, _button, mapping_qs = ( await self.formatter.form_subject_unity(_data, header=_header) ) await self.send_message_helper( user=user, chunked_text=chunked_text, end_buttons=_button, ) user.subject_unities = mapping_qs except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_conflict_law_writing_policy(self, user: BaleUser): user.is_processing_lock = True try: user.last_query = user.input_query _result = await self.request_manager.get_result( payload={ "section_content": user.input_query, "effort": user.effort, }, url="/conflict/law_writing_policy", ) result = await self.formatter.from_law_writing_policy( _input_dict=_result, header="نتیجه بررسی با سیاست های قانون گذاری" ) _buttons = [[HOME_BUTTON]] if user.effort != "medium": _buttons.insert(0, [MORE_EFFORT_BUTTON]) await self.send_message_helper( user=user, chunked_text=result, end_buttons=_buttons ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_conflict_all_qavanin(self, user: BaleUser): user.is_processing_lock = True try: async for step_data in self.request_manager.stream_result( payload={ "section_content": user.input_query, "effort": user.effort, "limit": user.limit, "mode_type": "bale", }, url="/conflict/all_qanon/qs_unity", ): step = step_data.get("step") data = step_data.get("data") is_first = step_data.get("is_first", False) is_last = step_data.get("is_last", False) _button = [[HOME_BUTTON]] print(f"==== handle_conflict_general_policy ====") if step == "rule_making": print(f"rule_making {user.effort}") chunked_response = await self.formatter.form_rule_making( _input=data ) await self.send_message_helper( user=user, update_message=False, chunked_text=chunked_response, ) elif step == "semantich_search": print(f"semantich_search {user.effort}") _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" response = await self.formatter.form_ss_rules(data, header=_header) await self.send_message_helper(user, chunked_text=response) elif step == "subject_unities": print(f"subject_unities {user.effort}") _header = "نتایج اولیه مغایرت های احتمالی :\n" print(f"data type {type(data)}") try: print(f"data -> RuleRelation") _data = [RuleRelation.parse_obj(i) for i in data] except: print(f"data -> String") _data = data chunked_text, _button, mapping_qs = ( await self.formatter.form_subject_unity(_data, header=_header) ) await self.send_message_helper( user=user, chunked_text=chunked_text, end_buttons=_button, ) user.subject_unities = mapping_qs except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_conflict_general_policy(self, user: BaleUser): user.is_processing_lock = True try: async for step_data in self.request_manager.stream_result( payload={ "section_content": user.input_query, "effort": user.effort, "limit": user.limit, "mode_type": "bale", }, url="/conflict/general_policy/qs_unity", ): step = step_data.get("step") data = step_data.get("data") is_first = step_data.get("is_first", False) is_last = step_data.get("is_last", False) _button = [[HOME_BUTTON]] print(f"==== handle_conflict_general_policy ====") if step == "rule_making": print(f"rule_making {user.effort}") chunked_response = await self.formatter.form_rule_making( _input=data ) await self.send_message_helper( user=user, update_message=False, chunked_text=chunked_response, ) elif step == "semantich_search": print(f"semantich_search {user.effort}") _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" response = await self.formatter.form_ss_rules(data, header=_header) await self.send_message_helper(user, chunked_text=response) elif step == "subject_unities": print(f"subject_unities {user.effort}") _header = "نتایج اولیه مغایرت های احتمالی :\n" print(f"data type {type(data)}") try: print(f"data -> RuleRelation") _data = [RuleRelation.parse_obj(i) for i in data] except: print(f"data -> String") _data = data chunked_text, _button, mapping_qs = ( await self.formatter.form_subject_unity(_data, header=_header) ) await self.send_message_helper( user=user, chunked_text=chunked_text, end_buttons=_button, ) user.subject_unities = mapping_qs except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_advanced_check_conflict(self, user: BaleUser): user.is_processing_lock = True print(f"handle_advanced_check_conflict======================================") try: step, _type, qq_title = user.call_back_query.split(":") if _type == "qq": groups = {} for qanon_title, item in user.subject_unities.items(): if qanon_title == qq_title: for i in item: groups[qanon_title] = i.db_rule.section_id if len(groups) > 1: _button = [] for i, (k, v) in enumerate(groups.items(), start=1): _button.append( [{"text": i, "callback_data": f"subject_unities:qs:{v}"}] ) await self.send_message_helper( user=user, text=f"برای ادامه یک مورد را از این قانون انتخاب کنید{k} انتخاب کنید", end_buttons=_button, ) else: user.call_back_query = f"subject_unities:qs:{groups[qq_title]}" elif _type == "qs": content = None for k, v in user.subject_unities.items(): if v.db_rule.section_id == qq_title: content = v.model_dump() print(f"content qs -> {content}") async for step_data in self.request_manager.stream_result( payload={ "rule_relation": content, "effort": user.effort, "mode_type": "bale", }, url="/conflict/unity_eval", ): print(f"qs {user.call_back_query}") step = step_data.get("step") data = step_data.get("data") # is_last = step_data.get("is_last") _button = [[HOME_BUTTON]] print(f"==== handle_advanced_check_conflict ====") if step == "step2": print(f"*********************Step2 {user.effort}") _header = "نتایج اولیه مغایرت های احتمالی :\n" _data = RuleRelation.parse_obj(data) chunked_text = await self.formatter.form_conflict_detection( _data, header=_header ) await self.send_message_helper(user=user, text=chunked_text) elif step == "step3": print(f"*********************Step3") _data = RuleRelation.parse_obj(data) chunked_text = ( await self.formatter.form_conflict_type_detection( _data, header=_header ) ) await self.send_message_helper(user=user, text=chunked_text) elif step == "step4": print(f"*********************Step4") _data = RuleRelation.parse_obj(data) chunked_text = ( await self.formatter.form_relation_identification( _data, header=_header ) ) await self.send_message_helper(user=user, text=chunked_text) elif step == "step5": print(f"*********************Step5") _data = Evaluation.parse_obj(data) chunked_text = await self.formatter.form_evaluation( _data, header=_header ) await self.send_message_helper(user=user, text=chunked_text) else: print(f"eerror in uknown step --> {step}") except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_chat_in_law( self, user: BaleUser ) -> List[BalePayload]: # -> List[List[str, dict]] user.is_processing_lock = True user.last_query = user.input_query # گرفتن آخرین runtime effort = user.effort try: # اگر runtime تغییر کرده یا نتیجه قبلی وجود ندارد → درخواست جدید result = await self.request_manager.get_result( payload={ "section_content": user.input_query, "effort": effort, "limit": user.limit, "mode_type": "bale", }, url="/semantic_search/run_chat", ) print(f"===================={result}") text_result = self.formatter.form_law_chat( result["answer"] # , llm_answer["source"] ) _buttons = [[HOME_BUTTON]] _b = [] if user.limit < self.max_limit: _b += [MORE_LIMIT_BUTTON] if effort != "medium": _b += [MORE_EFFORT_BUTTON] if len(_b) > 0: _buttons.insert(0, _b) await self.send_message_helper( user=user, chunked_text=text_result, end_buttons=_buttons ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False async def handle_stream_chat(self, user: BaleUser): """ stream_talk() ↓ async generator → chunk ↓ handle_stream_chat() ↓ send first message ↓ update_message (throttled) """ full_text = "" message_id = None last_update = 0 async for chunk in stream_talk(user.input_query): full_text += chunk now = time.time() # جلوگیری از اسپم API (مثلاً هر 0.6 ثانیه آپدیت) if now - last_update < 0.6: continue last_update = now if message_id is None: # اولین پیام message_id = await self.send_message_to_bale( user=user, text=full_text + " ▌", ) else: payload = BalePayload( chat_id=user.chat_id, message_id=message_id, text=full_text + " ▌" ) self.update_message_to_bale(user, payload) # آخر کار، بدون cursor if message_id: payload = BalePayload( chat_id=user.chat_id, message_id=message_id, text=full_text ) self.update_message_to_bale(user, payload) async def handle_logical_chat_in_law(self, user: BaleUser): user.is_processing_lock = True try: async for step_data in self.request_manager.stream_result( payload={ "section_content": user.input_query, "effort": user.effort, }, url="/stream/chat_logical", ): step = step_data.get("step") data = step_data.get("data") if step == "rule_making": _header = f"✅ مرحله استخراج اجزاء حقوقی متن انجام شد.\nتعداد جزء ها: {len(data)}\n اجزاء حقوقی:\n" response = await self.formatter.form_rule_making( data, header=_header ) await self.send_message_helper(user, chunked_text=response) elif step == "semantic_search": _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" response = await self.formatter.form_ss_rules(data, header=_header) await self.send_message_helper(user, chunked_text=response) elif step == "llm_answer": _button = [[HOME_BUTTON]] if user.effort != "medium": _button.insert(0, [MORE_EFFORT_BUTTON]) _header = f"📝 پاسخ نهایی:\n" response = await self.formatter.form_llm_answer_chat( data, header=_header ) print(f"response {response}") await self.send_message_helper( user, chunked_text=response, end_buttons=_button ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_beta(self, user: BaleUser): """ state: user.is_processing_lock: query: time.time first_name last_name """ try: text_1 = "سلام" text_2 = "سلام، عرض ادب و احترام" await self.send_message_helper( user=user, structed_output=[ ["این تست است ", [[{"text": "تستتت", "callback_data": "not_yet"}]]], [ "این تست اس2ت ", [[{"text": "تستت2ت", "callback_data": "not_yet"}]], ], ], end_buttons=[ [ {"text": "دکمه های نهایی", "callback_data": "not_yet"}, {"text": "دکمه های ن2هایی", "callback_data": "not_yet"}, {"text": "دکمه های ن23هایی", "callback_data": "not_yet"}, ], [{"text": "دکمه های آخر", "callback_data": "not_yet"}], ], ) except Exception as e: print("ERROR in handl_beta:", str(traceback.print_exc())) async def save_to_es(self, data: QaChat): # print("save_to_es data rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr") try: es_res = self.es_helper.update_index_doc( is_update_state=False, index_name_o=self.es_index_name, eid=data.id, data=data.model_dump(), ) # type_name, payload, request print(f"Saved {es_res}") except Exception as e: print("save_to_es ", str(traceback.print_exc()))