################# modularity ### import from external-package import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re, random from typing import Iterable, Any, List, Dict, Optional from core.base_model import * ### import from internal-file from router.bale.base_model import * from router.bale.bale_buttons import * from router.bale.bale_massages import * from router.bale.bale_manager import UserManager from core.operation import Operation from core.core import * from core.static import * from router.bale.config import * class BaleBotGeneral: """ کلاس اصلی ارتباط با Bale و متد های اصلی ارتباط """ def __init__( self, token: str, send_message_url: Optional[str] = None, delete_message_url: Optional[str] = None, update_message_url: Optional[str] = None, ): self.token = token self.send_message_url = ( send_message_url or f"https://tapi.bale.ai/bot{self.token}/sendMessage" ) self.delete_message_url = ( delete_message_url or f"https://tapi.bale.ai/bot{self.token}/deleteMessage" ) self.update_message_url = ( update_message_url or f"https://tapi.bale.ai/bot{self.token}/editMessageText" ) async def delete_bale_message(self, chat_id, message_id): payload = { "chat_id": chat_id, "message_id": message_id, } try: r = requests.post(self.delete_message_url, json=payload) print(f"Delete Message Status code:", r.status_code) except Exception: print("ERROR in Delete Message Status code:") traceback.print_exc() async def update_message_to_bale( self, user: BaleUser, text: str = None, buttons: List = [], reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, ): payload = { "chat_id": user.chat_id, "message_id": user.active_message_id, "text": text, "reply_markup": { # "keyboard": reply_markup, "resize_keyboard": True, "one_time_keyboard": False, }, } if buttons: payload["reply_markup"]["inline_keyboard"] = buttons try: r = requests.post(self.update_message_url, json=payload) if not r.ok: print("ERROR in Update Message Bale API Error:", r.status_code, r.text) print(f"Update Message Status code:", r.status_code) except Exception: print("ERROR in Update Message Status code:") traceback.print_exc() async def send_message_helper( self, user: BaleUser, update_message: bool = False, text: str = None, chunked_text: List[str] = None, structed_output: List = None, end_buttons: List = [], reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, is_save_active_message_id=False, ): _i = 0 if text: _i += 1 if chunked_text: _i += 1 if structed_output: _i += 1 if _i != 1: raise ValueError( "In send_message_helper Only Send One Of {text, chunked_text, structed_output}" ) if update_message == True: if user.active_message_id == 0: await self.send_message_helper( user=user, update_message=False, text=text, end_buttons=end_buttons, chunked_text=chunked_text, structed_output=structed_output, reply_markup=reply_markup, is_save_active_message_id=is_save_active_message_id, ) else: if text: new_text = text if chunked_text: new_text = chunked_text[-1] if structed_output: new_text = structed_output[-1][0] await self.update_message_to_bale( user=user, text=new_text, buttons=end_buttons, reply_markup=reply_markup, ) else: if structed_output: for i, item in enumerate(structed_output, start=1): if i == len(structed_output) and len(end_buttons) > 0: # is_last item[1] += end_buttons await self.send_message_to_bale( user=user, text=item[0], buttons=item[1], reply_markup=reply_markup, is_save_active_message_id=is_save_active_message_id, ) if chunked_text: _buttons = [] for i, item in enumerate(chunked_text, start=1): if i == len(chunked_text) and len(end_buttons) > 0: # is_last _buttons = end_buttons await self.send_message_to_bale( user=user, text=item, buttons=_buttons, reply_markup=reply_markup, is_save_active_message_id=is_save_active_message_id, ) await asyncio.sleep(0.05) if text: await self.send_message_to_bale( user=user, text=text, buttons=end_buttons, reply_markup=reply_markup, is_save_active_message_id=is_save_active_message_id, ) async def send_message_to_bale( self, user: BaleUser, text: str, buttons: List = [], reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, is_save_active_message_id=False, ): payload = { "chat_id": user.chat_id, "text": text, "reply_markup": { # "keyboard": reply_markup, "resize_keyboard": True, "one_time_keyboard": False, }, } if buttons: payload["reply_markup"]["inline_keyboard"] = buttons try: r = requests.post(self.send_message_url, json=payload) if not r.ok: print("ERROR in Send Message Bale API Error:", r.status_code, r.text) r_ = r.json() print(f"Send Message Status code:", r.status_code) if is_save_active_message_id: user.active_message_id = int(r_["result"]["message_id"]) except Exception: print("ERROR in Send Message Status code:") traceback.print_exc() class BaleBot(BaleBotGeneral): """ input → set_state → render_user_state """ def __init__( self, user_manager: UserManager, operation: Operation, es_index_name: str, token: str, es_helper: ElasticHelper, formatter: Formatter, back_end_url: str, request_manager: RequestManager, ): super().__init__( token=token, ) self.ui_handler = UIState( delay_time=0.4, token=token, ) self.freeze_limit = 3 self.operator = operation self.formatter = formatter self.request_manager = request_manager self.es_helper = es_helper self.token = token self.es_index_name = es_index_name self.user_manager = user_manager self.max_limit = 100 self.back_end_url = back_end_url """ deleteMessage message_id chat_id """ # self.user_state = self.make_state() # ------------------------------------------------------------------ # توابعی که در والد BaleBotGeneral هستند # ------------------------------------------------------------------ async def delete_bale_message(self, **kwargs): await super().delete_bale_message(**kwargs) async def update_message_to_bale(self, *args, **kwargs): await super().update_message_to_bale(*args, **kwargs) async def send_message_helper(self, *args, **kwargs): await super().send_message_helper(*args, **kwargs) async def send_message_to_bale(self, *args, **kwargs): await super().send_message_to_bale(*args, **kwargs) # ------------------------------------------------------------------ # توابع همین کلاس # ------------------------------------------------------------------ async def render_user_state(self, user: BaleUser): """Bus سیستم""" try: print(f"user.input_query.render_user_state >{user.input_query}<") print(f"user.state >{user.state_detail.state}<") if user.input_query == "": await self.send_message_helper( user=user, text=user.state_detail.message, end_buttons=user.state_detail.end_buttons, ) return {"ok": True} # if user.input_query != "" and not run_internal and # and user.last_input_query == "" # if user.last_input_query != "" and user.input_query != "" and "subject_unities": # await self.handle_advanced_check_conflict(user) if user.state_detail.handler: user.is_processing_lock = True handler = getattr(self, user.state_detail.handler) await handler(user) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) return {"ok": True} async def render_update(self, update: BaleUpdate): """ مهمترین تابع ربات بله ورودی شما یا به صورت متن ساده هست یا بصورت query render_message : متن render_callback : کوئری """ # 2️⃣ ساخت یا بازیابی user user = self.user_manager.get_or_create(update) print(f"render_update user -> {user.uc_id} user.is_processing_lock {user.is_processing_lock }") if user.is_processing_lock == False: if update.message: user.message_limit = 0 user.input_query = str(user.update.message.text or "").strip() # async def render_message(self, user: BaleUser): if user.input_query in ("/start", "main", "بازگشت به خانه"): user.input_query = "" await self.handle_main(user) return {"ok": True} elif user.input_query == "جستجو": user.state_detail = STATE_CONFIG["search_in_law"] user.input_query = "" elif user.input_query == "گفتگو": user.state_detail = STATE_CONFIG["chat_in_law"] user.input_query = "" # --- WorkFlow Logic by State await self.render_user_state(user) return {"ok": True} if update.callback_query: """ اینجا فقط باید مرحله و state عوض شود و داده ای وارد نمیشود!!! برای سوال پرسیدن از کاربر و ... """ user.message_limit = 0 user.is_call_back_query = True user.call_back_query = user.update.callback_query.data # user.update.callback_query.data run_internal = None if user.call_back_query == "main": await self.handle_main(user) return {"ok": True} # Dynamic Change Options if user.call_back_query == "more_limit": user.limit += 10 user.input_query = user.last_input_query if user.state_detail == STATE_CONFIG["logical_chat_in_law"]: user.state_detail = STATE_CONFIG["chat_in_law"] # Dynamic Change Options elif user.call_back_query == "more_effort": user.effort = "medium" user.input_query = user.last_input_query elif user.call_back_query == "logical_chat_in_law": # user.effort = "medium" user.input_query = user.last_input_query user.state_detail = STATE_CONFIG["logical_chat_in_law"] elif user.call_back_query.startswith("subject_unities:"): run_internal = "subject_unities" # subject_unities:qq:qq_983214 # Dynamic Change State else: user.state_detail = STATE_CONFIG[user.call_back_query] await self.render_user_state(user) return {"ok": True} else: if user.call_back_query == "main": await self.handle_main(user) return {"ok": True} # beta-mode if user.message_limit < self.freeze_limit: await self.send_message_helper( user=user, text=BUSY_TEXT, end_buttons=[[{"text":"توقف عملیات در حال اجراء", "callback_data":"main"}]] ) user.message_limit += 1 return {"ok": True} async def talk(self, user: BaleUser): pass async def handle_main(self, user: BaleUser): # if user.input_query != "": # "اگر کاربر پیام داد در حالت عادی به چت منتقل شود" # user.state_detail = STATE_CONFIG["chat_in_law"] # await self.render_user_state(user) # return {"ok": True} """ اگر کاربر پیامی نداده بود صفحه اصلی نمایش داده شود و داده ها پاک شود """ # ریست کردن پشته‌ها و نتایج user.last_query = "" user.last_result = "" user.stack.clear() user.input_query = "" user.sub_state = "" user.limit = 10 user.effort = "low" user.active_message_id = 0 user.message_limit = 0 message = "من ربات تست ام" await self.send_message_helper( user=user, text=message, # end_buttons=MAIN_BUTTON, ) async def choice_button(self, user: BaleUser, choice_buttons): # handle_conflict_law_writing_policy # handle_conflict_qanon_asasi # handle_conflict_general_policy # handle_conflict_all_qavanin result = await self.send_message_helper( user=user, text="یک مورد را انتخاب کنید:", end_buttons=choice_buttons, ) """ -> Response from render message -> return """ return None async def handle_search_in_law(self, user: BaleUser): user.is_processing_lock = True print(f"handle_search_in_law -> {user.input_query}") self.ui_handler.create(user=user, main_text="در حال جستجو") _buttons = [[HOME_BUTTON]] try: async for result in self.operator.stream_search_in_law( query=user.input_query, limit=user.limit, rerank_model="BAAI/bge-reranker-v2-m3", embed_model="BAAI/bge-m3", ): result = BMNewSemanticSearchOutput.model_validate(result["result"]) if isinstance(result, BMNewSemanticSearchOutput): self.ui_handler.cancel(user) chunked_text_ = self.formatter.form_search_in_law( header=f"برای پرسش: {user.input_query}\n\n", footer=f"مدت زمان بردارسازی {result.embed_model_time} مدت زمان تشابه یابی {result.cosine_similarity_time} مدت زمان تشابه یابی دقیق {result.rerank_time}", sections=result.result, ) if user.limit < self.max_limit: _buttons.insert(0, [MORE_LIMIT_BUTTON]) await self.send_message_helper( user=user, chunked_text=chunked_text_, end_buttons=_buttons, ) # else: # heartbeat_task except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: data = { "state": "search_in_law", "contents": [i.content for i in result.result], "scores": [i.score for i in result.result], "ref_ids": [i.id for i in result.result], "user_input": user.input_query, "metadata": { "embed_model_time": result.embed_model_time, "cosine_similarity_time": result.cosine_similarity_time, "rerank_time": result.rerank_time, }, } data["metadata"].update(result.metadata) await self.save_to_db(user, data=data) user.is_processing_lock = False user.last_input_query = user.input_query user.input_query = "" self.ui_handler.cancel(user) return {"ok": True} async def handle_search_in_law_rules(self, user: BaleUser): user.is_processing_lock = True _status_show_map = { "total_process": "تعداد کل موارد مورد بررسی", "total_processed": "تعداد موارد بررسی شده", } total_process = len(user.last_result) * user.limit es_data = None es_metadata = None print( f"user.limit {user.limit}", f"len(user.last_result) {len(user.last_result)}", f"total_process*****s {total_process}", ) try: if user.last_result: print(f"/////////////user.last_result", len(user.last_result)) await self.send_message_helper( user=user, text="در حال جستجو ...", update_message=True, is_save_active_message_id=True, ) _buttons = [ [HOME_BUTTON], [{"text": "بررسی مغایرت", "callback_data": "not_yet"}], ] filter_qanon_ids = [] # filter_qanon_ids باید از کاربر پرسیده شود async for step_data in self.operator.stream_rule_semantic_search( queries=[i.model_dump() for i in user.last_result], filter_qanon_ids=filter_qanon_ids, limit_rerank=user.limit, ): status = step_data.get("status") data = step_data.get("data") metadata = step_data.get("metadata") # time if status == "end": # print(f'*****data {data}') es_data = data[status] es_metadata = metadata data = [ SemanticSearchP2P.model_validate(i) for i in data[status] ] _res = self.formatter.form_search_in_law_rules( header="نتایج جستجوی اجزاء در قانون\n", body=data, ) print(f"len(_res) {len(_res) }") if user.limit < self.max_limit: _buttons.insert(0, [MORE_LIMIT_BUTTON]) if len(_res) == 1: await self.send_message_helper( user=user, chunked_text=_res, update_message=True, end_buttons=_buttons, ) else: for i, item in enumerate(_res): print(i) if i == 0: print("start-batch") await self.send_message_helper( user=user, text=item, update_message=True, ) elif i == len(_res) - 1: print("end-batch") await self.send_message_helper( user=user, text=item, end_buttons=_buttons, ) else: print("middle-batch") await self.send_message_helper( user=user, text=item, ) user.active_message_id = 0 else: # print( # f"status {status}\nText {type(_status_show_map[status])}\ndata type {type(data[status])}" # ) if status == "total_processed": _result = ( _status_show_map[status] + f": {data['total_processed']}/{total_process}" ) elif status == "total_process": _result = ( _status_show_map[status] + f": {data['total_process']}" ) await self.send_message_helper( user=user, text=_result, update_message=True, is_save_active_message_id=True, ) else: print("-----------------------No Found Any Last_data") if es_data: _data = { "state": "search_in_law_rules", "result": es_data, "metadata": {}, } _data["metadata"].update(es_metadata) await self.save_to_db(user, data=_data) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_rule_making(self, user: BaleUser): user.is_processing_lock = True self.ui_handler.create(user=user, main_text="در حال بررسی") _buttons = [ [ { "text": "مشابهت یابی در قانون", "callback_data": "not_yet", # "search_in_law_rules", } ], [HOME_BUTTON], ] _max_token = None try: f_data = None f_metadata = None async for step_data in self.operator.stream_rule_making( query=user.input_query, llm_name="gpt-oss-20b", effort=user.effort ): status = step_data.get("status") data = step_data.get("data") metadata = step_data.get("metadata") # time if status == "end": self.ui_handler.cancel(user) f_data = data[status] f_metadata = metadata res_ = await self.formatter.form_rule_making( _input=data[status], footer=f'تعداد توکن های مصرف شده {_max_token}', ) if user.effort != "medium": _buttons.insert(0, [MORE_EFFORT_BUTTON]) await self.send_message_helper( user=user, chunked_text=res_, end_buttons=_buttons, ) user.last_result = [ InputRule.model_validate(i) for i in data[status] ] if user.is_vip: _result2 = await self.formatter.form_chat( header=" *reasoning* :\n", llm_text=metadata["llm_reason"], ) await self.send_message_helper( user=user, chunked_text=_result2, ) else: _max_token = data["max_token"] if _max_token: self.ui_handler.main_text = [ f'تعداد توکن های مصرف شده {_max_token}', ] except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: if f_data: _data = { "state": "rule_making", "result": f_data, "llm_reason": metadata["llm_reason"], "metadata": { "llm_token":_max_token }, } print( f'_data {_data}' ) _data["metadata"].update(f_metadata) await self.save_to_db(user, data=_data) user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False self.ui_handler.cancel(user) return {"ok": True} async def handle_chat_in_law(self, user: BaleUser): user.is_processing_lock = True self.ui_handler.create(user=user, main_text=[ "در حال تحلیل", "در حال طبقه بندی", "در حال تفکر", "ساختار بندی", ], ) # print('asyncio') # sleep(80) # print('wait') _buttons = [[HOME_BUTTON]] try: async for _result in self.operator.stream_chat_in_law( query=user.input_query, effort=user.effort, limit=user.limit ): print(f"handle_chat_in_law **--**---*--*-*-*-***-**-*-*-*-** ") self.ui_handler.cancel(user) result = ChatLaw.model_validate(_result["result"]) user.last_result = result text_result = self.formatter.form_law_chat(result.answer) if result.answer_type == "legal_question": _b = [ { "text": "بررسی عمیق تر", "callback_data": "logical_chat_in_law", } ] if user.limit < self.max_limit: _b.append( { "text": "بررسی گسترده تر", "callback_data": "more_limit", } ) _buttons.insert(0, _b) await self.send_message_helper( user=user, chunked_text=text_result, end_buttons=_buttons ) if user.is_vip: print(f"user.is_vip {user.is_vip}") reason_result = await self.formatter.form_chat( header="model-reason", llm_text=result.llm_reason ) await self.send_message_helper( user=user, chunked_text=reason_result, ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: print(f"result {type(result)}") print(f"result.answer {type(result.answer)}") print(f"result.ref_ids {type(result.ref_ids)}") print(f"result.answer_type {type(result.answer_type)}") _data = { "state": "chat_in_law", "result": result.answer, "llm_reason": result.llm_reason, "metadata": { "ref_ids": result.ref_ids, "answer_type": result.answer_type, }, } await self.save_to_db(user=user, data=_data) user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False self.ui_handler.cancel(user) async def handle_qanon_title_repeat(self, user: BaleUser): user.is_processing_lock = True try: _result: List[TitleRepeat] = await self.operator.title_repeated( qanontitle=user.input_query ) print(f"*/*/*/*/*/*/*/*/*/*/*/result_ {_result}") _res = await self.formatter.form_title_repeated(_input=_result) _buttons = [[HOME_BUTTON]] await self.send_message_helper( user=user, chunked_text=_res, end_buttons=_buttons ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_talk(self, user: BaleUser): user.is_processing_lock = True try: _result = await self.operator.talk(query=user.input_query) await self.send_message_helper( user=user, text=_result, end_buttons=MAIN_BUTTON ) except Exception as e: print("ERROR in handle_talk:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_logical_chat_in_law(self, user: BaleUser): user.is_processing_lock = True self.ui_handler.create(user=user, main_text=["در حال استخراج اجزاء"]) f_data = "" _button = [[HOME_BUTTON]] len_rules = 0 try: metadata = user.last_result["metadata"] except: metadata = {} try: async for _data in self.operator.stream_logical_chat_in_law( query=user.input_query, effort=user.effort, metadata=metadata, limit=20 ): status = _data.get("status") data = _data.get("data") # _metadata = _data.get("metadata") print( f"status {status}", ) if status == "rules": len_rules = data self.ui_handler.main_text = [f"تعداد اجزاء استخراج شده {data}"] elif status == "semantic": self.ui_handler.main_text = [ f"مشابهت‌ یابی اجزاء {user.limit*len_rules}/{data}" ] elif status == "llm": self.ui_handler.main_text = ["ساخت پاسخ نهایی"] self.ui_handler.cancel(user) _header = f"📝 پاسخ نهایی:\n" f_data = data["text"] response = await self.formatter.form_llm_answer_chat( data["text"], header=_header ) await self.send_message_helper( user, chunked_text=response, end_buttons=_button, ) if user.is_vip: response2 = await self.formatter.form_chat( data["llm_reason"], header="دلیل مدل برای خروجی" ) await self.send_message_helper( user, chunked_text=response2, ) # break except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False user.input_query = "" self.ui_handler.cancel(user) _data = { "state": "logical_chat_in_law", "result": f_data, "llm_reason": data["llm_reason"], "metadata": {}, } # _data['metadata'].update(_metadata) await self.save_to_db(user=user, data=_data) return {"ok": True} async def handle_conflict_qanon_asasi(self, user: BaleUser): user.is_processing_lock = True user.active_message_id = 0 try: print(f"effort=user.effort {user.effort}") _buttons = [[HOME_BUTTON]] if user.effort == "low": current_text = "*نتیجه بررسی مغایرت با اصول مهم قانون اساسی*:\n\n" seen_k = set() _buttons.insert(0, [MORE_EFFORT_BUTTON]) async for _data in self.operator.conflict_qanon_asasi_low( user.input_query, user.effort, user.limit ): status = _data.get("status") data = _data.get("data") print(f"status {type(status)}-{status}") data = {k: v for k, v in data.items() if k not in seen_k} print(f"data {len(data)} seen_k {len(list(seen_k))}") text = await self.formatter.form_constitution_low(data, status) text = text + "\n\n" seen_k.update(data.keys()) if status == 11: await self.send_message_helper( user, text=current_text, end_buttons=_buttons, update_message=True, is_save_active_message_id=True, ) else: text = [text[i : i + 50] for i in range(0, len(text), 50)] for word in text: current_text += word await self.send_message_helper( user, text=current_text, update_message=True, is_save_active_message_id=True, ) text = "" elif user.effort == "medium": print(f"==== handle_constitution ====") # chunked_response = await self.formatter.form_rule_making( # _input=data # ) await self.send_message_helper( user=user, text="در دست توسعه...", ) # async for _data in self.operator.conflict_qanon_asasi_steps( # user.input_query, user.effort, user.limit # ): # status = _data.get("status") # data = _data.get("data") # print(f"status {type(status)}-{status}") # print(f"semantich_search {user.effort}") # _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" # response = await self.formatter.form_ss_rules(data, header=_header) # await self.send_message_helper(user, chunked_text=response) # print(f"subject_unities {user.effort}") # _header = "نتایج اولیه مغایرت های احتمالی :\n" # print(f"data type {type(data)}") # try: # print(f"data -> RuleRelation") # _data = [RuleRelation.model_validate(i) for i in data] # except: # print(f"data -> String") # _data = data # chunked_text, _button, mapping_qs = ( # await self.formatter.form_subject_unity(_data, header=_header) # ) # await self.send_message_helper( # user=user, # chunked_text=chunked_text, # end_buttons=_button, # ) # user.subject_unities = mapping_qs except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False return {"ok": True} async def handle_conflict_law_writing_policy(self, user: BaleUser): user.is_processing_lock = True _buttons = [[HOME_BUTTON]] self.ui_handler.create(user=user, main_text=[ "در حال تحلیل", "در حال طبقه بندی", "در حال تفکر", "ساختار بندی", ], ) try: _result = await self.operator.conflict_law_writing_policy( query=user.input_query, effort=user.effort ) result = await self.formatter.from_law_writing_policy( _input_dict=_result, header="نتیجه بررسی با سیاست های قانون گذاری" ) if user.effort != "medium": _buttons.insert(0, [MORE_EFFORT_BUTTON]) self.ui_handler.cancel(user) await self.send_message_helper( user=user, chunked_text=result, end_buttons=_buttons ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False return {"ok": True} async def handle_conflict_all_qavanin(self, user: BaleUser): user.is_processing_lock = True try: async for step_data in self.request_manager.stream_result( payload={ "section_content": user.input_query, "effort": user.effort, "limit": user.limit, "mode_type": "bale", }, url="/conflict/all_qanon/qs_unity", ): step = step_data.get("step") data = step_data.get("data") is_first = step_data.get("is_first", False) is_last = step_data.get("is_last", False) _button = [[HOME_BUTTON]] print(f"==== handle_conflict_general_policy ====") if step == "rule_making": print(f"rule_making {user.effort}") chunked_response = await self.formatter.form_rule_making( _input=data ) await self.send_message_helper( user=user, update_message=False, chunked_text=chunked_response, ) elif step == "semantich_search": print(f"semantich_search {user.effort}") _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" response = await self.formatter.form_ss_rules(data, header=_header) await self.send_message_helper(user, chunked_text=response) elif step == "subject_unities": print(f"subject_unities {user.effort}") _header = "نتایج اولیه مغایرت های احتمالی :\n" print(f"data type {type(data)}") try: print(f"data -> RuleRelation") _data = [RuleRelation.model_validate(i) for i in data] except: print(f"data -> String") _data = data chunked_text, _button, mapping_qs = ( await self.formatter.form_subject_unity(_data, header=_header) ) await self.send_message_helper( user=user, chunked_text=chunked_text, end_buttons=_button, ) user.subject_unities = mapping_qs except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: # user.input_query = "" user.is_processing_lock = False return {"ok": True} async def handle_conflict_general_policy(self, user: BaleUser): user.is_processing_lock = True try: async for step_data in self.request_manager.stream_result( payload={ "section_content": user.input_query, "effort": user.effort, "limit": user.limit, "mode_type": "bale", }, url="/conflict/general_policy/qs_unity", ): step = step_data.get("step") data = step_data.get("data") is_first = step_data.get("is_first", False) is_last = step_data.get("is_last", False) _button = [[HOME_BUTTON]] print(f"==== handle_conflict_general_policy ====") if step == "rule_making": print(f"rule_making {user.effort}") chunked_response = await self.formatter.form_rule_making( _input=data ) await self.send_message_helper( user=user, update_message=False, chunked_text=chunked_response, ) elif step == "semantich_search": print(f"semantich_search {user.effort}") _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" response = await self.formatter.form_ss_rules(data, header=_header) await self.send_message_helper(user, chunked_text=response) elif step == "subject_unities": print(f"subject_unities {user.effort}") _header = "نتایج اولیه مغایرت های احتمالی :\n" print(f"data type {type(data)}") try: print(f"data -> RuleRelation") _data = [RuleRelation.model_validate(i) for i in data] except: print(f"data -> String") _data = data chunked_text, _button, mapping_qs = ( await self.formatter.form_subject_unity(_data, header=_header) ) await self.send_message_helper( user=user, chunked_text=chunked_text, end_buttons=_button, ) user.subject_unities = mapping_qs except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: # user.input_query = "" user.is_processing_lock = False return {"ok": True} async def handle_advanced_check_conflict(self, user: BaleUser): user.is_processing_lock = True print(f"handle_advanced_check_conflict======================================") try: print(f"user.call_back_query {user.call_back_query}") step, _type, qq_title = user.call_back_query.split(":") if _type == "qq": print("A") groups = {} for qanon_title, item in user.subject_unities.items(): if qanon_title == qq_title: for i in item: groups[qanon_title] = i.db_rule.section_id print("B") if len(groups) > 1: _button = [] for i, (k, v) in enumerate(groups.items(), start=1): _button.append( [{"text": i, "callback_data": f"subject_unities:qs:{v}"}] ) await self.send_message_helper( user=user, text=f"برای ادامه یک مورد را از این قانون انتخاب کنید{k} انتخاب کنید", end_buttons=_button, ) else: user.call_back_query = f"subject_unities:qs:{groups[qq_title]}" print("ccccc") elif _type == "qs": print("AAA@@@") content = None for k, v in user.subject_unities.items(): if v.db_rule.section_id == qq_title: content = v.model_dump() print(f"content qs -> {content}") async for step_data in self.request_manager.stream_result( payload={ "rule_relation": content, "effort": user.effort, "mode_type": "bale", }, url="/conflict/unity_eval", ): print(f"qs {user.call_back_query}") step = step_data.get("step") data = step_data.get("data") # is_last = step_data.get("is_last") _button = [[HOME_BUTTON]] print(f"==== handle_advanced_check_conflict ====") if step == "step2": print(f"*********************Step2 {user.effort}") _header = "نتایج اولیه مغایرت های احتمالی :\n" _data = RuleRelation.model_validate(data) chunked_text = await self.formatter.form_conflict_detection( _data, header=_header ) await self.send_message_helper(user=user, text=chunked_text) elif step == "step3": print(f"*********************Step3") _data = RuleRelation.model_validate(data) chunked_text = ( await self.formatter.form_conflict_type_detection( _data, header=_header ) ) await self.send_message_helper(user=user, text=chunked_text) elif step == "step4": print(f"*********************Step4") _data = RuleRelation.model_validate(data) chunked_text = ( await self.formatter.form_relation_identification( _data, header=_header ) ) await self.send_message_helper(user=user, text=chunked_text) elif step == "step5": print(f"*********************Step5") _data = Evaluation.model_validate(data) chunked_text = await self.formatter.form_evaluation( _data, header=_header ) await self.send_message_helper(user=user, text=chunked_text) else: print(f"eerror in uknown step --> {step}") except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False return {"ok": True} async def handle_stream_chat(self, user: BaleUser): """ stream_talk() ↓ async generator → chunk ↓ handle_stream_chat() ↓ send first message ↓ update_message (throttled) """ full_text = "" message_id = None last_update = 0 async for chunk in stream_talk(user.input_query): full_text += chunk now = time.time() # جلوگیری از اسپم API (مثلاً هر 0.6 ثانیه آپدیت) if now - last_update < 0.6: continue last_update = now if message_id is None: # اولین پیام message_id = await self.send_message_to_bale( user=user, text=full_text + " ▌", ) else: payload = BalePayload( chat_id=user.chat_id, message_id=message_id, text=full_text + " ▌" ) self.update_message_to_bale(user, payload) # آخر کار، بدون cursor if message_id: payload = BalePayload( chat_id=user.chat_id, message_id=message_id, text=full_text ) self.update_message_to_bale(user, payload) async def handle_beta(self, user: BaleUser): """ state: user.is_processing_lock: query: time.time first_name last_name """ try: text_1 = "سلام" text_2 = "سلام، عرض ادب و احترام" await self.send_message_helper( user=user, structed_output=[ ["این تست است ", [[{"text": "تستتت", "callback_data": "not_yet"}]]], [ "این تست اس2ت ", [[{"text": "تستت2ت", "callback_data": "not_yet"}]], ], ], end_buttons=[ [ {"text": "دکمه های نهایی", "callback_data": "not_yet"}, {"text": "دکمه های ن2هایی", "callback_data": "not_yet"}, {"text": "دکمه های ن23هایی", "callback_data": "not_yet"}, ], [{"text": "دکمه های آخر", "callback_data": "not_yet"}], ], ) except Exception as e: print("ERROR in handl_beta:", str(traceback.print_exc())) async def save_to_db(self, user: BaleUser, data: Dict): time_now = int(time.time()) _id = f"ble_{user.chat_id}_{time_now}" data.update( { "id": _id, "uc_id": user.uc_id, "message_id": user.active_message_id, "username": user.username, "is_bot": user.is_bot, "first_name": user.first_name, "last_name": user.last_name, "effort": user.effort, "limit": user.limit, "input_query": user.input_query, "timestamp": time_now, } ) try: es_res = self.es_helper.update_index_doc( is_update_state=False, index_name_o=self.es_index_name, eid=_id, data=data, ) # type_name, payload, request print(f"-- ES-Saved {es_res}") except Exception as e: print("++ failed save_to_Es ", str(traceback.print_exc())) class UIState(BaleBotGeneral): """ مسئول نمایش «در حال تحلیل…» و پاک‌کردن پیام اصلی در زمان لغو یا پایان کار است. برای سرگرم کردن کاربر یک روند کاملا مجزا برای نشان دادن پیام و حذف در انتها """ def __init__( self, token:str, delay_time: float = 0.4, ): super().__init__( token=token, ) """ :param bot: نمونهٔ BaleBot که متدهای API را در اختیار دارد :param user: کاربری که پیامش در حال پردازش است """ self.delay_time = delay_time self.done = False self._heartbeat_task: Dict[str, asyncio.Task] = {} # ------------------------------------------------------------------ # توابعی که در والد BaleBotGeneral هستند # ------------------------------------------------------------------ async def delete_bale_message(self, *args, **kwargs): await super().delete_bale_message(*args, **kwargs) async def send_message_helper(self, *args, **kwargs): await super().send_message_helper(*args, **kwargs) # ------------------------------------------------------------------ # توابع خود کلاس # ------------------------------------------------------------------ async def active(self, user: BaleUser,): await self.send_message_helper( user=user, text=self.main_text[0], update_message=False, is_save_active_message_id=True, ) # ------------------------------------------------------------------ # متدهای عمومی # ------------------------------------------------------------------ def cancel(self, user: BaleUser,): """ متوقف کردن حلقهٔ heartbeat و پاک‌کردن پیام اصلی """ self.done = True _user_task = self._heartbeat_task.get(user.uc_id, None) if _user_task: self._heartbeat_task[user.uc_id].cancel() # اگر هنوز در حال اجرا باشد # پاک‌کردن پیام اصلی asyncio.create_task( self.delete_bale_message( chat_id=user.chat_id, message_id=user.active_message_id, ) ) self.main_text = ["در حال تحلیل"] def create(self, user: BaleUser, main_text: str | list = ["در حال تحلیل"], waiting_list:list=[]): """ شروع حلقهٔ heartbeat """ if isinstance(main_text, str): self.main_text = [main_text] elif isinstance(main_text, list): self.main_text = main_text print( f'create create' ) self._heartbeat_task[user.uc_id] = asyncio.create_task(self.heartbeat_loop(user=user, waiting_list=waiting_list)) # ------------------------------------------------------------------ # حلقهٔ heartbeat # ------------------------------------------------------------------ async def heartbeat_loop(self, user: BaleUser, waiting_list=[] ): """ هر 0.2 ثانیه متن «در حال تحلیل…» را به‌روزرسانی می‌کند. """ await self.active(user=user) if len(waiting_list) == 0: waiting_list = [".", "..", "...", "...."] while not self.done: text = random.choice(self.main_text) for j in waiting_list: # ویرایش پیام _text = f"{j} {text}" await self.send_message_helper( user=user, text=_text, update_message=True, is_save_active_message_id=True, ) await asyncio.sleep(self.delay_time) # صبر 5 ثانیه # await asyncio.sleep(0.2)