add embedder and advanced crawler for speechs

This commit is contained in:
init_mahdi 2025-08-19 15:35:41 +03:30
parent 7e456568e5
commit 9ee755bb39
12 changed files with 301682 additions and 1844599 deletions

3
.gitignore vendored
View File

@ -4,6 +4,8 @@ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class
.gitignore
# C extensions # C extensions
*.so *.so
@ -160,3 +162,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
caches

41
README-hybrid-pipeline.md Normal file
View File

@ -0,0 +1,41 @@
# Hybrid Retrieval + Reranker (Persian-friendly)
این پایپ‌لاین یک روش هیبریدی برای بازیابی و بازرتبه‌بندی نتایج است:
- Dense Retrieval با `sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2`
- Sparse Retrieval با TF-IDF (الگوی توکن‌سازی سازگار با فارسی)
- Fusion بر اساس وزن‌دهی امتیازها (`--dense-alpha`)
- Re-ranking با Cross-Encoder: `BAAI/bge-reranker-v2-m3`
## دادهٔ ورودی
فایل `./data/sentences_vector.json` با ساختار زیر:
```json
[
{"sentence": "متن جمله ۱", "embedding": [0.1, -0.2, ...]},
{"sentence": "متن جمله ۲", "embedding": [0.05, 0.33, ...]}
]
```
## نصب
```bash
pip install -U sentence-transformers transformers scikit-learn numpy torch
```
## اجرا
```bash
python hybrid_retrieval_reranker.py --query "بهترین راه موفقیت چیست؟" --topk-dense 50 --topk-sparse 50 --pre-rerank-k 50 --final-k 5
```
### آرگومان‌ها
- `--query` : پرسش کاربر
- `--data` : مسیر فایل داده (پیش‌فرض: `./data/sentences_vector.json`)
- `--topk-dense` ، `--topk-sparse` : تعداد نتایج اولیه از هر روش
- `--pre-rerank-k` : تعداد کاندیدهای ورودی به ریرنکر
- `--final-k` : تعداد نتایج نهایی
- `--dense-alpha` : وزن امتیاز Dense در مرحله Fusion (۰ تا ۱)
- `--device` : `cuda` یا `cpu` (در صورت عدم تعیین، خودکار)
## خروجی
نتایج در ترمینال چاپ می‌شود. برای ذخیرهٔ JSON:
```bash
python hybrid_retrieval_reranker.py --query "..." --save-json output.json
```

199
crawler.py Normal file
View File

