from router.bale.base_model import * from core.core import * def encode_uc(update: BaleUpdate) -> str: if update.message: user = update.message.from_user chat = update.message.chat elif update.callback_query: user = update.callback_query.from_user chat = update.callback_query.message.chat else: return "unknown" username = user.username or user.id chat_id = chat.id # ✅ فقط chat_id return f"{username}:{chat_id}" def decode_uc(uc_id: str) -> dict: """ ورودی: 'username:chat_id' یا 'user_id:chat_id' خروجی: {'username': ..., 'chat_id': ...} """ try: username, chat_id = uc_id.split(":", 1) return (username, int(chat_id) if chat_id.isdigit() else chat_id) except ValueError: raise ValueError(f"decode_uc") def extract_user_info(update: BaleUpdate) -> Dict: uc_id = encode_uc(update) if update.message: u = update.message.from_user return { "uc_id": str(uc_id), "chat_id": update.message.chat.id, "user_id": u.id, "username": u.username, "first_name": u.first_name, "last_name": u.last_name or "", "is_bot": u.is_bot, "update": update, } if update.callback_query: u = update.callback_query.from_user return { "uc_id": str(uc_id), "chat_id": update.callback_query.message.chat.id, "user_id": u.id, "username": u.username, "first_name": u.first_name, "last_name": "", "is_bot": u.is_bot, "update": update, } raise ValueError("No user info in update") class UserManager: def __init__(self): self.users: Dict[str, BaleUser] = {} self.list_vip_username = load_orjson( "/home/sabr/back_new/mj_bale_chat_test/mj_bale_chat/vip_username.json" ) self.temporary_data = load_orjson( "/home/sabr/back_new/mj_bale_chat_test/mj_bale_chat/temp.json" ) def get_or_create(self, update: BaleUpdate) -> BaleUser: user_data = extract_user_info(update) uc_id = user_data["uc_id"] if user_data["username"] in self.list_vip_username: user_data["is_vip"] = True if uc_id not in self.users: self.users[uc_id] = BaleUser( **user_data, ) user = self.users[uc_id] user.update = update return user