mj_bale_chat/main.py
2025-12-03 14:37:00 +00:00

633 lines
18 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#################
from fastapi import FastAPI, Request, HTTPException
import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re
from dotenv import load_dotenv
from pathlib import Path
from time import sleep
from enum import Enum
#################
from base_model import BaleUpdate, QaChat
from bale_buttons import (
START_BUTTONS_INLINE,
BUTTON_TEXT_TO_CALLBACK,
BUTTON_TEXT_TO_CALLBACK_LIST,
MORE_BUTTON, CHAT_EFFORT_BUTTONS
)
from bale_massages import (
STARTMESSAGE,
ABOUT,
CONTACT_US,
JOST_OJO_TEXT,
HOURGLASS,
WAIT_TEXT,GOFT_OGO_TEXT
)
from utils import load_orjson, save_orjson, ElasticHelper, split_text_chunks
############## Create app
app = FastAPI()
##############
############## Global-Params
load_dotenv()
TOKEN = os.getenv("BALE_TOKEN")
ES_URL = os.getenv("ES_URL")
ES_PASSWORD = os.getenv("ES_PASSWORD")
ES_USER_NAME = os.getenv("ES_USER_NAME")
ES_INDEX_NAME = os.getenv("ES_INDEX_NAME")
BASE_URL = f"https://tapi.bale.ai/bot{TOKEN}"
DATA_DIR = os.path.join(".", "_data_json")
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
GLOBAL_DOMAIN = "https://bl.tavasi.ir" # f"https://YOUR_DOMAIN.com
WEBHOOK_URL = f"{GLOBAL_DOMAIN}/webhook/{TOKEN}"
SET_WEBHOOK_URL = f"https://tapi.bale.ai/bot{TOKEN}/setWebhook"
MAX_LIMIT_RAG = 100
STEP_RAG = 10
USER_STATE = {}
USER_LAST_QUERY = {}
USER_PAGINATION = {}
USER_CHAT_EFFORT = {}
USER_CHAT_LIMIT = {}
USER_LAST_CHAT_QUERY = {}
TIME_OUT = 60
MAX_LEN = 4000 # کمی کمتر از حد پایه امن‌تر است
ES_HELPER = ElasticHelper(
es_url=ES_URL,
es_pass=ES_PASSWORD,
es_user=ES_USER_NAME,
)
class UserState(str, Enum):
MAIN = "main"
LAW_SEARCH = "law_search"
LAW_CHAT = "law_chat"
BUSY = "busy"
class SessionManager:
def __init__(self):
self.states = {}
self.pagination = {}
def init(self, chat_id: int):
if chat_id not in self.states:
self.states[chat_id] = UserState.MAIN
def get_state(self, chat_id: int):
return self.states.get(chat_id, UserState.MAIN)
def set_state(self, chat_id: int, state: UserState):
self.states[chat_id] = state
def set_query(self, chat_id: int, query: str):
self.pagination[chat_id] = {"query": query, "limit": 10, "step": 10}
def increase_limit(self, chat_id: int):
if chat_id in self.pagination:
self.pagination[chat_id]["limit"] += self.pagination[chat_id]["step"]
return self.pagination[chat_id]
return None
def get_pagination(self, chat_id: int):
return self.pagination.get(chat_id)
def clear(self, chat_id: int):
self.pagination.pop(chat_id, None)
self.states[chat_id] = UserState.MAIN
class BaleBot:
def __init__(self, session: SessionManager):
self.session = session
async def handle_update(self, update: BaleUpdate):
if update.message:
return await self.handle_message(update)
if update.callback_query:
return await self.handle_callback(update)
async def handle_message(self, update: BaleUpdate):
chat_id = update.message.chat.id
text = (update.message.text or "").strip()
self.session.init(chat_id)
# /start
if text == "/start":
self.session.clear(chat_id)
send_message(chat_id, STARTMESSAGE, buttons=START_BUTTONS_INLINE)
return {"ok": True}
if text == "تماس با ما":
send_message(chat_id, CONTACT_US)
return {"ok": True}
if text == "درباره ما":
send_message(chat_id, ABOUT)
return {"ok": True}
# ✅ اگر BUSY بود
if self.session.get_state(chat_id) == UserState.BUSY:
send_message(chat_id, WAIT_TEXT)
return {"ok": True}
# ✅ شبیه‌سازی کلیک روی دکمه‌ها با متن
if text == "جستجو":
self.session.set_state(chat_id, UserState.LAW_SEARCH)
send_message(chat_id, JOST_OJO_TEXT)
return {"ok": True}
if text == "گفتگو":
self.session.set_state(chat_id, UserState.LAW_CHAT)
send_message(chat_id, GOFT_OGO_TEXT)
return {"ok": True}
# وضعیت فعلی کاربر
state = self.session.get_state(chat_id)
if state == UserState.LAW_SEARCH:
return await self.handle_search(chat_id, update, text)
elif state == UserState.LAW_CHAT:
return await self.handle_chat(chat_id, text)
# اگر هیچکدوم نبود → پیش‌فرض بزن بره گفتگو
else:
self.session.set_state(chat_id, UserState.LAW_CHAT)
return await self.handle_chat(chat_id, text)
async def handle_callback(self, update: BaleUpdate):
chat_id = update.callback_query.message.chat.id
data = update.callback_query.data
if data == "law_search":
self.session.set_state(chat_id, UserState.LAW_SEARCH)
send_message(chat_id, JOST_OJO_TEXT)
return {"ok": True}
if data.startswith("chat_effort_"):
level = data.replace("chat_effort_", "")
USER_CHAT_EFFORT[chat_id] = level
send_message(chat_id, f"✅ بله حتما، لطفا کمی منتظر بمانید ...")
# اجرای مجدد آخرین پرسش
last_query = USER_LAST_CHAT_QUERY.get(chat_id)
if last_query:
return await self.handle_chat(chat_id, last_query, again=True)
return {"ok": True}
if data == "chat_more":
pag = self.session.increase_limit(chat_id)
if not pag:
send_message(chat_id, "❌ سوالی برای ادامه نیست")
return {"ok": True}
last_query = USER_LAST_CHAT_QUERY.get(chat_id)
if last_query:
return await self.handle_chat(chat_id, last_query)
return {"ok": True}
if data == "law_chat":
self.session.set_state(chat_id, UserState.LAW_CHAT)
send_message(chat_id, GOFT_OGO_TEXT)
return {"ok": True}
if data == "more":
return await self.handle_more(chat_id)
if data == "stop":
self.session.clear(chat_id)
send_message(chat_id, "✅ پایان نمایش")
return {"ok": True}
async def handle_search(self, chat_id, update, text):
self.session.set_state(chat_id, UserState.BUSY)
self.session.set_query(chat_id, text)
send_message(chat_id, "⏳ در حال جستجو...")
try:
pag = self.session.get_pagination(chat_id)
result = await result_semantic_search(
text=pag["query"],
limit=pag["limit"]
)
# print(f'result rag {result} {type(result["ss_answer"])}')
chunked_text_ = get_in_form(
title=pag["query"],
sections=result["ss_answer"],
)
# print(f'chunked_text_ rag {chunked_text_}')
send_message(
chat_id,
chunked_text=chunked_text_,
buttons=MORE_BUTTON
)
except Exception as e:
send_message(chat_id, "❌ خطا در جستجو")
print(e)
finally:
self.session.set_state(chat_id, UserState.MAIN)
return {"ok": True}
async def handle_more(self, chat_id):
pag = self.session.increase_limit(chat_id)
if not pag:
send_message(chat_id, "❌ درخواستی یافت نشد")
return {"ok": True}
self.session.set_state(chat_id, UserState.BUSY)
try:
result = await result_semantic_search(
text=pag["query"],
limit=pag["limit"]
)
chunked_text_ = get_in_form(
title=pag["query"],
sections=result["ss_answer"],
)
send_message(
chat_id,
chunked_text=chunked_text_,
buttons=MORE_BUTTON
)
except Exception as e:
send_message(chat_id, "❌ خطا در افزایش نتایج")
finally:
self.session.set_state(chat_id, UserState.MAIN)
return {"ok": True}
async def handle_chat(self, chat_id: int, text: str, again=False):
# رفتن به حالت BUSY
self.session.set_state(chat_id, UserState.BUSY)
send_message(chat_id, "⏳ در حال پردازش...")
try:
# اگر اولین پیام است، آن را به عنوان query ذخیره کن
pag = self.session.get_pagination(chat_id)
if not pag:
self.session.set_query(chat_id, text)
pag = self.session.get_pagination(chat_id)
else:
# اگر query قبلا بوده ولی کاربر چیز جدیدی تایپ کرده
if text.strip():
self.session.set_query(chat_id, text)
pag = self.session.get_pagination(chat_id)
limit = pag["limit"]
# effort را از حافظه بیرونی بگیر (یا پیش فرض)
effort = USER_CHAT_EFFORT.get(chat_id, "low")
result = await result_chat(
text=pag["query"],
limit=limit,
effort=effort,
again=again
)
# print(f'result {result}')
text_ = result['llm_answer']
query_type = result['query_type']
if isinstance(text_, str):
text_ = format_answer_bale(answer_text=text_)
# print(f'text_ {text_}')
# ذخیره آخرین سوال برای دکمه‌ها
USER_LAST_CHAT_QUERY[chat_id] = pag["query"]
print ( '-='*10, effort, query_type)
if query_type == 'legal_question' and effort != 'medium' :
send_message(
chat_id=chat_id,
chunked_text=text_,
buttons=CHAT_EFFORT_BUTTONS
)
else :
send_message(
chat_id=chat_id,
chunked_text=text_
)
except Exception as e:
print("ERROR in handle_chat:", e)
send_message(chat_id, "❌ خطا در پردازش")
finally:
# بازگشت به حالت چت
self.session.set_state(chat_id, UserState.LAW_CHAT)
return {"ok": True}
WEB_LINK = "https://majles.tavasi.ir/entity/detail/view/qsection/"
def get_in_form(title: str, sections: list, max_len: int = 4000):
chunks = []
current = f"برای پرسش: {title}\n\n"
ref_text = "«منبع»"
for i, data in enumerate(sections, start=1):
sec_text = data.get("content", "")
idx = data.get("id")
# ساخت ref کامل
ref = f"[{ref_text}]({WEB_LINK}{idx})"
# متن کامل آیتم
block = f"{i}: {sec_text}\n{ref}\n\n"
# اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید
if len(current) + len(block) > max_len:
chunks.append(current.rstrip())
current = ""
current += block
# آخرین چانک را هم اضافه کن
if current.strip():
chunks.append(current.rstrip())
return chunks
def format_answer_bale(answer_text: str, max_len: int = 4000):
"""
answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد
sources: مثل ['qs2117427']
"""
ref_text = "«منبع»"
def make_link(src):
return f"[{ref_text}]({WEB_LINK}{src})"
# الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد
# مثلا: (qs123) یا (qs123, qs456, qs789)
pattern = r"\((?:منبع[: ]+)?([a-zA-Z0-9_, ]+)\)"
def replace_source(m):
content = m.group(1)
codes = [c.strip() for c in content.split(",")] # جداسازی چند کد
links = [make_link(code) for code in codes]
full_match = m.group(0)
# if "منبع" in full_match:
# print(f'Found explicit source(s): {links}')
# else:
# print(f'Found implicit source(s): {links}')
return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان
# جایگزینی در متن
answer_text = re.sub(pattern, replace_source, answer_text)
# اگر طول کمتر از max_len بود → تمام
if len(answer_text) <= max_len:
return [answer_text]
# تقسیم متن اگر طول زیاد شد
chunks = []
current = ""
sentences = answer_text.split(". ")
for sentence in sentences:
st = sentence.strip()
if not st.endswith("."):
st += "."
if len(current) + len(st) > max_len:
chunks.append(current.strip())
current = ""
current += st + " "
if current.strip():
chunks.append(current.strip())
return chunks
def send_message(
chat_id: int,
text: str = None,
buttons=None, # inline buttons
chunked_text=None,
):
url = f"https://tapi.bale.ai/bot{TOKEN}/sendMessage"
if chunked_text is None:
chunks = split_text_chunks(text)
else:
chunks = chunked_text
reply_keyboard = [
[{"text": btn} for btn in row] for row in BUTTON_TEXT_TO_CALLBACK_LIST
]
total = len(chunks)
for i, chunk in enumerate(chunks):
is_last = (i == total - 1)
payload = {
"chat_id": chat_id,
"text": chunk,
}
# فقط برای پیام آخر کیبورد بفرست
if is_last:
reply_markup = {
"keyboard": reply_keyboard,
"resize_keyboard": True,
"one_time_keyboard": False,
}
if buttons:
reply_markup["inline_keyboard"] = buttons
payload["reply_markup"] = reply_markup
r = requests.post(url, json=payload)
# try:
# r.json()
# print("Send:", r.status_code)
# except:
# print("Send:", r.status_code)
async def save_to_es(data: QaChat):
# print("save_to_es data rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr")
try:
es_res = ES_HELPER.update_index_doc(
is_update_state=False,
index_name_o=ES_INDEX_NAME,
eid=data.id,
data=data.model_dump(),
)
# type_name, payload, request
# print(f"Saved {es_res}")
except:
print("save_to_es - 000000000000000000000000000000000000000000000")
traceback.print_exc()
def initialize_webhook():
"""
این تابع برای ربات اجباری است
به سرور بله مشخص میکند که به چه server ایی درخواست ها را با این توکن داده شده ارسال کند
"""
params = {"url": WEBHOOK_URL}
r = requests.get(SET_WEBHOOK_URL, params=params)
print("Webhook set status:", r.status_code)
print("Webhook set response:", r.json())
def chunked_simple_text(answer_text, max_len=4000):
chunks = []
current = ""
sentences = answer_text.split(". ")
for sentence in sentences:
st = sentence.strip()
if not st.endswith("."):
st += "."
if len(current) + len(st) > max_len:
chunks.append(current.strip())
current = ""
current += st + " "
if current.strip():
chunks.append(current.strip())
return chunks
async def result_chat(text, limit=10, effort="low", again=False):
url = "http://2.188.15.101:8009/run_chat"
try:
async with httpx.AsyncClient(timeout=TIME_OUT) as client:
response = await client.post(
url,
json={
"query": text,
"limit": limit,
"effort": effort,
"again": again,
"mode_type":"bale"
}
)
response.raise_for_status()
data = response.json()
result = data.get("result", "❌ پاسخی دریافت نشد")
# print('results_chat ',type(result))
return result
except Exception as e:
print(f"❌ خطای RAG:\n{str(e)}")
return "❌ ارتباط با سرور قطع می‌باشد"
async def result_semantic_search(text, limit):
url = "http://2.188.15.101:8009/run_semantic_search"
try:
async with httpx.AsyncClient(timeout=TIME_OUT) as client:
response = await client.post(url, json={"query": text, "limit": limit})
response.raise_for_status()
data = response.json() # ⚠️ اینجا response.json() فقط داده می‌دهد، شیء نیست
result = data.get("result")
# result = chunked_simple_text(result)
return result
except Exception as e:
print(f"❌ خطای RAG:\n{str(e)}")
return "ارتباط با بالا قطع می باشد❌"
session = SessionManager()
bot = BaleBot(session)
@app.post(f"/webhook/{TOKEN}")
async def webhook(request: Request):
raw = await request.json()
try:
update = BaleUpdate(**raw)
except Exception as e:
print("❌ Parse Error", e)
return {"ok": True}
print(f'update {update}')
return await bot.handle_update(update)
if __name__ == "__main__":
print("\n====== Bot Status: ======")
initialize_webhook()
print("\n====== Bot Webhook Server Running ======")
uvicorn.run("main:app", host="0.0.0.0", port=8005, reload=True)
# if __name__ == "__main__":
# # 1⃣ ابتدا وبهوک را ست می‌کنیم
# print("\n====== Bot Status: ======")
# initialize_webhook()
# # 2⃣ بعد سرور FastAPI را اجرا می‌کنیم تا پیام‌ها را دریافت کند
# print("\n====== Bot Webhook Server Running ======")
# uvicorn.run("main:app", host="0.0.0.0", port=8005, reload=True)