@ -0,0 +1,199 @@
import requests
from bs4 import BeautifulSoup
import json
import time
import os
def crawl_wisdoms():
data = []
base_url = "http://nahj.makarem.ir/wisdom/{}"
failed = []
for page in range(958, 1450):
url = base_url.format(page)
try:
print(f'try page {page} cralwing ...')
response = requests.get(url, timeout=10)
if response.status_code != 200:
print(f'page {page} response error ...')
with open('./data/failed-pages.txt', 'a+', encoding='utf-8') as f:
f.write(f'{page}\n')
time.sleep(2)
continue
soup = BeautifulSoup(response.text, "html.parser")
# عنوان حکمت
title_tag = soup.find("h2", class_="card-title py-4")
title = title_tag.get_text(strip=True) if title_tag else ""
# متن عربی
arabic_tag = soup.find("p", class_="card-text arabic-text")
arabic_text = arabic_tag.get_text(" ", strip=True) if arabic_tag else ""
# ترجمه فارسی
persian_tag = soup.find("p", class_="card-text translate-text")
persian_translate = persian_tag.get_text(" ", strip=True) if persian_tag else ""
# تفسیر فارسی
interpretation_tag = soup.find("div", style=lambda s: s and "font-size:14pt;" in s)
interpretation = interpretation_tag.get_text(" ", strip=True) if interpretation_tag else ""
# ذخیره داده‌ها
if any([title, arabic_text, persian_translate, interpretation]):
if not title.__contains__('حکمت'):
is_subpart = True
else:
is_subpart = False
data.append({
"title": title,
"url": url,
"arabic_text": arabic_text,
"persian_translate": persian_translate,
"Interpretation": interpretation,
"is-subpart": is_subpart
})
time.sleep(1) # جلوگیری از فشار زیاد به سرور
except Exception as e:
print(f'error in crawling page: {page} . error : {e}')
continue
# ذخیره در فایل JSON
output_file = "./data/wisdom_data.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
OUTPUT_FILE = "./output/speechs.json"
FAILED_FILE = "./data/failed-speech-pages.txt"
def crawl_speech_page(page):
"""کراول یک خطبه بر اساس شماره صفحه و برگرداندن لیست بخش‌ها"""
url = f"http://nahj.makarem.ir/speech/{page}"
response = requests.get(url, timeout=10)
if response.status_code != 200:
raise Exception(f"status code {response.status_code}")
soup = BeautifulSoup(response.text, "html.parser")
# عنوان خطبه
title_tag = soup.find("h2", class_="text-center phrase-title")
speech_title = title_tag.get_text(strip=True) if title_tag else ""
# لیست بخش‌ها
parts = []
for idx, part_div in enumerate(soup.find_all("div", class_="phrase-text-container"), start=1):
# متن عربی
arabic_tag = part_div.find("p", class_="arabic-text show-off")
arabic_text = arabic_tag.get_text(" ", strip=True) if arabic_tag else ""
# ترجمه فارسی
persian_tag = part_div.find("p", class_="translate-text")
persian_translate = persian_tag.get_text(" ", strip=True) if persian_tag else ""
# لینک تفسیر
interpretation_link = ""
ul_tag = part_div.find("ul", class_="tools")
if ul_tag:
first_li = ul_tag.find("li")
if first_li:
a_tag = first_li.find("a")
if a_tag and a_tag.has_attr("href"):
interpretation_link = a_tag["href"]
parts.append({
"speech_title": speech_title,
"part_order": idx,
"url": url,
"arabic_text": arabic_text,
"persian_translate": persian_translate,
"Interpretation_link": interpretation_link
})
return parts
def save_failed(pages):
with open(FAILED_FILE, "w", encoding="utf-8") as f:
for p in pages:
f.write(f"{p}\n")
def load_failed():
if not os.path.exists(FAILED_FILE):
return []
with open(FAILED_FILE, "r", encoding="utf-8") as f:
return [int(line.strip()) for line in f if line.strip().isdigit()]
def load_existing_data():
if not os.path.exists(OUTPUT_FILE):
return []
with open(OUTPUT_FILE, "r", encoding="utf-8") as f:
return json.load(f)
def save_data(data):
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
def crawl_all_speeches(start=1, end=758):
all_data = load_existing_data()
failed = []
for page in range(start, end + 1):
try:
print(f"Trying speech {page} ...")
parts = crawl_speech_page(page)
all_data.extend(parts)
time.sleep(1)
except Exception as e:
print(f"❌ Failed speech {page} | error: {e}")
failed.append(page)
save_data(all_data)
save_failed(failed)
def retry_until_success(start=1, end=758):
"""تا وقتی که هیچ صفحه‌ای ناموفق نباشد تکرار می‌کند"""
crawl_all_speeches(start, end)
while True:
failed_pages = load_failed()
if not failed_pages:
print("✅ All speeches crawled successfully.")
break
print(f"🔄 Retrying {len(failed_pages)} failed pages ...")
failed = []
all_data = load_existing_data()
for page in failed_pages:
try:
print(f"Retry speech {page} ...")
parts = crawl_speech_page(page)
all_data.extend(parts)
time.sleep(1)
except Exception as e:
print(f"❌ Still failed {page} | error: {e}")
failed.append(page)
save_data(all_data)
save_failed(failed)
if not failed:
print("✅ Finished. No failed pages remain.")
break
if __name__ == "__main__":
retry_until_success(1, 758)

