################# modularity ### import from external-package import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re, random from typing import Iterable, Any, List, Dict, Optional from core.base_model import * ### import from internal-file from router.bale.base_model import * from router.bale.bale_buttons import * from router.bale.bale_massages import * from router.bale.bale_manager import UserManager from core.operation import Operation from core.core import * from core.static import * from router.bale.config import * class BaleBotBase: """ کلاس اصلی ارتباط با Bale و متد های اصلی ارتباط """ def __init__( self, send_message_url: str, delete_message_url: str, update_message_url: str, ): self.send_message_url = send_message_url self.delete_message_url = delete_message_url self.update_message_url = update_message_url async def delete_bale_message(self, chat_id, message_id): payload = { "chat_id": chat_id, "message_id": message_id, } try: r = requests.post(self.delete_message_url, json=payload) print(f"Delete Message Status code:", r.status_code) except Exception: print(f"Delete Message Status code:", r.status_code, f"r {r}") print("ERROR in Delete Message Status code:") traceback.print_exc() async def update_bale_message( self, user: BaleUser, text: str = None, buttons: List = [], reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, ): payload = { "chat_id": user.chat_id, "message_id": user.active_message_id, "text": text, "reply_markup": { # "keyboard": reply_markup, "resize_keyboard": True, "one_time_keyboard": False, }, } if buttons: payload["reply_markup"]["inline_keyboard"] = buttons try: r = requests.post(self.update_message_url, json=payload) print(f"Update Message Status code:", r.status_code) except Exception: print("ERROR in Update Message Bale API Error:", r.status_code, r) print("ERROR in Update Message Status code:") traceback.print_exc() async def serialize(self, obj): if isinstance(obj, BaseModel): return obj.model_dump() elif isinstance(obj, list): return [await self.serialize(i) for i in obj] elif isinstance(obj, dict): return {k: await self.serialize(v) for k, v in obj.items()} else: return obj async def normalize_messages( self, text: Optional[str] = None, chunked_text: Optional[List[str]] = None, structured_text: Optional[List] = None, end_buttons: List[BaleButton] = [], ) -> List[BaleMessage]: messages: List[BaleMessage] = [] if structured_text: for text_, buttons in structured_text: messages.append(BaleMessage(text=text_, buttons=buttons)) elif chunked_text: for chunk in chunked_text: messages.append(BaleMessage(text=chunk)) elif text: messages.append(BaleMessage(text=text)) # attach end buttons to last message if messages and end_buttons: messages[-1].buttons.extend(end_buttons) return messages async def send_message_helper( self, user: BaleUser, update_message: bool = False, text: Optional[str] = None, chunked_text: Optional[List[str]] = None, structured_text: Optional[List] = None, end_buttons: List[List[BaleButton]] = [], reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, is_save_active_message_id=False, ): _i = 0 if text: _i += 1 if chunked_text: _i += 1 if structured_text: _i += 1 if _i != 1: raise ValueError( "In send_message_helper Only Send One Of {text, chunked_text, structured_text}" ) messages: List[BaleMessage] = await self.normalize_messages( text=text, chunked_text=chunked_text, structured_text=structured_text, end_buttons=end_buttons, ) if update_message == True: if user.active_message_id == 0: await self.send_message_helper( user=user, update_message=False, text=text, end_buttons=end_buttons, chunked_text=chunked_text, structured_text=structured_text, reply_markup=reply_markup, is_save_active_message_id=is_save_active_message_id, ) else: if text: new_text = text if chunked_text: new_text = chunked_text[-1] if structured_text: new_text = structured_text[-1][0] await self.update_bale_message( user=user, text=new_text, buttons=end_buttons, reply_markup=reply_markup, ) else: for i, msg in enumerate(messages): await self.send_bale_message( user=user, text=msg.text, buttons=msg.buttons, reply_markup=reply_markup, is_save_active_message_id=is_save_active_message_id, ) if i < len(messages) - 1: await asyncio.sleep(D_TIME_CHAT) async def send_bale_message( self, user: BaleUser, text: str, buttons: List[List[BaleButton]] | List[List[Dict]] = [], reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, is_save_active_message_id=False, ): payload = { "chat_id": user.chat_id, "text": text, "reply_markup": { # "keyboard": reply_markup, "resize_keyboard": True, "one_time_keyboard": False, }, } if buttons: payload["reply_markup"]["inline_keyboard"] = await self.serialize(buttons) try: r = requests.post(self.send_message_url, json=payload) print(f"Send Message Status code:", r.status_code) r = r.json() if is_save_active_message_id: user.active_message_id = int(r["result"]["message_id"]) except Exception: # print("ERROR in Send Message Bale API Error:", r.status_code, r) print("ERROR in Send Message Status code:") traceback.print_exc() class BaleBotUI(BaleBotBase): """ مسئول نمایش «در حال تحلیل…» و پاک‌کردن پیام اصلی در زمان لغو یا پایان کار است. برای سرگرم کردن کاربر یک روند کاملا مجزا برای نشان دادن پیام و حذف در انتها """ def __init__( self, send_message_url: str, delete_message_url: str, update_message_url: str, delay_time: float, ): super().__init__( send_message_url=send_message_url, delete_message_url=delete_message_url, update_message_url=update_message_url, ) """ :param bot: نمونهٔ BaleBotCore که متدهای API را در اختیار دارد :param user: کاربری که پیامش در حال پردازش است """ self.delay_time = delay_time self.done = False self._heartbeat_task: Dict[str, asyncio.Task] = {} # ------------------------------------------------------------------ # توابعی که در والد BaleBotBase هستند # ------------------------------------------------------------------ async def delete_bale_message(self, *args, **kwargs): await super().delete_bale_message(*args, **kwargs) async def send_message_helper(self, *args, **kwargs): await super().send_message_helper(*args, **kwargs) # ------------------------------------------------------------------ # توابع خود کلاس # ------------------------------------------------------------------ async def active( self, user: BaleUser, ): await self.send_message_helper( user=user, text=self.main_text[0], update_message=False, is_save_active_message_id=True, ) # ------------------------------------------------------------------ # متدهای عمومی # ------------------------------------------------------------------ def cancel( self, user: BaleUser, ): """ متوقف کردن حلقهٔ heartbeat و پاک‌کردن پیام اصلی """ self.done = True _user_task = self._heartbeat_task.get(user.uc_id, None) if _user_task: self._heartbeat_task[user.uc_id].cancel() # اگر هنوز در حال اجرا باشد # پاک‌کردن پیام اصلی asyncio.create_task( self.delete_bale_message( chat_id=user.chat_id, message_id=user.active_message_id, ) ) self.main_text = ["در حال تحلیل"] def create( self, user: BaleUser, main_text: str | list = ["در حال تحلیل"], waiting_list: list = [], ): """ شروع حلقهٔ heartbeat """ if isinstance(main_text, str): self.main_text = [main_text] elif isinstance(main_text, list): self.main_text = main_text print(f"create create") self._heartbeat_task[user.uc_id] = asyncio.create_task( self.heartbeat_loop(user=user, waiting_list=waiting_list) ) # ------------------------------------------------------------------ # حلقهٔ heartbeat # ------------------------------------------------------------------ async def heartbeat_loop(self, user: BaleUser, waiting_list=[]): """ هر 0.2 ثانیه متن «در حال تحلیل…» را به‌روزرسانی می‌کند. """ await self.active(user=user) if len(waiting_list) == 0: waiting_list = [".", "..", "...", "...."] while not self.done: text = random.choice(self.main_text) for j in waiting_list: # ویرایش پیام _text = f"{j} {text}" await self.send_message_helper( user=user, text=_text, update_message=True, is_save_active_message_id=True, ) await asyncio.sleep(self.delay_time) # صبر 5 ثانیه # await asyncio.sleep(0.2) class StackManager: # --- push مرحله جدید def push( self, output_pure_data, user: BaleUser, step_name: str, metadata: Dict, output_formed_data: Union[PreviousMessage, None] = None, input_data=None, runtime_ms: int = -1, ): """ وارد کردن داده داخل پشته رفتار: اگر input_data برابر با None بود پس داده از مرحله قبل آمده است پس index مرحله قبل را در این فیلد وارد میکنیم """ try: if not input_data: input_data = len(user.stack) user.stack.append( StackItem( index=len(user.stack), input_data=input_data, output_pure_data=output_pure_data, output_formed_data=output_formed_data, step_name=step_name, metadata=metadata, runtime_ms=runtime_ms, ) ) except Exception as e: print(f"StackManager>push {traceback.print_exc()}") # --- آخرین مرحله def last_item(self, user: BaleUser) -> StackItem | None: """گرفتن آخرین داده داخل پشته""" return user.stack[-1] if user.stack else None def get_last_item_metadata(self, user: BaleUser) -> Dict | None: """گرفتن آخرین داده داخل پشته""" return dict(user.stack[-1].metadata) if user.stack else None # --- مرحله قبل def prev_item(self, user: BaleUser) -> StackItem | None: """گرفتن داده مرحله قبل از پشته""" return user.stack[-2] if len(user.stack) >= 2 else None # --- گرفتن مرحله با نام def get(self, user: BaleUser, name: str) -> StackItem | None: """گرفتن داده بر اساس نام داده از پشته""" for item in reversed(user.stack): if item.step_name == name: return item return None # --- حذف مراحل اضافی (cleanup) def cleanup(self, user: BaleUser): """حذف پشته""" user.stack = [] # --- ثبت runtime def set_runtime(self, user: BaleUser, index: int, runtime_ms: float): """وارد کردن runtime""" if 0 <= index < len(user.stack): user.stack[index].runtime_ms = runtime_ms class BaleBotCore(BaleBotBase): """ input → set_state → render_user_state """ def __init__( self, user_manager: UserManager, operation: Operation, es_index_name: str, es_helper: ElasticHelper, formatter: Formatter, back_end_url: str, request_manager: RequestManager, send_message_url: str, delete_message_url: str, update_message_url: str, ): super().__init__( send_message_url=send_message_url, delete_message_url=delete_message_url, update_message_url=update_message_url, ) self.ui_handler = BaleBotUI( delay_time=D_TIME_UI_TEMP_CHAT, send_message_url=send_message_url, delete_message_url=delete_message_url, update_message_url=update_message_url, ) self.stack: List[StackItem] = [] self.freeze_limit = REQUEST_FREEZE_LIMIT self.operator = operation self.formatter = formatter self.request_manager = request_manager self.es_helper = es_helper self.es_index_name = es_index_name self.user_manager = user_manager self.max_limit = MAX_RAG_LIMIT self.back_end_url = back_end_url # self.user_state = self.make_state() @property def stack_manager(self) -> StackManager: return StackManager() # ------------------------------------------------------------------ # توابعی که در والد BaleBotBase هستند # ------------------------------------------------------------------ async def delete_bale_message(self, **kwargs): await super().delete_bale_message(**kwargs) async def update_bale_message(self, *args, **kwargs): await super().update_bale_message(*args, **kwargs) async def send_message_helper( self, user: BaleUser, update_message: bool = False, text: str = None, chunked_text: List[str] = None, structured_text: List = None, end_buttons: List = [], reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, is_save_active_message_id: bool = False, ): await super().send_message_helper( user=user, update_message=update_message, text=text, chunked_text=chunked_text, structured_text=structured_text, end_buttons=end_buttons, reply_markup=reply_markup, is_save_active_message_id=is_save_active_message_id, ) async def send_bale_message(self, *args, **kwargs): await super().send_bale_message(*args, **kwargs) # ------------------------------------------------------------------ # توابع همین کلاس # ------------------------------------------------------------------ async def render_user_state(self, user: BaleUser): """Bus سیستم""" try: _state = user.state_detail.state if user.state_detail else None _input_query = user.input_query if user.input_query else "" print(f"user.input_query.render_user_state >{_input_query}<") print(f"user.state >{_state}<") if user.input_query == "" and user.state_detail.message != "": print(f"") await self.send_message_helper( user=user, text=user.state_detail.message, end_buttons=user.state_detail.end_buttons, ) return {"ok": True} if user.state_detail.handler: user.is_processing_lock = True # print(f'fdffffffffff {user.state_detail.handler}') handler = getattr(self, user.state_detail.handler) await handler(user) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) return {"ok": True} async def render_update(self, update: BaleUpdate): """ مهمترین تابع ربات بله ورودی شما یا به صورت متن ساده هست یا بصورت query render_message : متن render_callback : کوئری """ # 2️⃣ ساخت یا بازیابی user user = self.user_manager.get_or_create(update) # print(f"render_update user -> {user.uc_id} user.is_processing_lock {user.is_processing_lock }") if user.is_processing_lock == False: if update.message: user.message_limit = 0 user.input_query = str(user.update.message.text or "").strip() # async def render_message(self, user: BaleUser): if user.input_query in ("/start", "main", "بازگشت به خانه"): user.input_query = "" await self.handle_main(user) return {"ok": True} elif user.input_query == "جستجو": user.state_detail = STATE_CONFIG["search_in_law"] user.input_query = "" elif user.input_query == "گفتگو": user.state_detail = STATE_CONFIG["chat_in_law"] user.input_query = "" # --- WorkFlow Logic by State await self.render_user_state(user) return {"ok": True} if update.callback_query: """ اینجا فقط باید مرحله و state عوض شود و داده ای وارد نمیشود!!! برای سوال پرسیدن از کاربر و ... """ user.message_limit = 0 user.is_call_back_query = True user.call_back_query = user.update.callback_query.data # user.update.callback_query.data call_back_query = user.call_back_query if call_back_query == "main": await self.handle_main(user) return {"ok": True} elif call_back_query.startswith("show_stack_data:"): stack_data_name = call_back_query.split(":")[1] await self.show_stack_data( user=user, _data_key_name=stack_data_name ) return {"ok": True} elif call_back_query.startswith("subject_unities:"): _, _type, _id = call_back_query.split(":") if _type == "qq": map_qq_qs = self.stack_manager.get_last_item_metadata(user)[ "map_qq_qs" ] qs_data = map_qq_qs[_id] print(f"qs_data {qs_data}") if len(qs_data) == 1: print("A" * 15) user.call_back_query = f"subject_unities:qs:{list(qs_data)[0].db_rule.section_id}" user.state_detail = STATE_CONFIG[ "subject_unities_to_evalution" ] else: print("B") _button = [] for i, item in enumerate(qs_data, start=1): _button.append( [ { "text": i, "callback_data": f"subject_unities:qs:{item.db_rule.section_id}", } ] ) _t1 = f"برای ادامه یک مورد را از این قانون انتخاب کنید{item.db_rule.qanon_title} انتخاب کنید" print("B" * 10) print(f"_t1 {_t1}") print("B" * 10) await self.send_message_helper( user=user, text=_t1, end_buttons=_button, ) print("C" * 10) return {"ok": True} elif _type == "qs": user.state_detail = STATE_CONFIG["subject_unities_to_evalution"] # Dynamic Change Options elif user.call_back_query == "more_limit": user.limit += 10 user.input_query = user.last_input_query if user.state_detail == STATE_CONFIG["logical_chat_in_law"]: user.state_detail = STATE_CONFIG["chat_in_law"] # Dynamic Change Options elif user.call_back_query == "more_effort": user.effort = "medium" user.input_query = user.last_input_query elif user.call_back_query == "logical_chat_in_law": # user.effort = "medium" user.input_query = user.last_input_query user.state_detail = STATE_CONFIG["logical_chat_in_law"] # Dynamic Change State else: print(f"Dynamic Change State -> {user.state_detail}") user.state_detail = STATE_CONFIG[user.call_back_query] await self.render_user_state(user) return {"ok": True} else: if user.call_back_query == "main": await self.handle_main(user) return {"ok": True} # beta-mode if user.message_limit < self.freeze_limit: await self.send_message_helper( user=user, text=BUSY_TEXT, end_buttons=[[STOP_PROCCESS]] ) user.message_limit += 1 return {"ok": True} async def handle_previous_message(self, user: BaleUser): """ دسترسی به پیام مرحله قبل """ print("=============================== handle_previous_message") try: # if user.call_back_query == "previous_message": if user.previous_message: _type = user.previous_message.type if _type == "text": await self.send_message_helper( user=user, text=user.previous_message.message, end_buttons=user.previous_message.buttons, ) elif _type == "structured_text": await self.send_message_helper( user=user, structured_text=user.previous_message.message, end_buttons=user.previous_message.buttons, ) elif _type == "chunked_text": await self.send_message_helper( user=user, chunked_text=user.previous_message.message, end_buttons=user.previous_message.buttons, ) else: await self.handle_main(user) else: await self.handle_main(user) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_main(self, user: BaleUser): # if user.input_query != "": # "اگر کاربر پیام داد در حالت عادی به چت منتقل شود" # user.state_detail = STATE_CONFIG["chat_in_law"] # await self.render_user_state(user) # return {"ok": True} """ اگر کاربر پیامی نداده بود صفحه اصلی نمایش داده شود و داده ها پاک شود """ # ریست کردن پشته‌ها و نتایج user.last_query = "" user.last_result = "" user.stack.clear() user.input_query = "" user.sub_state = "" user.limit = 10 user.effort = "low" user.active_message_id = 0 user.message_limit = 0 message = "من ربات تست ام" await self.send_message_helper( user=user, text=message, # end_buttons=MAIN_BUTTON, ) async def handle_search_in_law(self, user: BaleUser): user.is_processing_lock = True print(f"handle_search_in_law -> {user.input_query}") self.ui_handler.create(user=user, main_text="در حال جستجو") _buttons = [[HOME_BUTTON]] try: async for result in self.operator.stream_search_in_law( query=user.input_query, limit=user.limit, rerank_model="BAAI/bge-reranker-v2-m3", embed_model="BAAI/bge-m3", ): result = BMNewSemanticSearchOutput.model_validate(result["result"]) if isinstance(result, BMNewSemanticSearchOutput): self.ui_handler.cancel(user) chunked_text_ = self.formatter.form_search_in_law( header=f"برای پرسش: {user.input_query}\n\n", footer=f"مدت زمان بردارسازی {result.embed_model_time} مدت زمان تشابه یابی {result.cosine_similarity_time} مدت زمان تشابه یابی دقیق {result.rerank_time}", sections=result.result, ) if user.limit < self.max_limit: _buttons.insert(0, [MORE_LIMIT_BUTTON]) await self.send_message_helper( user=user, chunked_text=chunked_text_, end_buttons=_buttons, ) # else: # heartbeat_task except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: data = { "state": "search_in_law", "contents": [i.content for i in result.result], "scores": [i.score for i in result.result], "ref_ids": [i.id for i in result.result], "user_input": user.input_query, "metadata": { "embed_model_time": result.embed_model_time, "cosine_similarity_time": result.cosine_similarity_time, "rerank_time": result.rerank_time, }, } data["metadata"].update(result.metadata) await self.save_to_db(user, data=data) user.is_processing_lock = False user.last_input_query = user.input_query user.input_query = "" self.ui_handler.cancel(user) return {"ok": True} async def handle_search_in_law_rules(self, user: BaleUser): user.is_processing_lock = True _status_show_map = { "total_process": "تعداد کل موارد مورد بررسی", "total_processed": "تعداد موارد بررسی شده", } total_process = len(user.last_result) * user.limit es_data = None es_metadata = None print( f"user.limit {user.limit}", f"len(user.last_result) {len(user.last_result)}", f"total_process*****s {total_process}", ) try: if user.last_result: print(f"/////////////user.last_result", len(user.last_result)) await self.send_message_helper( user=user, text="در حال جستجو ...", update_message=True, is_save_active_message_id=True, ) _buttons = [ [HOME_BUTTON], [{"text": "بررسی مغایرت", "callback_data": "not_yet"}], ] filter_qanon_ids = [] # filter_qanon_ids باید از کاربر پرسیده شود async for step_data in self.operator.stream_rule_semantic_search( queries=[i.model_dump() for i in user.last_result], filter_qanon_ids=filter_qanon_ids, limit_rerank=user.limit, ): status = step_data.get("status") data = step_data.get("data") metadata = step_data.get("metadata") # time if status == "end": # print(f'*****data {data}') es_data = data[status] es_metadata = metadata data = [ SemanticSearchP2P.model_validate(i) for i in data[status] ] _res = self.formatter.form_search_in_law_rules( header="نتایج جستجوی اجزاء در قانون\n", body=data, ) print(f"len(_res) {len(_res) }") if user.limit < self.max_limit: _buttons.insert(0, [MORE_LIMIT_BUTTON]) if len(_res) == 1: await self.send_message_helper( user=user, chunked_text=_res, update_message=True, end_buttons=_buttons, ) else: for i, item in enumerate(_res): print(i) if i == 0: print("start-batch") await self.send_message_helper( user=user, text=item, update_message=True, ) elif i == len(_res) - 1: print("end-batch") await self.send_message_helper( user=user, text=item, end_buttons=_buttons, ) else: print("middle-batch") await self.send_message_helper( user=user, text=item, ) user.active_message_id = 0 else: # print( # f"status {status}\nText {type(_status_show_map[status])}\ndata type {type(data[status])}" # ) if status == "total_processed": _result = ( _status_show_map[status] + f": {data['total_processed']}/{total_process}" ) elif status == "total_process": _result = ( _status_show_map[status] + f": {data['total_process']}" ) await self.send_message_helper( user=user, text=_result, update_message=True, is_save_active_message_id=True, ) else: print("-----------------------No Found Any Last_data") if es_data: _data = { "state": "search_in_law_rules", "result": es_data, "metadata": {}, } _data["metadata"].update(es_metadata) await self.save_to_db(user, data=_data) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_rule_making(self, user: BaleUser): user.is_processing_lock = True self.ui_handler.create(user=user, main_text="در حال بررسی") _buttons = [ [ { "text": "مشابهت یابی در قانون", "callback_data": "not_yet", # "search_in_law_rules", } ], [HOME_BUTTON], ] _max_token = None try: f_data = None f_metadata = None async for step_data in self.operator.stream_rule_making( query=user.input_query, llm_name="gpt-oss-20b", effort=user.effort ): status = step_data.get("status") data = step_data.get("data") metadata = step_data.get("metadata") # time if status == "end": self.ui_handler.cancel(user) f_data = data[status] f_metadata = metadata res_ = await self.formatter.form_rule_making( _input=data[status], footer=f"تعداد توکن های مصرف شده {_max_token}", ) if user.effort != "medium": _buttons.insert(0, [MORE_EFFORT_BUTTON]) await self.send_message_helper( user=user, chunked_text=res_, end_buttons=_buttons, ) user.last_result = [ InputRule.model_validate(i) for i in data[status] ] if user.is_vip: _result2 = await self.formatter.form_chat( header=" *reasoning* :\n", llm_text=metadata["llm_reason"], ) await self.send_message_helper( user=user, chunked_text=_result2, ) else: _max_token = data["max_token"] if _max_token: self.ui_handler.main_text = [ f"تعداد توکن های مصرف شده {_max_token}", ] except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: if f_data: _data = { "state": "rule_making", "result": f_data, "llm_reason": metadata["llm_reason"], "metadata": {"llm_token": _max_token}, } print(f"_data {_data}") _data["metadata"].update(f_metadata) await self.save_to_db(user, data=_data) user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False self.ui_handler.cancel(user) return {"ok": True} async def handle_chat_in_law(self, user: BaleUser): user.is_processing_lock = True self.ui_handler.create( user=user, main_text=[ "در حال تحلیل", "در حال طبقه بندی", "در حال تفکر", "ساختار بندی", ], ) # print('asyncio') # sleep(80) # print('wait') _buttons = [[HOME_BUTTON]] try: async for _result in self.operator.stream_chat_in_law( query=user.input_query, effort=user.effort, limit=user.limit ): print(f"handle_chat_in_law **--**---*--*-*-*-***-**-*-*-*-** ") self.ui_handler.cancel(user) result = ChatLaw.model_validate(_result["result"]) user.last_result = result text_result = self.formatter.form_law_chat(result.answer) if result.answer_type == "legal_question": _b = [ { "text": "بررسی عمیق تر", "callback_data": "logical_chat_in_law", } ] if user.limit < self.max_limit: _b.append( { "text": "بررسی گسترده تر", "callback_data": "more_limit", } ) _buttons.insert(0, _b) await self.send_message_helper( user=user, chunked_text=text_result, end_buttons=_buttons ) if user.is_vip: print(f"user.is_vip {user.is_vip}") reason_result = await self.formatter.form_chat( header="model-reason", llm_text=result.llm_reason ) await self.send_message_helper( user=user, chunked_text=reason_result, ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: print(f"result {type(result)}") print(f"result.answer {type(result.answer)}") print(f"result.ref_ids {type(result.ref_ids)}") print(f"result.answer_type {type(result.answer_type)}") _data = { "state": "chat_in_law", "result": result.answer, "llm_reason": result.llm_reason, "metadata": { "ref_ids": result.ref_ids, "answer_type": result.answer_type, }, } await self.save_to_db(user=user, data=_data) user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False self.ui_handler.cancel(user) async def handle_qanon_title_repeat(self, user: BaleUser): user.is_processing_lock = True try: _result: List[TitleRepeat] = await self.operator.title_repeated( qanontitle=user.input_query ) print(f"*/*/*/*/*/*/*/*/*/*/*/result_ {_result}") _res = await self.formatter.form_title_repeated(_input=_result) _buttons = [[HOME_BUTTON]] await self.send_message_helper( user=user, chunked_text=_res, end_buttons=_buttons ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_talk(self, user: BaleUser): user.is_processing_lock = True try: _result = await self.operator.talk(query=user.input_query) await self.send_message_helper( user=user, text=_result, end_buttons=MAIN_BUTTON ) except Exception as e: print("ERROR in handle_talk:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False return {"ok": True} async def handle_logical_chat_in_law(self, user: BaleUser): user.is_processing_lock = True self.ui_handler.create(user=user, main_text=["در حال استخراج اجزاء"]) f_data = "" _button = [[HOME_BUTTON]] len_rules = 0 try: metadata = user.last_result["metadata"] except: metadata = {} try: async for _data in self.operator.stream_logical_chat_in_law( query=user.input_query, effort=user.effort, metadata=metadata, limit=20 ): status = _data.get("status") data = _data.get("data") # _metadata = _data.get("metadata") print( f"status {status}", ) if status == "rules": len_rules = data self.ui_handler.main_text = [f"تعداد اجزاء استخراج شده {data}"] elif status == "semantic": self.ui_handler.main_text = [ f"مشابهت‌ یابی اجزاء {user.limit*len_rules}/{data}" ] elif status == "llm": self.ui_handler.main_text = ["ساخت پاسخ نهایی"] self.ui_handler.cancel(user) _header = f"📝 پاسخ نهایی:\n" f_data = data["text"] response = await self.formatter.form_llm_answer_chat( data["text"], header=_header ) await self.send_message_helper( user, chunked_text=response, end_buttons=_button, ) if user.is_vip: response2 = await self.formatter.form_chat( data["llm_reason"], header="دلیل مدل برای خروجی" ) await self.send_message_helper( user, chunked_text=response2, ) # break except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.is_processing_lock = False user.input_query = "" self.ui_handler.cancel(user) _data = { "state": "logical_chat_in_law", "result": f_data, "llm_reason": data["llm_reason"], "metadata": {}, } # _data['metadata'].update(_metadata) await self.save_to_db(user=user, data=_data) return {"ok": True} async def handle_conflict_qanon_asasi(self, user: BaleUser): user.is_processing_lock = True user.active_message_id = 0 try: print(f"effort=user.effort {user.effort}") _buttons = [[HOME_BUTTON]] if user.effort == "low": current_text = "*نتیجه بررسی مغایرت با اصول مهم قانون اساسی*:\n\n" seen_k = set() _buttons.insert(0, [MORE_EFFORT_BUTTON]) async for _data in self.operator.conflict_qanon_asasi_low( user.input_query, user.effort, user.limit ): status = _data.get("status") data = _data.get("data") print(f"status {type(status)}-{status}") data = {k: v for k, v in data.items() if k not in seen_k} print(f"data {len(data)} seen_k {len(list(seen_k))}") text = await self.formatter.form_constitution_low(data, status) text = text + "\n\n" seen_k.update(data.keys()) if status == 11: await self.send_message_helper( user, text=current_text, end_buttons=_buttons, update_message=True, is_save_active_message_id=True, ) else: text = [text[i : i + 50] for i in range(0, len(text), 50)] for word in text: current_text += word await self.send_message_helper( user, text=current_text, update_message=True, is_save_active_message_id=True, ) text = "" elif user.effort == "medium": print(f"==== handle_constitution ====") # chunked_response = await self.formatter.form_rule_making( # _input=data # ) await self.send_message_helper( user=user, text="در دست توسعه...", ) # async for _data in self.operator.conflict_qanon_asasi_steps( # user.input_query, user.effort, user.limit # ): # status = _data.get("status") # data = _data.get("data") # print(f"status {type(status)}-{status}") # print(f"semantich_search {user.effort}") # _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" # response = await self.formatter.form_ss_rules(data, header=_header) # await self.send_message_helper(user, chunked_text=response) # print(f"subject_unities {user.effort}") # _header = "نتایج اولیه مغایرت های احتمالی :\n" # print(f"data type {type(data)}") # try: # print(f"data -> RuleRelation") # _data = [RuleRelation.model_validate(i) for i in data] # except: # print(f"data -> String") # _data = data # chunked_text, _button, mapping_qs = ( # await self.formatter.form_subject_unity(_data, header=_header) # ) # await self.send_message_helper( # user=user, # chunked_text=chunked_text, # end_buttons=_button, # ) # user.subject_unities = mapping_qs except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False return {"ok": True} async def handle_conflict_law_writing_policy(self, user: BaleUser): user.is_processing_lock = True _buttons = [[HOME_BUTTON]] self.ui_handler.create( user=user, main_text=[ "در حال تحلیل", "در حال طبقه بندی", "در حال تفکر", "ساختار بندی", ], ) try: _result = await self.operator.conflict_law_writing_policy( query=user.input_query, effort=user.effort ) result = await self.formatter.from_law_writing_policy( _input_dict=_result, header="نتیجه بررسی با سیاست های قانون گذاری" ) if user.effort != "medium": _buttons.insert(0, [MORE_EFFORT_BUTTON]) self.ui_handler.cancel(user) await self.send_message_helper( user=user, chunked_text=result, end_buttons=_buttons ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False return {"ok": True} async def handle_subject_unities_to_evalution(self, user: BaleUser): user.is_processing_lock = True _buttons = [[HOME_BUTTON, PREVIOUS_BUTTON]] _b = [] try: qs_id = user.call_back_query.split(":")[-1] # subject_unities:qs:qs2794301 print(f"/************* handle_subject_unities_to_evalution qs_id {qs_id}") map_qq_qs = self.stack_manager.get(user, name="subject_unities").metadata[ "map_qq_qs" ] qs_data = None for k, v in map_qq_qs.items(): for item in v: if item.db_rule.section_id == qs_id: qs_data = item break qs_data = RuleRelation.model_validate(qs_data) self.ui_handler.create( user=user, main_text=[ "در حال بررسی", "تفکر", "جمع بندی", ], ) print(f"Send 2 /conflict/unity_eval ") async for step_data in self.request_manager.stream_result( payload={ "rule_relation": qs_data.model_dump(), "effort": user.effort, "mode_type": "bale", }, url="/conflict/unity_eval", ): step = step_data.get("step") data = step_data.get("data") metadata = step_data.get("metadata") # is_last = step_data.get("is_last") print(f"==== handle_advanced_check_conflict ====") print(f"step {step}") # print(f"data {data}") print("==" * 20) if step == "ConflictDetection": print(f"*********************Step2 {user.effort}") _header = "نتیجه تشخیص مغایرت" self.ui_handler.main_text = [_header] _data = RuleRelation.model_validate(data) chunked_text = await self.formatter.form_conflict_detection(_data) _metadata = {} _metadata["reasoning_content"] = metadata["reasoning_content"] _metadata["total_token"] = ( f"کل توکن های مصرف شده: {metadata['total_token']}" ) self.stack_manager.push( user=user, output_pure_data=_data, step_name="ConflictDetection", metadata=_metadata, output_formed_data=PreviousMessage( message=chunked_text, type="chunked_text" ), ) _b.append( BaleButton( text="تشخیص مغایرت", callback_data="show_stack_data:ConflictDetection", ) ) elif step == "ConflictTypeDetection": print(f"*********************Step3") _header = "نتیجه تشخیص نوع مغایرت" self.ui_handler.main_text = [_header] _data = RuleRelation.model_validate(data) chunked_text = await self.formatter.form_conflict_type_detection( _data ) _metadata = {} _metadata["reasoning_content"] = metadata["reasoning_content"] _metadata["total_token"] = ( f"کل توکن های مصرف شده: {metadata['total_token']}" ) self.stack_manager.push( user=user, output_pure_data=_data, step_name="ConflictTypeDetection", metadata=_metadata, output_formed_data=PreviousMessage( message=chunked_text, type="chunked_text" ), ) _b.append( BaleButton( text="تشخیص نوع مغایرت", callback_data="show_stack_data:ConflictTypeDetection", ) ) elif step == "RelationIdentification": print(f"*********************Step4") _header = "ارتباط یابی مغایرتی" self.ui_handler.main_text = [_header] _data = RuleRelation.model_validate(data) chunked_text = await self.formatter.form_relation_identification( _data, header=_header ) _metadata = {} _metadata["reasoning_content"] = metadata["reasoning_content"] _metadata["total_token"] = ( f"کل توکن های مصرف شده: {metadata['total_token']}" ) self.stack_manager.push( user=user, output_pure_data=_data, step_name="RelationIdentification", metadata=_metadata, output_formed_data=PreviousMessage( message=chunked_text, type="chunked_text" ), ) _b.append( BaleButton( text="ارتباط یابی مغایرت", callback_data="show_stack_data:RelationIdentification", ) ) _header = "نهایی سازی" self.ui_handler.main_text = [_header] elif step == "Evaluation": print(f"*********************Step5") self.ui_handler.cancel(user) _data = Evaluation.model_validate(data) chunked_text = await self.formatter.form_evaluation( _data, header=_header ) _metadata = {} _metadata["reasoning_content"] = metadata["reasoning_content"] _metadata["total_token"] = ( f"کل توکن های مصرف شده: {metadata['total_token']}" ) print(f"chunked_text {type(chunked_text)} == {chunked_text}") self.stack_manager.push( user=user, output_pure_data=_data, step_name="Evaluation", metadata=_metadata, output_formed_data=PreviousMessage( message=chunked_text, buttons=_buttons, type="text" ), ) _buttons.append(_b) await self.send_message_helper( user=user, text=chunked_text, end_buttons=_buttons ) else: print(f"eerror in uknown step --> {step}") except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False return {"ok": True} async def show_stack_data( self, user: BaleUser, _data_key_name=None, metadata_key=None ): user.is_processing_lock = True _buttons = [[HOME_BUTTON, PREVIOUS_BUTTON]] try: print(f"_data_key_name ={_data_key_name}=") if _data_key_name: data = self.stack_manager.get(user, _data_key_name) else: data = self.stack_manager.last_item(user) # data = self.stack_manager.get_last_item_metadata(user) if data: if data.output_formed_data: if data.output_formed_data.type == "text": await self.send_message_helper( user=user, text=data.output_formed_data.message, end_buttons=data.output_formed_data.buttons, ) elif data.output_formed_data.type == "chunked_text": await self.send_message_helper( user=user, chunked_text=data.output_formed_data.message, end_buttons=data.output_formed_data.buttons, ) elif data.output_formed_data.type == "structured_text": await self.send_message_helper( user=user, structured_text=data.output_formed_data.message, end_buttons=data.output_formed_data.buttons, ) else: if _data_key_name == "rule_making": chunked_text = await self.formatter.form_rule_making( _input=data.output_pure_data ) elif _data_key_name == "semantich_search": chunked_text = await self.formatter.form2_ss_rules( data.output_pure_data, header="نتایج جستجوی معنایی " ) elif _data_key_name == "ConflictDetection": chunked_text = await self.formatter.form_conflict_detection( data.output_pure_data, ) elif _data_key_name == "ConflictTypeDetection": chunked_text = ( await self.formatter.form_conflict_type_detection( data.output_pure_data ) ) elif _data_key_name == "RelationIdentification": chunked_text = ( await self.formatter.form_relation_identification( data.output_pure_data, ) ) elif _data_key_name == "Evaluation": chunked_text = await self.formatter.form_evaluation( data.output_pure_data ) else: chunked_text = ["متاسفانه؛ داده مرتبط یافت نشد"] print(f"_data_key_name {_data_key_name}") print( f"chunked_text {type(chunked_text)} -index-0 {type(chunked_text[0])}" ) await self.send_message_helper( user=user, chunked_text=chunked_text, end_buttons=_buttons ) else: await self.send_message_helper( user=user, text="متاسفانه؛ داده مرتبط یافت نشد" ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False return {"ok": True} async def handle_summary_conflict_all_qavanin( self, user: BaleUser, _button: List = [[HOME_BUTTON]] ): b1 = { "text": "اجزاء متن ورودی", "callback_data": "show_stack_data:rule_making", } b2 = { "text": "اجزاء مرتبط از قانون", "callback_data": "show_stack_data:semantich_search", } _button.append([b1, b2]) print(f"******************* handle_summary_conflict_all_qavanin") user.is_processing_lock = True self.ui_handler.create( user=user, main_text=[ "در حال بررسی", "تفکر", "جمع بندی", ], ) try: async for step_data in self.request_manager.stream_result( payload={ "section_content": user.input_query, "effort": user.effort, "limit": user.limit, "mode_type": "bale", }, url="/conflict/all_qanon/qs_unity", ): step = step_data.get("step") data = step_data.get("data") _metadata = step_data.get("metadata", {}) print(f"==== handle_conflict_all_qavanin step {step} ====") if step == "rule_making": self.stack_manager.push( user=user, input_data=user.input_query, output_pure_data=data, step_name="rule_making", metadata=_metadata, ) # List[InputRule] self.ui_handler.main_text = [ f"تعداد اجزاء تشخیص داده شده در متن ورودی {len(data)}" ] elif step == "semantich_search": self.stack_manager.push( user=user, output_pure_data=[ SemanticSearchP2P.model_validate(i) for i in data ], step_name="semantich_search", metadata=_metadata, ) # List[SemanticSearchP2P] self.ui_handler.main_text = [ f"تعداد مستندات مرتبط یافت شده در قوانین {len(data)}", "در حال بررسی مغایرت های احتمالی در قوانین", ] elif step == "subject_unities": self.ui_handler.cancel(user) _header = "نتایج بررسی وحدت موضوعی :\n" try: _data = [RuleRelation.model_validate(i) for i in data] except: _data = data structured_text, map_qq_qs = ( await self.formatter.structed_form_subject_unity( _input=_data, header=_header, end_button=_button ) ) await self.send_message_helper( user=user, structured_text=structured_text, ) user.last_result = _data user.previous_message = PreviousMessage( message=structured_text, type="structured_text", ) _metadata.update({"map_qq_qs": map_qq_qs}) self.stack_manager.push( user=user, output_pure_data=_data, output_formed_data=PreviousMessage( message=structured_text, type="structured_text" ), step_name="subject_unities", metadata=_metadata, ) except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False self.ui_handler.cancel(user) return {"ok": True} async def handle_full_conflict_all_qavanin( self, user: BaleUser, _button: List = [[HOME_BUTTON]] ): print(f";;;;;;;;;;;;;;;;;;;; handle_full_conflict_all_qavanin") user.is_processing_lock = True self.ui_handler.create( user=user, main_text=[ "در حال بررسی", "تفکر", "جمع بندی", ], ) try: async for step_data in self.request_manager.stream_result( payload={ "section_content": user.input_query, "effort": user.effort, "limit": user.limit, "mode_type": "bale", }, url="/conflict/all_qanon/qs_unity", ): step = step_data.get("step") data = step_data.get("data") metadata = step_data.get("metadata") print(f"==== handle_conflict_all_qavanin ====") if step == "rule_making": chunked_response = await self.formatter.form_rule_making( _input=data ) await self.send_message_helper( user=user, chunked_text=chunked_response, ) elif step == "semantich_search": print(f"semantich_search {user.effort}") _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" response = await self.formatter.form_ss_rules(data, header=_header) await self.send_message_helper(user, chunked_text=response) elif step == "subject_unities": self.ui_handler.cancel(user) print(f"subject_unities {user.effort}") _header = "نتایج اولیه مغایرت های احتمالی :\n" print(f"data type {type(data)}") try: print(f"data -> RuleRelation") _data = [RuleRelation.model_validate(i) for i in data] except: print(f"data -> String") _data = data chunked_text, _button, mapping_qs = ( await self.formatter.form_subject_unity(_data, header=_header) ) await self.send_message_helper( user=user, chunked_text=chunked_text, end_buttons=_button, ) user.subject_unities = mapping_qs except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False self.ui_handler.cancel(user) return {"ok": True} async def handle_conflict_general_policy(self, user: BaleUser): user.is_processing_lock = True try: async for step_data in self.request_manager.stream_result( payload={ "section_content": user.input_query, "effort": user.effort, "limit": user.limit, "mode_type": "bale", }, url="/conflict/general_policy/qs_unity", ): step = step_data.get("step") data = step_data.get("data") is_first = step_data.get("is_first", False) is_last = step_data.get("is_last", False) _button = [[HOME_BUTTON]] print(f"==== handle_conflict_general_policy ====") if step == "rule_making": print(f"rule_making {user.effort}") chunked_response = await self.formatter.form_rule_making( _input=data ) await self.send_message_helper( user=user, update_message=False, chunked_text=chunked_response, ) elif step == "semantich_search": print(f"semantich_search {user.effort}") _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" response = await self.formatter.form_ss_rules(data, header=_header) await self.send_message_helper(user, chunked_text=response) elif step == "subject_unities": print(f"subject_unities {user.effort}") _header = "نتایج اولیه مغایرت های احتمالی :\n" print(f"data type {type(data)}") try: print(f"data -> RuleRelation") _data = [RuleRelation.model_validate(i) for i in data] except: print(f"data -> String") _data = data chunked_text, _button, mapping_qs = ( await self.formatter.form_subject_unity(_data, header=_header) ) await self.send_message_helper( user=user, chunked_text=chunked_text, end_buttons=_button, ) user.subject_unities = mapping_qs except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: # user.input_query = "" user.is_processing_lock = False return {"ok": True} async def handle_advanced_check_conflict(self, user: BaleUser): user.is_processing_lock = True print(f"handle_advanced_check_conflict======================================") try: print(f"user.call_back_query {user.call_back_query}") step, _type, qq_title = user.call_back_query.split(":") if _type == "qq": print("A") groups = {} for qanon_title, item in user.subject_unities.items(): if qanon_title == qq_title: for i in item: groups[qanon_title] = i.db_rule.section_id print("B") if len(groups) > 1: _button = [] for i, (k, v) in enumerate(groups.items(), start=1): _button.append( [{"text": i, "callback_data": f"subject_unities:qs:{v}"}] ) await self.send_message_helper( user=user, text=f"برای ادامه یک مورد را از این قانون انتخاب کنید{k} انتخاب کنید", end_buttons=_button, ) else: user.call_back_query = f"subject_unities:qs:{groups[qq_title]}" print("ccccc") elif _type == "qs": print("AAA@@@") content = None for k, v in user.subject_unities.items(): if v.db_rule.section_id == qq_title: content = v.model_dump() print(f"content qs -> {content}") async for step_data in self.request_manager.stream_result( payload={ "rule_relation": content, "effort": user.effort, "mode_type": "bale", }, url="/conflict/unity_eval", ): print(f"qs {user.call_back_query}") step = step_data.get("step") data = step_data.get("data") # is_last = step_data.get("is_last") _button = [[HOME_BUTTON]] print(f"==== handle_advanced_check_conflict ====") if step == "step2": print(f"*********************Step2 {user.effort}") _header = "نتایج اولیه مغایرت های احتمالی :\n" _data = RuleRelation.model_validate(data) chunked_text = await self.formatter.form_conflict_detection( _data, header=_header ) await self.send_message_helper(user=user, text=chunked_text) elif step == "step3": print(f"*********************Step3") _data = RuleRelation.model_validate(data) chunked_text = ( await self.formatter.form_conflict_type_detection( _data, header=_header ) ) await self.send_message_helper(user=user, text=chunked_text) elif step == "step4": print(f"*********************Step4") _data = RuleRelation.model_validate(data) chunked_text = ( await self.formatter.form_relation_identification( _data, header=_header ) ) await self.send_message_helper(user=user, text=chunked_text) elif step == "step5": print(f"*********************Step5") _data = Evaluation.model_validate(data) chunked_text = await self.formatter.form_evaluation( _data, header=_header ) await self.send_message_helper(user=user, text=chunked_text) else: print(f"eerror in uknown step --> {step}") except Exception as e: print("ERROR in handle_chat:", str(traceback.print_exc())) await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) finally: user.last_input_query = user.input_query user.input_query = "" user.is_processing_lock = False return {"ok": True} async def handle_stream_chat(self, user: BaleUser): """ stream_talk() ↓ async generator → chunk ↓ handle_stream_chat() ↓ send first message ↓ update_message (throttled) """ full_text = "" message_id = None last_update = 0 async for chunk in stream_talk(user.input_query): full_text += chunk now = time.time() # جلوگیری از اسپم API (مثلاً هر 0.6 ثانیه آپدیت) if now - last_update < 0.6: continue last_update = now if message_id is None: # اولین پیام message_id = await self.send_bale_message( user=user, text=full_text + " ▌", ) else: payload = BalePayload( chat_id=user.chat_id, message_id=message_id, text=full_text + " ▌" ) self.update_bale_message(user, payload) # آخر کار، بدون cursor if message_id: payload = BalePayload( chat_id=user.chat_id, message_id=message_id, text=full_text ) self.update_bale_message(user, payload) async def handle_beta(self, user: BaleUser): """ state: user.is_processing_lock: query: time.time first_name last_name """ try: text_1 = "سلام" text_2 = "سلام، عرض ادب و احترام" await self.send_message_helper( user=user, structured_text=[ ["این تست است ", [[{"text": "تستتت", "callback_data": "not_yet"}]]], [ "این تست اس2ت ", [[{"text": "تستت2ت", "callback_data": "not_yet"}]], ], ], end_buttons=[ [ {"text": "دکمه های نهایی", "callback_data": "not_yet"}, {"text": "دکمه های ن2هایی", "callback_data": "not_yet"}, {"text": "دکمه های ن23هایی", "callback_data": "not_yet"}, ], [{"text": "دکمه های آخر", "callback_data": "not_yet"}], ], ) except Exception as e: print("ERROR in handl_beta:", str(traceback.print_exc())) async def save_to_db(self, user: BaleUser, data: Dict): time_now = int(time.time()) _id = f"ble_{user.chat_id}_{time_now}" data.update( { "id": _id, "uc_id": user.uc_id, "message_id": user.active_message_id, "username": user.username, "is_bot": user.is_bot, "first_name": user.first_name, "last_name": user.last_name, "effort": user.effort, "limit": user.limit, "input_query": user.input_query, "timestamp": time_now, } ) try: es_res = self.es_helper.update_index_doc( is_update_state=False, index_name_o=self.es_index_name, eid=_id, data=data, ) # type_name, payload, request print(f"-- ES-Saved {es_res}") except Exception as e: print("++ failed save_to_Es ", str(traceback.print_exc()))