633 lines
18 KiB
Python
633 lines
18 KiB
Python
#################
|
||
from fastapi import FastAPI, Request, HTTPException
|
||
import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re
|
||
from dotenv import load_dotenv
|
||
from pathlib import Path
|
||
from time import sleep
|
||
from enum import Enum
|
||
|
||
#################
|
||
from base_model import BaleUpdate, QaChat
|
||
from bale_buttons import (
|
||
START_BUTTONS_INLINE,
|
||
BUTTON_TEXT_TO_CALLBACK,
|
||
BUTTON_TEXT_TO_CALLBACK_LIST,
|
||
MORE_BUTTON, CHAT_EFFORT_BUTTONS
|
||
)
|
||
from bale_massages import (
|
||
STARTMESSAGE,
|
||
ABOUT,
|
||
CONTACT_US,
|
||
JOST_OJO_TEXT,
|
||
HOURGLASS,
|
||
WAIT_TEXT,GOFT_OGO_TEXT
|
||
)
|
||
from utils import load_orjson, save_orjson, ElasticHelper, split_text_chunks
|
||
|
||
|
||
############## Create app
|
||
app = FastAPI()
|
||
##############
|
||
|
||
############## Global-Params
|
||
load_dotenv()
|
||
|
||
TOKEN = os.getenv("BALE_TOKEN")
|
||
ES_URL = os.getenv("ES_URL")
|
||
ES_PASSWORD = os.getenv("ES_PASSWORD")
|
||
ES_USER_NAME = os.getenv("ES_USER_NAME")
|
||
ES_INDEX_NAME = os.getenv("ES_INDEX_NAME")
|
||
|
||
BASE_URL = f"https://tapi.bale.ai/bot{TOKEN}"
|
||
DATA_DIR = os.path.join(".", "_data_json")
|
||
if not os.path.exists(DATA_DIR):
|
||
os.makedirs(DATA_DIR)
|
||
|
||
GLOBAL_DOMAIN = "https://bl.tavasi.ir" # f"https://YOUR_DOMAIN.com
|
||
WEBHOOK_URL = f"{GLOBAL_DOMAIN}/webhook/{TOKEN}"
|
||
SET_WEBHOOK_URL = f"https://tapi.bale.ai/bot{TOKEN}/setWebhook"
|
||
|
||
MAX_LIMIT_RAG = 100
|
||
STEP_RAG = 10
|
||
USER_STATE = {}
|
||
USER_LAST_QUERY = {}
|
||
USER_PAGINATION = {}
|
||
USER_CHAT_EFFORT = {}
|
||
USER_CHAT_LIMIT = {}
|
||
USER_LAST_CHAT_QUERY = {}
|
||
|
||
TIME_OUT = 60
|
||
MAX_LEN = 4000 # کمی کمتر از حد پایه امنتر است
|
||
|
||
ES_HELPER = ElasticHelper(
|
||
es_url=ES_URL,
|
||
es_pass=ES_PASSWORD,
|
||
es_user=ES_USER_NAME,
|
||
)
|
||
|
||
class UserState(str, Enum):
|
||
MAIN = "main"
|
||
LAW_SEARCH = "law_search"
|
||
LAW_CHAT = "law_chat"
|
||
BUSY = "busy"
|
||
|
||
|
||
class SessionManager:
|
||
|
||
def __init__(self):
|
||
self.states = {}
|
||
self.pagination = {}
|
||
|
||
def init(self, chat_id: int):
|
||
if chat_id not in self.states:
|
||
self.states[chat_id] = UserState.MAIN
|
||
|
||
def get_state(self, chat_id: int):
|
||
return self.states.get(chat_id, UserState.MAIN)
|
||
|
||
def set_state(self, chat_id: int, state: UserState):
|
||
self.states[chat_id] = state
|
||
|
||
def set_query(self, chat_id: int, query: str):
|
||
self.pagination[chat_id] = {"query": query, "limit": 10, "step": 10}
|
||
|
||
def increase_limit(self, chat_id: int):
|
||
if chat_id in self.pagination:
|
||
self.pagination[chat_id]["limit"] += self.pagination[chat_id]["step"]
|
||
return self.pagination[chat_id]
|
||
return None
|
||
|
||
def get_pagination(self, chat_id: int):
|
||
return self.pagination.get(chat_id)
|
||
|
||
def clear(self, chat_id: int):
|
||
self.pagination.pop(chat_id, None)
|
||
self.states[chat_id] = UserState.MAIN
|
||
|
||
|
||
class BaleBot:
|
||
|
||
def __init__(self, session: SessionManager):
|
||
self.session = session
|
||
|
||
async def handle_update(self, update: BaleUpdate):
|
||
|
||
if update.message:
|
||
return await self.handle_message(update)
|
||
|
||
if update.callback_query:
|
||
return await self.handle_callback(update)
|
||
|
||
async def handle_message(self, update: BaleUpdate):
|
||
chat_id = update.message.chat.id
|
||
text = (update.message.text or "").strip()
|
||
|
||
self.session.init(chat_id)
|
||
|
||
# /start
|
||
if text == "/start":
|
||
self.session.clear(chat_id)
|
||
send_message(chat_id, STARTMESSAGE, buttons=START_BUTTONS_INLINE)
|
||
return {"ok": True}
|
||
|
||
if text == "تماس با ما":
|
||
send_message(chat_id, CONTACT_US)
|
||
return {"ok": True}
|
||
|
||
if text == "درباره ما":
|
||
send_message(chat_id, ABOUT)
|
||
return {"ok": True}
|
||
|
||
# ✅ اگر BUSY بود
|
||
if self.session.get_state(chat_id) == UserState.BUSY:
|
||
send_message(chat_id, WAIT_TEXT)
|
||
return {"ok": True}
|
||
|
||
# ✅ شبیهسازی کلیک روی دکمهها با متن
|
||
if text == "جستجو":
|
||
self.session.set_state(chat_id, UserState.LAW_SEARCH)
|
||
send_message(chat_id, JOST_OJO_TEXT)
|
||
return {"ok": True}
|
||
|
||
if text == "گفتگو":
|
||
self.session.set_state(chat_id, UserState.LAW_CHAT)
|
||
send_message(chat_id, GOFT_OGO_TEXT)
|
||
return {"ok": True}
|
||
|
||
# وضعیت فعلی کاربر
|
||
state = self.session.get_state(chat_id)
|
||
|
||
if state == UserState.LAW_SEARCH:
|
||
return await self.handle_search(chat_id, update, text)
|
||
|
||
elif state == UserState.LAW_CHAT:
|
||
return await self.handle_chat(chat_id, text)
|
||
|
||
# اگر هیچکدوم نبود → پیشفرض بزن بره گفتگو
|
||
else:
|
||
self.session.set_state(chat_id, UserState.LAW_CHAT)
|
||
return await self.handle_chat(chat_id, text)
|
||
|
||
|
||
async def handle_callback(self, update: BaleUpdate):
|
||
chat_id = update.callback_query.message.chat.id
|
||
data = update.callback_query.data
|
||
|
||
if data == "law_search":
|
||
self.session.set_state(chat_id, UserState.LAW_SEARCH)
|
||
send_message(chat_id, JOST_OJO_TEXT)
|
||
return {"ok": True}
|
||
|
||
if data.startswith("chat_effort_"):
|
||
level = data.replace("chat_effort_", "")
|
||
USER_CHAT_EFFORT[chat_id] = level
|
||
|
||
send_message(chat_id, f"✅ بله حتما، لطفا کمی منتظر بمانید ...")
|
||
|
||
# اجرای مجدد آخرین پرسش
|
||
last_query = USER_LAST_CHAT_QUERY.get(chat_id)
|
||
if last_query:
|
||
return await self.handle_chat(chat_id, last_query, again=True)
|
||
|
||
return {"ok": True}
|
||
|
||
|
||
if data == "chat_more":
|
||
pag = self.session.increase_limit(chat_id)
|
||
|
||
if not pag:
|
||
send_message(chat_id, "❌ سوالی برای ادامه نیست")
|
||
return {"ok": True}
|
||
|
||
last_query = USER_LAST_CHAT_QUERY.get(chat_id)
|
||
if last_query:
|
||
return await self.handle_chat(chat_id, last_query)
|
||
|
||
return {"ok": True}
|
||
|
||
|
||
|
||
if data == "law_chat":
|
||
self.session.set_state(chat_id, UserState.LAW_CHAT)
|
||
send_message(chat_id, GOFT_OGO_TEXT)
|
||
return {"ok": True}
|
||
|
||
if data == "more":
|
||
return await self.handle_more(chat_id)
|
||
|
||
if data == "stop":
|
||
self.session.clear(chat_id)
|
||
send_message(chat_id, "✅ پایان نمایش")
|
||
return {"ok": True}
|
||
|
||
async def handle_search(self, chat_id, update, text):
|
||
|
||
self.session.set_state(chat_id, UserState.BUSY)
|
||
|
||
self.session.set_query(chat_id, text)
|
||
|
||
send_message(chat_id, "⏳ در حال جستجو...")
|
||
|
||
try:
|
||
pag = self.session.get_pagination(chat_id)
|
||
|
||
result = await result_semantic_search(
|
||
text=pag["query"],
|
||
limit=pag["limit"]
|
||
)
|
||
|
||
# print(f'result rag {result} {type(result["ss_answer"])}')
|
||
chunked_text_ = get_in_form(
|
||
title=pag["query"],
|
||
sections=result["ss_answer"],
|
||
)
|
||
# print(f'chunked_text_ rag {chunked_text_}')
|
||
|
||
send_message(
|
||
chat_id,
|
||
chunked_text=chunked_text_,
|
||
buttons=MORE_BUTTON
|
||
)
|
||
|
||
except Exception as e:
|
||
send_message(chat_id, "❌ خطا در جستجو")
|
||
print(e)
|
||
|
||
finally:
|
||
self.session.set_state(chat_id, UserState.MAIN)
|
||
|
||
return {"ok": True}
|
||
|
||
async def handle_more(self, chat_id):
|
||
|
||
pag = self.session.increase_limit(chat_id)
|
||
|
||
if not pag:
|
||
send_message(chat_id, "❌ درخواستی یافت نشد")
|
||
return {"ok": True}
|
||
|
||
self.session.set_state(chat_id, UserState.BUSY)
|
||
|
||
try:
|
||
result = await result_semantic_search(
|
||
text=pag["query"],
|
||
limit=pag["limit"]
|
||
)
|
||
|
||
chunked_text_ = get_in_form(
|
||
title=pag["query"],
|
||
sections=result["ss_answer"],
|
||
)
|
||
send_message(
|
||
chat_id,
|
||
chunked_text=chunked_text_,
|
||
buttons=MORE_BUTTON
|
||
)
|
||
|
||
except Exception as e:
|
||
send_message(chat_id, "❌ خطا در افزایش نتایج")
|
||
|
||
finally:
|
||
self.session.set_state(chat_id, UserState.MAIN)
|
||
|
||
return {"ok": True}
|
||
|
||
async def handle_chat(self, chat_id: int, text: str, again=False):
|
||
|
||
# رفتن به حالت BUSY
|
||
self.session.set_state(chat_id, UserState.BUSY)
|
||
|
||
send_message(chat_id, "⏳ در حال پردازش...")
|
||
|
||
try:
|
||
# اگر اولین پیام است، آن را به عنوان query ذخیره کن
|
||
pag = self.session.get_pagination(chat_id)
|
||
if not pag:
|
||
self.session.set_query(chat_id, text)
|
||
pag = self.session.get_pagination(chat_id)
|
||
else:
|
||
# اگر query قبلا بوده ولی کاربر چیز جدیدی تایپ کرده
|
||
if text.strip():
|
||
self.session.set_query(chat_id, text)
|
||
pag = self.session.get_pagination(chat_id)
|
||
|
||
limit = pag["limit"]
|
||
|
||
# effort را از حافظه بیرونی بگیر (یا پیش فرض)
|
||
effort = USER_CHAT_EFFORT.get(chat_id, "low")
|
||
|
||
|
||
result = await result_chat(
|
||
text=pag["query"],
|
||
limit=limit,
|
||
effort=effort,
|
||
again=again
|
||
)
|
||
# print(f'result {result}')
|
||
text_ = result['llm_answer']
|
||
query_type = result['query_type']
|
||
if isinstance(text_, str):
|
||
text_ = format_answer_bale(answer_text=text_)
|
||
|
||
# print(f'text_ {text_}')
|
||
# ذخیره آخرین سوال برای دکمهها
|
||
USER_LAST_CHAT_QUERY[chat_id] = pag["query"]
|
||
|
||
print ( '-='*10, effort, query_type)
|
||
if query_type == 'legal_question' and effort != 'medium' :
|
||
send_message(
|
||
chat_id=chat_id,
|
||
chunked_text=text_,
|
||
buttons=CHAT_EFFORT_BUTTONS
|
||
)
|
||
else :
|
||
send_message(
|
||
chat_id=chat_id,
|
||
chunked_text=text_
|
||
)
|
||
|
||
|
||
|
||
except Exception as e:
|
||
print("ERROR in handle_chat:", e)
|
||
send_message(chat_id, "❌ خطا در پردازش")
|
||
|
||
finally:
|
||
# بازگشت به حالت چت
|
||
self.session.set_state(chat_id, UserState.LAW_CHAT)
|
||
|
||
return {"ok": True}
|
||
|
||
|
||
|
||
WEB_LINK = "https://majles.tavasi.ir/entity/detail/view/qsection/"
|
||
|
||
def get_in_form(title: str, sections: list, max_len: int = 4000):
|
||
chunks = []
|
||
current = f"برای پرسش: {title}\n\n"
|
||
ref_text = "«منبع»"
|
||
|
||
for i, data in enumerate(sections, start=1):
|
||
sec_text = data.get("content", "")
|
||
idx = data.get("id")
|
||
|
||
# ساخت ref کامل
|
||
ref = f"[{ref_text}]({WEB_LINK}{idx})"
|
||
# متن کامل آیتم
|
||
block = f"{i}: {sec_text}\n{ref}\n\n"
|
||
|
||
# اگر با اضافه شدن این آیتم از حد مجاز عبور میکنیم → شروع چانک جدید
|
||
if len(current) + len(block) > max_len:
|
||
chunks.append(current.rstrip())
|
||
current = ""
|
||
|
||
current += block
|
||
|
||
# آخرین چانک را هم اضافه کن
|
||
if current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
return chunks
|
||
|
||
|
||
|
||
def format_answer_bale(answer_text: str, max_len: int = 4000):
|
||
"""
|
||
answer_text: متن خروجی مدل که داخلش عبارتهای مثل (منبع: qs2117427) وجود دارد
|
||
sources: مثل ['qs2117427']
|
||
"""
|
||
ref_text = "«منبع»"
|
||
|
||
def make_link(src):
|
||
return f"[{ref_text}]({WEB_LINK}{src})"
|
||
|
||
# الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد
|
||
# مثلا: (qs123) یا (qs123, qs456, qs789)
|
||
pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)"
|
||
|
||
def replace_source(m):
|
||
content = m.group(1)
|
||
codes = [c.strip() for c in content.split(",")] # جداسازی چند کد
|
||
links = [make_link(code) for code in codes]
|
||
full_match = m.group(0)
|
||
# if "منبع" in full_match:
|
||
# print(f'Found explicit source(s): {links}')
|
||
# else:
|
||
# print(f'Found implicit source(s): {links}')
|
||
return ", ".join(links) # جایگزینی همه کدها با لینکهایشان
|
||
|
||
# جایگزینی در متن
|
||
answer_text = re.sub(pattern, replace_source, answer_text)
|
||
|
||
# اگر طول کمتر از max_len بود → تمام
|
||
if len(answer_text) <= max_len:
|
||
return [answer_text]
|
||
|
||
# تقسیم متن اگر طول زیاد شد
|
||
chunks = []
|
||
current = ""
|
||
|
||
sentences = answer_text.split(". ")
|
||
for sentence in sentences:
|
||
st = sentence.strip()
|
||
if not st.endswith("."):
|
||
st += "."
|
||
|
||
if len(current) + len(st) > max_len:
|
||
chunks.append(current.strip())
|
||
current = ""
|
||
|
||
current += st + " "
|
||
|
||
if current.strip():
|
||
chunks.append(current.strip())
|
||
|
||
return chunks
|
||
|
||
|
||
|
||
def send_message(
|
||
chat_id: int,
|
||
text: str = None,
|
||
buttons=None, # inline buttons
|
||
chunked_text=None,
|
||
):
|
||
url = f"https://tapi.bale.ai/bot{TOKEN}/sendMessage"
|
||
|
||
if chunked_text is None:
|
||
chunks = split_text_chunks(text)
|
||
else:
|
||
chunks = chunked_text
|
||
|
||
reply_keyboard = [
|
||
[{"text": btn} for btn in row] for row in BUTTON_TEXT_TO_CALLBACK_LIST
|
||
]
|
||
|
||
total = len(chunks)
|
||
|
||
for i, chunk in enumerate(chunks):
|
||
is_last = (i == total - 1)
|
||
|
||
payload = {
|
||
"chat_id": chat_id,
|
||
"text": chunk,
|
||
}
|
||
|
||
# فقط برای پیام آخر کیبورد بفرست
|
||
if is_last:
|
||
|
||
reply_markup = {
|
||
"keyboard": reply_keyboard,
|
||
"resize_keyboard": True,
|
||
"one_time_keyboard": False,
|
||
}
|
||
|
||
if buttons:
|
||
reply_markup["inline_keyboard"] = buttons
|
||
|
||
payload["reply_markup"] = reply_markup
|
||
|
||
r = requests.post(url, json=payload)
|
||
|
||
# try:
|
||
# r.json()
|
||
# print("Send:", r.status_code)
|
||
# except:
|
||
# print("Send:", r.status_code)
|
||
|
||
|
||
async def save_to_es(data: QaChat):
|
||
# print("save_to_es data rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr")
|
||
try:
|
||
|
||
es_res = ES_HELPER.update_index_doc(
|
||
is_update_state=False,
|
||
index_name_o=ES_INDEX_NAME,
|
||
eid=data.id,
|
||
data=data.model_dump(),
|
||
)
|
||
# type_name, payload, request
|
||
# print(f"Saved {es_res}")
|
||
except:
|
||
print("save_to_es - 000000000000000000000000000000000000000000000")
|
||
traceback.print_exc()
|
||
|
||
|
||
def initialize_webhook():
|
||
"""
|
||
این تابع برای ربات اجباری است
|
||
به سرور بله مشخص میکند که به چه server ایی درخواست ها را با این توکن داده شده ارسال کند
|
||
"""
|
||
params = {"url": WEBHOOK_URL}
|
||
r = requests.get(SET_WEBHOOK_URL, params=params)
|
||
print("Webhook set status:", r.status_code)
|
||
print("Webhook set response:", r.json())
|
||
|
||
|
||
|
||
def chunked_simple_text(answer_text, max_len=4000):
|
||
chunks = []
|
||
current = ""
|
||
|
||
sentences = answer_text.split(". ")
|
||
for sentence in sentences:
|
||
st = sentence.strip()
|
||
if not st.endswith("."):
|
||
st += "."
|
||
|
||
if len(current) + len(st) > max_len:
|
||
chunks.append(current.strip())
|
||
current = ""
|
||
|
||
current += st + " "
|
||
|
||
if current.strip():
|
||
chunks.append(current.strip())
|
||
|
||
return chunks
|
||
|
||
|
||
async def result_chat(text, limit=10, effort="low", again=False):
|
||
url = "http://2.188.15.101:8009/run_chat"
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=TIME_OUT) as client:
|
||
response = await client.post(
|
||
url,
|
||
json={
|
||
"query": text,
|
||
"limit": limit,
|
||
"effort": effort,
|
||
"again": again,
|
||
"mode_type":"bale"
|
||
|
||
}
|
||
)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
result = data.get("result", "❌ پاسخی دریافت نشد")
|
||
|
||
# print('results_chat ',type(result))
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(f"❌ خطای RAG:\n{str(e)}")
|
||
return "❌ ارتباط با سرور قطع میباشد"
|
||
|
||
async def result_semantic_search(text, limit):
|
||
|
||
url = "http://2.188.15.101:8009/run_semantic_search"
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=TIME_OUT) as client:
|
||
response = await client.post(url, json={"query": text, "limit": limit})
|
||
response.raise_for_status()
|
||
data = response.json() # ⚠️ اینجا response.json() فقط داده میدهد، شیء نیست
|
||
result = data.get("result")
|
||
# result = chunked_simple_text(result)
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(f"❌ خطای RAG:\n{str(e)}")
|
||
return "ارتباط با بالا قطع می باشد❌"
|
||
|
||
|
||
|
||
|
||
session = SessionManager()
|
||
bot = BaleBot(session)
|
||
|
||
|
||
@app.post(f"/webhook/{TOKEN}")
|
||
async def webhook(request: Request):
|
||
raw = await request.json()
|
||
|
||
try:
|
||
update = BaleUpdate(**raw)
|
||
except Exception as e:
|
||
print("❌ Parse Error", e)
|
||
return {"ok": True}
|
||
|
||
print(f'update {update}')
|
||
|
||
return await bot.handle_update(update)
|
||
|
||
if __name__ == "__main__":
|
||
print("\n====== Bot Status: ======")
|
||
initialize_webhook()
|
||
|
||
print("\n====== Bot Webhook Server Running ======")
|
||
uvicorn.run("main:app", host="0.0.0.0", port=8005, reload=True)
|
||
|
||
|
||
# if __name__ == "__main__":
|
||
# # 1️⃣ ابتدا وبهوک را ست میکنیم
|
||
# print("\n====== Bot Status: ======")
|
||
# initialize_webhook()
|
||
|
||
# # 2️⃣ بعد سرور FastAPI را اجرا میکنیم تا پیامها را دریافت کند
|
||
# print("\n====== Bot Webhook Server Running ======")
|
||
# uvicorn.run("main:app", host="0.0.0.0", port=8005, reload=True)
|
||
|
||
|