View File

@ -136,15 +136,22 @@ class PersianVectorAnalyzer:
# Extract sentences from different possible keys # Extract sentences from different possible keys
for key in ['persian_translate']: for key in ['persian_translate']:
if key in item and item[key]: if key in item and item[key]:
sentences.append(str(item[key])) splited_sentences = str(item[key]).split('.')
for sent in splited_sentences:
sentences.append(sent)
elif isinstance(item, str): elif isinstance(item, str):
sentences.append(item) splited_sentences = str(item).split('.')
for sent in splited_sentences:
sentences.append(sent)
elif isinstance(data, dict): elif isinstance(data, dict):
# If it's a single object, extract all string values # If it's a single object, extract all string values
for value in data.values(): for value in data.values():
if isinstance(value, str): if isinstance(value, str):
sentences.append(value) splited_sentences = str(value).split('.')
for sent in splited_sentences:
sentences.append(sent)
sentences = [senten for senten in sentences if senten]
logger.info(f"Loaded {len(sentences)} sentences") logger.info(f"Loaded {len(sentences)} sentences")
return sentences return sentences
@ -229,31 +236,35 @@ class PersianVectorAnalyzer:
logger.info(f"Found {len(unique_words)} unique words from {len(words)} total words") logger.info(f"Found {len(unique_words)} unique words from {len(words)} total words")
return unique_words return unique_words
def compute_word_vectors(self, words: List[str]) -> Dict[str, List[float]]: def compute_word_vectors(self, sentences: List[str]) -> Dict[str, List[float]]:
""" """
Compute vector representations for words. Compute vector representations for words.
Args: Args:
words: List of unique words sentences: List of unique sentences
Returns: Returns:
Dictionary mapping words to their vector representations Dictionary mapping sentences to their vector representations
""" """
if self.model is None: if self.model is None:
self.load_model() self.load_model()
logger.info(f"Computing vectors for {len(words)} words...") logger.info(f"Computing vectors for {len(sentences)} sentences...")
# Compute embeddings # Compute embeddings
embeddings = self.model.encode(words, show_progress_bar=True) embeddings = self.model.encode(sentences, show_progress_bar=True)
# Create dictionary # Create dictionary
word_vectors = {} sentences_vectors = {}
for i, word in enumerate(words): for i, sent in enumerate(sentences):
word_vectors[word] = embeddings[i].tolist() sentences_vectors[f'sentence-{i+1}'] = {
'sentence': sent,
'embeddings': embeddings[i].tolist()
}
print(f'sentence {i} embedded!')
logger.info("Word vectors computed successfully!") logger.info("Word vectors computed successfully!")
return word_vectors return sentences_vectors
def find_closest_words(self, word_vectors: Dict[str, List[float]], def find_closest_words(self, word_vectors: Dict[str, List[float]],
key_words: List[str], top_k: int = 20) -> Dict[str, List[str]]: key_words: List[str], top_k: int = 20) -> Dict[str, List[str]]:
@ -465,32 +476,32 @@ class PersianVectorAnalyzer:
sentences = self.load_json_data(input_file) sentences = self.load_json_data(input_file)
# Step 2: Extract words # Step 2: Extract words
all_words = self.extract_words(sentences) # all_words = self.extract_words(sentences)
# Step 3: Remove stop words # Step 3: Remove stop words
# filtered_words = self.remove_stop_words(all_words) # filtered_words = self.remove_stop_words(all_words)
filtered_words = all_words # filtered_words = all_words
# Step 4: Get unique words # Step 4: Get unique words
unique_words = self.get_unique_words(filtered_words) # unique_words = self.get_unique_words(filtered_words)
# Step 5: Compute word vectors # Step 5: Compute word vectors
word_vectors = self.compute_word_vectors(unique_words) sentences_vectors = self.compute_word_vectors(sentences)
# Step 6: Save word vectors # Step 6: Save word vectors
self.save_json(word_vectors, f"{output_dir}/words_vector.json") self.save_json(sentences_vectors, f"{output_dir}/sentences_vector.json")
# Step 7: Find closest words to key words # Step 7: Find closest words to key words
selected_words = self.find_closest_words(word_vectors, self.key_words) # selected_words = self.find_closest_words(word_vectors, self.key_words)
# Step 8: Save selected words # Step 8: Save selected words
self.save_json(selected_words, f"{output_dir}/selected_words.json") # self.save_json(selected_words, f"{output_dir}/selected_words.json")
# Step 9: Reduce to 3D # Step 9: Reduce to 3D
word_vectors_3d = self.reduce_to_3d(word_vectors, method='tsne') # word_vectors_3d = self.reduce_to_3d(word_vectors, method='tsne')
# Step 10: Save 3D vectors # Step 10: Save 3D vectors
self.save_json(word_vectors_3d, f"{output_dir}/words_vector_3d.json") # self.save_json(word_vectors_3d, f"{output_dir}/words_vector_3d.json")
# Step 11: Create visualization # Step 11: Create visualization
# self.create_3d_visualization(word_vectors_3d, selected_words, # self.create_3d_visualization(word_vectors_3d, selected_words,
@ -503,10 +514,10 @@ class PersianVectorAnalyzer:
print("PIPELINE SUMMARY") print("PIPELINE SUMMARY")
print("="*50) print("="*50)
print(f"Input sentences: {len(sentences)}") print(f"Input sentences: {len(sentences)}")
print(f"Total words extracted: {len(all_words)}") # print(f"Total words extracted: {len(all_words)}")
print(f"Unique words after preprocessing: {len(unique_words)}") # print(f"Unique words after preprocessing: {len(unique_words)}")
print(f"Word vectors computed: {len(word_vectors)}") # print(f"Word vectors computed: {len(word_vectors)}")
print(f"Key words processed: {len(self.key_words)}") # print(f"Key words processed: {len(self.key_words)}")
print(f"Output files saved to: {output_dir}/") print(f"Output files saved to: {output_dir}/")
print("="*50) print("="*50)

