93 lines
2.5 KiB
Python
93 lines
2.5 KiB
Python
|
|
from router.bale.base_model import *
|
|
from core.core import *
|
|
|
|
def encode_uc(update: BaleUpdate) -> str:
|
|
if update.message:
|
|
user = update.message.from_user
|
|
chat = update.message.chat
|
|
|
|
elif update.callback_query:
|
|
user = update.callback_query.from_user
|
|
chat = update.callback_query.message.chat
|
|
|
|
else:
|
|
return "unknown"
|
|
|
|
username = user.username or user.id
|
|
chat_id = chat.id # ✅ فقط chat_id
|
|
|
|
return f"{username}:{chat_id}"
|
|
|
|
|
|
def decode_uc(uc_id: str) -> dict:
|
|
"""
|
|
ورودی: 'username:chat_id' یا 'user_id:chat_id'
|
|
خروجی: {'username': ..., 'chat_id': ...}
|
|
"""
|
|
|
|
try:
|
|
username, chat_id = uc_id.split(":", 1)
|
|
|
|
return (username, int(chat_id) if chat_id.isdigit() else chat_id)
|
|
|
|
except ValueError:
|
|
raise ValueError(f"decode_uc")
|
|
|
|
|
|
def extract_user_info(update: BaleUpdate) -> Dict:
|
|
uc_id = encode_uc(update)
|
|
if update.message:
|
|
u = update.message.from_user
|
|
return {
|
|
"uc_id": str(uc_id),
|
|
"chat_id": update.message.chat.id,
|
|
"user_id": u.id,
|
|
"username": u.username,
|
|
"first_name": u.first_name,
|
|
"last_name": u.last_name or "",
|
|
"is_bot": u.is_bot,
|
|
"update": update,
|
|
}
|
|
|
|
if update.callback_query:
|
|
u = update.callback_query.from_user
|
|
return {
|
|
"uc_id": str(uc_id),
|
|
"chat_id": update.callback_query.message.chat.id,
|
|
"user_id": u.id,
|
|
"username": u.username,
|
|
"first_name": u.first_name,
|
|
"last_name": "",
|
|
"is_bot": u.is_bot,
|
|
"update": update,
|
|
}
|
|
|
|
raise ValueError("No user info in update")
|
|
|
|
|
|
class UserManager:
|
|
def __init__(self):
|
|
self.users: Dict[str, BaleUser] = {}
|
|
self.list_vip_username = load_orjson(
|
|
"/home/sabr/back_new/mj_bale_chat_test/mj_bale_chat/vip_username.json"
|
|
)
|
|
self.temporary_data = load_orjson(
|
|
"/home/sabr/back_new/mj_bale_chat_test/mj_bale_chat/temp.json"
|
|
)
|
|
|
|
def get_or_create(self, update: BaleUpdate) -> BaleUser:
|
|
user_data = extract_user_info(update)
|
|
uc_id = user_data["uc_id"]
|
|
|
|
if user_data["username"] in self.list_vip_username:
|
|
user_data["is_vip"] = True
|
|
|
|
if uc_id not in self.users:
|
|
self.users[uc_id] = BaleUser(
|
|
**user_data,
|
|
)
|
|
user = self.users[uc_id]
|
|
user.update = update
|
|
return user
|