190 lines
6.7 KiB
Python
190 lines
6.7 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Hybrid Retrieval + Reranker Pipeline (Debuggable Version)
|
|
---------------------------------------------------------
|
|
این نسخه برای اجرا و دیباگ خط به خط (pdb یا IDE) آماده شده است.
|
|
- Dense retriever: sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2
|
|
- Sparse retriever: TF-IDF
|
|
- Fusion: weighted sum
|
|
- Reranker: BAAI/bge-reranker-v2-m3
|
|
|
|
نحوه اجرا در حالت دیباگر:
|
|
python -m pdb hybrid_retrieval_reranker_debug.py
|
|
"""
|
|
|
|
import json
|
|
import numpy as np
|
|
import torch
|
|
from typing import List, Tuple, Dict
|
|
from sentence_transformers import SentenceTransformer, util
|
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
|
|
# -------------------
|
|
# مدلها و مسیر داده
|
|
# -------------------
|
|
EMBED_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
|
|
RERANKER_MODEL = "BAAI/bge-reranker-v2-m3"
|
|
DATA_PATH = "./output-speechs/speech-sentences-vector.json"
|
|
|
|
|
|
def load_dataset(path: str) -> Tuple[List[str], np.ndarray]:
|
|
"""Load sentences and embeddings (float32)."""
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
raw = json.load(f)
|
|
|
|
# اگر فایل dict باشه → به list تبدیل میکنیم
|
|
if isinstance(raw, dict):
|
|
raw = list(raw.values())
|
|
|
|
sentences, emb_list = [], []
|
|
for it in raw:
|
|
sent = it.get("sentence")
|
|
emb = it.get("embeddings")
|
|
if sent and isinstance(emb, (list, tuple)):
|
|
sentences.append(sent)
|
|
emb_list.append(emb)
|
|
|
|
if not sentences:
|
|
raise ValueError("Dataset invalid. Needs 'sentence' + 'embeddings'.")
|
|
|
|
emb_matrix = np.asarray(emb_list, dtype=np.float32)
|
|
return sentences, emb_matrix
|
|
|
|
|
|
class HybridRetrieverReranker:
|
|
def __init__(self, sentences: List[str], emb_matrix: np.ndarray,
|
|
dense_alpha: float = 0.6, device: str = None):
|
|
|
|
if device is None:
|
|
device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
self.device = device
|
|
|
|
self.sentences = sentences
|
|
self.emb_matrix = emb_matrix
|
|
self.N = len(sentences)
|
|
|
|
# Dense
|
|
self.embedder = SentenceTransformer(EMBED_MODEL, device=self.device)
|
|
self.embeddings_tensor = torch.from_numpy(self.emb_matrix).to(self.device)
|
|
|
|
# Sparse
|
|
self.vectorizer = TfidfVectorizer(
|
|
analyzer="word",
|
|
ngram_range=(1, 2),
|
|
token_pattern=r"(?u)\b[\w\u0600-\u06FF]{2,}\b"
|
|
)
|
|
self.tfidf_matrix = self.vectorizer.fit_transform(self.sentences)
|
|
|
|
# Reranker
|
|
self.tokenizer = AutoTokenizer.from_pretrained(RERANKER_MODEL)
|
|
self.reranker = AutoModelForSequenceClassification.from_pretrained(
|
|
RERANKER_MODEL
|
|
).to(self.device)
|
|
|
|
self.dense_alpha = float(dense_alpha)
|
|
|
|
def dense_retrieve(self, query: str, top_k: int):
|
|
q_emb = self.embedder.encode(query, convert_to_tensor=True).to(self.device)
|
|
similars = util.cos_sim(q_emb, self.embeddings_tensor).squeeze(0)
|
|
top_scores, top_idx = torch.topk(similars, k=min(top_k, self.N))
|
|
return top_idx.tolist(), top_scores.detach().cpu().numpy()
|
|
|
|
def sparse_retrieve(self, query: str, top_k: int):
|
|
q_vec = self.vectorizer.transform([query])
|
|
sims = cosine_similarity(q_vec, self.tfidf_matrix).ravel()
|
|
idx = np.argpartition(-sims, kth=top_k-1)[:top_k]
|
|
idx = idx[np.argsort(-sims[idx])]
|
|
return idx.tolist(), sims[idx]
|
|
|
|
@staticmethod
|
|
def _minmax_norm(arr: np.ndarray) -> np.ndarray:
|
|
if arr.size == 0:
|
|
return arr
|
|
a_min, a_max = arr.min(), arr.max()
|
|
if a_max - a_min < 1e-12:
|
|
return np.zeros_like(arr)
|
|
return (arr - a_min) / (a_max - a_min)
|
|
|
|
def fuse(self, d_idx, d_scores, s_idx, s_scores, pre_rerank_k: int):
|
|
d_norm = self._minmax_norm(d_scores.astype(np.float32))
|
|
s_norm = self._minmax_norm(s_scores.astype(np.float32))
|
|
|
|
d_map = {i: d for i, d in zip(d_idx, d_norm)}
|
|
s_map = {i: s for i, s in zip(s_idx, s_norm)}
|
|
|
|
fused = []
|
|
for i in set(d_idx) | set(s_idx):
|
|
score = self.dense_alpha * d_map.get(i, 0.0) + (1-self.dense_alpha) * s_map.get(i, 0.0)
|
|
fused.append((i, score))
|
|
|
|
fused.sort(key=lambda x: x[1], reverse=True)
|
|
return [i for i, _ in fused[:pre_rerank_k]]
|
|
|
|
def rerank(self, query: str, candidate_indices: List[int], final_k: int):
|
|
pairs = [(query, self.sentences[i]) for i in candidate_indices]
|
|
scores = []
|
|
for batch in [pairs[i:i+16] for i in range(0, len(pairs), 16)]:
|
|
inputs = self.tokenizer(batch, padding=True, truncation=True,
|
|
max_length=512, return_tensors="pt").to(self.device)
|
|
with torch.no_grad():
|
|
logits = self.reranker(**inputs).logits.view(-1)
|
|
scores.extend(logits.cpu().tolist())
|
|
|
|
items = sorted(zip(candidate_indices, scores), key=lambda x: x[1], reverse=True)
|
|
return items[:final_k]
|
|
|
|
def search(self, query: str, topk_dense=50, topk_sparse=50,
|
|
pre_rerank_k=50, final_k=5):
|
|
d_idx, d_scores = self.dense_retrieve(query, topk_dense)
|
|
# import pdb; pdb.set_trace() # ← میتونی اینجا توقف کنی
|
|
|
|
s_idx, s_scores = self.sparse_retrieve(query, topk_sparse)
|
|
cand_idx = self.fuse(d_idx, d_scores, s_idx, s_scores, pre_rerank_k)
|
|
# import pdb; pdb.set_trace() # ← بعد از fusion توقف
|
|
|
|
reranked = self.rerank(query, cand_idx, final_k)
|
|
# import pdb; pdb.set_trace() # ← بعد از rerank توقف
|
|
|
|
return [{"idx": i, "sentence": self.sentences[i], "rerank_score": score}
|
|
for i, score in reranked]
|
|
|
|
|
|
def main():
|
|
query = "افراد کوتاه قد چه ویژگی هایی دارند؟"
|
|
sentences, emb_matrix = load_dataset(DATA_PATH)
|
|
|
|
pipe = HybridRetrieverReranker(sentences, emb_matrix, dense_alpha=0.6)
|
|
results = pipe.search(query, topk_dense=30, topk_sparse=30, pre_rerank_k=30, final_k=5)
|
|
|
|
print("\nTop results:")
|
|
for i, r in enumerate(results, 1):
|
|
print(f"{i}. [score={r['rerank_score']:.4f}] {r['sentence']}")
|
|
print("--"*100)
|
|
print("--"*100)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import datetime
|
|
start = datetime.datetime.now()
|
|
main()
|
|
time2 = datetime.datetime.now()
|
|
print(f'p1: {time2 - start}')
|
|
|
|
main()
|
|
time3 = datetime.datetime.now()
|
|
print(f'p2: {time3 - time2}')
|
|
|
|
main()
|
|
time4 = datetime.datetime.now()
|
|
print(f'p3: {time4 - time3}')
|
|
|
|
main()
|
|
time5 = datetime.datetime.now()
|
|
print(f'p4: {time5 - time4}')
|
|
|
|
pass
|