View File

@ -0,0 +1,187 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Hybrid Retrieval + Reranker Pipeline (Debuggable Version)
---------------------------------------------------------
این نسخه برای اجرا و دیباگ خط به خط (pdb یا IDE) آماده شده است.
- Dense retriever: sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2
- Sparse retriever: TF-IDF
- Fusion: weighted sum
- Reranker: BAAI/bge-reranker-v2-m3
نحوه اجرا در حالت دیباگر:
python -m pdb hybrid_retrieval_reranker_debug.py
"""
import json
import numpy as np
import torch
from typing import List, Tuple, Dict
from sentence_transformers import SentenceTransformer, util
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
# -------------------
# مدل‌ها و مسیر داده
# -------------------
EMBED_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
RERANKER_MODEL = "BAAI/bge-reranker-v2-m3"
DATA_PATH = "./output/sentences_vector.json"
def load_dataset(path: str) -> Tuple[List[str], np.ndarray]:
"""Load sentences and embeddings (float32)."""
with open(path, "r", encoding="utf-8") as f:
raw = json.load(f)
# اگر فایل dict باشه → به list تبدیل می‌کنیم
if isinstance(raw, dict):
raw = list(raw.values())
sentences, emb_list = [], []
for it in raw:
sent = it.get("sentence")
emb = it.get("embeddings")
if sent and isinstance(emb, (list, tuple)):
sentences.append(sent)
emb_list.append(emb)
if not sentences:
raise ValueError("Dataset invalid. Needs 'sentence' + 'embeddings'.")
emb_matrix = np.asarray(emb_list, dtype=np.float32)
return sentences, emb_matrix
class HybridRetrieverReranker:
def __init__(self, sentences: List[str], emb_matrix: np.ndarray,
dense_alpha: float = 0.6, device: str = None):
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = device
self.sentences = sentences
self.emb_matrix = emb_matrix
self.N = len(sentences)
# Dense
self.embedder = SentenceTransformer(EMBED_MODEL, device=self.device)
self.embeddings_tensor = torch.from_numpy(self.emb_matrix).to(self.device)
# Sparse
self.vectorizer = TfidfVectorizer(
analyzer="word",
ngram_range=(1, 2),
token_pattern=r"(?u)\b[\w\u0600-\u06FF]{2,}\b"
)
self.tfidf_matrix = self.vectorizer.fit_transform(self.sentences)
# Reranker
self.tokenizer = AutoTokenizer.from_pretrained(RERANKER_MODEL)
self.reranker = AutoModelForSequenceClassification.from_pretrained(
RERANKER_MODEL
).to(self.device)
self.dense_alpha = float(dense_alpha)
def dense_retrieve(self, query: str, top_k: int):
q_emb = self.embedder.encode(query, convert_to_tensor=True).to(self.device)
similars = util.cos_sim(q_emb, self.embeddings_tensor).squeeze(0)
top_scores, top_idx = torch.topk(similars, k=min(top_k, self.N))
return top_idx.tolist(), top_scores.detach().cpu().numpy()
def sparse_retrieve(self, query: str, top_k: int):
q_vec = self.vectorizer.transform([query])
sims = cosine_similarity(q_vec, self.tfidf_matrix).ravel()
idx = np.argpartition(-sims, kth=top_k-1)[:top_k]
idx = idx[np.argsort(-sims[idx])]
return idx.tolist(), sims[idx]
@staticmethod
def _minmax_norm(arr: np.ndarray) -> np.ndarray:
if arr.size == 0:
return arr
a_min, a_max = arr.min(), arr.max()
if a_max - a_min < 1e-12:
return np.zeros_like(arr)
return (arr - a_min) / (a_max - a_min)
def fuse(self, d_idx, d_scores, s_idx, s_scores, pre_rerank_k: int):
d_norm = self._minmax_norm(d_scores.astype(np.float32))
s_norm = self._minmax_norm(s_scores.astype(np.float32))
d_map = {i: d for i, d in zip(d_idx, d_norm)}
s_map = {i: s for i, s in zip(s_idx, s_norm)}
fused = []
for i in set(d_idx) | set(s_idx):
score = self.dense_alpha * d_map.get(i, 0.0) + (1-self.dense_alpha) * s_map.get(i, 0.0)
fused.append((i, score))
fused.sort(key=lambda x: x[1], reverse=True)
return [i for i, _ in fused[:pre_rerank_k]]
def rerank(self, query: str, candidate_indices: List[int], final_k: int):
pairs = [(query, self.sentences[i]) for i in candidate_indices]
scores = []
for batch in [pairs[i:i+16] for i in range(0, len(pairs), 16)]:
inputs = self.tokenizer(batch, padding=True, truncation=True,
max_length=512, return_tensors="pt").to(self.device)
with torch.no_grad():
logits = self.reranker(**inputs).logits.view(-1)
scores.extend(logits.cpu().tolist())
items = sorted(zip(candidate_indices, scores), key=lambda x: x[1], reverse=True)
return items[:final_k]
def search(self, query: str, topk_dense=50, topk_sparse=50,
pre_rerank_k=50, final_k=5):
d_idx, d_scores = self.dense_retrieve(query, topk_dense)
# import pdb; pdb.set_trace() # ← می‌تونی اینجا توقف کنی
s_idx, s_scores = self.sparse_retrieve(query, topk_sparse)
cand_idx = self.fuse(d_idx, d_scores, s_idx, s_scores, pre_rerank_k)
# import pdb; pdb.set_trace() # ← بعد از fusion توقف
reranked = self.rerank(query, cand_idx, final_k)
# import pdb; pdb.set_trace() # ← بعد از rerank توقف
return [{"idx": i, "sentence": self.sentences[i], "rerank_score": score}
for i, score in reranked]
def main():
query = "انسان در فتنه ها باید چگونه عملی کند؟"
sentences, emb_matrix = load_dataset(DATA_PATH)
pipe = HybridRetrieverReranker(sentences, emb_matrix, dense_alpha=0.6)
results = pipe.search(query, topk_dense=30, topk_sparse=30, pre_rerank_k=30, final_k=5)
print("\nTop results:")
for i, r in enumerate(results, 1):
print(f"{i}. [score={r['rerank_score']:.4f}] {r['sentence']}")
if __name__ == "__main__":
import datetime
start = datetime.datetime.now()
main()
time2 = datetime.datetime.now()
print(time2 - start)
main()
time3 = datetime.datetime.now()
print(time3 - time2)
main()
time4 = datetime.datetime.now()
print(time4 - time3)
main()
time5 = datetime.datetime.now()
print(time5 - time4)
pass

View File

@ -1,223 +0,0 @@
{
"خدا": [
"بالله",
"خدای",
"خداوند",
"خدایی",
"الله",
"خدایا",
"الهی",
"لله",
"آله",
"خداییم",
"الرب",
"خداوندا",
"خدایش",
"حضرت",
"یاسر",
"آیه",
"بهشتش",
"تعالی",
"باطنم",
"وعید"
],
"بنده": [
"مالک",
"پیشگاه",
"قربانگاه",
"فرمانروایی",
"کوچ",
"مالکی",
"قربانگاههای",
"خزانهدار",
"پیشوای",
"جانشین",
"همنشین",
"مأمور",
"مستولی",
"منکرات",
"بندهاش",
"اختیار",
"منکری",
"حاکم",
"عبد",
"زمامداران"
],
"جهاد": [
"مجاهد",
"اسلام",
"مسلم",
"شامیان",
"علیهالسلام",
"مسلمانان",
"قرآن",
"طلبان",
"صلیالله",
"عبیدالله",
"امان",
"عبدالله",
"شامی",
"خلافت",
"پیغمبر",
"مسلمین",
"سپاه",
"سید",
"علی",
"پیامبر"
],
"ولی": [
"اما",
"مگر",
"وإنما",
"إلا",
"اگرچه",
"برخلاف",
"خلافی",
"درحالیکه",
"بلکه",
"إلیها",
"غیرش",
"لان",
"وگرنه",
"بخلاف",
"ورزند",
"چنانچه",
"وگروه",
"بس",
"وبالش",
"واگر"
],
"زکات": [
"گلوگاه",
"غنائمی",
"مینگرند",
"غبن",
"دراز",
"نزند",
"میافکند",
"گرچه",
"زبیر",
"تابی",
"طغیان",
"بلاغت",
"توفیق",
"ضبائی",
"قیمة",
"فریفتند",
"آمیز",
"پوشی",
"طویلة",
"سوگشان"
],
"نماز": [
"دعا",
"صلوات",
"دعای",
"دعایی",
"عبادت",
"مومنان",
"مؤمنان",
"ایمانی",
"مؤمنی",
"مؤمن",
"مومن",
"برکت",
"ایمان",
"المؤمنین",
"ایمانش",
"رحمت",
"مؤمنانم",
"دینی",
"ایمانتان",
"معنوی"
],
"صبر": [
"انتظار",
"یصبر",
"لایصبر",
"صبور",
"پروا",
"متکبر",
"تعویذ",
"دعائم",
"سکونت",
"رکاب",
"إرواد",
"ماند",
"پرخوری",
"دنبال",
"استهزاء",
"میپیچید",
"دوشید",
"بیندیشید",
"تقوای",
"نفرماید"
],
"عبادت": [
"دعایی",
"دعای",
"صلوات",
"نماز",
"دعا",
"خدای",
"مومن",
"خداوند",
"بالله",
"خدا",
"برکت",
"مؤمنانم",
"الهی",
"خدایا",
"الرب",
"لله",
"آله",
"ایمانی",
"الله",
"خدایی"
],
"ولایت": [],
"خلافت": [
"سپاه",
"حاکم",
"امت",
"فرمانروایی",
"لشکر",
"قصار",
"امان",
"برترند",
"نهاد",
"زمامداران",
"وحکمة",
"ستمگری",
"الإبل",
"بالإبل",
"مسلط",
"سرکش",
"اختیار",
"امانی",
"مأموریت",
"حکومتی"
],
"پیامبر": [
"پیغمبر",
"پیامبرش",
"پیامبران",
"پیامبرتان",
"قرآن",
"رسولالله",
"مجاهد",
"عبیدالله",
"الله",
"مسلم",
"ربانی",
"اسلام",
"خدای",
"ایمانی",
"یاسر",
"شهید",
"خدایی",
"بالله",
"صلیالله",
"خدا"
]
}

301088
output/sentences_vector.json Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

27
output2.json Normal file
View File

@ -0,0 +1,27 @@
[
{
"idx": 0,
"sentence": "امام (عليه السلام) فرمود: در فتنه ها همچون شتر کم سن و سال باش؛ نه پشت او قوى شده که سوارش شوند نه پستانى دارد که بدوشند!",
"rerank_score": -0.3798098564147949
},
{
"idx": 592,
"sentence": "امام (عليه السلام) فرمود: روزگارى بر مردم فرا مى رسد که در ميان آن ها از قرآن چيزى جز خطوطش و از اسلام جز نامش باقى نخواهد ماند، مساجد آن ها در آن زمان از جهت بنا آباد و محکم ولى ازجهت هدايت خراب و ويران است، ساکنان آن مساجد و آبادکنندگانش بدترين مردم روى زمين اند، فتنه و فساد از آن ها برمى خيزد و خطاها به آن ها بازمى گردد، آن کس که در آن زمان از فتنه ها کناره گيرى کند او را (به اجبار) به آن بازمى گردانند و هر کسى که از آن جا وامانده است به سوى آن سوقش مى دهند، خداوند سبحان مى فرمايد: به ذاتم سوگند خورده ام فتنه اى بر آنان مى فرستم که\nعاقل بردبار در آن حيران بماند (سپس امام (عليه السلام) افزود:) و هم اکنون اين کار انجام شده و ما از خداوند خواستاريم که از لغزش ها و غفلت هاى ما درگذرد",
"rerank_score": -3.0706212520599365
},
{
"idx": 632,
"sentence": "امام (عليه السلام) فرمود: انسان باايمان ساعات شبانه روز خود را به سه بخش تقسيم مى کند: قسمتى را صرف مناجات با پروردگارش مى کند و قسمت ديگرى را براى ترميم معاش و کسب و کار زندگى قرار مى دهد و قسمت سوم را براى بهره گيرى از لذات حلال و دلپسند مى گذارد و سزاوار نيست که انسان عاقل حرکتش جز در سه چيز باشد: مرمت معاش، گامى در راه معاد و لذت در غير حرام",
"rerank_score": -3.6406915187835693
},
{
"idx": 463,
"sentence": " (به دليل نهايت تواضع، بر حسب ظاهر) انسان ضعيفى بود و مردم نيز او را ضعيف مى شمردند؛ ولى هرگاه کار مهم و جدى به ميان مى آمد همچون شير بيشه مى خروشيد و مانند مار بيابانى به حرکت درمى آمد",
"rerank_score": -4.289053916931152
},
{
"idx": 41,
"sentence": " کسى که درست بينديشد به اعماق دانش آگاهى پيدا مى کند و کسى که به عمق علم و دانش برسد از سرچشمه احکام سيراب بازمى گردد و آن کس که حلم و بردبارى پيشه کند گرفتار تفريط و کوتاهى در امور خود نمى شود و در ميان مردم با آبرومندى زندگى خواهد کرد",
"rerank_score": -5.043278694152832
}
]

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
sentence-transformers
transformers
scikit-learn
numpy
torch

96
reranker.py Normal file
View File

@ -0,0 +1,96 @@
import json
import torch
from sentence_transformers import SentenceTransformer, util
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import numpy as np
EMBED_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
RERANKER_MODEL = "BAAI/bge-reranker-v2-m3"
DATA_PATH = "./output/sentences_vector.json"
# -----------------------------
# data fetching
# -----------------------------
with open(DATA_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
sentences = []
emb_list = []
for item in data:
if "sentence" in data[item] and "embeddings" in data[item] and isinstance(data[item]["embeddings"], list):
sentences.append(data[item]["sentence"])
emb_list.append(data[item]["embeddings"])
if not sentences:
raise ValueError("هیچ جمله/امبدینگی در فایل یافت نشد.")
# به float32 تبدیل می‌کنیم تا با خروجی SentenceTransformer هم‌خوان باشد
emb_matrix = np.asarray(emb_list, dtype=np.float32)
# -----------------------------
# device configuration
# -----------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device_str = "cuda" if torch.cuda.is_available() else "cpu"
# -----------------------------
# loading models
# -----------------------------
embedder = SentenceTransformer(EMBED_MODEL, device=device_str)
tokenizer = AutoTokenizer.from_pretrained(RERANKER_MODEL)
reranker = AutoModelForSequenceClassification.from_pretrained(RERANKER_MODEL).to(device)
# تنسور امبدینگ‌های دیتاست روی همان دیوایس
embeddings_tensor = torch.from_numpy(emb_matrix).to(device) # (N, D) float32
# -----------------------------
# main function
# -----------------------------
def get_top_sentences(query: str, top_k: int = 20, final_k: int = 5):
# 1) embedding query
query_emb = embedder.encode(query, convert_to_tensor=True) # روی همان device مدل
if query_emb.device != device:
query_emb = query_emb.to(device)
# 2) شباهت کسینوسی (خروجی (1, N) → تبدیل به (N,))
sim_scores = util.cos_sim(query_emb, embeddings_tensor).squeeze(0) # (N,)
# تعداد واقعی k
k = min(top_k, sim_scores.size(0))
# 3) انتخاب k نتیجه نزدیک‌تر
topk_scores, topk_indices = torch.topk(sim_scores, k=k, largest=True, sorted=True)
idx_list = topk_indices.tolist() # لیست تخت از int
# extract top-k nearest sentences
candidate_sentences = [sentences[i] for i in idx_list]
# 4) rerank on top-k sentences
pairs = [(query, sent) for sent in candidate_sentences]
inputs = tokenizer(pairs, padding=True, truncation=True, max_length=512, return_tensors="pt").to(device)
with torch.no_grad():
logits = reranker(**inputs).logits.view(-1)
# set final k sentences
final_k = min(final_k, len(candidate_sentences))
# 5) select final best nearer sentences based on rerank scores
best_idx = torch.topk(logits, k=final_k, largest=True, sorted=True).indices.tolist()
final_sentences = [candidate_sentences[i] for i in best_idx]
return final_sentences
# -----------------------------
# exeqution
# -----------------------------
if __name__ == "__main__":
q = "فرصت های خوب را نباید از دست داد"
q = "انسان در فتنه ها باید چگونه عملی کند؟"
results = get_top_sentences(q, top_k=20, final_k=5)
results_string = ''
for item in results:
results_string += "- " + item + '\n'
print(results_string)
print()