From f062fba5715a217d6a5b74cdb1abb3c2a837ebe6 Mon Sep 17 00:00:00 2001 From: hsafaei Date: Tue, 30 Dec 2025 11:39:44 +0000 Subject: [PATCH] be4 es-connect --- .gitignore | 1 + app.py | 123 +++ bale_buttons.py | 42 - bale_massages.py | 35 - base_model.py | 116 --- dependencies.py | 26 + main.py | 632 ----------- requirements.txt => requierments.txt | 2 + router/bale_bot.py | 40 + sample_env | 0 utils.py | 131 --- utils/bale_buttons.py | 10 + utils/bale_massages.py | 39 + utils/base_model.py | 288 ++++++ utils/config.py | 146 +++ utils/main.py | 1011 ++++++++++++++++++ utils/static.py | 26 + utils/workflow.py | 1437 ++++++++++++++++++++++++++ 18 files changed, 3149 insertions(+), 956 deletions(-) mode change 100644 => 100755 .gitignore create mode 100755 app.py delete mode 100644 bale_buttons.py delete mode 100644 bale_massages.py delete mode 100644 base_model.py create mode 100755 dependencies.py delete mode 100644 main.py rename requirements.txt => requierments.txt (61%) mode change 100644 => 100755 create mode 100755 router/bale_bot.py mode change 100644 => 100755 sample_env delete mode 100644 utils.py create mode 100755 utils/bale_buttons.py create mode 100755 utils/bale_massages.py create mode 100755 utils/base_model.py create mode 100644 utils/config.py create mode 100755 utils/main.py create mode 100755 utils/static.py create mode 100755 utils/workflow.py diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 index 67becb4..57b9156 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ app/lib _temp.py _data_json/ __pycache__/* +run_dev.bash diff --git a/app.py b/app.py new file mode 100755 index 0000000..ad1c842 --- /dev/null +++ b/app.py @@ -0,0 +1,123 @@ +# app.py +from fastapi import FastAPI +from contextlib import asynccontextmanager +from utils.workflow import ElasticHelper, Formatter, RequestManager +from fastapi.middleware.cors import CORSMiddleware +from router.bale_bot import router as bale_router +from router.bale_bot import initialize_webhook +from utils.main import BaleBot, UserManager +from dotenv import load_dotenv +import os + + +# --- Lifespan manager --- +@asynccontextmanager +async def lifespan(app: FastAPI): + # 🚀 Startup — همه موارد مقداردهی اولیه باید اینجا باشند + print("🚀 Starting up Bale-Bot Backend system...") + + # فقط یک بار در ابتدای برنامه `.env` را بارگذاری کنیم + load_dotenv() + + TOKEN = os.getenv("BALE_TOKEN") + ES_URL = os.getenv("ES_URL") + ES_PASSWORD = os.getenv("ES_PASSWORD") + ES_USER_NAME = os.getenv("ES_USER_NAME") + ES_INDEX_NAME = os.getenv("ES_INDEX_NAME") + GLOBAL_DOMAIN = os.getenv("GLOBAL_DOMAIN") + BACK_END_HOST= os.getenv("BACK_END_HOST") + BACK_END_PORT= os.getenv("BACK_END_PORT") + BACK_END_URL = f"http://{BACK_END_HOST}:{BACK_END_PORT}" + + # 🔍 گرفتن تنظیمات + required_vars = { + "TOKEN": TOKEN, + "ES_URL": ES_URL, + "ES_PASSWORD": ES_PASSWORD, + "ES_USER_NAME": ES_USER_NAME, + "ES_INDEX_NAME": ES_INDEX_NAME, + "GLOBAL_DOMAIN": GLOBAL_DOMAIN, + } + missing = [k for k, v in required_vars.items() if not v] + if missing: + raise EnvironmentError( + f"Missing required environment variables: {missing}\nPLZ add the required field in the .env file !" + ) + + app.state.es_helper = ElasticHelper( + es_url=ES_URL, + es_pass=ES_PASSWORD, + es_user=ES_USER_NAME, + ) + + app.state.base_url = f"https://tapi.bale.ai/bot{TOKEN}" + app.state.webhook_url = f"{GLOBAL_DOMAIN}/webhook/{TOKEN}" + app.state.set_webhook_url = f"https://tapi.bale.ai/bot{TOKEN}/setWebhook" + app.state.es_index_name = ES_INDEX_NAME + app.state.bale_token = TOKEN + + # اجباری است برای ربات + initialize_webhook( + webhook_url=app.state.webhook_url, + set_webhook_url=app.state.set_webhook_url, + ) + + app.state.user_manager = UserManager() + app.state.formatter = Formatter() + app.state.request_manager = RequestManager( + host_url=BACK_END_URL, + ) + # بله بات + bale_bot = BaleBot( + user_manager=app.state.user_manager, + es_helper=app.state.es_helper, + es_index_name=app.state.es_index_name, + token=app.state.bale_token, + formatter= app.state.formatter, + back_end_url = BACK_END_URL, + request_manager = app.state.request_manager, + ) + app.state.bale_bot = bale_bot + print("=== Bale-Bot Initialized ===") + + yield # برنامه در این حالت اجرا می‌شود + + print("🛑 Shutting down Bale-Bot Backend system...") + + +# --- Application factory --- +def create_app() -> FastAPI: + app = FastAPI( + title="Bale-Bot Backend", + version="0.1.0", + lifespan=lifespan, + # برای محیط production، می‌توانید docs + redoc را غیرفعال کنید + # docs_url=None, redoc_url=None + ) + + # 🌐 CORS + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # فقط در dev — در prod لیست دقیق دامنه‌ها را بگذارید + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + # 🏠 Health routes + @app.get("/", include_in_schema=False) + async def health_check(): + return {"status": "ok", "message": "Bale-Bot Backend is running"} + + @app.get("/ping", include_in_schema=False) + async def ping(): + return {"pong": True} + + # 📦 روت‌ها + app.include_router(bale_router) + + return app + + +############## Create app +app = create_app() diff --git a/bale_buttons.py b/bale_buttons.py deleted file mode 100644 index 83ec74e..0000000 --- a/bale_buttons.py +++ /dev/null @@ -1,42 +0,0 @@ - -START_BUTTONS_INLINE = [ - [{"text": "گفتگو", "callback_data": "law_chat"}], - [{"text": "جستجو", "callback_data": "law_search"}], - [ - # {"text": "راهنما", "callback_data": "help"}, - {"text": "درباره ما", "callback_data": "about"}, - {"text": "تماس با ما", "callback_data": "contact"}, - ], -] - - -BUTTON_TEXT_TO_CALLBACK = { - "جستجو": "law_search", - "گفتگو": "law_chat", - # "راهنما": "help", - "درباره ما": "about", - "تماس با ما": "contact", -} -BUTTON_TEXT_TO_CALLBACK_LIST = [ - [ - "جستجو", - "گفتگو", - # "راهنما", - "درباره ما", - "تماس با ما", - ], -] - -MORE_BUTTON = [[{"text": "🔽 نمایش نتایج بیشتر", "callback_data": f"more"}]] - - -CHAT_EFFORT_BUTTONS = [ - [ - # {"text": "⚡ سریع", "callback_data": "chat_effort_low"}, - {"text": "🧠 بررسی عمیق تر", "callback_data": "chat_effort_medium"}, - # {"text": "⚖ نرمال", "callback_data": "chat_effort_medium"}, - ] - # [ - # {"text": "➕ ادامه پاسخ", "callback_data": "chat_more"} - # ] -] diff --git a/bale_massages.py b/bale_massages.py deleted file mode 100644 index d1152c6..0000000 --- a/bale_massages.py +++ /dev/null @@ -1,35 +0,0 @@ -STARTMESSAGE = """👋 سلام دوست عزیز! 🤗 -به دستیار هوشمند قانون یار خوش آمدید! -فقط کافیه به من بگید چه کمکی از دستم برمیاد!""" - - - -ABOUT = """ -من ربات گفتگوگر حقوقی هستم که روی قوانین رسمی جمهوری اسلامی ایران از سامانه قانون‌یار مجلس شورای اسلامی توسعه یافتم. -لذا به هر سوال و گفتگویی طبق همان منابع پاسخ می‌دهم -نشانی سامانه منبع در زیر آمده است -[qanonyar.parliran.ir](https://qanonyar.parliran.ir) - -کارفرما : مرکز فناوری مجلس شورای اسلامی ایران - -""" -CONTACT_US = """لطفا برای ارتباط با ما از طریق مرکز فناوری مجلس شورای اسلامی ایران اقدام فرمایید""" - - - - - -JOST_OJO_TEXT = """📚✨ هر پرسش حقوقی که از قوانین کشور دارید یا هر متن حقوقی که براتون مهم هست، همینجا بفرستید تا با استناد به قوانین با شما گفتگو کنم.. - -📝 می‌تونید: -• موضوع کلی رو بگید -• سؤال بپرسید -• یا متن حقوقی که داری برام بنویسید - -🎯 هرچقدر دقیق‌تر و واضح‌تر توضیح بدید، بهتر می‌توانم راهنمایی‌ کنم. -""" -HOURGLASS = "⏳" # استیکر ساعت شنی -WAIT_TEXT = f"{HOURGLASS} لطفاً کمی صبر کنید...\nدر حال پردازش درخواست قبلی شما هستم." - - -GOFT_OGO_TEXT = "💬 آماده گفتگو هستم" \ No newline at end of file diff --git a/base_model.py b/base_model.py deleted file mode 100644 index f6a0b69..0000000 --- a/base_model.py +++ /dev/null @@ -1,116 +0,0 @@ -from pydantic import BaseModel, Field -from typing import List, Optional - - -class BaleStartMessageForm(BaseModel): - id: int - is_bot: bool = False - first_name: str - last_name: Optional[str] = None - username: Optional[str] = None - - -class BaleStartMessageChat(BaseModel): - id: int - type: str - username: Optional[str] = None - first_name: Optional[str] = None - - -class BaleStartMessage(BaseModel): - message_id: int - from_user: BaleStartMessageForm = Field(..., alias="from") - date: int - chat: BaleStartMessageChat - text: str - entities: List[dict] = [] - - class Config: - populate_by_name = True - - -class BaleCallbackFrom(BaseModel): - id: int - is_bot: bool - first_name: str - username: Optional[str] = None - - -class BaleCallbackMessage(BaseModel): - message_id: int - chat: BaleStartMessageChat - text: Optional[str] - - -class BaleCallbackQuery(BaseModel): - id: str - from_user: BaleCallbackFrom = Field(..., alias="from") - message: BaleCallbackMessage - data: str - - class Config: - populate_by_name = True - - -class BaleUpdate(BaseModel): - update_id: int - message: Optional[BaleStartMessage] = None - callback_query: Optional[BaleCallbackQuery] = None - - class Config: - exclude_none = True - - -class QaChatSingle(BaseModel): - id: str - chat_id: int - user_query: str - model_key: str - model_effort: str - - retrived_passage: str - retrived_ref_ids: str - retrived_duration: Optional[int] = 0 - prompt_type: str = "question" - llm_duration: int - full_duration: Optional[int] = 0 - time_create: Optional[int] = 0 - used_ref_ids: Optional[str] = "" - status_text: Optional[str] = "" - status: Optional[int] = 0 - prompt_answer: str - other_info: dict | None - - -class QaChatBlock(BaseModel): - id: str - title: str - user_id: str - is_premium: bool - chat: QaChatSingle - total_token: int - is_end: bool - - -class QaChat(BaseModel): - id: str - chat_id: int - title: Optional[str] = "" - user_id: str - user_query: str - query_type: str = "question" # llm -> greeting, other, legal_question | rag -> question - full_duration: Optional[float] = 0 - other_info: Optional[dict] = "" - - ss_ref_ids: Optional[List[str]] = "" - ss_model_key: Optional[str] = "" - ss_duration: Optional[float] = 0 - ss_answer: Optional[str] = "" - - llm_ref_ids: Optional[List[str]] = [] - llm_model_key: Optional[str] = "" - llm_duration: Optional[float] = 0 - llm_answer: Optional[str] = "" - - status_text: Optional[str] = "" - status: Optional[int] = 0 diff --git a/dependencies.py b/dependencies.py new file mode 100755 index 0000000..3ac1783 --- /dev/null +++ b/dependencies.py @@ -0,0 +1,26 @@ +# dependencies.py +from fastapi import Depends, Request +from utils.workflow import ElasticHelper + +def _get_es_helper(request: Request) -> ElasticHelper: + return request.app.state.es_helper + +def _get_base_url(request: Request) -> str: + return request.app.state.base_url + +def _get_webhook_url(request: Request) -> str: + return request.app.state.webhook_url + +def _get_base_set_webhook_url(request: Request) -> str: + return request.app.state.set_webhook_url + +def _get_base_es_index_name(request: Request) -> str: + return request.app.state.es_index_name + +def _get_bale_token(request: Request) -> str: + return request.app.state.bale_token + +def _get_bale_bot(request: Request) -> str: + return request.app.state.bale_bot + + diff --git a/main.py b/main.py deleted file mode 100644 index a8c79ab..0000000 --- a/main.py +++ /dev/null @@ -1,632 +0,0 @@ -################# -from fastapi import FastAPI, Request, HTTPException -import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re -from dotenv import load_dotenv -from pathlib import Path -from time import sleep -from enum import Enum - -################# -from base_model import BaleUpdate, QaChat -from bale_buttons import ( - START_BUTTONS_INLINE, - BUTTON_TEXT_TO_CALLBACK, - BUTTON_TEXT_TO_CALLBACK_LIST, - MORE_BUTTON, CHAT_EFFORT_BUTTONS -) -from bale_massages import ( - STARTMESSAGE, - ABOUT, - CONTACT_US, - JOST_OJO_TEXT, - HOURGLASS, - WAIT_TEXT,GOFT_OGO_TEXT -) -from utils import load_orjson, save_orjson, ElasticHelper, split_text_chunks - - -############## Create app -app = FastAPI() -############## - -############## Global-Params -load_dotenv() - -TOKEN = os.getenv("BALE_TOKEN") -ES_URL = os.getenv("ES_URL") -ES_PASSWORD = os.getenv("ES_PASSWORD") -ES_USER_NAME = os.getenv("ES_USER_NAME") -ES_INDEX_NAME = os.getenv("ES_INDEX_NAME") - -BASE_URL = f"https://tapi.bale.ai/bot{TOKEN}" -DATA_DIR = os.path.join(".", "_data_json") -if not os.path.exists(DATA_DIR): - os.makedirs(DATA_DIR) - -GLOBAL_DOMAIN = "https://bl.tavasi.ir" # f"https://YOUR_DOMAIN.com -WEBHOOK_URL = f"{GLOBAL_DOMAIN}/webhook/{TOKEN}" -SET_WEBHOOK_URL = f"https://tapi.bale.ai/bot{TOKEN}/setWebhook" - -MAX_LIMIT_RAG = 100 -STEP_RAG = 10 -USER_STATE = {} -USER_LAST_QUERY = {} -USER_PAGINATION = {} -USER_CHAT_EFFORT = {} -USER_CHAT_LIMIT = {} -USER_LAST_CHAT_QUERY = {} - -TIME_OUT = 60 -MAX_LEN = 4000 # کمی کمتر از حد پایه امن‌تر است - -ES_HELPER = ElasticHelper( - es_url=ES_URL, - es_pass=ES_PASSWORD, - es_user=ES_USER_NAME, -) - -class UserState(str, Enum): - MAIN = "main" - LAW_SEARCH = "law_search" - LAW_CHAT = "law_chat" - BUSY = "busy" - - -class SessionManager: - - def __init__(self): - self.states = {} - self.pagination = {} - - def init(self, chat_id: int): - if chat_id not in self.states: - self.states[chat_id] = UserState.MAIN - - def get_state(self, chat_id: int): - return self.states.get(chat_id, UserState.MAIN) - - def set_state(self, chat_id: int, state: UserState): - self.states[chat_id] = state - - def set_query(self, chat_id: int, query: str): - self.pagination[chat_id] = {"query": query, "limit": 10, "step": 10} - - def increase_limit(self, chat_id: int): - if chat_id in self.pagination: - self.pagination[chat_id]["limit"] += self.pagination[chat_id]["step"] - return self.pagination[chat_id] - return None - - def get_pagination(self, chat_id: int): - return self.pagination.get(chat_id) - - def clear(self, chat_id: int): - self.pagination.pop(chat_id, None) - self.states[chat_id] = UserState.MAIN - - -class BaleBot: - - def __init__(self, session: SessionManager): - self.session = session - - async def handle_update(self, update: BaleUpdate): - - if update.message: - return await self.handle_message(update) - - if update.callback_query: - return await self.handle_callback(update) - - async def handle_message(self, update: BaleUpdate): - chat_id = update.message.chat.id - text = (update.message.text or "").strip() - - self.session.init(chat_id) - - # /start - if text == "/start": - self.session.clear(chat_id) - send_message(chat_id, STARTMESSAGE, buttons=START_BUTTONS_INLINE) - return {"ok": True} - - if text == "تماس با ما": - send_message(chat_id, CONTACT_US) - return {"ok": True} - - if text == "درباره ما": - send_message(chat_id, ABOUT) - return {"ok": True} - - # ✅ اگر BUSY بود - if self.session.get_state(chat_id) == UserState.BUSY: - send_message(chat_id, WAIT_TEXT) - return {"ok": True} - - # ✅ شبیه‌سازی کلیک روی دکمه‌ها با متن - if text == "جستجو": - self.session.set_state(chat_id, UserState.LAW_SEARCH) - send_message(chat_id, JOST_OJO_TEXT) - return {"ok": True} - - if text == "گفتگو": - self.session.set_state(chat_id, UserState.LAW_CHAT) - send_message(chat_id, GOFT_OGO_TEXT) - return {"ok": True} - - # وضعیت فعلی کاربر - state = self.session.get_state(chat_id) - - if state == UserState.LAW_SEARCH: - return await self.handle_search(chat_id, update, text) - - elif state == UserState.LAW_CHAT: - return await self.handle_chat(chat_id, text) - - # اگر هیچکدوم نبود → پیش‌فرض بزن بره گفتگو - else: - self.session.set_state(chat_id, UserState.LAW_CHAT) - return await self.handle_chat(chat_id, text) - - - async def handle_callback(self, update: BaleUpdate): - chat_id = update.callback_query.message.chat.id - data = update.callback_query.data - - if data == "law_search": - self.session.set_state(chat_id, UserState.LAW_SEARCH) - send_message(chat_id, JOST_OJO_TEXT) - return {"ok": True} - - if data.startswith("chat_effort_"): - level = data.replace("chat_effort_", "") - USER_CHAT_EFFORT[chat_id] = level - - send_message(chat_id, f"✅ بله حتما، لطفا کمی منتظر بمانید ...") - - # اجرای مجدد آخرین پرسش - last_query = USER_LAST_CHAT_QUERY.get(chat_id) - if last_query: - return await self.handle_chat(chat_id, last_query, again=True) - - return {"ok": True} - - - if data == "chat_more": - pag = self.session.increase_limit(chat_id) - - if not pag: - send_message(chat_id, "❌ سوالی برای ادامه نیست") - return {"ok": True} - - last_query = USER_LAST_CHAT_QUERY.get(chat_id) - if last_query: - return await self.handle_chat(chat_id, last_query) - - return {"ok": True} - - - - if data == "law_chat": - self.session.set_state(chat_id, UserState.LAW_CHAT) - send_message(chat_id, GOFT_OGO_TEXT) - return {"ok": True} - - if data == "more": - return await self.handle_more(chat_id) - - if data == "stop": - self.session.clear(chat_id) - send_message(chat_id, "✅ پایان نمایش") - return {"ok": True} - - async def handle_search(self, chat_id, update, text): - - self.session.set_state(chat_id, UserState.BUSY) - - self.session.set_query(chat_id, text) - - send_message(chat_id, "⏳ در حال جستجو...") - - try: - pag = self.session.get_pagination(chat_id) - - result = await result_semantic_search( - text=pag["query"], - limit=pag["limit"] - ) - - # print(f'result rag {result} {type(result["ss_answer"])}') - chunked_text_ = get_in_form( - title=pag["query"], - sections=result["ss_answer"], - ) - # print(f'chunked_text_ rag {chunked_text_}') - - send_message( - chat_id, - chunked_text=chunked_text_, - buttons=MORE_BUTTON - ) - - except Exception as e: - send_message(chat_id, "❌ خطا در جستجو") - print(e) - - finally: - self.session.set_state(chat_id, UserState.MAIN) - - return {"ok": True} - - async def handle_more(self, chat_id): - - pag = self.session.increase_limit(chat_id) - - if not pag: - send_message(chat_id, "❌ درخواستی یافت نشد") - return {"ok": True} - - self.session.set_state(chat_id, UserState.BUSY) - - try: - result = await result_semantic_search( - text=pag["query"], - limit=pag["limit"] - ) - - chunked_text_ = get_in_form( - title=pag["query"], - sections=result["ss_answer"], - ) - send_message( - chat_id, - chunked_text=chunked_text_, - buttons=MORE_BUTTON - ) - - except Exception as e: - send_message(chat_id, "❌ خطا در افزایش نتایج") - - finally: - self.session.set_state(chat_id, UserState.MAIN) - - return {"ok": True} - - async def handle_chat(self, chat_id: int, text: str, again=False): - - # رفتن به حالت BUSY - self.session.set_state(chat_id, UserState.BUSY) - - send_message(chat_id, "⏳ در حال پردازش...") - - try: - # اگر اولین پیام است، آن را به عنوان query ذخیره کن - pag = self.session.get_pagination(chat_id) - if not pag: - self.session.set_query(chat_id, text) - pag = self.session.get_pagination(chat_id) - else: - # اگر query قبلا بوده ولی کاربر چیز جدیدی تایپ کرده - if text.strip(): - self.session.set_query(chat_id, text) - pag = self.session.get_pagination(chat_id) - - limit = pag["limit"] - - # effort را از حافظه بیرونی بگیر (یا پیش فرض) - effort = USER_CHAT_EFFORT.get(chat_id, "low") - - - result = await result_chat( - text=pag["query"], - limit=limit, - effort=effort, - again=again - ) - # print(f'result {result}') - text_ = result['llm_answer'] - query_type = result['query_type'] - if isinstance(text_, str): - text_ = format_answer_bale(answer_text=text_) - - # print(f'text_ {text_}') - # ذخیره آخرین سوال برای دکمه‌ها - USER_LAST_CHAT_QUERY[chat_id] = pag["query"] - - print ( '-='*10, effort, query_type) - if query_type == 'legal_question' and effort != 'medium' : - send_message( - chat_id=chat_id, - chunked_text=text_, - buttons=CHAT_EFFORT_BUTTONS - ) - else : - send_message( - chat_id=chat_id, - chunked_text=text_ - ) - - - - except Exception as e: - print("ERROR in handle_chat:", e) - send_message(chat_id, "❌ خطا در پردازش") - - finally: - # بازگشت به حالت چت - self.session.set_state(chat_id, UserState.LAW_CHAT) - - return {"ok": True} - - - -WEB_LINK = "https://majles.tavasi.ir/entity/detail/view/qsection/" - -def get_in_form(title: str, sections: list, max_len: int = 4000): - chunks = [] - current = f"برای پرسش: {title}\n\n" - ref_text = "«منبع»" - - for i, data in enumerate(sections, start=1): - sec_text = data.get("content", "") - idx = data.get("id") - - # ساخت ref کامل - ref = f"[{ref_text}]({WEB_LINK}{idx})" - # متن کامل آیتم - block = f"{i}: {sec_text}\n{ref}\n\n" - - # اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید - if len(current) + len(block) > max_len: - chunks.append(current.rstrip()) - current = "" - - current += block - - # آخرین چانک را هم اضافه کن - if current.strip(): - chunks.append(current.rstrip()) - - return chunks - - - -def format_answer_bale(answer_text: str, max_len: int = 4000): - """ - answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد - sources: مثل ['qs2117427'] - """ - ref_text = "«منبع»" - - def make_link(src): - return f"[{ref_text}]({WEB_LINK}{src})" - - # الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد - # مثلا: (qs123) یا (qs123, qs456, qs789) - pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)" - - def replace_source(m): - content = m.group(1) - codes = [c.strip() for c in content.split(",")] # جداسازی چند کد - links = [make_link(code) for code in codes] - full_match = m.group(0) - # if "منبع" in full_match: - # print(f'Found explicit source(s): {links}') - # else: - # print(f'Found implicit source(s): {links}') - return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان - - # جایگزینی در متن - answer_text = re.sub(pattern, replace_source, answer_text) - - # اگر طول کمتر از max_len بود → تمام - if len(answer_text) <= max_len: - return [answer_text] - - # تقسیم متن اگر طول زیاد شد - chunks = [] - current = "" - - sentences = answer_text.split(". ") - for sentence in sentences: - st = sentence.strip() - if not st.endswith("."): - st += "." - - if len(current) + len(st) > max_len: - chunks.append(current.strip()) - current = "" - - current += st + " " - - if current.strip(): - chunks.append(current.strip()) - - return chunks - - - -def send_message( - chat_id: int, - text: str = None, - buttons=None, # inline buttons - chunked_text=None, -): - url = f"https://tapi.bale.ai/bot{TOKEN}/sendMessage" - - if chunked_text is None: - chunks = split_text_chunks(text) - else: - chunks = chunked_text - - reply_keyboard = [ - [{"text": btn} for btn in row] for row in BUTTON_TEXT_TO_CALLBACK_LIST - ] - - total = len(chunks) - - for i, chunk in enumerate(chunks): - is_last = (i == total - 1) - - payload = { - "chat_id": chat_id, - "text": chunk, - } - - # فقط برای پیام آخر کیبورد بفرست - if is_last: - - reply_markup = { - "keyboard": reply_keyboard, - "resize_keyboard": True, - "one_time_keyboard": False, - } - - if buttons: - reply_markup["inline_keyboard"] = buttons - - payload["reply_markup"] = reply_markup - - r = requests.post(url, json=payload) - - # try: - # r.json() - # print("Send:", r.status_code) - # except: - # print("Send:", r.status_code) - - -async def save_to_es(data: QaChat): - # print("save_to_es data rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr") - try: - - es_res = ES_HELPER.update_index_doc( - is_update_state=False, - index_name_o=ES_INDEX_NAME, - eid=data.id, - data=data.model_dump(), - ) - # type_name, payload, request - # print(f"Saved {es_res}") - except: - print("save_to_es - 000000000000000000000000000000000000000000000") - traceback.print_exc() - - -def initialize_webhook(): - """ - این تابع برای ربات اجباری است - به سرور بله مشخص میکند که به چه server ایی درخواست ها را با این توکن داده شده ارسال کند - """ - params = {"url": WEBHOOK_URL} - r = requests.get(SET_WEBHOOK_URL, params=params) - print("Webhook set status:", r.status_code) - print("Webhook set response:", r.json()) - - - -def chunked_simple_text(answer_text, max_len=4000): - chunks = [] - current = "" - - sentences = answer_text.split(". ") - for sentence in sentences: - st = sentence.strip() - if not st.endswith("."): - st += "." - - if len(current) + len(st) > max_len: - chunks.append(current.strip()) - current = "" - - current += st + " " - - if current.strip(): - chunks.append(current.strip()) - - return chunks - - -async def result_chat(text, limit=10, effort="low", again=False): - url = "http://2.188.15.101:8009/run_chat" - - try: - async with httpx.AsyncClient(timeout=TIME_OUT) as client: - response = await client.post( - url, - json={ - "query": text, - "limit": limit, - "effort": effort, - "again": again, - "mode_type":"bale" - - } - ) - response.raise_for_status() - data = response.json() - result = data.get("result", "❌ پاسخی دریافت نشد") - - # print('results_chat ',type(result)) - return result - - except Exception as e: - print(f"❌ خطای RAG:\n{str(e)}") - return "❌ ارتباط با سرور قطع می‌باشد" - -async def result_semantic_search(text, limit): - - url = "http://2.188.15.101:8009/run_semantic_search" - - try: - async with httpx.AsyncClient(timeout=TIME_OUT) as client: - response = await client.post(url, json={"query": text, "limit": limit}) - response.raise_for_status() - data = response.json() # ⚠️ اینجا response.json() فقط داده می‌دهد، شیء نیست - result = data.get("result") - # result = chunked_simple_text(result) - return result - - except Exception as e: - print(f"❌ خطای RAG:\n{str(e)}") - return "ارتباط با بالا قطع می باشد❌" - - - - -session = SessionManager() -bot = BaleBot(session) - - -@app.post(f"/webhook/{TOKEN}") -async def webhook(request: Request): - raw = await request.json() - - try: - update = BaleUpdate(**raw) - except Exception as e: - print("❌ Parse Error", e) - return {"ok": True} - - print(f'update {update}') - - return await bot.handle_update(update) - -if __name__ == "__main__": - print("\n====== Bot Status: ======") - initialize_webhook() - - print("\n====== Bot Webhook Server Running ======") - uvicorn.run("main:app", host="0.0.0.0", port=8005, reload=True) - - -# if __name__ == "__main__": -# # 1️⃣ ابتدا وبهوک را ست می‌کنیم -# print("\n====== Bot Status: ======") -# initialize_webhook() - -# # 2️⃣ بعد سرور FastAPI را اجرا می‌کنیم تا پیام‌ها را دریافت کند -# print("\n====== Bot Webhook Server Running ======") -# uvicorn.run("main:app", host="0.0.0.0", port=8005, reload=True) - - diff --git a/requirements.txt b/requierments.txt old mode 100644 new mode 100755 similarity index 61% rename from requirements.txt rename to requierments.txt index 83ebe60..017b6be --- a/requirements.txt +++ b/requierments.txt @@ -1 +1,3 @@ elasticsearch==8.13.2 +nltk +pydantic diff --git a/router/bale_bot.py b/router/bale_bot.py new file mode 100755 index 0000000..fa61442 --- /dev/null +++ b/router/bale_bot.py @@ -0,0 +1,40 @@ +# router.bale_bot.py +from fastapi import Depends, APIRouter, Request +import requests, traceback +from utils.main import BaleBot, BaleUpdate +from dependencies import _get_bale_token, _get_bale_bot + + +############## +router = APIRouter(tags=["bale-bot"]) +############## + + +# @app.post(f"/webhook/{TOKEN}") +@router.post("/webhook/{token}", description="ربات قانون یار") +async def webhook( + request: Request, + token: str = Depends(_get_bale_token), + bale_bot: BaleBot = Depends(_get_bale_bot), +): + raw = await request.json() + + try: + update = BaleUpdate(**raw) + except Exception as e: + print("❌ Parse Error", e) + return {"ok": True} + + # print(f"update {update}") + return await bale_bot.render_update(update) + + +def initialize_webhook(webhook_url, set_webhook_url): + """ + این تابع برای ربات اجباری است + به سرور بله مشخص میکند که به چه server ایی درخواست ها را با این توکن داده شده ارسال کند + """ + params = {"url": webhook_url} + r = requests.get(set_webhook_url, params=params) + print("Webhook set status:", r.status_code) + print("Webhook set response:", r.json()) diff --git a/sample_env b/sample_env old mode 100644 new mode 100755 diff --git a/utils.py b/utils.py deleted file mode 100644 index 9f7aa24..0000000 --- a/utils.py +++ /dev/null @@ -1,131 +0,0 @@ -from elasticsearch import Elasticsearch, helpers -import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time - -from pathlib import Path - -from time import sleep - -# ------------------------ Global-params - -def load_orjson(path: str | Path): - path = Path(path) - with path.open("rb") as f: # باید باینری باز بشه برای orjson - return orjson.loads(f.read()) - - -def save_orjson(path, data): - with open(path, "wb") as f: - f.write( - orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) - ) - - -def split_text_chunks(text: str, max_len: int = 4000): - """Split a long text into safe chunks.""" - return [text[i : i + max_len] for i in range(0, len(text), max_len)] - - -class ElasticHelper: - """ - کلاس ElasticHelper: - نوع ورودی: بدون ورودی مستقیم در تعریف کلاس - نوع خروجی: شیء از نوع ElasticHelper - عملیات: - - متغیرهای کلاسی برای شمارش و مدیریت عملیات تعریف می‌کند - - مسیر پیش‌فرض مپینگ‌ها را تنظیم می‌کند - """ - - counter = 0 - total = 0 - id = "" - path_mappings = os.getcwd() + "/repo/_other/" - - def __init__( - self, - es_url="http://127.0.0.1:6900", - es_pass="", - es_user="elastic", - path_mappings="", - ): - """ - نوع ورودی: - - es_url: آدرس Elasticsearch (str) - پیش‌فرض "http://127.0.0.1:6900" - - es_pass: رمز عبور (str) - پیش‌فرض خالی - - es_user: نام کاربری (str) - پیش‌فرض "elastic" - - path_mappings: مسیر مپینگ‌ها (str) - پیش‌فرض خالی - نوع خروجی: شیء ElasticHelper - عملیات: - - اتصال به Elasticsearch را برقرار می‌کند - - در صورت وجود رمز عبور، از احراز هویت استفاده می‌کند - - تا 10 بار برای اتصال مجدد تلاش می‌کند (هر بار 5 ثانیه انتظار) - - در صورت عدم موفقیت، پیام خطا نمایش داده می‌شود - """ - if path_mappings: - self.path_mappings = path_mappings - - if es_pass == "": - self.es = Elasticsearch(es_url) - else: - self.es = Elasticsearch( - es_url, - basic_auth=(es_user, es_pass), - verify_certs=False, - ) - # print(es_url) - # print(self.es) - - self.success_connect = False - for a in range(0, 10): - try: - if not self.es.ping(): - print("Elastic Connection Not ping, sleep 30 s : ", a) - sleep(5) - continue - else: - self.success_connect = True - break - - except Exception as e: - break - if not self.success_connect: - print("******", "not access to elastic service") - return - - self.counter = 0 - self.total = 0 - self.id = "" - - def search(self, **params): - try: - res = self.es.search(**params) - except: - return {"hits": {"hits": []}} - return res - - def get_document(self, index_name, id): - res = self.es.get(index=index_name, id=id) - return res - - def exist_document(self, index_name, id): - res = self.es.exists(index=index_name, id=id) - return res - - def update_index_doc(self, is_update_state, index_name_o, eid, data): - """ - نوع ورودی: - - is_update_state: تعیین عملیات (update یا index) (bool) - - index_name_o: نام اندیس (str) - - eid: شناسه سند (str) - - data: داده‌های سند (dict) - نوع خروجی: پاسخ Elasticsearch (dict) - عملیات: - - اگر is_update_state=True باشد: سند را آپدیت می‌کند - - در غیر این صورت: سند جدید ایجاد می‌کند - """ - if is_update_state: - resp = self.es.update(index=index_name_o, id=eid, doc=data) - # resp = self.es.update(index=index_name_o, id=eid, body={'doc':data}) - else: - resp = self.es.index(index=index_name_o, id=eid, document=data) - return resp - diff --git a/utils/bale_buttons.py b/utils/bale_buttons.py new file mode 100755 index 0000000..753a3ef --- /dev/null +++ b/utils/bale_buttons.py @@ -0,0 +1,10 @@ +BACK_BUTTON = {"text": "⬅️ مرحله قبل", "callback_data": "workflow_back"} +HOME_BUTTON = {"text": "🏠 خانه", "callback_data": "main"} +MORE_LIMIT_BUTTON = {"text": "بارگذاری نتایج بیشتر 10+", "callback_data": "more_limit"} +MORE_EFFORT_BUTTON = {"text": "🧠 بررسی عمیق تر", "callback_data": "more_effort"} + +BUTTON_TEXT_TO_CALLBACK_LIST = [ + {"text": "جستجو"}, + {"text": "گفتگو"}, + {"text": "بازگشت به خانه"}, +] diff --git a/utils/bale_massages.py b/utils/bale_massages.py new file mode 100755 index 0000000..b30b1b6 --- /dev/null +++ b/utils/bale_massages.py @@ -0,0 +1,39 @@ +BASE_MESSAGE = { + "not_yet": """این قسمت در دست توسعه است...""", + "contact_us": """لطفا برای ارتباط با ما از طریق مرکز فناوری مجلس شورای اسلامی ایران اقدام فرمایید""", + "about_us": """ +من ربات گفتگوگر حقوقی هستم که روی قوانین رسمی جمهوری اسلامی ایران از *سامانه قانون‌یار مجلس شورای اسلامی* توسعه یافتم. +لذا به هر سوال و گفتگویی طبق همان منابع پاسخ می‌دهم +نشانی سامانه منبع در زیر آمده است +[qanonyar.parliran.ir](https://qanonyar.parliran.ir) + +*کارفرما : *مرکز فناوری مجلس شورای اسلامی ایران""", + "main": """👋 سلام دوست عزیز! 🤗 +به دستیار هوشمند قانون یار خوش آمدید! +فقط کافیه به من بگید چه کمکی از دستم برمیاد!""", + "search_in_law": """📚✨ هر پرسش حقوقی که از قوانین کشور دارید یا هر متن حقوقی که براتون مهم هست، همینجا بفرستید تا با استناد به قوانین با شما گفتگو کنم.. + +📝 می‌تونید: +• موضوع کلی رو بگید +• سؤال بپرسید +• یا متن حقوقی که داری برام بنویسید + +🎯 هرچقدر دقیق‌تر و واضح‌تر توضیح بدید، بهتر می‌توانم راهنمایی‌ کنم. +""", + "law_chat": """💬 با احترام، آماده گفتگو هستم""", + "law_chat_logical": """💬 با عرض ادب و احترام، آماده گفتگو حقوقی دقیق هستم""", + "law_writing_policy": """برای بررسی مغایرت با سیاست های قانون گذاری متن ورودی را ارسال نمایید.""", + "constitution": """برای بررسی مغایرت با اصول مهم قانون اساسی؛ لطفا ماده قانونی را وارد نمایید.""", + "general_policy": """برای بررسی مغایرت با سیاست های کلی نظام متن ورودی را ارسال نمایید.""", + "conflic_all_qq": """برای بررسی مغایرت با تمام قوانین؛ متن ورودی را ارسال نمایید.""", + "busy": """⏳ تا اتمام پردازش قبلی منتظر بمانید ⏳""", + "qanon_title_repeat": """لطفا عنوان قانون مورد نظر را ارسال نمایید:""", + "rule_making": """متن مورد نظر را وارد کنید :""", +} + +END_BOT = "✅ پایان نمایش" +USER_WAIT = "✅ بله حتما، لطفا کمی منتظر بمانید ..." +ERROR_IN_PROCESS = "❌ خطا در پردازش" +NO_MORE_QUESTION = "❌ سوالی برای ادامه نیست" +HOURGLASS = "⏳" # استیکر ساعت شنی +WAIT_TEXT = f"{HOURGLASS} لطفاً کمی صبر کنید...\nدر حال پردازش درخواست قبلی شما هستم." diff --git a/utils/base_model.py b/utils/base_model.py new file mode 100755 index 0000000..3e6be1e --- /dev/null +++ b/utils/base_model.py @@ -0,0 +1,288 @@ +from pydantic import BaseModel, Field +from typing import List, Optional, Literal, Union, Dict, Any +from utils.bale_buttons import BUTTON_TEXT_TO_CALLBACK_LIST +import json +from typing import Optional, Callable, List, Any +from pydantic import BaseModel + + +class BaleStartMessageForm(BaseModel): + id: int + is_bot: bool = False + first_name: str + last_name: Optional[str] = None + username: Optional[str] = None + + +class BaleStartMessageChat(BaseModel): + id: int + type: str + username: Optional[str] = None + first_name: Optional[str] = None + + +class BaleStartMessage(BaseModel): + message_id: int + from_user: BaleStartMessageForm = Field(..., alias="from") + date: int + chat: BaleStartMessageChat + text: str + entities: List[dict] = [] + + class Config: + populate_by_name = True + + +class BaleCallbackFrom(BaseModel): + id: int + is_bot: bool + first_name: str + username: Optional[str] = None + + +class BaleCallbackMessage(BaseModel): + message_id: int + chat: BaleStartMessageChat + text: Optional[str] + + +class BaleCallbackQuery(BaseModel): + id: str + from_user: BaleCallbackFrom = Field(..., alias="from") + message: BaleCallbackMessage + data: str + + class Config: + populate_by_name = True + + +class BaleUpdate(BaseModel): + update_id: int + message: Optional[BaleStartMessage] = None + callback_query: Optional[BaleCallbackQuery] = None + + class Config: + exclude_none = True + + +class QaChatSingle(BaseModel): + id: str + chat_id: int + user_query: str + model_key: str + model_effort: str + + retrived_passage: str + retrived_ref_ids: str + retrived_duration: Optional[int] = 0 + prompt_type: str = "question" + llm_duration: int + full_duration: Optional[int] = 0 + time_create: Optional[int] = 0 + used_ref_ids: Optional[str] = "" + status_text: Optional[str] = "" + status: Optional[int] = 0 + prompt_answer: str + other_info: dict | None + + +class QaChatBlock(BaseModel): + id: str + title: str + user_id: str + is_premium: bool + chat: QaChatSingle + total_token: int + is_end: bool + + +class QaChat(BaseModel): + time_stamp: int = 0 + id: str + chat_id: int + title: Optional[str] = "" + user_id: str + user_query: str + query_type: str = ( + "question" # llm -> greeting, other, legal_question | rag -> question + ) + full_duration: Optional[float] = 0 + other_info: Optional[dict] = "" + + ss_ref_ids: Optional[List[str]] = "" + ss_model_key: Optional[str] = "" + ss_duration: Optional[float] = 0 + ss_answer: Optional[str] = "" + + llm_ref_ids: Optional[List[str]] = [] + llm_model_key: Optional[str] = "" + llm_duration: Optional[float] = 0 + llm_answer: Optional[str] = "" + + status_text: Optional[str] = "" + status: Optional[int] = 0 + + +class ConflictDetection(BaseModel): + explanation_of_conflict: str + has_confict: bool = False + + +class ConflictTypeDetection(BaseModel): + conflict_type: Literal["Doctrinal_conflict", "Practical_conflict"] + explanation_of_type: str + + +class RelationIdentification(BaseModel): + reasoning: str + relation_type: Literal[ + "مطلق مقدم، مقید موخر", + "مقید مقدم، مطلق موخر", + "تکرار حکم", + "تعارض مستقر", + "بدون تعارض", + ] +class Evaluation(BaseModel): + is_subject_unity_assessment_correct: bool + is_conflict_detection_correct: bool + is_conflict_type_detection_correct: bool + is_relation_type_detection_correct: bool + valid_relation_type: str + comments: str + +class SingleRuleRelation(BaseModel): + rule_id: str + rule_content: str + rule_type: str + + section_id: str + section_content: str + + qanon_etebar: Optional[str] = None + qanon_id: Optional[str] = None + qanon_title: Optional[str] = None + + state_etebar: Optional[str] = None + + date: Optional[str] = None + + def __str__(self): + dict_ = { + # "rule_id": self.rule_id, + "متن حکم": self.rule_content, + "نوع حکم": self.rule_type, + "متن ماده قانونی": self.section_content, + } + # Return the dictionary as a formatted JSON string + return json.dumps(dict_, ensure_ascii=False, indent=2) + + +class SubjectUnity(BaseModel): + has_subject_unity: Literal["yes", "no", "yes_under_assumptions"] + required_assumptions: Optional[str] = None + reasoning: Optional[str] + + +class RuleRelation(BaseModel): + # ----- input rule + in_rule: SingleRuleRelation + + # ----- semantic-search close rule + db_rule: SingleRuleRelation + + # ----- subject-unity data + subject_unity: SubjectUnity | None = None + + # ----- conflict-detection data + conflict_detection: ConflictDetection | None = None + + # ----- conflict-type-detection data + conflict_type_detection: ConflictTypeDetection | None = None + + # ----- relation-identification data + relation_identification: RelationIdentification | None = None + +class StateDetail(BaseModel): + state: str + message: str + button_text: str + end_buttons: List = [] + inline_buttons: Optional[List[List[dict]]] = None + + handler : str = None + allow_empty_input: bool = True + +class BaleUser(BaseModel): + uc_id: str + chat_id: int + user_id: str + update: BaleUpdate + username: str + is_bot: bool = False + first_name: str = "" + last_name: str = "" + + rule_relation: RuleRelation | None = None + subject_unities:Dict = {} + + # ---- defaults + effort: Literal["medium", "low"] = "low" + limit: int = 10 + permission: str = "normal" + + # ---- runtime + is_processing_lock : bool = False + is_call_back_query : bool = False + state_detail : StateDetail = None + last_message_id : int = 0 + + input_query: str = "" # ورودی کاربر + call_back_query: str = "" # ورودی کاربر + _query_type: str = "" # ورودی کاربر + sub_state: str = "" # برای روندی ها + + all_qq: bool = False + + # ---- memory + last_query: Dict[str, str] = {} # آخرین سوال کاربر + last_result: Dict[str, Any] = {} # آخرین نتایج + last_runtime: Dict[str, Any] = ( + {} + ) # مثلا {"GP": {"effort": "medium", "limit": 20}} برای بقیه چیزها + stack: Dict[str, List] = {} # پشته برای داده های مرحله ای + + +class KeyboardItem(BaseModel): + text: str + + +class InlineKeyboardItem(BaseModel): + text: str + callback_data: str + + +class ReplyMarkup(BaseModel): + keyboard: List[List[Union[KeyboardItem, Dict]]] = Field( + default_factory=lambda: BUTTON_TEXT_TO_CALLBACK_LIST + ) + resize_keyboard: bool = True + one_time_keyboard: bool = False + inline_keyboard: List[List[Union[InlineKeyboardItem, Dict]]] | None = ( + None # دکمه ها ی داخال صفحه + ) + + +class BalePayload(BaseModel): + reply_markup: ReplyMarkup = ReplyMarkup() + chat_id: int + message_id: int = None + text: str + + +class Step(BaseModel): + name: str + level : int + data_input : str + data_output : str + bot_output : List + diff --git a/utils/config.py b/utils/config.py new file mode 100644 index 0000000..ac0c7d6 --- /dev/null +++ b/utils/config.py @@ -0,0 +1,146 @@ +from utils.base_model import StateDetail + + + + +BUSY_TEXT = ("""⏳ تا اتمام پردازش قبلی منتظر بمانید ⏳""",) + + +class StateRegistry: + def __init__(self, states): + self._states = {s.state: s for s in states} + + def get(self, key, default=None): + return self._states.get(key, default) + + def all(self): + return list(self._states.values()) + + + +STATE = [ + StateDetail( + state="search_in_law", + button_text="جستجو معنایی در قوانین 🔎", + end_buttons=[], + message="""متن حقوقی برای جستجو در قوانین را وارد نمایید""", + handler="handle_search_in_law", + ), + StateDetail( + state="chat_in_law", + button_text="گفتگو طبق قوانین کشور", + end_buttons=[], + message="""💬 با احترام، آماده گفتگو هستم""", + handler="handle_chat_in_law", + ), + StateDetail( + state="logical_chat_in_law", + button_text="گفتگوی حقوقی دقیق تر 🧠💬", + end_buttons=[], + message="""💬 با عرض ادب و احترام، آماده گفتگو حقوقی دقیق هستم""", + handler="handle_logical_chat_in_law", + ), + StateDetail( + state="conflict_law_writing_policy", + button_text="بررسی مغایرت با سیاست های قانون گذاری 📜", + end_buttons=[], + message="""متن مورد نظر برای بررسی مغایرت با سیاست های قانون گذاری را وارد کنید :""", + handler="handle_conflict_law_writing_policy", + ), + StateDetail( + state="conflict_qanon_asasi", + button_text="بررسی مغایرت با اصول مهم قانون اساسی ⚖️", + end_buttons=[], + message="""متن مورد نظر برای بررسی مغایرت با اصول مهم قانون اساسی را وارد کنید :""", + handler="handle_conflict_qanon_asasi", + ), + StateDetail( + state="conflict_general_policy", + button_text="بررسی مغایرت با سیاست های کلی نظام 🏛️", + end_buttons=[], + message="""متن مورد نظر برای بررسی مغایرت با سیاست های کلی نظام را وارد کنید :""", + handler="handle_conflict_general_policy", + ), + StateDetail( + state="conflict_all_qavanin", + button_text="بررسی مغایرت در تمام قوانین", + end_buttons=[], + message="""متن مورد نظر برای بررسی مغایرت در تمام قوانین جمهوری اسلامی ایران را وارد کنید :""", + handler="handle_conflict_all_qavanin", + ), + StateDetail( + state="qanon_title_repeat", + button_text="بررسی تکرار عنوان قانون 🔁", + end_buttons=[], + handler="handle_qanon_title_repeat", + message="""لطفا عنوان قانون مورد نظر را برای بررسی ارسال نمایید:""", + ), + StateDetail( + state="rule_making", + handler="handle_rule_making", + button_text="استخراج اجزاء حقوقی متن", + end_buttons=[], + message="""متن مورد نظر برای استخراج اجزاء حقوقی را وارد کنید :""", + ), + StateDetail( + state="beta", + handler="handle_beta", + button_text="BETA-Mode", + end_buttons=[], + message="""این قسمت در دست توسعه قرار دارد ...""", + ), + StateDetail( + state="contact_us", + button_text="تماس با ما ☎️", + message="""لطفا برای ارتباط با ما از طریق مرکز فناوری مجلس شورای اسلامی ایران اقدام فرمایید""", + ), + StateDetail( + state="about_us", + button_text="درباره ما ⚡", + message="""من ربات گفتگوگر حقوقی هستم که روی قوانین رسمی جمهوری اسلامی ایران از *سامانه قانون‌یار مجلس شورای اسلامی* توسعه یافتم. +لذا به هر سوال و گفتگویی طبق همان منابع پاسخ می‌دهم +نشانی سامانه منبع در زیر آمده است +[qanonyar.parliran.ir](https://qanonyar.parliran.ir) + +*کارفرما : *مرکز فناوری مجلس شورای اسلامی ایران""", + ), +] + +STATE_REGISTERY = StateRegistry(STATE) +STATE_CONFIG = {i.state: i for i in STATE} + + +def build_buttons_form(button_form): + buttons = [] + + for row in button_form: + row_buttons = [] + for state_name in row: + state = STATE_REGISTERY.get(state_name) + if not state: + continue + + row_buttons.append({ + "text": state.button_text, + "callback_data": state.state + }) + + if row_buttons: + buttons.append(row_buttons) + + return buttons + +# Button-STYLE +main_button_form = [ + ["search_in_law"], + ["chat_in_law"], + ["logical_chat_in_law"], + ["conflict_law_writing_policy"], + ["conflict_qanon_asasi"], + ["conflict_general_policy"], + ["conflict_all_qavanin"], + ["qanon_title_repeat"], + ["rule_making"], + ["contact_us", "about_us", "beta"] +] +MAIN_BUTTON = build_buttons_form(main_button_form) \ No newline at end of file diff --git a/utils/main.py b/utils/main.py new file mode 100755 index 0000000..d1f9663 --- /dev/null +++ b/utils/main.py @@ -0,0 +1,1011 @@ +################# modularity +### import from external-package +from fastapi import FastAPI, Request, HTTPException +import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re +from dotenv import load_dotenv +from pathlib import Path +from time import sleep +from enum import Enum +import time +from typing import Dict, Optional +from dataclasses import dataclass + +### import from internal-file +from utils.base_model import * +from utils.bale_buttons import * +from utils.bale_massages import * +from utils.workflow import * +from utils.static import * +from utils.config import STATE_CONFIG, STATE, BUSY_TEXT,MAIN_BUTTON, STATE_REGISTERY, build_buttons_form +from typing import Iterable, Any, List + + +def extract_user_info(update: BaleUpdate) -> dict: + uc_id = encode_uc(update) + user_id, chat_id = decode_uc(uc_id) + if update.message: + u = update.message.from_user + return { + "uc_id": str(uc_id), + "chat_id": int(chat_id), + "user_id": user_id, + "username": u.username, + "first_name": u.first_name, + "last_name": u.last_name or "", + "is_bot": u.is_bot, + "update": update, + } + + if update.callback_query: + u = update.callback_query.from_user + return { + "uc_id": str(uc_id), + "chat_id": int(chat_id), + "user_id": user_id, + "username": u.username, + "first_name": u.first_name, + "last_name": "", + "is_bot": u.is_bot, + "update": update, + } + + raise ValueError("No user info in update") + + +class UserManager: + def __init__(self): + self.users: Dict[str, BaleUser] = {} + + def get_or_create(self, update: BaleUpdate) -> BaleUser: + user_data = extract_user_info(update) + + uc_id = user_data["uc_id"] + + if uc_id not in self.users: + self.users[uc_id] = BaleUser( + **user_data, + ) + user = self.users[uc_id] + user.update = update + return user + + +class BaleBot: + """ + input → set_state → render_user_state + """ + + def __init__( + self, + user_manager: UserManager, + es_index_name: str, + token: str, + es_helper: ElasticHelper, + formatter: Formatter, + back_end_url: str, + request_manager: RequestManager, + ): + self.formatter = formatter + self.request_manager = request_manager + self.es_helper = es_helper + self.token = token + self.es_index_name = es_index_name + self.user_manager = user_manager + self.max_limit = 100 + self.back_end_url = back_end_url + self.update_message_url = ( + f"https://tapi.bale.ai/bot{self.token}/editMessageText" + ) + self.send_message_url = f"https://tapi.bale.ai/bot{self.token}/sendMessage" + # self.user_state = self.make_state() + + async def render_user_state(self, user: BaleUser, run_internal=None): + """Buse سیستم""" + try: + state_detail = user.state_detail + + if user.input_query == "": + await self.send_message_helper( + user=user, + text=state_detail.message, + end_buttons=state_detail.end_buttons, + ) + return {"ok": True} + + if run_internal == "subject_unities": + await self.handle_advanced_check_conflict(user) + + elif not run_internal and state_detail.handler: + handler = getattr(self, state_detail.handler) + await handler(user) + except: + await self.handle_main(user) + + return {"ok": True} + + async def update_message_to_bale( + self, + user: BaleUser, + text: str = None, + buttons: List = [], + reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, + ): + + payload = { + "chat_id": user.chat_id, + "message_id": user.last_message_id, + "text": text, + "reply_markup": { + # "keyboard": reply_markup, + "resize_keyboard": True, + "one_time_keyboard": False, + }, + } + + if buttons: + payload["reply_markup"]["inline_keyboard"] = buttons + + try: + r = requests.post(self.update_message_url, json=payload) + + if not r.ok: + print("Bale API Error:", r.status_code, r.text) + + r_ = r.json() + print(f"Send:", r.status_code) + # user.last_message_id = int(r_["result"]["message_id"]) + + except Exception: + print("ERROR in update_message_to_bale:") + traceback.print_exc() + + async def send_message_helper( + self, + user: BaleUser, + update_message: bool = False, + text: str = None, + chunked_text: List[str] = None, + structed_output: List = None, + end_buttons: List = [], + reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, + ): + _i = 0 + + if text: + _i += 1 + if chunked_text: + _i += 1 + if structed_output: + _i += 1 + if _i != 1: + raise ValueError( + "In send_message_helper Only Send One Of {text, chunked_text, structed_output}" + ) + + if update_message == True: + if user.last_message_id == 0: + await self.send_message_helper( + user=user, + update_message=False, + text=text, + end_buttons=end_buttons, + chunked_text=chunked_text, + structed_output=structed_output, + reply_markup=reply_markup, + ) + else: + if text: + new_text = text + if chunked_text: + new_text = chunked_text[-1] + if structed_output: + new_text = structed_output[-1][0] + + await self.update_message_to_bale( + user=user, + text=new_text, + buttons=end_buttons, + reply_markup=reply_markup, + ) + else: + if structed_output: + for i, item in enumerate(structed_output, start=1): + + if i == len(structed_output) and len(end_buttons) > 0: # is_last + item[1] += end_buttons + + await self.send_message_to_bale( + user=user, + text=item[0], + buttons=item[1], + reply_markup=reply_markup, + ) + + if chunked_text: + _buttons = [] + for i, item in enumerate(chunked_text, start=1): + + if i == len(chunked_text) and len(end_buttons) > 0: # is_last + _buttons = end_buttons + + await self.send_message_to_bale( + user=user, + text=item, + buttons=_buttons, + reply_markup=reply_markup, + ) + + if text: + await self.send_message_to_bale( + user=user, + text=text, + buttons=end_buttons, + reply_markup=reply_markup, + ) + + async def send_message_to_bale( + self, + user: BaleUser, + text: str, + buttons: List = [], + reply_markup: List = BUTTON_TEXT_TO_CALLBACK_LIST, + ): + + print(f"send_message_to_bale--- {text}") + print(f"send_message_to_bale buttons--- {buttons}") + payload = { + "chat_id": user.chat_id, + "text": text, + "reply_markup": { + # "keyboard": reply_markup, + "resize_keyboard": True, + "one_time_keyboard": False, + }, + } + + if buttons: + payload["reply_markup"]["inline_keyboard"] = buttons + try: + r = requests.post(self.send_message_url, json=payload) + + if not r.ok: + print("Bale API Error:", r.status_code, r.text) + + r_ = r.json() + print(f"Send:", r.status_code) + user.last_message_id = int(r_["result"]["message_id"]) + + except Exception: + print("ERROR in send_message_to_bale:") + traceback.print_exc() + + async def render_update(self, update: BaleUpdate): + """ + مهمترین تابع ربات بله + ورودی شما یا به صورت متن ساده هست یا بصورت query + render_message : متن + render_callback : کوئری + """ + + # 2️⃣ ساخت یا بازیابی user + user = self.user_manager.get_or_create(update) + + print(f"render_update user.state_detail.state -> {user.state_detail}") + if user.is_processing_lock == False: + if update.message: + print(f"render_message") + user.input_query = str(user.update.message.text or "").strip() + # async def render_message(self, user: BaleUser): + + if user.input_query in ("/start", "main", "بازگشت به خانه"): + await self.handle_main(user) + user.input_query = "" + return {"ok": True} + + if user.input_query == "جستجو": + user.state_detail = STATE_CONFIG["search_in_law"] + user.input_query = "" + + if user.input_query == "گفتگو": + user.state_detail = STATE_CONFIG["chat_in_law"] + user.input_query = "" + + + # --- WorkFlow Logic by State + await self.render_user_state(user) + return {"ok": True} + + if update.callback_query: + """ + اینجا فقط باید مرحله و state عوض شود و داده ای وارد نمیشود!!! + برای سوال پرسیدن از کاربر و ... + """ + print("render_callback") + user.is_call_back_query = True + user.call_back_query = user.update.callback_query.data + # user.update.callback_query.data + run_internal = None + + if user.call_back_query == "main": + user.input_query = "" + await self.handle_main(user) + return {"ok": True} + + # Dynamic Change Options + if user.call_back_query == "more_limit": + user.limit += 10 + + # Dynamic Change Options + elif user.call_back_query == "more_effort": + user.effort = "medium" + + elif user.call_back_query.startswith("subject_unities:"): + run_internal = "subject_unities" + # subject_unities:qq:qq_983214 + + # Dynamic Change State + else: + user.state_detail = STATE_CONFIG[user.call_back_query] + + await self.render_user_state(user, run_internal) + return {"ok": True} + + else: + # beta-mode + await self.send_message_helper(user=user, text=BUSY_TEXT) + + return {"ok": True} + + async def talk(self, user: BaleUser): + + pass + + async def handle_main(self, user: BaleUser): + # ریست کردن پشته‌ها و نتایج + """ """ + ################################### SAVE2ELASTIC ################################### + user.last_query = "" + user.last_result = "" + user.stack.clear() + user.input_query = "" + user.sub_state = "" + user.limit = 10 + user.effort = "low" + user.last_message_id = 0 + + message = """👋 سلام دوست عزیز! 🤗 +به دستیار هوشمند قانون یار خوش آمدید! +فقط کافیه به من بگید چه کمکی از دستم برمیاد!""" + + await self.send_message_helper( + user=user, + text=message, # + end_buttons=MAIN_BUTTON, + ) + + async def handle_search_in_law(self, user: BaleUser): + + user.is_processing_lock = True + user.last_query = user.input_query + + try: + + result = await self.request_manager.get_result( + payload={ + "section_content": user.input_query, + "limit": user.limit, + "effort": user.effort, + }, + url="/semantic_search/run_semantic_search", + ) + + # print(f'result rag {result} {type(result["ss_answer"])}') + chunked_text_ = self.formatter.form_search_in_law( + title=user.input_query, + sections=result["answer"], + ) + + # print(f'chunked_text_ rag {chunked_text_}') + _buttons = [[HOME_BUTTON]] + if user.limit < self.max_limit: + _buttons.insert(0, [MORE_LIMIT_BUTTON]) + + await self.send_message_helper( + user=user, chunked_text=chunked_text_, end_buttons=_buttons + ) + except Exception as e: + print("ERROR in handle_chat:", str(traceback.print_exc())) + await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) + + finally: + user.is_processing_lock = False + + return {"ok": True} + + async def handle_rule_making(self, user: BaleUser): + + user.is_processing_lock = True + try: + result = await self.request_manager.get_result( + payload={ + "section_content": user.input_query, + "limit": user.limit, + "effort": user.effort, + }, + url="/rule_making", + ) + print(f"handle_rule_making {result}") + res_ = await self.formatter.form_rule_making(_input=result) + _buttons = [[HOME_BUTTON]] + if user.effort != "medium": + _buttons.append([MORE_EFFORT_BUTTON]) + await self.send_message_helper( + user=user, chunked_text=res_, end_buttons=_buttons + ) + + except Exception as e: + print("ERROR in handle_chat:", str(traceback.print_exc())) + await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) + finally: + user.is_processing_lock = False + + return {"ok": True} + + async def handle_qanon_title_repeat(self, user: BaleUser): + + user.is_processing_lock = True + try: + result_ = await title_repeated(qanontitle=user.input_query) + res_ = await self.formatter.form_title_repeated(data=result_) + _buttons = [[HOME_BUTTON]] + await self.send_message_helper( + user=user, chunked_text=res_, end_buttons=_buttons + ) + except Exception as e: + print("ERROR in handle_chat:", str(traceback.print_exc())) + await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) + finally: + user.is_processing_lock = False + + return {"ok": True} + + async def handle_conflict_qanon_asasi(self, user: BaleUser): + + user.is_processing_lock = True + user.last_message_id = 0 + user.effort = "medium" + try: + print(f"effort=user.effort {user.effort}") + async for step_data in self.request_manager.stream_result( + payload={ + "section_content": user.input_query, + "effort": user.effort, + "limit": user.limit, + "mode_type": "bale", + }, + url="/conflict/constitution", + ): + step = step_data.get("step") + data = step_data.get("data") + is_first = step_data.get("is_first", False) + is_last = step_data.get("is_last", False) + print(f"is_first {is_first} is_last {is_last}") + # is_last = step_data.get("is_last") + _button = [[HOME_BUTTON]] + print(f"==== handle_constitution ====") + if step == "low": + print(f"low {user.effort}") + chunked_response = await self.formatter.form_constitution(data) + # await self.send_message_helper(user, chunked_text=response) + # if is_last: + if is_first is True: + await self.send_message_helper( + user=user, + update_message=False, + chunked_text=chunked_response, + ) + elif is_last is True: + _button.append([MORE_EFFORT_BUTTON]) + await self.send_message_helper( + user=user, + update_message=True, + chunked_text=chunked_response, + end_buttons=_button, + ) + else: + await self.send_message_helper( + user=user, + update_message=True, + chunked_text=chunked_response, + ) + elif step == "rule_making": + print(f"rule_making {user.effort}") + + chunked_response = await self.formatter.form_rule_making( + _input=data + ) + await self.send_message_helper( + user=user, + update_message=False, + chunked_text=chunked_response, + ) + + elif step == "semantich_search": + print(f"semantich_search {user.effort}") + _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" + response = await self.formatter.form_ss_rules(data, header=_header) + await self.send_message_helper(user, chunked_text=response) + + elif step == "subject_unities": + print(f"subject_unities {user.effort}") + _header = "نتایج اولیه مغایرت های احتمالی :\n" + print(f"data type {type(data)}") + try: + print(f"data -> RuleRelation") + _data = [RuleRelation.parse_obj(i) for i in data] + except: + print(f"data -> String") + _data = data + + chunked_text, _button, mapping_qs = ( + await self.formatter.form_subject_unity(_data, header=_header) + ) + await self.send_message_helper( + user=user, + chunked_text=chunked_text, + end_buttons=_button, + ) + user.subject_unities = mapping_qs + + except Exception as e: + print("ERROR in handle_chat:", str(traceback.print_exc())) + await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) + finally: + user.is_processing_lock = False + return {"ok": True} + + async def handle_conflict_law_writing_policy(self, user: BaleUser): + user.is_processing_lock = True + try: + user.last_query = user.input_query + + _result = await self.request_manager.get_result( + payload={ + "section_content": user.input_query, + "effort": user.effort, + }, + url="/conflict/law_writing_policy", + ) + + result = await self.formatter.from_law_writing_policy( + _input_dict=_result, header="نتیجه بررسی با سیاست های قانون گذاری" + ) + _buttons = [[HOME_BUTTON]] + if user.effort != "medium": + _buttons.insert(0, [MORE_EFFORT_BUTTON]) + + await self.send_message_helper( + user=user, chunked_text=result, end_buttons=_buttons + ) + + except Exception as e: + print("ERROR in handle_chat:", str(traceback.print_exc())) + await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) + + finally: + user.is_processing_lock = False + return {"ok": True} + + async def handle_conflict_all_qavanin(self, user: BaleUser): + user.is_processing_lock = True + try: + async for step_data in self.request_manager.stream_result( + payload={ + "section_content": user.input_query, + "effort": user.effort, + "limit": user.limit, + "mode_type": "bale", + }, + url="/conflict/all_qanon/qs_unity", + ): + step = step_data.get("step") + data = step_data.get("data") + is_first = step_data.get("is_first", False) + is_last = step_data.get("is_last", False) + _button = [[HOME_BUTTON]] + print(f"==== handle_conflict_general_policy ====") + if step == "rule_making": + print(f"rule_making {user.effort}") + + chunked_response = await self.formatter.form_rule_making( + _input=data + ) + await self.send_message_helper( + user=user, + update_message=False, + chunked_text=chunked_response, + ) + + elif step == "semantich_search": + print(f"semantich_search {user.effort}") + _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" + response = await self.formatter.form_ss_rules(data, header=_header) + await self.send_message_helper(user, chunked_text=response) + + elif step == "subject_unities": + print(f"subject_unities {user.effort}") + _header = "نتایج اولیه مغایرت های احتمالی :\n" + print(f"data type {type(data)}") + try: + print(f"data -> RuleRelation") + _data = [RuleRelation.parse_obj(i) for i in data] + except: + print(f"data -> String") + _data = data + + chunked_text, _button, mapping_qs = ( + await self.formatter.form_subject_unity(_data, header=_header) + ) + await self.send_message_helper( + user=user, + chunked_text=chunked_text, + end_buttons=_button, + ) + user.subject_unities = mapping_qs + except Exception as e: + print("ERROR in handle_chat:", str(traceback.print_exc())) + await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) + + finally: + user.is_processing_lock = False + + return {"ok": True} + + async def handle_conflict_general_policy(self, user: BaleUser): + user.is_processing_lock = True + try: + async for step_data in self.request_manager.stream_result( + payload={ + "section_content": user.input_query, + "effort": user.effort, + "limit": user.limit, + "mode_type": "bale", + }, + url="/conflict/general_policy/qs_unity", + ): + step = step_data.get("step") + data = step_data.get("data") + is_first = step_data.get("is_first", False) + is_last = step_data.get("is_last", False) + _button = [[HOME_BUTTON]] + print(f"==== handle_conflict_general_policy ====") + if step == "rule_making": + print(f"rule_making {user.effort}") + + chunked_response = await self.formatter.form_rule_making( + _input=data + ) + await self.send_message_helper( + user=user, + update_message=False, + chunked_text=chunked_response, + ) + + elif step == "semantich_search": + print(f"semantich_search {user.effort}") + _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" + response = await self.formatter.form_ss_rules(data, header=_header) + await self.send_message_helper(user, chunked_text=response) + + elif step == "subject_unities": + print(f"subject_unities {user.effort}") + _header = "نتایج اولیه مغایرت های احتمالی :\n" + print(f"data type {type(data)}") + try: + print(f"data -> RuleRelation") + _data = [RuleRelation.parse_obj(i) for i in data] + except: + print(f"data -> String") + _data = data + + chunked_text, _button, mapping_qs = ( + await self.formatter.form_subject_unity(_data, header=_header) + ) + await self.send_message_helper( + user=user, + chunked_text=chunked_text, + end_buttons=_button, + ) + user.subject_unities = mapping_qs + + except Exception as e: + print("ERROR in handle_chat:", str(traceback.print_exc())) + await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) + + finally: + user.is_processing_lock = False + + return {"ok": True} + + async def handle_advanced_check_conflict(self, user: BaleUser): + user.is_processing_lock = True + print(f"handle_advanced_check_conflict======================================") + try: + step, _type, qq_title = user.call_back_query.split(":") + if _type == "qq": + groups = {} + for qanon_title, item in user.subject_unities.items(): + if qanon_title == qq_title: + for i in item: + groups[qanon_title] = i.db_rule.section_id + + if len(groups) > 1: + _button = [] + for i, (k, v) in enumerate(groups.items(), start=1): + _button.append( + [{"text": i, "callback_data": f"subject_unities:qs:{v}"}] + ) + await self.send_message_helper( + user=user, + text=f"برای ادامه یک مورد را از این قانون انتخاب کنید{k} انتخاب کنید", + end_buttons=_button, + ) + else: + user.call_back_query = f"subject_unities:qs:{groups[qq_title]}" + + elif _type == "qs": + content = None + for k, v in user.subject_unities.items(): + if v.db_rule.section_id == qq_title: + content = v.model_dump() + + print(f"content qs -> {content}") + async for step_data in self.request_manager.stream_result( + payload={ + "rule_relation": content, + "effort": user.effort, + "mode_type": "bale", + }, + url="/conflict/unity_eval", + ): + + print(f"qs {user.call_back_query}") + step = step_data.get("step") + data = step_data.get("data") + # is_last = step_data.get("is_last") + _button = [[HOME_BUTTON]] + print(f"==== handle_advanced_check_conflict ====") + if step == "step2": + print(f"*********************Step2 {user.effort}") + _header = "نتایج اولیه مغایرت های احتمالی :\n" + _data = RuleRelation.parse_obj(data) + chunked_text = await self.formatter.form_conflict_detection( + _data, header=_header + ) + await self.send_message_helper(user=user, text=chunked_text) + elif step == "step3": + print(f"*********************Step3") + _data = RuleRelation.parse_obj(data) + chunked_text = ( + await self.formatter.form_conflict_type_detection( + _data, header=_header + ) + ) + await self.send_message_helper(user=user, text=chunked_text) + elif step == "step4": + + print(f"*********************Step4") + _data = RuleRelation.parse_obj(data) + chunked_text = ( + await self.formatter.form_relation_identification( + _data, header=_header + ) + ) + await self.send_message_helper(user=user, text=chunked_text) + elif step == "step5": + print(f"*********************Step5") + _data = Evaluation.parse_obj(data) + chunked_text = await self.formatter.form_evaluation( + _data, header=_header + ) + await self.send_message_helper(user=user, text=chunked_text) + else: + print(f"eerror in uknown step --> {step}") + + except Exception as e: + print("ERROR in handle_chat:", str(traceback.print_exc())) + await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) + finally: + user.is_processing_lock = False + + return {"ok": True} + + async def handle_chat_in_law( + self, user: BaleUser + ) -> List[BalePayload]: # -> List[List[str, dict]] + + user.is_processing_lock = True + user.last_query = user.input_query + + # گرفتن آخرین runtime + effort = user.effort + try: + # اگر runtime تغییر کرده یا نتیجه قبلی وجود ندارد → درخواست جدید + result = await self.request_manager.get_result( + payload={ + "section_content": user.input_query, + "effort": effort, + "limit": user.limit, + "mode_type": "bale", + }, + url="/semantic_search/run_chat", + ) + print(f"===================={result}") + text_result = self.formatter.form_law_chat( + result["answer"] # , llm_answer["source"] + ) + _buttons = [[HOME_BUTTON]] + _b = [] + if user.limit < self.max_limit: + _b += [MORE_LIMIT_BUTTON] + + if effort != "medium": + _b += [MORE_EFFORT_BUTTON] + + if len(_b) > 0: + _buttons.insert(0, _b) + + await self.send_message_helper( + user=user, chunked_text=text_result, end_buttons=_buttons + ) + + except Exception as e: + print("ERROR in handle_chat:", str(traceback.print_exc())) + await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) + finally: + user.is_processing_lock = False + + async def handle_stream_chat(self, user: BaleUser): + """ + stream_talk() + ↓ + async generator → chunk + ↓ + handle_stream_chat() + ↓ + send first message + ↓ + update_message (throttled) + + """ + + full_text = "" + message_id = None + last_update = 0 + + async for chunk in stream_talk(user.input_query): + full_text += chunk + + now = time.time() + + # جلوگیری از اسپم API (مثلاً هر 0.6 ثانیه آپدیت) + if now - last_update < 0.6: + continue + + last_update = now + + if message_id is None: + # اولین پیام + message_id = await self.send_message_to_bale( + user=user, + text=full_text + " ▌", + ) + else: + payload = BalePayload( + chat_id=user.chat_id, message_id=message_id, text=full_text + " ▌" + ) + self.update_message_to_bale(user, payload) + + # آخر کار، بدون cursor + if message_id: + payload = BalePayload( + chat_id=user.chat_id, message_id=message_id, text=full_text + ) + self.update_message_to_bale(user, payload) + + async def handle_logical_chat_in_law(self, user: BaleUser): + user.is_processing_lock = True + + try: + async for step_data in self.request_manager.stream_result( + payload={ + "section_content": user.input_query, + "effort": user.effort, + }, + url="/stream/chat_logical", + ): + step = step_data.get("step") + data = step_data.get("data") + + if step == "rule_making": + _header = f"✅ مرحله استخراج اجزاء حقوقی متن انجام شد.\nتعداد جزء ها: {len(data)}\n اجزاء حقوقی:\n" + response = await self.formatter.form_rule_making( + data, header=_header + ) + await self.send_message_helper(user, chunked_text=response) + + elif step == "semantic_search": + _header = f"🔍 جستجوی معنایی انجام شد و مستندات مرتبط یافت شدند:\n" + response = await self.formatter.form_ss_rules(data, header=_header) + await self.send_message_helper(user, chunked_text=response) + + elif step == "llm_answer": + _button = [[HOME_BUTTON]] + if user.effort != "medium": + _button.insert(0, [MORE_EFFORT_BUTTON]) + _header = f"📝 پاسخ نهایی:\n" + response = await self.formatter.form_llm_answer_chat( + data, header=_header + ) + print(f"response {response}") + await self.send_message_helper( + user, chunked_text=response, end_buttons=_button + ) + + except Exception as e: + print("ERROR in handle_chat:", str(traceback.print_exc())) + await self.send_message_helper(user=user, text=ERROR_IN_PROCESS) + + finally: + user.is_processing_lock = False + + return {"ok": True} + + async def handle_beta(self, user: BaleUser): + """ + state: + user.is_processing_lock: + query: + time.time + first_name + last_name + """ + try: + text_1 = "سلام" + text_2 = "سلام، عرض ادب و احترام" + + await self.send_message_helper( + user=user, + structed_output=[ + ["این تست است ", [[{"text": "تستتت", "callback_data": "not_yet"}]]], + [ + "این تست اس2ت ", + [[{"text": "تستت2ت", "callback_data": "not_yet"}]], + ], + ], + end_buttons=[ + [ + {"text": "دکمه های نهایی", "callback_data": "not_yet"}, + {"text": "دکمه های ن2هایی", "callback_data": "not_yet"}, + {"text": "دکمه های ن23هایی", "callback_data": "not_yet"}, + ], + [{"text": "دکمه های آخر", "callback_data": "not_yet"}], + ], + ) + + except Exception as e: + print("ERROR in handl_beta:", str(traceback.print_exc())) + + async def save_to_es(self, data: QaChat): + # print("save_to_es data rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr") + try: + es_res = self.es_helper.update_index_doc( + is_update_state=False, + index_name_o=self.es_index_name, + eid=data.id, + data=data.model_dump(), + ) + # type_name, payload, request + print(f"Saved {es_res}") + except Exception as e: + print("save_to_es ", str(traceback.print_exc())) diff --git a/utils/static.py b/utils/static.py new file mode 100755 index 0000000..b7317ba --- /dev/null +++ b/utils/static.py @@ -0,0 +1,26 @@ +################# modularity +### import from external-package +from fastapi import FastAPI, Request, HTTPException +import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re +from dotenv import load_dotenv +from pathlib import Path +from time import sleep +from enum import Enum +from typing import Dict +from utils.base_model import RuleRelation + + + +################################################# +# پارامتر های کلی +################################################# +EFFORT = "low" +MAX_LIMIT_RAG = 100 # بیشینه تعدادی که rag برمی گرداند +STEP_RAG = 10 # مقداری که هر مرحله rag اضافه میکند با فشردن نمایش بیشتر more +TIME_OUT = 600 # هر درخواست با این مقدار time-out ارسال می شود +MAX_LEN = 4000 # کمی کمتر از حد پایه امن‌تر است برای بیشینه پیام در بله +QS_WEB_LINK = "https://majles.tavasi.ir/entity/detail/view/qsection/" # آدرس صفحه qs ها +QQ_WEB_LINK = ( + "https://majles.tavasi.ir/entity/navigation/view/qasection/" # آدرس صفحه qq ها +) +REF_TEXT = "«منبع»" # برای نمایش منبع diff --git a/utils/workflow.py b/utils/workflow.py new file mode 100755 index 0000000..f50c4c8 --- /dev/null +++ b/utils/workflow.py @@ -0,0 +1,1437 @@ +################# modularity +### import from external-package +from fastapi import FastAPI, Request, HTTPException +import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re +from dotenv import load_dotenv +from pathlib import Path +from time import sleep +from enum import Enum +from typing import Dict, List, Tuple +from collections import defaultdict +from typing import Union, List +from elasticsearch import Elasticsearch, helpers +import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time +from pathlib import Path +from time import sleep +import re +import unicodedata +import httpx +import json + +### import from internal-file +from utils.base_model import * +from utils.bale_buttons import * +from utils.bale_massages import * +from utils.static import * + + +############## Global-Params + +DATA_DIR = os.path.join(".", "_data_json") +if not os.path.exists(DATA_DIR): + os.makedirs(DATA_DIR) + +PERSIAN_BOUNDARIES = set(" \n،.؟!؛:") + + +# f"https://YOUR_DOMAIN.com + +from pydantic import BaseModel + +class DbRule(BaseModel): + rule_id: str + rule_content: str + rule_type: str + section_id: str + section_content: str + section_full_path :str + qanon_id: str + qanon_etebar: str + qanon_title: str + state_etebar: str + +class InputRule(BaseModel): + rule_id: str + rule_content: str + rule_type: str + section_id: str + section_content: str + +class SemanticSearchP2P(BaseModel): + in_rule: InputRule + db_rule: DbRule + score: float = 0 + metadata: Dict + + +class Formatter: + """ + Formatting options + Bold : \s*TEXT*\s + Italic : \s_TEXT_\s + Link: [متن](آدرس‌لینک) + ```‌[متن]‌توضیحات‌``` + + answerCallbackQuery -> {callback_query_id:str, text:str, show_alert:bool} + setChatDescription -> {chat_id:str, description:str} + editMessageText -> {chat_id, message_id, text } + """ + + ########################################################### + # توابع برای رفتار کلاس + ########################################################### + def __init__(self, max_len: int = 4000): + self.max_len = max_len + self._number_map = { + "0": "0️⃣", + "1": "1️⃣", + "2": "2️⃣", + "3": "3️⃣", + "4": "4️⃣", + "5": "5️⃣", + "6": "6️⃣", + "7": "7️⃣", + "8": "8️⃣", + "9": "9️⃣", + } + + def __getattr__(self, name: str): + # فقط برای روش‌های مجاز (مثل bold, number) واکنش نشان بده + if name == "bold": + return self._bold + if name == "number": + return self._number + if name == "format_text": + return self._pretier1 + raise AttributeError( + f"'{self.__class__.__name__}' object has no attribute '{name}'" + ) + + ########################################################### + # توابع استایل دهی + ########################################################### + + def _bold(self, _string: str) -> str: + return f" *{_string}* " + + def _number(self, value: Union[int, str]) -> Union[str, int]: + """ + اگر int بود، تبدیل به str برای پردازش + اگر رشته‌ای بود که فقط از ارقام تشکیل شده → تبدیل + اگر رشته بود اما عدد نبود → خودش را برگردان + هر نوع دیگری → بدون تغییر برگردان + تبدیل هر رقم به ایموجی مربوطه + """ + + if isinstance(value, int): + num_str = str(value) + elif isinstance(value, str): + if value.isdigit(): + num_str = value + else: + return value + else: + return value + + return "".join( + self._number_map.get(d, d) for d in num_str[::-1] + ) # handle array of number + + def _pretier1(self, text: str) -> str: + """ + مشکل : + عدم تشخیص پاراگراف + عدم تشخیص اعداد پشت سر هم با - و فاصله + عدم تشخیص اعداد 1 0 - با یک فاصله از اول هستند + """ + pattern = r"(? List: + """ + خروجی به صورت چانک بدون دکمه هر خروجی لینک دارد + برش امن لینک ها و اده ها + """ + chunks = [] + current = f"برای پرسش: {title}\n\n" + + for i, data in enumerate(sections, start=1): + sec_text = data.get("content", "") + idx = data.get("id") + + # ساخت ref کامل + ref = self.__make_link_qs(src=idx) + # متن کامل آیتم + block = ( + f"{self.number(i)} {sec_text}\n{ref}\n\n" # self.format_text(sec_text) + ) + + # اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید + if len(current) + len(block) > self.max_len: + chunks.append(current.rstrip()) + current = "" + + current += block + + # آخرین چانک را هم اضافه کن + if current.strip(): + chunks.append(current.rstrip()) + + return chunks + + def form_law_chat(self, answer_text: str): + """ + answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد + sources: مثل ['qs2117427'] + """ + + # الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد + # مثلا: (qs123) یا (qs123, qs456, qs789) + pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)" + + def replace_source(m): + content = m.group(1) + codes = [c.strip() for c in content.split(",")] # جداسازی چند کد + links = [make_link_qs(src=code) for code in codes] + full_match = m.group(0) + # if "منبع" in full_match: + # print(f'Found explicit source(s): {links}') + # else: + # print(f'Found implicit source(s): {links}') + return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان + + # جایگزینی در متن + answer_text = re.sub(pattern, replace_source, answer_text) + + # اگر طول کمتر از MAX_LEN بود → تمام + if len(answer_text) <= MAX_LEN: + return [answer_text] + + # تقسیم متن اگر طول زیاد شد + chunks = [] + current = "" + + sentences = answer_text.split(". ") + for sentence in sentences: + st = sentence.strip() + if not st.endswith("."): + st += "." + + if len(current) + len(st) > MAX_LEN: + chunks.append(current.strip()) + current = "" + + current += st + " " + + if current.strip(): + chunks.append(current.strip()) + + return chunks + + async def form_title_repeated(self, data: List[Dict[str, str]]): + if len(data) == 0: + return ["هیچ عنوان تکراری و یا حتی مشابه یافت نشد."] + + chunks = [] + current = "نزدیک‌ترین عناوین مشابه عنوان قانون موارد زیر می باشد:\n\n" + + for i, item in enumerate(data, start=1): + title = item.get("title", "") + sec_id = item.get("id", "") + score = item.get("score", "") + + if not title or not sec_id: + continue + + ref = self.__make_link_qq(src=sec_id) + + # بلوک کامل: عنوان + لینک — هر دو در یک بلوک غیرقابل تقسیم + # block = f"{i}. {title}(وزن {score})\n{ref}\n" + block = ( + f"{self.number(i)} {self.bold(title)}؛ میزان تشابه: %{score} ؛{ref}\n" + ) + + # اگر اضافه کردن این بلوک باعث overflow شود → چانک قبلی را ذخیره و current را ریست کن + if len(current) + len(block) > self.max_len and current.strip(): + chunks.append(current.rstrip()) + + current += block + + # ذخیره آخرین چانک + if current.strip(): + chunks.append(current.rstrip()) + + return chunks + + def replace_source(self, m): + content = m.group(1) + codes = [c.strip() for c in content.split(",")] # جداسازی چند کد + links = [self.__make_link_qs(src=code) for code in codes] + full_match = m.group(0) + # if "منبع" in full_match: + # print(f'Found explicit source(s): {links}') + # else: + # print(f'Found implicit source(s): {links}') + return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان + + async def form_chat(self, llm_text: str, header: str): + """ + answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد + """ + + # الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد + # مثلا: (qs123) یا (qs123, qs456, qs789) + pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)" + + # جایگزینی در متن + answer_text = re.sub(pattern, self.replace_source, llm_text) + + # اگر طول کمتر از MAX_LEN بود → تمام + if len(answer_text) <= self.max_len: + return [header + answer_text] + + # تقسیم متن اگر طول زیاد شد + chunks = [] + current = header + + sentences = answer_text.split(". ") + for sentence in sentences: + st = sentence.strip() + if not st.endswith("."): + st += "." + + if len(current) + len(st) > self.max_len: + chunks.append(current.strip()) + current = "" + + current += st + " " + + if current.strip(): + chunks.append(current.strip()) + + return chunks + + async def form_llm_answer_chat(self, _input, header): + if len(_input) > 0: + return await self.form_chat(llm_text=_input["text"], header=header) + # _input['source'] + return ["هیچ ماده مرتبطی یافت نشد!"] + + async def form_subject_unity(self, + _input:Union[List[RuleRelation], str], + header="نتایج اولیه مغایرت های احتمالی :\n" + ): + if isinstance(_input, str): + _input = self.form_law_chat(_input) + return _input, [], [] + else: + chunks = [] + buttons = [] + seen_qanon_titles = set() + groups = defaultdict(set) + + for item in _input: + title = item.db_rule.qanon_title + groups[title].add(item.db_rule.section_id) + + current = header + for idx, (qanon_title, section_ids) in enumerate(groups.items(), start=1): + block_lines = [f"{self.number(idx)} در قانون {self.bold(qanon_title)}"] + sample_items_by_section = {} + for item in _input: + if item.db_rule.qanon_title == qanon_title and item.db_rule.section_id in section_ids: + sid = item.db_rule.section_id + if sid not in sample_items_by_section: + sample_items_by_section[sid] = item + + for sub_idx, section_id in enumerate(sorted(section_ids), start=1): + item = sample_items_by_section[section_id] # representative item + link = self.__make_link_qs(src=section_id) + + unity = item.subject_unity + if not unity: + block_lines.append("\t\t—") + continue + + if unity.has_subject_unity == "yes": + block_lines.append(f"توضیح {sub_idx} بر اساس {link}:") + block_lines.append(f"\t{unity.reasoning or ''}") + + elif unity.has_subject_unity == "yes_under_assumptions": + block_lines.append(f"توضیح {sub_idx} بر اساس {link}:") + block_lines.append(f"\t{unity.reasoning or ''}") + block_lines.append("\tتوضیحات بیشتر (فرضیات لازم):") + block_lines.append(f"\t{unity.required_assumptions or ''}") + + if len(block_lines) > 2: + block = "\n".join(block_lines) + "\n" + else: + continue + + # Auto-chunk based on length + if len(current) + len(block) > MAX_LEN and current != header: + chunks.append(current.rstrip()) + current = header + + current += block + + # Button: add *once* per qanon_title + if qanon_title and qanon_title not in seen_qanon_titles: + seen_qanon_titles.add(qanon_title) + buttons.append([ + { + "text": f"بررسی مغایرت با {qanon_title}", + "callback_data": f"subject_unities:qq:{qanon_title}" + } + ]) + + # Final flush + if current.strip() and (len(chunks) == 0 or current.strip() != header.rstrip()): + chunks.append(current.rstrip()) + + input_dict = {item.db_rule.section_id : item for item in _input} + mapping_data = defaultdict(list) + for k, v in groups.items(): + for i in v: + mapping_data[k].append(input_dict[i]) + + return chunks, buttons, mapping_data + + + async def form_rule_making( + self, _input, header="گزاره های حقوقی زیر استخراج شد:\n\n" + ): + if len(_input) > 0: + chunks = [] + current = header + + for i, item in enumerate(_input, start=1): + block = f'{self.number(i)} {item["rule_content"]}\n' + if len(current) + len(block) > self.max_len and current.strip(): + chunks.append(current.rstrip()) + + current += block + + if current.strip(): + chunks.append(current.rstrip()) + + return chunks + + return ["هیچ گزاره حقوقی یافت و استخراج نشد!"] + + def get_asl(self, _in: str): + return _in.replace("qs_", "اصل ") + + def get_in_form_single(self, asl: str, _in_dict: Dict, _id: int) -> str: + f_list = [] + if _in_dict["is_conflict"]: + f_list += [f"{_id}. *{self.get_asl(asl)}*: ❌ دارای مغایرت ❌"] + f_list += [ + f"موضوع این اصل قانون اساسی {_in_dict['principle_subject']} می باشد." + ] + f_list += [f"موضوع متن ورودی شامل {_in_dict['text_subject']} است."] + if _in_dict["has_subject_relation"] == True: + unity_text = "می باشد" + if _in_dict["has_subject_relation"] == False: + unity_text = "نمی باشد" + f_list += [f"دارای وحدت در موضوع {unity_text}."] + if _in_dict["conflict_type"] != "": + f_list += [f"نوع مغایرت تشحیص داده شده: {_in_dict['conflict_type']}"] + else: + f_list += [f"{_id}. *{self.get_asl(asl)}*: ✅ عدم مغایرت ✅"] + + f_list += [f"توضیحات: {_in_dict['legal_reasoning']}"] + f_list += ["\n\n"] + return "\n".join(f_list) + + async def form_constitution(self, input: Dict): + """ """ + + chunks = [] + header = "*نتیجه بررسی مغایرت با اصول مهم قانون اساسی*:\n\n" + current = header + + _id = 1 + for k, v in input.items(): + block = self.get_in_form_single(asl=k, _in_dict=v, _id=_id) + + # اگر این بلاک جا نشد → چانک جدید + if len(current) + len(block) > self.max_len: + chunks.append(current.rstrip()) + current = header + block + else: + current += block + + _id += 1 + + # آخرین چانک + if current.strip(): + chunks.append(current.rstrip()) + + return chunks + + async def form_ss_rules(self, _input:List[Dict], header): + + if len(_input) > 1: + chunks = [] + current = header + _i = 0 + + # -------- 1. group by qanon_id / qanon_title + groups = defaultdict(set) + for item in _input: + key = item['db_rule']['qanon_title'] + groups[key].add(item['db_rule']['section_id']) + + for qanon_title, ids in groups.items(): + _i += 1 + links = "و ".join([self.__make_link_qs(id) for id in ids]) + block = f"{self.number(_i)} در قانون {self.bold(qanon_title)} تشابه با گزاره های حقوقی ماده:{links}\n\n" + + if len(current) + len(block) > self.max_len: + if current: + chunks.append(current) + + current = header + block + else: + current += block + + if current and current != header: + chunks.append(current) + + return chunks + + return ["هیچ ماده مرتبطی یافت نشد!"] + + + + async def form_conflict_detection(self, + _input:RuleRelation, header="نتیجه تشخیص مغایرت :\n" + ): + current = header + + # ساخت لینک + # _link = self.__make_link_qs(src=_input.db_rule.section_id) + current += f"به صورت خلاصه {_input.conflict_detection.has_confict}\n" + current += f"توضیحات : {_input.conflict_detection.explanation_of_conflict}\n" + + return current + + async def form_conflict_type_detection(self, + _input:RuleRelation, header="نتیجه تشخیص نوع مغایرت :\n" + ): + current = header + + # ساخت لینک + # _link = self.__make_link_qs(src=_input.db_rule.section_id) + current += f"به صورت خلاصه {_input.conflict_type_detection.conflict_type}\n" + current += f"توضیحات : {_input.conflict_type_detection.explanation_of_type}\n" + + return current + + async def form_relation_identification(self, + _input:RuleRelation, header="نتیجه رابطه مغایرت :\n" + ): + current = header + + # ساخت لینک + # _link = self.__make_link_qs(src=_input.db_rule.section_id) + current += f"به صورت خلاصه {_input.relation_identification.relation_type}\n" + current += f"توضیحات : {_input.relation_identification.reasoning}\n" + + return current + + async def form_evaluation(self, + _input:Evaluation, header="نتیجه نهایی بررسی مغایرت :\n" + ): + current = header + + # ساخت لینک + # _link = self.__make_link_qs(src=_input.db_rule.section_id) + current += f"1. آیا ارزیابی وحدت موضوع صحیح است؟ {_input.is_subject_unity_assessment_correct}\n" + current += f"2. آیا ارزیابی تشخیص نوع درست است ؟ {_input.is_conflict_detection_correct}\n" + current += f"3. آیا ارزیابی نوع درست است ؟ {_input.is_conflict_type_detection_correct}\n" + current += f"4. رابطه مغایرت چطور؟ {_input.is_relation_type_detection_correct}\n" + current += f"5. نوع رابطه ؟ {_input.valid_relation_type}\n" + current += f"6.توضیح بیشتر: {_input.comments}\n" + + return current + + async def from_law_writing_policy(self, _input_dict: Dict, header:str) -> List[str]: + f_list = [ + self.bold(header)] + _str = { + "analyze": "گزارش تحلیلی بندبه‌بند", + "strength": "بیان نقاط قوت", + "weakness": "بیان نقاط ضعف و ریسک‌های تقنینی", + "conclusion_score": "جمع‌بندی نهایی شامل میزان انطباق کلی (عالی / متوسط / ضعیف)", + "suggestions": "ارائه پیشنهادهای اصلاحی مشخص و عملی", + } + for k, v in _input_dict.items(): + _title = _str[k] + _title = "*" + _title + "*" + f_list += [_title] + # f_list += ['\n'] + f_list += [v] + f_list += ['\n'] + + return ["\n".join(f_list)] + + + +""" +deleteMessage +message_id +chat_id +""" + +class RequestManager: + def __init__(self, + host_url:str, + url_time_out=120, + step_time_out=60, + ): + self.host_url = host_url + self.url_time_out = url_time_out + self.step_time_out = step_time_out + TASK_URL ={ + # stream + "":"/stream/chat_logical", + + # none-stream + "":"/conflict/general_policy/qs_unity", + "":"/conflict/all_qanon/qs_unity", + "":"/conflict/general_policy/unity_eval", + "":"/conflict/law_writing_policy", + "":"/conflict/constitution", + "":"/rule_making", + "":"/chat", + "":"/talk", + "":"/semantic_search/chat_logical", + "":"/semantic_search/run_chat", + "":"/semantic_search/run_semantic_search", + } + + + async def get_result( + self, + payload, + url :str, + section_id:str='qs_10001', + mode_type='bale', + ): + + _url = self.host_url+url + print( + f'get_result _url {_url}' + ) + try: + async with httpx.AsyncClient(timeout=self.url_time_out) as client: + response = await client.post( + url=_url, json=payload + ) + response.raise_for_status() + data = response.json() + result = data.get("result", "❌ پاسخی دریافت نشد") + + return result + + except Exception as e: + print(f"❌ خطای RAG:\n{str(e)}") + return "❌ ارتباط با سرور قطع می‌باشد" + + + async def stream_result( + self, + url :str, + payload : Dict, + ): + """ + هر مرحله شامل: + { + step : "اسم مرحله" + data : "داده در این مرحله" + } + """ + timeout = httpx.Timeout(self.step_time_out, read=self.url_time_out) + _url = self.host_url+url + + async with httpx.AsyncClient(timeout=timeout) as client: + # ارسال درخواست به صورت Stream + async with client.stream( + "POST", + url=_url, + json=payload + ) as r: + # بررسی وضعیت پاسخ + if r.status_code != 200: + print(f"Error: {r.status_code}") + return + + # خواندن خط به خط (هر خط یک JSON است که سرور Yield کرده) + async for line in r.aiter_lines(): + if line.strip(): # جلوگیری از پردازش خطوط خالی + try: + # تبدیل متن JSON به دیکشنری پایتون + step_data = json.loads(line) + yield step_data + except json.JSONDecodeError: + print(f"Failed to decode: {line}") + + +def unique_id(prefix="wai_") -> str: + return f"{prefix}{uuid.uuid4().hex[:16]}" + + +def load_orjson(path: str | Path): + path = Path(path) + with path.open("rb") as f: # باید باینری باز بشه برای orjson + return orjson.loads(f.read()) + + +def save_orjson(path, data): + with open(path, "wb") as f: + f.write( + orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) + ) + + +def split_text_chunks(text: str): + """Split a long text into safe chunks.""" + return [text[i : i + MAX_LEN] for i in range(0, len(text), MAX_LEN)] + + +class ElasticHelper: + """ + کلاس ElasticHelper: + نوع ورودی: بدون ورودی مستقیم در تعریف کلاس + نوع خروجی: شیء از نوع ElasticHelper + عملیات: + - متغیرهای کلاسی برای شمارش و مدیریت عملیات تعریف می‌کند + - مسیر پیش‌فرض مپینگ‌ها را تنظیم می‌کند + """ + + counter = 0 + total = 0 + id = "" + path_mappings = os.getcwd() + "/repo/_other/" + + def __init__( + self, + es_url="http://127.0.0.1:6900", + es_pass="", + es_user="elastic", + path_mappings="", + ): + """ + نوع ورودی: + - es_url: آدرس Elasticsearch (str) - پیش‌فرض "http://127.0.0.1:6900" + - es_pass: رمز عبور (str) - پیش‌فرض خالی + - es_user: نام کاربری (str) - پیش‌فرض "elastic" + - path_mappings: مسیر مپینگ‌ها (str) - پیش‌فرض خالی + نوع خروجی: شیء ElasticHelper + عملیات: + - اتصال به Elasticsearch را برقرار می‌کند + - در صورت وجود رمز عبور، از احراز هویت استفاده می‌کند + - تا 10 بار برای اتصال مجدد تلاش می‌کند (هر بار 5 ثانیه انتظار) + - در صورت عدم موفقیت، پیام خطا نمایش داده می‌شود + """ + if path_mappings: + self.path_mappings = path_mappings + + if es_pass == "": + self.es = Elasticsearch(es_url) + else: + self.es = Elasticsearch( + es_url, + basic_auth=(es_user, es_pass), + verify_certs=False, + ) + # print(es_url) + # print(self.es) + + self.success_connect = False + for a in range(0, 10): + try: + if not self.es.ping(): + print("Elastic Connection Not ping, sleep 30 s : ", a) + sleep(5) + continue + else: + self.success_connect = True + break + + except Exception as e: + break + if not self.success_connect: + print("******", "not access to elastic service") + return + + self.counter = 0 + self.total = 0 + self.id = "" + + def search(self, **params): + try: + res = self.es.search(**params) + except: + return {"hits": {"hits": []}} + return res + + def get_document(self, index_name, id): + res = self.es.get(index=index_name, id=id) + return res + + def exist_document(self, index_name, id): + res = self.es.exists(index=index_name, id=id) + return res + + def update_index_doc(self, is_update_state, index_name_o, eid, data): + """ + نوع ورودی: + - is_update_state: تعیین عملیات (update یا index) (bool) + - index_name_o: نام اندیس (str) + - eid: شناسه سند (str) + - data: داده‌های سند (dict) + نوع خروجی: پاسخ Elasticsearch (dict) + عملیات: + - اگر is_update_state=True باشد: سند را آپدیت می‌کند + - در غیر این صورت: سند جدید ایجاد می‌کند + """ + if is_update_state: + resp = self.es.update(index=index_name_o, id=eid, doc=data) + # resp = self.es.update(index=index_name_o, id=eid, body={'doc':data}) + else: + resp = self.es.index(index=index_name_o, id=eid, document=data) + return resp + + +def make_link_qq(src, ref_text=REF_TEXT): + return f"[{ref_text}]({QQ_WEB_LINK}{src})" + + +def make_link_qs(src, ref_text=REF_TEXT): + return f"[{ref_text}]({QS_WEB_LINK}{src})" + + +def encode_uc(update: BaleUpdate) -> str: + if update.message: + user = update.message.from_user + chat = update.message.chat + + elif update.callback_query: + user = update.callback_query.from_user + chat = update.callback_query.message.chat + + else: + return "unknown" + + username = user.username or user.id + chat_id = chat.id # ✅ فقط chat_id + + return f"{username}:{chat_id}" + + +def decode_uc(uc_id: str) -> dict: + """ + ورودی: 'username:chat_id' یا 'user_id:chat_id' + خروجی: {'username': ..., 'chat_id': ...} + """ + + try: + username, chat_id = uc_id.split(":", 1) + + return (username, int(chat_id) if chat_id.isdigit() else chat_id) + + except ValueError: + raise ValueError(f"decode_uc") + + +async def get_from_gpl(in_dict: Dict) -> List[str]: + f_list = [] + _str = { + "analyze": "گزارش تحلیلی بندبه‌بند", + "strength": "بیان نقاط قوت", + "weakness": "بیان نقاط ضعف و ریسک‌های تقنینی", + "conclusion_score": "جمع‌بندی نهایی شامل میزان انطباق کلی (عالی / متوسط / ضعیف)", + "suggestions": "ارائه پیشنهادهای اصلاحی مشخص و عملی", + } + for k, v in in_dict.items(): + _title = _str[k] + _title = "*" + _title + "*" + f_list += [_title] + # f_list += ['\n'] + f_list += [v] + f_list += ["\n"] + + return ["\n".join(f_list)] + + +def cer(ref: str, hyp: str) -> float: + m, n = len(ref), len(hyp) + dp = list(range(n + 1)) + + for i in range(1, m + 1): + prev, dp[0] = dp[0], i + for j in range(1, n + 1): + cur = dp[j] + dp[j] = min( + dp[j] + 1, # deletion + dp[j - 1] + 1, # insertion + prev + (ref[i - 1] != hyp[j - 1]) # substitution + ) + prev = cur + + return (dp[n] / m) * 100 + +import nltk +from nltk.metrics import edit_distance + +def cer_ntlk(exist: str, new: str) -> float: + """ + این روش دقیق‌تر است، چون تعداد کاراکترهای اضافی یا کم در متن طولانی، + CER را به شکل اغراق‌آمیز کاهش نمی‌دهد، بلکه روی شباهت معنایی و واژه‌ای تمرکز می‌کند. + """ + # edit distance روی کلمات + return round(float(1 - edit_distance(new, exist) / len(exist)) * 100, 2) + + + +async def title_repeated( + qanontitle, search_range: int = 10, url=f"http://localhost:8010/v1/indices/qaqanon/search" +): + """ + - باید با سرویس از حاج آقا گرفته شود + Fetch similar titles from the custom Elasticsearch-like REST API. + """ + # "/majles/similar/title/qaqanon/0/10/none" + # headers["Authorization"]="GuestAccess" + headers = {"accept": "application/json", "Content-Type": "application/json"} + + body = { + "query": qanontitle, # + "from_": 0, + "size": search_range+10, + "track_total_hits": True, + } + + response = requests.request("POST", url, headers=headers, json=body, timeout=20) + + if response.status_code != 200: + print("ERROR:", response.status_code) + print(response.text) + else: + data = response.json() + ids = [] + # print('---------------------------------------> max_score', max_score) + # print(data["hits"]) + + for i in data["hits"]["hits"]: + title = i["_source"]["title"] + ids.append( + {"title": title, "id": i["_source"]["id"], "score" :cer_ntlk(exist=title, new=qanontitle)} + ) + + return sorted(ids, key=lambda x: x['score'], reverse=True)[:search_range] + + +def normalize_persian(text: str) -> str: + # حذف کنترل‌کاراکترها + text = "".join(ch for ch in text if unicodedata.category(ch)[0] != "C") + + # حذف فاصله بین حروف فارسی + text = re.sub(r"(?<=[آ-ی])\s+(?=[آ-ی])", "", text) + + # اصلاح فاصله قبل و بعد از علائم + text = re.sub(r"\s+([،؛:؟!])", r"\1", text) + text = re.sub(r"([،؛:؟!])\s*", r"\1 ", text) + + # فاصله‌های چندتایی + text = re.sub(r"\s{2,}", " ", text) + + return text.strip() + + +async def get_in_from_rule_making(_input): + + print(f"_input {_input}") + o_put = "گزاره های حقوقی زیر استخراج شد:\n" + for i, item in enumerate(_input, start=1): + o_put += f'{i}. {item["rule_content"]}\n' + + return o_put + + +async def get_in_from_title_repeated(data: List[Dict[str, str]]): + if len(data) == 0: + return ["هیچ عنوانی تکراری یافت نشد."] + + chunks = [] + current = "نزدیک‌ترین عناوین مشابه عنوان قانون موارد زیر می باشد::\n\n" + + for i, item in enumerate(data, start=1): + title = item.get("title", "").strip() + sec_id = item.get("id", "").strip() + score = item.get("score", "") + + if not title or not sec_id: + continue + + ref = make_link_qq(src=sec_id) + + # بلوک کامل: عنوان + لینک — هر دو در یک بلوک غیرقابل تقسیم + # block = f"{i}. {title}(وزن {score})\n{ref}\n" + block = f"{i}. {title}\n{ref}\n" + + # اگر اضافه کردن این بلوک باعث overflow شود → چانک قبلی را ذخیره و current را ریست کن + if len(current) + len(block) > 4000 and current.strip(): + chunks.append(current.rstrip()) + + current += block + + # ذخیره آخرین چانک + if current.strip(): + chunks.append(current.rstrip()) + + return chunks + + +async def get_multi_qs_advance_gp( + _inputs: List[RuleRelation], +): + chunks = ["لطفا یک شماره را جهت بررسی جزئی تر مغایرت انتخاب کنید: "] + buttons = [] + + for i, item in enumerate(_inputs, start=1): + unity = item.subject_unity + section_id = item.db_rule.section_id + if unity: + if unity.has_subject_unity != "no": + buttons.append( + [ + { + "text": f"{i} بررسی مورد ", + "callback_data": f"advanced_check_conflict_qsids:{section_id}", + } + ] + ) + + return chunks, buttons + + +async def get_form_gp_p1( + _inputs: Union[List[RuleRelation], List], +): + f_result = [] + if not _inputs or len(_inputs) <= 1: + f_result += ["قوانین مرتبط زیر از سیاستهای کلی نظام یافت شد : "] + for i, item in enumerate(_inputs, start=1): + link = make_link_qs(src=item.db_rule.section_id) + f_result += [f"{i}. {item.subject_unity.reasoning}. {link}"] + f_result += [ + "و بعد از بررسی گزاره های حقوقی هر یک با متن شما ، موضوعات مرتبط مستقیم یافت نشد" + ] + return ["\n".join(f_result)], [] + + # -------- 1. group by qanon_id / qanon_title + groups = defaultdict(list) + for item in _inputs: + key = item.db_rule.qanon_id or item.db_rule.qanon_title + groups[key].append(item) + + # -------- 2. build output per group + for qanon_key, items in groups.items(): + chunks = [] + buttons = [] + + qanon_title = items[0].db_rule.qanon_title or "قانون نامشخص" + + current = f"موضوعات مرتبط در قانون *{qanon_title}*:\n\n" + + for i, item in enumerate(items, start=1): + unity = item.subject_unity + link = make_link_qs(src=item.db_rule.section_id) + + lines = [] + + if unity: + if unity.has_subject_unity == "yes": + lines.append(f"{i}- " + unity.reasoning or "") + lines.append(link) + + elif unity.has_subject_unity == "yes_under_assumptions": + lines.append(f"{i}- " + unity.reasoning or "") + lines.append("مشروط به فرض زیر :") + lines.append("\t" + unity.required_assumptions or "") + lines.append(link) + + block = "\n".join(lines) + "\n\n" + + if len(current) + len(block) > MAX_LEN and current.strip(): + chunks.append(current.rstrip()) + current = "" + + current += block + + if current.strip(): + chunks.append(current.rstrip()) + + # -------- 3. one button per law + buttons.append( + [ + { + "text": f"بررسی وجود مغایرت", + "callback_data": f"advanced_check_conflict_qqids:{qanon_key}", + } + ] + ) + f_result.append([chunks, buttons]) + + return f_result + + +async def get_form_gp_old(_inputs: Union[List[RuleRelation], List]): + chunks = [] + _button = [] + + print(f"_inputs {_inputs}") + if len(_inputs) > 1: + current = "نتایج اولیه مغایرت های احتمالی :\n" + + for i, item in enumerate(_inputs, start=1): + + # ساخت لینک + _link = make_link_qs(src=item.db_rule.section_id) + + # ساخت بلوک متنی کامل مربوط به این item — بدون قطع شدن + lines = [f"{i}. {item.db_rule.qanon_title} \n{_link}"] + + unity = item.subject_unity + + print(f"unity.has_subject_unity {unity.has_subject_unity}") + _qs_title = item.db_rule.qanon_title + "-" + str(i) + if unity.has_subject_unity == "yes": + print(f"yes") + lines.append("توضیح:") + lines.append(unity.reasoning or "") + + elif unity.has_subject_unity == "yes_under_assumptions": + print(f"yes_under_assumptions") + lines.append("توضیح:") + lines.append(unity.reasoning or "") + lines.append("توضیحات بیشتر (فرضیات لازم):") + lines.append(unity.required_assumptions or "") + + block = "\n".join(lines) + "\n" + + if len(current) + len(block) > MAX_LEN and current.strip(): + # قبلی را ذخیره کن + chunks.append(current.rstrip()) + + current += block + + _button.append( + [{"text": f"بررسی {_qs_title}", "callback_data": f"not_yet"}] + ) + if current.strip(): + chunks.append(current.rstrip()) + + else: + chunks = ["هیچ مغایرتی یافت نشد."] + return chunks, _button + + +async def get_form_gp_advanced(_input: RuleRelation): + """ + ما در نظر میگیریم که subject_unity را داریم + """ + finall = ["نتیجه:\n"] + qs_id = _input.db_rule.section_id + button = [] + if _input.relation_identification: + pass + if _input.conflict_type_detection: + pass + if _input.conflict_detection: + print("conflict_detection----------------------------") + _end = "آیا میخواهید نتیجه بررسی تزاحم را ببینید؟" + if _input.conflict_detection.has_confict == True: + # finall.append( + # 'باهم تعارض دارند !' + # ) + button.append( + [ + { + "text": "بررسی نوع تعارض", + "callback_data": f"advanced_check_conflict_qsids:{qs_id}", + } + ] + ) + finall += ["توضیحات"] + finall += [_input.conflict_detection.explanation_of_conflict] + finall += [_end] + else: + # finall.append( + # 'باهم تعارض مستقیم ندارند' + # ) + finall += ["توضیحات"] + finall += [_input.conflict_detection.explanation_of_conflict] + finall = ["\n".join(finall)] + return finall, button + + if _input.subject_unity: + pass + # _input.subject_unity.has_subject_unity + # _input.subject_unity.required_assumptions + # _input.subject_unity.reasoning + + return _input.model_dump() + + +async def result_gp(text, url, effort="low") -> Dict: + + print( + f"text {type(text)}\n-> {text}", + ) + try: + async with httpx.AsyncClient(timeout=TIME_OUT) as client: + response = await client.post( + url, + json={ + "section_content": text, + "effort": "medium", + "mode_type": "bale", + }, + ) + response.raise_for_status() + response = response.json() + data = response.get("result", "❌ پاسخی دریافت نشد") + if isinstance(data, str): + return data + _output = [] + for item in data: + _output.append(RuleRelation.parse_obj(item)) + + # print('results_chat ',type(result)) + return _output + + except Exception as e: + print(f"❌ خطای RAG:\n{str(e)}") + return "❌ ارتباط با سرور قطع می‌باشد" + + +def extract_other_info(update: BaleUpdate) -> dict: + other_info = {} + + if update.message: + user = update.message.from_user + + elif update.callback_query: + user = update.callback_query.from_user + + else: + return other_info # خالی برگردان اگر هیچ‌کدام نبود + + # ایمن در برابر None + other_info["username"] = user.username or "" + other_info["first_name"] = user.first_name or "" + other_info["last_name"] = getattr(user, "last_name", "") or "" + + return other_info + + +def get_in_form(title: str, sections: list): + chunks = [] + current = f"برای پرسش: {title}\n\n" + + for i, data in enumerate(sections, start=1): + sec_text = data.get("content", "") + idx = data.get("id") + + # ساخت ref کامل + ref = make_link_qs(src=idx) + # متن کامل آیتم + block = f"{i}: {sec_text}\n{ref}\n\n" + + # اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید + if len(current) + len(block) > MAX_LEN: + chunks.append(current.rstrip()) + current = "" + + current += block + + # آخرین چانک را هم اضافه کن + if current.strip(): + chunks.append(current.rstrip()) + + return chunks + + +def form_search_in_law(title: str, sections: List) -> List: + chunks = [] + current = f"برای پرسش: {title}\n\n" + + for i, data in enumerate(sections, start=1): + sec_text = data.get("content", "") + idx = data.get("id") + + # ساخت ref کامل + ref = make_link_qs(src=idx) + # متن کامل آیتم + block = f"{i}: {sec_text}\n{ref}\n\n" + + # اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید + if len(current) + len(block) > MAX_LEN: + chunks.append(current.rstrip()) + current = "" + + current += block + + # آخرین چانک را هم اضافه کن + if current.strip(): + chunks.append(current.rstrip()) + + return chunks + + +def format_answer_bale(answer_text: str): + """ + answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد + sources: مثل ['qs2117427'] + """ + + # الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد + # مثلا: (qs123) یا (qs123, qs456, qs789) + pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)" + + def replace_source(m): + content = m.group(1) + codes = [c.strip() for c in content.split(",")] # جداسازی چند کد + links = [make_link_qs(src=code) for code in codes] + full_match = m.group(0) + # if "منبع" in full_match: + # print(f'Found explicit source(s): {links}') + # else: + # print(f'Found implicit source(s): {links}') + return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان + + # جایگزینی در متن + answer_text = re.sub(pattern, replace_source, answer_text) + + # اگر طول کمتر از MAX_LEN بود → تمام + if len(answer_text) <= MAX_LEN: + return [answer_text] + + # تقسیم متن اگر طول زیاد شد + chunks = [] + current = "" + + sentences = answer_text.split(". ") + for sentence in sentences: + st = sentence.strip() + if not st.endswith("."): + st += "." + + if len(current) + len(st) > MAX_LEN: + chunks.append(current.strip()) + current = "" + + current += st + " " + + if current.strip(): + chunks.append(current.strip()) + + return chunks + + +def form_answer_bale(answer_text: str): + """ + answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد + sources: مثل ['qs2117427'] + """ + + # الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد + # مثلا: (qs123) یا (qs123, qs456, qs789) + pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)" + + def replace_source(m): + content = m.group(1) + codes = [c.strip() for c in content.split(",")] # جداسازی چند کد + links = [make_link_qs(src=code) for code in codes] + full_match = m.group(0) + # if "منبع" in full_match: + # print(f'Found explicit source(s): {links}') + # else: + # print(f'Found implicit source(s): {links}') + return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان + + # جایگزینی در متن + answer_text = re.sub(pattern, replace_source, answer_text) + + # اگر طول کمتر از MAX_LEN بود → تمام + if len(answer_text) <= MAX_LEN: + return [answer_text] + + # تقسیم متن اگر طول زیاد شد + chunks = [] + current = "" + + sentences = answer_text.split(". ") + for sentence in sentences: + st = sentence.strip() + if not st.endswith("."): + st += "." + + if len(current) + len(st) > MAX_LEN: + chunks.append(current.strip()) + current = "" + + current += st + " " + + if current.strip(): + chunks.append(current.strip()) + + return chunks + + +def chunked_simple_text(answer_text): + chunks = [] + current = "" + + sentences = answer_text.split(". ") + for sentence in sentences: + st = sentence.strip() + if not st.endswith("."): + st += "." + + if len(current) + len(st) > MAX_LEN: + chunks.append(current.strip()) + current = "" + + current += st + " " + + if current.strip(): + chunks.append(current.strip()) + + return chunks + +