import json import os import numpy as np import torch import faiss from typing import List, Tuple from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer, AutoModelForSequenceClassification from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import datetime import re import random from embedder_sbert_qavanin_285k import PersianVectorAnalyzer from normalizer import cleaning from fastapi import FastAPI from pydantic import BaseModel # LLM Libs from openai import OpenAI from langchain_openai import ChatOpenAI # pip install -U langchain_openai import requests today = f'{datetime.datetime.now().year}{datetime.datetime.now().month}{datetime.datetime.now().day}' chatbot = FastAPI() # ------------------- # مدل‌ها و مسیر داده # ------------------- EMBED_MODEL = "/home/sabr/MODLES/rag_chat/sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" RERANKER_MODEL = "/home/sabr/MODLES/rag_chat/BAAI/bge-reranker-v2-m3" FAISS_INDEX_PATH = "./qavanin-faiss/faiss_index_qavanin_285k.index" FAISS_METADATA_PATH = "./qavanin-faiss/faiss_index_qavanin_285k_metadata.json" RERANK_BATCH = int(os.environ.get("RERANK_BATCH", 256)) # print(f'RERANK_BATCH: {RERANK_BATCH}') def get_key(): key = 'aa-fdh9d847ANcBxQCBTZD5hrrAdl0UrPEnJOScYmOncrkagYPf' return key def load_faiss_index(index_path: str, metadata_path: str): """بارگذاری ایندکس FAISS و متادیتا (لیست جملات + عناوین).""" index = faiss.read_index(index_path) with open(metadata_path, "r", encoding="utf-8") as f: metadata = json.load(f) content_list, ids, prefix_list = [], [], [] for item in metadata: content_list.append(item["content"]) ids.append(item["id"]) prefix_list.append(item["prefix"]) return content_list, ids, prefix_list, index def get_client(): url = "https://api.avalai.ir/v1" # key = 'aa-4tvAEazUBovEN1i7i7tdl1PR93OaWXs6hMflR4oQbIIA4K7Z' client = OpenAI( api_key= get_key(), # با کلید واقعی خود جایگزین کنید base_url= url, # آدرس پایه ) return client def llm_request(query, model): if query == '': return 'لطفا متن سوال را وارد نمائید' client = get_client() determine_refrence = """شناسه هر ماده قانون در ابتدای آن و با فرمت "id: {idvalue}" آمده است که id-value همان شناسه ماده است. بازای هربخش از پاسخی که تولید می شود، ضروری است شناسه ماده ای که در تدوین پاسخ از آن استفاده شده در انتهای پاراگراف یا جمله مربوطه با فرمت {idvalue} اضافه شود. همیشه idvalue با رشته "qs" شروع می شود""" try: messages.append({"role": "user", "content": query}) messages.append({"role": "user", "content": determine_refrence}) response = client.chat.completions.create( messages = messages, model= model) # "gpt-4o", "gpt-4o-mini", "deepseek-chat" , "gemini-2.0-flash", gemini-2.5-flash-lite # gpt-4o : 500 # gpt-4o-mini : 34 # deepseek-chat: : 150 # gemini-2.0-flash : error # cf.gemma-3-12b-it : 1 # gemini-2.5-flash-lite : 35 خیلی خوب answer = response.choices[0].message.content # پاسخ را هم به سابقه اضافه می‌کنیم messages.append({"role": "assistant", "content": answer}) except Exception as error: with open('./llm-answer/error-in-llm.txt', mode='a+', encoding='utf-8') as file: error_message = f'\n\nquery: {query.strip()}\nerror:{error} \n-------------------------------\n' file.write(error_message) return 'با عرض پوزش؛ متاسفانه خطایی رخ داده است. لطفا لحظاتی دیگر دوباره تلاش نمائید' return answer class HybridRetrieverReranker: __slots__ = ( "device", "content_list", "ids", "prefix_list", "N", "embedder", "faiss_index", "vectorizer", "tfidf_matrix", "tokenizer", "reranker", "dense_alpha" ) def __init__(self, content_list: List[str],ids: List[str], prefix_list: List[str], faiss_index, dense_alpha: float = 0.6, device: str = None): if device is None: device = "cuda" if torch.cuda.is_available() else "cpu" self.device = device self.content_list = content_list self.ids = ids self.prefix_list = prefix_list self.faiss_index = faiss_index self.N = len(content_list) # Dense self.embedder = SentenceTransformer(EMBED_MODEL, device=self.device) # Sparse (مثل قبل برای حفظ خروجی) self.vectorizer = TfidfVectorizer( analyzer="word", ngram_range=(1, 2), token_pattern=r"(?u)\b[\w\u0600-\u06FF]{2,}\b", ) self.tfidf_matrix = self.vectorizer.fit_transform(self.content_list) # Reranker self.tokenizer = AutoTokenizer.from_pretrained(RERANKER_MODEL, use_fast=True) self.reranker = AutoModelForSequenceClassification.from_pretrained( RERANKER_MODEL ).to(self.device) # self.reranker = AutoModelForSeq2SeqLM.from_pretrained(RERANKER_MODEL).to(self.device) # self.reranker.eval() self.dense_alpha = float(dense_alpha) # --- Dense (FAISS) --- def dense_retrieve(self, query: str, top_k: int): if top_k <= 0: return [], np.array([], dtype=np.float32) q_emb = self.embedder.encode(query, convert_to_numpy=True).astype(np.float32) D, I = self.faiss_index.search(np.expand_dims(q_emb, axis=0), top_k) return I[0].tolist(), D[0] # --- Sparse --- def sparse_retrieve(self, query: str, top_k: int): if top_k <= 0: return [], np.array([], dtype=np.float32) k = min(top_k, self.N) q_vec = self.vectorizer.transform([query]) sims = cosine_similarity(q_vec, self.tfidf_matrix).ravel() idx = np.argpartition(-sims, kth=k-1)[:k] idx = idx[np.argsort(-sims[idx], kind="mergesort")] return idx.tolist(), sims[idx] # --- Utils --- @staticmethod def _minmax_norm(arr: np.ndarray) -> np.ndarray: if arr.size == 0: return arr a_min = arr.min() a_max = arr.max() rng = a_max - a_min if rng < 1e-12: return np.zeros_like(arr) return (arr - a_min) / rng def fuse(self, d_idx, d_scores, s_idx, s_scores, top_k=50, k_rrf=60): """ ادغام نتایج دو retriever (dense و sparse) با استفاده از Reciprocal Rank Fusion (RRF) Args: d_idx (list or np.ndarray): ایندکس‌های نتایج dense retriever d_scores (list or np.ndarray): نمرات dense retriever s_idx (list or np.ndarray): ایندکس‌های نتایج sparse retriever s_scores (list or np.ndarray): نمرات sparse retriever top_k (int): تعداد نتایج نهایی k_rrf (int): ثابت در فرمول RRF برای کاهش تأثیر رتبه‌های پایین‌تر Returns: list: لیست ایندکس‌های ادغام‌شده به ترتیب نمره """ combined = {} # dense retriever for rank, idx in enumerate(d_idx): score = 1.0 / (k_rrf + rank) combined[idx] = combined.get(idx, 0) + score # sparse retriever for rank, idx in enumerate(s_idx): score = 1.0 / (k_rrf + rank) combined[idx] = combined.get(idx, 0) + score # مرتب‌سازی نهایی sorted_items = sorted(combined.items(), key=lambda x: x[1], reverse=True) cand_idx = [item[0] for item in sorted_items[:top_k]] return cand_idx def rerank(self, query: str, candidate_indices: List[int], passages: List[str], final_k: int) -> List[Tuple[int, float]]: """ Rerank candidate passages using a cross-encoder (e.g., MonoT5, MiniLM, etc.). Args: query (str): پرسش کاربر candidate_indices (List[int]): ایندکس‌های کاندیدا (از retriever) passages (List[str]): کل جملات/پاراگراف‌ها final_k (int): تعداد نتایج نهایی Returns: List[Tuple[int, float]]: لیستی از (ایندکس، امتیاز) برای بهترین نتایج """ if final_k <= 0 or not candidate_indices: return [] # آماده‌سازی جفت‌های (query, passage) texts = [query] * len(candidate_indices) pairs = passages scores: List[float] = [] def _iter_batches(max_bs: int): bs = max_bs while bs >= 16: # حداقل batch_size try: with torch.inference_mode(): for start in range(0, len(pairs), bs): batch_texts = texts[start:start + bs] batch_pairs = pairs[start:start + bs] inputs = self.tokenizer( batch_texts, batch_pairs, padding=True, truncation=True, max_length=512, return_tensors="pt", ).to(self.device) logits = self.reranker(**inputs).logits.view(-1) scores.extend(logits.detach().cpu().tolist()) return True except torch.cuda.OutOfMemoryError: if torch.cuda.is_available(): torch.cuda.empty_cache() bs //= 2 return False # اجرای reranking success = _iter_batches(max_bs=64) if not success: raise RuntimeError("Reranker failed due to CUDA OOM, even with small batch size.") # مرتب‌سازی نتایج بر اساس نمره reranked = sorted( zip(candidate_indices, scores), key=lambda x: x[1], reverse=True )[:final_k] return reranked def get_passages(self, cand_idx, content_list): passages = [] for idx in cand_idx: passages.append(content_list[idx]) return passages # --- Search (بدون تغییر) --- def search(self, query: str, content_list, topk_dense=50, topk_sparse=50, pre_rerank_k=50, final_k=10): d_idx, d_scores = self.dense_retrieve(query, topk_dense) s_idx, s_scores = self.sparse_retrieve(query, topk_sparse) cand_idx = self.fuse(d_idx, d_scores, s_idx, s_scores, pre_rerank_k) passages = self.get_passages(cand_idx, content_list) reranked = self.rerank(query, cand_idx, passages, final_k) return [{"idx": i, "content": self.content_list[i],"prefix": self.prefix_list[i], "rerank_score": score} for i, score in reranked] def single_query(query: str): query = cleaning(query) retrived_sections = pipe.search(query, content_list, topk_dense=30, topk_sparse=30, pre_rerank_k=30, final_k=10) final_similars = '' for i, row in enumerate(retrived_sections, 1): id_value = '{' + str(ids[row['idx']]) + '}' result = f"id: {id_value} \n{row['prefix']} {row['content']}\n" final_similars += ''.join(result) return final_similars, retrived_sections def find_refrences(llm_answer: str) -> List[str]: """ شناسایی شناسه هایی که مدل زبانی، برای تهیه پاسخ از آنها استفاده کرده است Args: llm_answer(str): متنی که مدل زبانی تولید کرده است Returns: refrence_ids(List[str]): لیستی از شناسه های تشخیص داده شده """ pattern = r"\{[^\}]+\}" refrence_ids = re.findall(pattern, llm_answer) return refrence_ids def replace_refrences(llm_answer: str, refrences_list:List[str]) -> List[str]: """ شناسایی شناسه هایی که مدل زبانی، برای تهیه پاسخ از آنها استفاده کرده است Args: llm_answer(str): متنی که مدل زبانی تولید کرده است refrences_list(List[str]): لیست شناسه ماده های مورد استفاده در پاسخ مدل زبانی Returns: llm_answer(str), : متن بازسازی شده پاسخ مدل زبانی که شناسه ماده های مورد استفاده در آن، اصلاح شده است """ refrences = '' for index, ref in enumerate(refrences_list,1): # breakpoint() llm_answer = llm_answer.replace(ref, f'[{index}]') id = ref.lstrip('{') id = id.rstrip('}') refrences += ''.join(f'[{index}] https://majles.tavasi.ir/entity/detail/view/qsection/{id}\n') llm_answer = f'{llm_answer}\n\nمنابع پاسخ‌:\n{refrences.strip()}' return llm_answer # load basic items content_list, ids, prefix_list, faiss_index = load_faiss_index(FAISS_INDEX_PATH, FAISS_METADATA_PATH) pipe = HybridRetrieverReranker(content_list, ids, prefix_list, faiss_index, dense_alpha=0.6) # query preprocess and normalize normalizer_obj = PersianVectorAnalyzer() messages = [ {"role": "system", "content": "تو یک دستیار خبره در زمینه حقوق و قوانین مرتبط به آن هستی و می توانی متون حقوقی را به صورت دقیق توضیح بدهی . پاسخ ها باید الزاما به زبان فارسی باشد. پاسخ ها فقط از متون قانونی که در پرامپت وجود دارد استخراج شود."}, ] def run_chatbot(query:str, chat_id:str): if query == '': return 'لطفا متن سوال را وارد نمائید' start_time = (datetime.datetime.now()) result_passages_text, result_passages_ids = single_query(query) end_retrive = datetime.datetime.now() print('-'*40) retrive_duration = (end_retrive - start_time).total_seconds() print(f'retrive duration: {str(retrive_duration)}') prompt = f'برای پرسش "{query}" از میان مواد قانونی "{result_passages_text}" .پاسخ مناسب و دقیق را استخراج کن. درصورتی که مطلبی مرتبط با پرسش در متن پیدا نشد، فقط پاسخ بده: "متاسفانه در منابع، پاسخی پیدا نشد!"' try: model = "gemini-2.5-flash-lite" llm_answer = llm_request(prompt, model) except Exception as error: model = "gpt-4o-mini" llm_answer = llm_request(prompt, model) llm_answer_duration = (datetime.datetime.now() - end_retrive).total_seconds() print(f'llm answer duration: {str(llm_answer_duration)}') used_refrences_in_answer = find_refrences(llm_answer) llm_answer = replace_refrences(llm_answer, used_refrences_in_answer) full_prompt_duration = (datetime.datetime.now() - start_time).total_seconds() print(f'full prompt duration: {full_prompt_duration}') print('~'*40) chat_obj = { 'chat-id' : chat_id, # str 'chat-title' : '', # str 'user-id' : '', 'user-query' : query, # str 'model' : model, # str 'result-passages' : result_passages_text, # str 'retrived-passages-ids' : result_passages_ids, # list[obj] 'retrive-duration' : retrive_duration, # str 'llm-answer-duration' : llm_answer_duration, # str 'full-prompt-duration' : full_prompt_duration, # str 'chat-date' : str(start_time), # str 'used-refrences-in-answer' : used_refrences_in_answer, # list[str] 'llm-answer' : llm_answer, # str } # prev_chat_data = [] # with open('./llm-answer/chat-messages.json', mode='r', encoding='utf-8') as file: # prev_chat_data = json.load(file) # prev_chat_data.append(chat_obj) # with open('./llm-answer/chat-messages.json', mode='w', encoding='utf-8') as output: # json.dump(prev_chat_data, output, ensure_ascii=False, indent=2) return chat_obj @chatbot.post("/credit_refresh") def credit_refresh(): url = "https://api.avalai.ir/user/credit" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {get_key()}" } remained_credit = requests.get(url, headers=headers) with open('./llm-answer/credit.txt','w') as file: file.write(str(remained_credit.json()['remaining_irt'])) return str(remained_credit.json()['remaining_irt']) # تعریف مدل داده‌ها برای درخواست‌های API class Query(BaseModel): query: str date = str((datetime.datetime.now())).replace(' ','-').replace(':','').replace('.','-') chat_id = f'{date}-{random.randint(100000, 999999)}' print('#'*19) print('-Chatbot is Ready!-') print('#'*19) # مسیر API برای اجرا کردن run_chatbot @chatbot.post("/run_chatbot") def chat(query: Query): answer = run_chatbot(query.query, chat_id) credit_refresh() return {"answer": answer} # uvicorn src.app:app --reload if __name__ == "__main__": # query = 'در قانون حمایت از خانواده و جوانی جمعیت چه خدماتی در نظر گرفته شده است؟' while True: query = input('enter your qustion:') if query == '': print('لطفا متن سوال را وارد نمائید') continue start = (datetime.datetime.now()) # result = test_dataset() result = single_query(query) end_retrive = datetime.datetime.now() print('-'*40) print(f'retrive duration: {(end_retrive - start).total_seconds()}') prompt = f'برای پرسش "{query}" از میان مواد قانونی "{result}" .پاسخ مناسب و دقیق را استخراج کن. درصورتی که مطلبی مرتبط با پرسش در متن پیدا نشد، فقط پاسخ بده: "متاسفانه در منابع، پاسخی پیدا نشد!"' llm_answer = llm_request(prompt) print('-'*40) print(f'llm duration: {(datetime.datetime.now() - end_retrive).total_seconds()}') refrences = '' recognized_refrences = find_refrences(llm_answer) llm_answer = replace_refrences(llm_answer, recognized_refrences) with open('./llm-answer/result.txt', mode='a+', encoding='utf-8') as file: result_message = f'متن پرامپت: {query.strip()}\n\nپاسخ: {llm_answer} \n----------------------------------------------------------\n' file.write(result_message) with open('./llm-answer/passages.txt', mode='a+', encoding='utf-8') as file: result_message = f'متن پرامپت: {query.strip()}\n\مواد مشابه: {result} \n----------------------------------------------------------\n' file.write(result_message) print('----------------------------------------------------------') print(f'full duration: {(datetime.datetime.now() - start).total_seconds()}') print('----------------------------------------------------------') print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')