#!/usr/bin/env python # -*- coding: utf-8 -*- """ Hybrid Retrieval + Reranker Pipeline (Debuggable Version) --------------------------------------------------------- این نسخه برای اجرا و دیباگ خط به خط (pdb یا IDE) آماده شده است. - Dense retriever: sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2 - Sparse retriever: TF-IDF - Fusion: weighted sum - Reranker: BAAI/bge-reranker-v2-m3 نحوه اجرا در حالت دیباگر: python -m pdb hybrid_retrieval_reranker_debug.py """ import json import numpy as np import torch from typing import List, Tuple, Dict from sentence_transformers import SentenceTransformer, util from transformers import AutoTokenizer, AutoModelForSequenceClassification from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity # ------------------- # مدل‌ها و مسیر داده # ------------------- EMBED_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" RERANKER_MODEL = "BAAI/bge-reranker-v2-m3" DATA_PATH = "./output-speechs/speech-sentences-vector.json" def load_dataset(path: str) -> Tuple[List[str], np.ndarray]: """Load sentences and embeddings (float32).""" with open(path, "r", encoding="utf-8") as f: raw = json.load(f) # اگر فایل dict باشه → به list تبدیل می‌کنیم if isinstance(raw, dict): raw = list(raw.values()) sentences, emb_list = [], [] for it in raw: sent = it.get("sentence") emb = it.get("embeddings") if sent and isinstance(emb, (list, tuple)): sentences.append(sent) emb_list.append(emb) if not sentences: raise ValueError("Dataset invalid. Needs 'sentence' + 'embeddings'.") emb_matrix = np.asarray(emb_list, dtype=np.float32) return sentences, emb_matrix class HybridRetrieverReranker: def __init__(self, sentences: List[str], emb_matrix: np.ndarray, dense_alpha: float = 0.6, device: str = None): if device is None: device = "cuda" if torch.cuda.is_available() else "cpu" self.device = device self.sentences = sentences self.emb_matrix = emb_matrix self.N = len(sentences) # Dense self.embedder = SentenceTransformer(EMBED_MODEL, device=self.device) self.embeddings_tensor = torch.from_numpy(self.emb_matrix).to(self.device) # Sparse self.vectorizer = TfidfVectorizer( analyzer="word", ngram_range=(1, 2), token_pattern=r"(?u)\b[\w\u0600-\u06FF]{2,}\b" ) self.tfidf_matrix = self.vectorizer.fit_transform(self.sentences) # Reranker self.tokenizer = AutoTokenizer.from_pretrained(RERANKER_MODEL) self.reranker = AutoModelForSequenceClassification.from_pretrained( RERANKER_MODEL ).to(self.device) self.dense_alpha = float(dense_alpha) def dense_retrieve(self, query: str, top_k: int): q_emb = self.embedder.encode(query, convert_to_tensor=True).to(self.device) similars = util.cos_sim(q_emb, self.embeddings_tensor).squeeze(0) top_scores, top_idx = torch.topk(similars, k=min(top_k, self.N)) return top_idx.tolist(), top_scores.detach().cpu().numpy() def sparse_retrieve(self, query: str, top_k: int): q_vec = self.vectorizer.transform([query]) sims = cosine_similarity(q_vec, self.tfidf_matrix).ravel() idx = np.argpartition(-sims, kth=top_k-1)[:top_k] idx = idx[np.argsort(-sims[idx])] return idx.tolist(), sims[idx] @staticmethod def _minmax_norm(arr: np.ndarray) -> np.ndarray: if arr.size == 0: return arr a_min, a_max = arr.min(), arr.max() if a_max - a_min < 1e-12: return np.zeros_like(arr) return (arr - a_min) / (a_max - a_min) def fuse(self, d_idx, d_scores, s_idx, s_scores, pre_rerank_k: int): d_norm = self._minmax_norm(d_scores.astype(np.float32)) s_norm = self._minmax_norm(s_scores.astype(np.float32)) d_map = {i: d for i, d in zip(d_idx, d_norm)} s_map = {i: s for i, s in zip(s_idx, s_norm)} fused = [] for i in set(d_idx) | set(s_idx): score = self.dense_alpha * d_map.get(i, 0.0) + (1-self.dense_alpha) * s_map.get(i, 0.0) fused.append((i, score)) fused.sort(key=lambda x: x[1], reverse=True) return [i for i, _ in fused[:pre_rerank_k]] def rerank(self, query: str, candidate_indices: List[int], final_k: int): pairs = [(query, self.sentences[i]) for i in candidate_indices] scores = [] for batch in [pairs[i:i+16] for i in range(0, len(pairs), 16)]: inputs = self.tokenizer(batch, padding=True, truncation=True, max_length=512, return_tensors="pt").to(self.device) with torch.no_grad(): logits = self.reranker(**inputs).logits.view(-1) scores.extend(logits.cpu().tolist()) items = sorted(zip(candidate_indices, scores), key=lambda x: x[1], reverse=True) return items[:final_k] def search(self, query: str, topk_dense=50, topk_sparse=50, pre_rerank_k=50, final_k=5): d_idx, d_scores = self.dense_retrieve(query, topk_dense) # import pdb; pdb.set_trace() # ← می‌تونی اینجا توقف کنی s_idx, s_scores = self.sparse_retrieve(query, topk_sparse) cand_idx = self.fuse(d_idx, d_scores, s_idx, s_scores, pre_rerank_k) # import pdb; pdb.set_trace() # ← بعد از fusion توقف reranked = self.rerank(query, cand_idx, final_k) # import pdb; pdb.set_trace() # ← بعد از rerank توقف return [{"idx": i, "sentence": self.sentences[i], "rerank_score": score} for i, score in reranked] def main(): query = "افراد کوتاه قد چه ویژگی هایی دارند؟" sentences, emb_matrix = load_dataset(DATA_PATH) pipe = HybridRetrieverReranker(sentences, emb_matrix, dense_alpha=0.6) results = pipe.search(query, topk_dense=30, topk_sparse=30, pre_rerank_k=30, final_k=5) print("\nTop results:") for i, r in enumerate(results, 1): print(f"{i}. [score={r['rerank_score']:.4f}] {r['sentence']}") print("--"*100) print("--"*100) if __name__ == "__main__": import datetime start = datetime.datetime.now() main() time2 = datetime.datetime.now() print(f'p1: {time2 - start}') main() time3 = datetime.datetime.now() print(f'p2: {time3 - time2}') main() time4 = datetime.datetime.now() print(f'p3: {time4 - time3}') main() time5 = datetime.datetime.now() print(f'p4: {time5 - time4}') pass