Compare commits

..

No commits in common. "main" and "master" have entirely different histories.
main ... master

17 changed files with 4207 additions and 163 deletions

2
.dockerignore Normal file
View File

@ -0,0 +1,2 @@
./qavanin-faiss
./llm-answer

165
.gitignore vendored
View File

@ -1,162 +1,5 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] qavanin-faiss/faiss_index_qavanin_285k_metadata.json
*$py.class qavanin-faiss/faiss_index_qavanin_285k.index
.vscode
# C extensions .gitignore
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

View File

@ -1,3 +1,2 @@
# rag_qavanin_api # Qavanin Chatbot
a chatbot based on optimized rag and reranker approch on law sections

586
chatbot.py Normal file
View File

@ -0,0 +1,586 @@
import json
import os
import numpy as np
import torch
import faiss
from typing import List, Tuple
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import datetime
import re
import random
from fastapi.middleware.cors import CORSMiddleware
from embedder_sbert_qavanin_285k import PersianVectorAnalyzer
#from normalizer import cleaning
from fastapi import FastAPI ,Header
from pydantic import BaseModel
# LLM Libs
from openai import OpenAI
from langchain_openai import ChatOpenAI # pip install -U langchain_openai
import requests
today = f'{datetime.datetime.now().year}{datetime.datetime.now().month}{datetime.datetime.now().day}'
chatbot = FastAPI()
origins = ["*"]
chatbot.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# -------------------
# مدل‌ها و مسیر دادهsrc/app/qavanin-faiss/faiss_index_qavanin_285k_metadata.json
# -------------------/src/app/qavanin-faiss
EMBED_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
RERANKER_MODEL = "BAAI/bge-reranker-v2-m3"
FAISS_INDEX_PATH = "/src/app/qavanin-faiss/faiss_index_qavanin_285k.index"
FAISS_METADATA_PATH = "/src/app/qavanin-faiss/faiss_index_qavanin_285k_metadata.json"
RERANK_BATCH = int(os.environ.get("RERANK_BATCH", 256))
# print(f'RERANK_BATCH: {RERANK_BATCH}')
def get_key():
key = 'aa-fdh9d847ANcBxQCBTZD5hrrAdl0UrPEnJOScYmOncrkagYPf'
return key
def load_faiss_index(index_path: str, metadata_path: str):
"""بارگذاری ایندکس FAISS و متادیتا (لیست جملات + عناوین)."""
index = faiss.read_index(index_path)
with open(metadata_path, "r", encoding="utf-8") as f:
metadata = json.load(f)
content_list, ids, prefix_list = [], [], []
for item in metadata:
content_list.append(item["content"])
ids.append(item["id"])
prefix_list.append(item["prefix"])
return content_list, ids, prefix_list, index
def get_client():
url = "https://api.avalai.ir/v1"
# key = 'aa-4tvAEazUBovEN1i7i7tdl1PR93OaWXs6hMflR4oQbIIA4K7Z'
client = OpenAI(
api_key= get_key(), # با کلید واقعی خود جایگزین کنید
base_url= url, # آدرس پایه
)
return client
def llm_base_request(query):
# model = 'cf.gemma-3-12b-it'
model = 'gpt-4o-mini'
prompt = f'برای متن {query} زیر، عنوانی کوتاه که بین 3 تا 6 کلمه داشته باشد، انتخاب کن. غیر از عنوان، به هیچ وجه توضیح اضافه ای در قبل یا بعد آن اضافه نکن.'
client = get_client()
try:
messages.append({"role": "user", "content": prompt})
response = client.chat.completions.create(
messages = messages,
model= model) # "gpt-4o", "gpt-4o-mini", "deepseek-chat" , "gemini-2.0-flash", gemini-2.5-flash-lite
# gpt-4o : 500
# gpt-4o-mini : 34
# deepseek-chat: : 150
# gemini-2.0-flash : error
# cf.gemma-3-12b-it : 1
# gemini-2.5-flash-lite : 35 خیلی خوب
answer = response.choices[0].message.content
# پاسخ را هم به سابقه اضافه می‌کنیم
messages.append({"role": "assistant", "content": answer})
except Exception as error:
with open('./llm-answer/error-in-llm.txt', mode='a+', encoding='utf-8') as file:
error_message = f'\n\nquery: {query.strip()}\nerror:{error} \n-------------------------------\n'
file.write(error_message)
return ''
return answer
def llm_request(query, model):
if query == '':
return 'لطفا متن سوال را وارد نمائید'
client = get_client()
determine_refrence = """شناسه هر ماده قانون در ابتدای آن و با فرمت "id: {idvalue}" آمده است که id-value همان شناسه ماده است. بازای هربخش از پاسخی که تولید می شود، ضروری است شناسه ماده ای که در تدوین پاسخ از آن استفاده شده در انتهای پاراگراف یا جمله مربوطه با فرمت {idvalue} اضافه شود. همیشه idvalue با رشته "qs" شروع می شود"""
try:
messages.append({"role": "user", "content": query})
messages.append({"role": "user", "content": determine_refrence})
response = client.chat.completions.create(
messages = messages,
model= model) # "gpt-4o", "gpt-4o-mini", "deepseek-chat" , "gemini-2.0-flash", gemini-2.5-flash-lite
# gpt-4o : 500
# gpt-4o-mini : 34
# deepseek-chat: : 150
# gemini-2.0-flash : error
# cf.gemma-3-12b-it : 1
# gemini-2.5-flash-lite : 35 خیلی خوب
answer = response.choices[0].message.content
# پاسخ را هم به سابقه اضافه می‌کنیم
messages.append({"role": "assistant", "content": answer})
except Exception as error:
with open('./llm-answer/error-in-llm.txt', mode='a+', encoding='utf-8') as file:
error_message = f'\n\nquery: {query.strip()}\nerror:{error} \n-------------------------------\n'
file.write(error_message)
return 'با عرض پوزش؛ متاسفانه خطایی رخ داده است. لطفا لحظاتی دیگر دوباره تلاش نمائید'
return answer
class HybridRetrieverReranker:
__slots__ = (
"device", "content_list", "ids", "prefix_list", "N", "embedder", "faiss_index",
"vectorizer", "tfidf_matrix", "tokenizer", "reranker", "dense_alpha"
)
def __init__(self, content_list: List[str],ids: List[str], prefix_list: List[str], faiss_index,
dense_alpha: float = 0.6, device: str = None):
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = device
self.content_list = content_list
self.ids = ids
self.prefix_list = prefix_list
self.faiss_index = faiss_index
self.N = len(content_list)
# Dense
self.embedder = SentenceTransformer(EMBED_MODEL,cache_folder='/src/MODELS', device=self.device)
#self.embedder = SentenceTransformer(EMBED_MODEL, device=self.device)
# Sparse (مثل قبل برای حفظ خروجی)
self.vectorizer = TfidfVectorizer(
analyzer="word",
ngram_range=(1, 2),
token_pattern=r"(?u)\b[\w\u0600-\u06FF]{2,}\b",
)
self.tfidf_matrix = self.vectorizer.fit_transform(self.content_list)
# Reranker
self.tokenizer = AutoTokenizer.from_pretrained(RERANKER_MODEL,cache_dir='/src/MODELS', use_fast=True)
self.reranker = AutoModelForSequenceClassification.from_pretrained(
RERANKER_MODEL
).to(self.device)
# self.reranker = AutoModelForSeq2SeqLM.from_pretrained(RERANKER_MODEL).to(self.device)
# self.reranker.eval()
self.dense_alpha = float(dense_alpha)
# --- Dense (FAISS) ---
def dense_retrieve(self, query: str, top_k: int):
if top_k <= 0:
return [], np.array([], dtype=np.float32)
q_emb = self.embedder.encode(query, convert_to_numpy=True).astype(np.float32)
D, I = self.faiss_index.search(np.expand_dims(q_emb, axis=0), top_k)
return I[0].tolist(), D[0]
# --- Sparse ---
def sparse_retrieve(self, query: str, top_k: int):
if top_k <= 0:
return [], np.array([], dtype=np.float32)
k = min(top_k, self.N)
q_vec = self.vectorizer.transform([query])
sims = cosine_similarity(q_vec, self.tfidf_matrix).ravel()
idx = np.argpartition(-sims, kth=k-1)[:k]
idx = idx[np.argsort(-sims[idx], kind="mergesort")]
return idx.tolist(), sims[idx]
# --- Utils ---
@staticmethod
def _minmax_norm(arr: np.ndarray) -> np.ndarray:
if arr.size == 0:
return arr
a_min = arr.min()
a_max = arr.max()
rng = a_max - a_min
if rng < 1e-12:
return np.zeros_like(arr)
return (arr - a_min) / rng
def fuse(self, d_idx, d_scores, s_idx, s_scores, top_k=50, k_rrf=60):
"""
ادغام نتایج دو retriever (dense و sparse) با استفاده از Reciprocal Rank Fusion (RRF)
Args:
d_idx (list or np.ndarray): ایندکسهای نتایج dense retriever
d_scores (list or np.ndarray): نمرات dense retriever
s_idx (list or np.ndarray): ایندکسهای نتایج sparse retriever
s_scores (list or np.ndarray): نمرات sparse retriever
top_k (int): تعداد نتایج نهایی
k_rrf (int): ثابت در فرمول RRF برای کاهش تأثیر رتبههای پایینتر
Returns:
list: لیست ایندکسهای ادغامشده به ترتیب نمره
"""
combined = {}
# dense retriever
for rank, idx in enumerate(d_idx):
score = 1.0 / (k_rrf + rank)
combined[idx] = combined.get(idx, 0) + score
# sparse retriever
for rank, idx in enumerate(s_idx):
score = 1.0 / (k_rrf + rank)
combined[idx] = combined.get(idx, 0) + score
# مرتب‌سازی نهایی
sorted_items = sorted(combined.items(), key=lambda x: x[1], reverse=True)
cand_idx = [item[0] for item in sorted_items[:top_k]]
return cand_idx
def rerank(self, query: str, candidate_indices: List[int], passages: List[str], final_k: int) -> List[Tuple[int, float]]:
"""
Rerank candidate passages using a cross-encoder (e.g., MonoT5, MiniLM, etc.).
Args:
query (str): پرسش کاربر
candidate_indices (List[int]): ایندکسهای کاندیدا (از retriever)
passages (List[str]): کل جملات/پاراگرافها
final_k (int): تعداد نتایج نهایی
Returns:
List[Tuple[int, float]]: لیستی از (ایندکس، امتیاز) برای بهترین نتایج
"""
if final_k <= 0 or not candidate_indices:
return []
# آماده‌سازی جفت‌های (query, passage)
texts = [query] * len(candidate_indices)
pairs = passages
scores: List[float] = []
def _iter_batches(max_bs: int):
bs = max_bs
while bs >= 16: # حداقل batch_size
try:
with torch.inference_mode():
for start in range(0, len(pairs), bs):
batch_texts = texts[start:start + bs]
batch_pairs = pairs[start:start + bs]
inputs = self.tokenizer(
batch_texts,
batch_pairs,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt",
).to(self.device)
logits = self.reranker(**inputs).logits.view(-1)
scores.extend(logits.detach().cpu().tolist())
return True
except torch.cuda.OutOfMemoryError:
if torch.cuda.is_available():
torch.cuda.empty_cache()
bs //= 2
return False
# اجرای reranking
success = _iter_batches(max_bs=64)
if not success:
raise RuntimeError("Reranker failed due to CUDA OOM, even with small batch size.")
# مرتب‌سازی نتایج بر اساس نمره
reranked = sorted(
zip(candidate_indices, scores),
key=lambda x: x[1],
reverse=True
)[:final_k]
return reranked
def get_passages(self, cand_idx, content_list):
passages = []
for idx in cand_idx:
passages.append(content_list[idx])
return passages
# --- Search (بدون تغییر) ---
def search(self, query: str, content_list, topk_dense=50, topk_sparse=50,
pre_rerank_k=50, final_k=10):
d_idx, d_scores = self.dense_retrieve(query, topk_dense)
s_idx, s_scores = self.sparse_retrieve(query, topk_sparse)
cand_idx = self.fuse(d_idx, d_scores, s_idx, s_scores, pre_rerank_k)
passages = self.get_passages(cand_idx, content_list)
reranked = self.rerank(query, cand_idx, passages, final_k)
return [{"idx": i, "content": self.content_list[i],"prefix": self.prefix_list[i], "rerank_score": score}
for i, score in reranked]
def single_query(query: str):
# query = cleaning(query)
retrived_sections_ids = []
retrived_sections = pipe.search(query, content_list, topk_dense=30, topk_sparse=30, pre_rerank_k=30, final_k=10)
final_similars = ''
for i, row in enumerate(retrived_sections, 1):
id_value = '{' + str(ids[row['idx']]) + '}'
result = f"id: {id_value} \n{row['prefix']} {row['content']}\n"
retrived_sections_ids.append(ids[row['idx']])
final_similars += ''.join(result)
return final_similars, retrived_sections_ids
def find_refrences(llm_answer: str) -> List[str]:
"""
شناسایی شناسه هایی که مدل زبانی، برای تهیه پاسخ از آنها استفاده کرده است
Args:
llm_answer(str): متنی که مدل زبانی تولید کرده است
Returns:
refrence_ids(List[str]): لیستی از شناسه های تشخیص داده شده
"""
pattern = r"\{[^\}]+\}"
refrence_ids = re.findall(pattern, llm_answer)
new_refrences_ids = []
for itm in refrence_ids:
refrence = itm.lstrip('{')
refrence = refrence.lstrip('}')
new_refrences_ids.append(refrence)
# refrence_ids = [item.lstrip('{').rstrip('}') for item in refrence_ids]
return refrence_ids
def replace_refrences(llm_answer: str, refrences_list:List[str]) -> List[str]:
"""
شناسایی شناسه هایی که مدل زبانی، برای تهیه پاسخ از آنها استفاده کرده است
Args:
llm_answer(str): متنی که مدل زبانی تولید کرده است
refrences_list(List[str]): لیست شناسه ماده های مورد استفاده در پاسخ مدل زبانی
Returns:
llm_answer(str), : متن بازسازی شده پاسخ مدل زبانی که شناسه ماده های مورد استفاده در آن، اصلاح شده است
"""
refrences = ''
for index, ref in enumerate(refrences_list,1):
# breakpoint()
llm_answer = llm_answer.replace(ref, f'[{index}]')
# id = ref.lstrip('{')
# id = id.rstrip('}')
# refrences += ''.join(f'[{index}] https://majles.tavasi.ir/entity/detail/view/qsection/{id}\n')
# llm_answer = f'{llm_answer}\n\nمنابع پاسخ‌:\n{refrences.strip()}'
return llm_answer.strip()
# load basic items
content_list, ids, prefix_list, faiss_index = load_faiss_index(FAISS_INDEX_PATH, FAISS_METADATA_PATH)
pipe = HybridRetrieverReranker(content_list, ids, prefix_list, faiss_index, dense_alpha=0.6)
# query preprocess and normalize
normalizer_obj = PersianVectorAnalyzer()
messages = [
{"role": "system", "content": "تو یک دستیار خبره در زمینه حقوق و قوانین مرتبط به آن هستی و می توانی متون حقوقی را به صورت دقیق توضیح بدهی . پاسخ ها باید الزاما به زبان فارسی باشد. پاسخ ها فقط از متون قانونی که در پرامپت وجود دارد استخراج شود."},
]
models = ["gemini-2.5-flash-lite", "gpt-4o-mini"]
def save_result(chat_obj: object) -> bool:
# index result in elastic
pass
def run_chatbot(query:str, chat_id:str):
prompt_status = True
status_text = 'لطفا متن سوال را وارد نمائید'
if query == '':
prompt_status = False
start_time = (datetime.datetime.now())
# در صورتی که وضعیت پرامپت معتبر باشد، وارد فرایند شو
if prompt_status:
result_passages_text, result_passages_ids = single_query(query)
end_retrive = datetime.datetime.now()
print('-'*40)
retrive_duration = (end_retrive - start_time).total_seconds()
print(f'retrive duration: {str(retrive_duration)}')
prompt = f'برای پرسش "{query}" از میان مواد قانونی "{result_passages_text}" .پاسخ مناسب و دقیق را استخراج کن. درصورتی که مطلبی مرتبط با پرسش در متن پیدا نشد، فقط پاسخ بده: "متاسفانه در منابع، پاسخی پیدا نشد!"'
llm_model = ''
for model in models:
try:
llm_model = model
llm_answer = llm_request(prompt, model)
except Exception as error:
error = f'model: {model} \n{error}\n\n'
prompt_status = False
status_text = 'با عرض پوزش، سرویس موقتا در دسترس نیست. لطفا دقایقی دیگر دوباره تلاش نمائید!'
else:
chat_obj = {
'id' : chat_id, # str
'title' : '', # str
'user_id' : '',
'user_query' : query, # str
'model_key' : llm_model, # str
'retrived_passage' : result_passages_text, # str
'retrived_ref_ids' : result_passages_ids, # list[obj]
'prompt_type' : 'question-answer', # str
'retrived_duration' : retrive_duration, # str
'llm_duration' : '0', # str
'full_duration' : '0', # str
'time_create' : str(start_time), # str
'used_ref_ids' : [], # list[str]
'prompt_answer' : '', # str
'status_text' : status_text,
'status' : prompt_status, # or False # bool
}
# آبجکت ایجاد شده با بازگردان
return chat_obj, status_text
llm_answer_duration = (datetime.datetime.now() - end_retrive).total_seconds()
print(f'llm answer duration: {str(llm_answer_duration)}')
used_refrences_in_answer = find_refrences(llm_answer)
llm_answer = replace_refrences(llm_answer, used_refrences_in_answer)
full_prompt_duration = (datetime.datetime.now() - start_time).total_seconds()
print(f'full prompt duration: {full_prompt_duration}')
print('~'*40)
status_text ='پاسخ با موفقیت ایجاد شد'
title = llm_base_request(query)
if title == '':
title = query[0:15]
chat_obj = {
'id' : chat_id, # str
'title' : title, # str
'user_id' : '',
'user_query' : query, # str
'model_key' : llm_model, # str
'retrived_passage' : result_passages_text, # str
'retrived_ref_ids' : result_passages_ids, # list[obj]
'prompt_type' : 'question-answer', # str
'retrived_duration' : retrive_duration, # str
'llm_duration' : llm_answer_duration, # str
'full_duration' : full_prompt_duration, # str
'time_create' : str(start_time), # str
'used_ref_ids' : used_refrences_in_answer, # list[str]
'prompt_answer' : llm_answer, # str
'status_text' : status_text, # str
'status' : True, # or False # bool
}
prev_chat_data = []
with open('./llm-answer/chat-messages.json', mode='r', encoding='utf-8') as file:
prev_chat_data = json.load(file)
prev_chat_data.append(chat_obj)
with open('./llm-answer/chat-messages.json', mode='w', encoding='utf-8') as output:
json.dump(prev_chat_data, output, ensure_ascii=False, indent=2)
# save_result(chat_obj)
# ایجاد آبجکت بازگشتی به فرانت
# chat_obj.pop('retrived_passage')
# chat_obj.pop('prompt_type')
return chat_obj
@chatbot.post("/credit_refresh")
def credit_refresh():
"""
Returns remained credit
"""
url = "https://api.avalai.ir/user/credit"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {get_key()}"
}
remained_credit = requests.get(url, headers=headers)
with open('./llm-answer/credit.txt','w') as file:
file.write(str(remained_credit.json()['remaining_irt']))
return str(remained_credit.json()['remaining_irt'])
def create_chat_id():
date = str((datetime.datetime.now())).replace(' ','-').replace(':','').replace('.','-')
print('date ', date )
chat_id = f'{date}-{random.randint(100000, 999999)}'
print('chat_id ', chat_id )
return chat_id
print('#'*19)
print('-Chatbot is Ready!!!!!-')
print('#'*19)
# تعریف مدل داده‌ها برای درخواست‌های API
class Query(BaseModel):
query: str
# مسیر API برای اجرا کردن run_chatbot
@chatbot.post("/run_chatbot")
def run_chat(query: Query):
print('query ', query )
chat_id = create_chat_id()
print('query.query ', query.query )
answer = run_chatbot(query.query, chat_id)
credit_refresh()
return {"answer": answer}
# uvicorn src.app:app --reload
if __name__ == "__main__":
# query = 'در قانون حمایت از خانواده و جوانی جمعیت چه خدماتی در نظر گرفته شده است؟'
while True:
query = input('enter your qustion:')
if query == '':
print('لطفا متن سوال را وارد نمائید')
continue
start = (datetime.datetime.now())
# result = test_dataset()
result = single_query(query)
end_retrive = datetime.datetime.now()
print('-'*40)
print(f'retrive duration: {(end_retrive - start).total_seconds()}')
prompt = f'برای پرسش "{query}" از میان مواد قانونی "{result}" .پاسخ مناسب و دقیق را استخراج کن. درصورتی که مطلبی مرتبط با پرسش در متن پیدا نشد، فقط پاسخ بده: "متاسفانه در منابع، پاسخی پیدا نشد!"'
llm_answer = llm_request(prompt)
print('-'*40)
print(f'llm duration: {(datetime.datetime.now() - end_retrive).total_seconds()}')
refrences = ''
recognized_refrences = find_refrences(llm_answer)
llm_answer = replace_refrences(llm_answer, recognized_refrences)
with open('./llm-answer/result.txt', mode='a+', encoding='utf-8') as file:
result_message = f'متن پرامپت: {query.strip()}\n\nپاسخ: {llm_answer} \n----------------------------------------------------------\n'
file.write(result_message)
with open('./llm-answer/passages.txt', mode='a+', encoding='utf-8') as file:
result_message = f'متن پرامپت: {query.strip()}\n\مواد مشابه: {result} \n----------------------------------------------------------\n'
file.write(result_message)
print('----------------------------------------------------------')
print(f'full duration: {(datetime.datetime.now() - start).total_seconds()}')
print('----------------------------------------------------------')
print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')

View File

@ -0,0 +1,72 @@
import json
import numpy as np
import faiss
import os
def create_faiss_index_from_json(json_file_path, faiss_index_path, metadata_file_path):
print(f'try to read {json_file_path} ...')
# --- 1. بارگذاری داده‌ها از JSON ---
with open(json_file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f'file reading finished')
# فرض بر این است که هر عنصر شامل فیلدهای زیر است:
# {
# "speech_title": "title",
# "sentence": "متن جمله",
# "embeddings": [0.12, 0.34, ...]
# }
sentences = []
titles = []
embeddings_list = []
prefix_list = []
for k, item in data.items():
sentences.append(item['content'])
titles.append(item['id'])
embeddings_list.append(item['embeddings'])
prefix_list.append(item['section-prefix'])
embeddings = np.array(embeddings_list).astype('float32') # ابعاد: (n, d)
dimension = embeddings.shape[1]
print(f"Loaded {len(embeddings)} embeddings with dimension {dimension}")
# --- 2. ایجاد ایندکس FAISS برای GPU ---
# اگر فقط CPU دارید، از faiss.IndexFlatL2 استفاده کنید.
# اگر GPU دارید، ابتدا ایندکس را روی CPU ایجاد و سپس به GPU انتقال دهید.
cpu_index = faiss.IndexFlatL2(dimension) # معیار فاصله L2 (Euclidean)
# انتقال ایندکس به GPU
if faiss.get_num_gpus() > 0:
print("Using GPU for FAISS index...")
res = faiss.StandardGpuResources()
gpu_index = faiss.index_cpu_to_gpu(res, 0, cpu_index)
else:
print("GPU not available, using CPU.")
gpu_index = cpu_index
# --- 3. افزودن داده‌ها به ایندکس ---
gpu_index.add(embeddings)
print(f"Total vectors indexed: {gpu_index.ntotal}")
# --- 4. ذخیره ایندکس به فایل ---
# برای ذخیره باید به CPU منتقل شود
final_index = faiss.index_gpu_to_cpu(gpu_index) if isinstance(gpu_index, faiss.Index) and faiss.get_num_gpus() > 0 else gpu_index
os.makedirs(os.path.dirname(faiss_index_path), exist_ok=True)
faiss.write_index(final_index, faiss_index_path)
print(f"FAISS index saved to {faiss_index_path}")
# --- 5. ذخیره متادیتا (برای نگاشت نتایج جستجو) ---
metadata = [{"id": id, "content": c, 'prefix': p} for id, c, p in zip(titles, sentences,prefix_list)]
with open(metadata_file_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
print(f"Metadata saved to {metadata_file_path}")
if __name__ == '__main__':
# استفاده از متد
json_file_path = './majles-output/sections-vec-285k.json'
faiss_index_path = './qavanin-faiss/faiss_index_qavanin_285k.index'
metadata_file_path = './qavanin-faiss/faiss_index_qavanin_285k_metadata.json'
create_faiss_index_from_json(json_file_path, faiss_index_path, metadata_file_path)

11
dockerfile Normal file
View File

@ -0,0 +1,11 @@
FROM qchat_base:1.0.0
RUN pip install uvicorn[standard]
WORKDIR /src/app
COPY . /src/app
EXPOSE 80
CMD [ "uvicorn","chatbot:chatbot","--reload","--port","80","--host=0.0.0.0"]

677
elastic_helper.py Normal file
View File

@ -0,0 +1,677 @@
import zipfile
import sys
import os
import json
from time import sleep
from elasticsearch7 import Elasticsearch,helpers
class ElasticHelper():
counter = 0
total = 0
id = ""
path_mappings = os.getcwd() + '/repo/_other/'
def __init__(self, es_url="http://127.0.0.1:6900", es_pass="", es_user="elastic", path_mappings = ""):
if path_mappings :
self.path_mappings = path_mappings
if es_pass == '' :
self.es = Elasticsearch(es_url)
else:
self.es = Elasticsearch(
es_url,
http_auth=(es_user, es_pass),
)
# print(es_url)
# print(self.es)
self.success_connect = False
for a in range(0,10):
try :
if not self.es.ping():
print('elastic not ping, sleep 30 s : ', a)
sleep(5)
continue
else:
self.success_connect = True
break
except Exception as e:
break
if not self.success_connect :
print('******','not access to elastic service')
return
self.counter = 0
self.total = 0
self.id = ""
def get_doctument(self, index_name, id):
res = self.es.get(index=index_name, id=id)
return res
def exist_doctument(self, index_name, id):
res = self.es.exists(index=index_name, id=id)
return res
def update_index_doc(self, is_update_state, index_name_o, eid, data):
if is_update_state:
resp = self.es.update(index=index_name_o, id=eid, doc=data)
# resp = self.es.update(index=index_name_o, id=eid, body={'doc':data})
else:
resp = self.es.index(index=index_name_o, id=eid, document=data)
return resp
def exportToJsonForAI(self, path_back, index_name, out_name= '', body={}, fields=[]) :
print('*' * 50, ' start backup -->', index_name)
self.counter = 0
sid = None
out = out_name
if out_name == '' :
out = index_name
fout = open( path_back + "/"+ out + '.json', 'a+' , encoding='utf-8')
s_res = self.es.search(
index=index_name,
scroll='5m',
size=1000,
body=body
)
self.total = s_res["hits"]["total"]['value']
print('start index = %s' % index_name)
print('total = %d' % self.total)
sid = s_res['_scroll_id']
scroll_size = len(s_res['hits']['hits'])
file_count = 1
out_json = []
while scroll_size > 0:
"Scrolling..."
self.counter += scroll_size
print("progress -> %.2f %%" % ((self.counter / self.total)*100))
#############################
for item in s_res['hits']['hits']:
if fields :
item2={}
item2['id']=item['_id']
for kf in fields :
#print(kf)
if kf in item['_source'] :
# print(item['_source'][kf])
item2[kf] = item['_source'][kf]
#exit()
else :
item2=item
out_json.append(item2)
s_res = self.es.scroll(scroll_id=sid, scroll='2m', request_timeout=100000)
sid = s_res['_scroll_id']
scroll_size = len(s_res['hits']['hits'])
sid = None
text = json.dumps(out_json, ensure_ascii=False)
fout.write(text)
##############################
def backupIndexToZipfile(self, path_back, index_name, out_name= '', body={}, byzip = True, fields=[], noFields=[]) :
print('*' * 50, ' start backup -->', index_name)
self.counter = 0
sid = None
out = out_name
if out_name == '' :
out = index_name
if body == {} :
s_res = self.es.search(
index=index_name,
scroll='5m',
size=1000
)
else:
s_res = self.es.search(
index=index_name,
scroll='5m',
size=1000,
body=body
)
self.total = s_res["hits"]["total"]['value']
if self.total == 0 :
print('total index_name by query = %d' % self.total)
return False
if byzip:
fout = zipfile.ZipFile(path_back + "/"+ out + '.zip', 'w')
else:
fout = open( path_back + "/"+ out + '.json', 'a+' , encoding='utf-8')
print('start index = %s' % index_name)
print('total = %d' % self.total)
sid = s_res['_scroll_id']
scroll_size = len(s_res['hits']['hits'])
file_count = 1
while scroll_size > 0:
"Scrolling..."
self.counter += scroll_size
print("progress -> %.2f %%" % ((self.counter / self.total)*100))
#############################
out_json = []
for item in s_res['hits']['hits']:
if fields :
item2={}
item2['id']=item['_id']
item2['_source']={}
for kf in fields :
if kf in item['_source'] :
item2['_source'][kf] = item['_source'][kf]
else :
item2=item
if noFields :
for kf in noFields :
if kf in item2['_source']:
del item2['_source'][kf]
out_json.append(item2)
text = json.dumps(out_json, ensure_ascii=False)
out_json = []
if byzip:
filename = out + str(file_count) + '.json'
file_count +=1
fout.writestr(filename, text.encode('utf-8'), zipfile.ZIP_DEFLATED )
else:
fout.write(text)
##############################
s_res = self.es.scroll(scroll_id=sid, scroll='2m', request_timeout=100000)
sid = s_res['_scroll_id']
scroll_size = len(s_res['hits']['hits'])
sid = None
fout.close()
def restorFileToElastic(self, path_back, index_name, app_key = '', queryDelete = True, map_name='') :
if not os.path.exists(path_back) :
print(' **** error *** path not exist: ', path_back)
return False
file_path = path_back + '/' + index_name + '.zip'
if not os.path.exists(file_path ) :
return False
if queryDelete :
# اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند
if self.deleteIndex(index_name) :
self.createIndex(index_name, app_key, map_name)
self.zipFileToElastic(file_path, index_name)
else : # اگر وجود داشته باشد پرش می کند و کاری نمیکند
self.createIndex(index_name, app_key, map_name)
self.zipFileToElastic(file_path, index_name)
def restorFileToElastic2(self, path_file, index_name, app_key = '', queryDelete = True, map_name='') :
if not os.path.exists(path_file) :
print(' **** error *** path not exist: ', path_file)
return False
file_path = path_file
if not os.path.exists(file_path ) :
return False
if queryDelete :
# اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند
if self.deleteIndex(index_name) :
self.createIndex(index_name, app_key, map_name)
self.zipFileToElastic(file_path, index_name)
else : # اگر وجود داشته باشد پرش می کند و کاری نمیکند
self.createIndex(index_name, app_key, map_name)
self.zipFileToElastic(file_path, index_name)
def renameElasticIndex(self, index_name_i, index_name_o, app_key = '', map_name='') :
if self.createIndex(index_name_o, app_key, map_name) :
res = self.es.reindex(
body={
"source": {"index": index_name_i},
"dest": {"index": index_name_o}
},
wait_for_completion=False)
print(type(res))
print(res)
taskid = res["task"] if res["task"] else ""
#tasks = client.TasksClient(self.es)
tasks = self.es.tasks
while True :
res = tasks.get(task_id = taskid)
if res["completed"] :
break
# print( res["task"])
print( '----', index_name_o, ' imported : ', res["task"]["status"]["total"] , ' / ', res["task"]["status"]["created"])
sleep(1)
print( '----', index_name_o, ' complated')
def deleteIndex(self, index_name) :
if not self.es.indices.exists(index=index_name) :
print(' ' * 10, " for delete NOT exist index :", index_name )
return True
question = 'Is DELETE elastic index (' + index_name +') ? '
if self.query_yes_no(question) :
self.es.indices.delete(index = index_name)
print('%' * 10 , " Finish DELETE index :", index_name )
return True
else :
return False
def query_yes_no(self, question, default="no"):
valid = { "yes": True, "y": True, "ye": True, "no": False, "n": False }
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
print('%'*10, ' quistion ', '%'*10 , '\n')
sys.stdout.write(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
elif choice in valid:
return valid[choice]
else:
sys.stdout.write("لطفا یکی از موارد روبرو را وارد کنید : 'yes' or 'no' " "(or 'y' or 'n').\n")
def createIndexIfNotExist(self, index_name_o, mapping_o=""):
try:
if not self.es.indices.exists(index=index_name_o):
response = self.es.indices.create(index=index_name_o, body=mapping_o)
# print out the response:
print("create index response:", response)
except:
print("....... index exist ! ... not created")
def createIndex(self, index_name, app_key='', map_name=''):
path_base = self.path_mappings
path_mapping1 = path_base + 'general/'
if app_key == '' :
app_key = 'tavasi'
path_mapping2 = path_base + app_key + '/'
if map_name == '':
map_name = index_name
if self.es.indices.exists(index=index_name) :
print("============== exist index :", index_name )
return True
if map_name == 'mj_rg_section' or map_name == 'semantic_search' :
map_name = 'mj_qa_section'
elif map_name[-3]=='_ai':
map_name=[0-len(map_name)-3]
print(map_name)
mapping_file_path = path_mapping1 + map_name + '.json'
print("mapping_file_path : " , mapping_file_path)
if not os.path.isfile(mapping_file_path):
if not os.path.isfile(mapping_file_path):
mapping_file_path = path_mapping2 + map_name + '.json'
print("mapping_file_path : " , mapping_file_path)
# Create Index With Mapping
if os.path.isfile(mapping_file_path):
mapping_file = open( mapping_file_path,'r', encoding='utf-8' )
mapping_file_read = mapping_file.read()
mapping_data = json.loads(mapping_file_read)
mapping_file.close()
if self.es.indices.exists(index=index_name) :
print("============== exist index :", index_name )
else :
self.es.indices.create(index = index_name , body = mapping_data)
return True
else:
print('*** error not find maping file elastic : *******', mapping_file_path)
return False
def updateBulkList(self, listData, index_name):
chunk_size=1000
raise_on_error=False
raise_on_exception=False
stats_only=True
yield_ok = False
actions=[]
for item in listData:
actions.append({
"_op_type": "update",
"_index": index_name,
"_id" : item['_id'],
"doc": item['_source']
}
)
helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok )
def importBulkList(self, listData, index_name):
chunk_size=100000
raise_on_error=False
raise_on_exception=False
stats_only=True
yield_ok = False
for item in listData:
actions = [{
"_op_type": "index",
"_index": index_name,
"_id" : item['_id'],
"_source": item['_source']
}
]
helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok )
def importJsonDataToElastic(self, jsonData, index_name, fields=[]):
chunk_size=1000
raise_on_error=False
raise_on_exception=False
stats_only=True
yield_ok = False
actions=[]
for item in jsonData:
id = item['_id'] if item['_id'] else item['id']
source = item['_source']
if fields :
source = {}
for col in fields :
if col in item['_source'] :
source[col] = item['_source']
actions.append({
"_op_type": "index",
"_index": index_name,
"_id" : id,
"_source": source
})
helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok )
def fileToElastic(self, file_path, index_name, limit_pack = -1, fields=[]):
if not os.path.exists(file_path):
print("file zip:" , file_path , " not exist")
return
print("index:" , index_name , '=>' , file_path )
self.counter = 0
with open(file_path) as file:
data = json.loads(file.read())
self.importJsonDataToElastic(data, index_name, fields)
self.es.indices.refresh(index=index_name)
print(self.es.cat.count(index=index_name, format="json"))
def zipFileToElastic(self, file_path, index_name, limit_pack = -1, fields=[]):
if not os.path.exists(file_path):
print("file zip:" , file_path , " not exist for imort to elastic : ", index_name )
return
fileNo = 0
with zipfile.ZipFile(file_path, 'r') as zObject:
fileNo +=1
print("="*10, " zip fileNo: " , fileNo ," - ( ", index_name," ) | File Numbers:" ,len(zObject.namelist()) , "=" * 10)
packNo = 0
self.counter = 0
for filename in zObject.namelist():
packNo += 1
if limit_pack != -1 :
if packNo > limit_pack :
print('limit_data ', index_name, ' ', limit_pack)
break
print("index:" , index_name , '=>' , filename )
with zObject.open(filename) as file:
data = json.loads(file.read())
self.importJsonDataToElastic(data, index_name, fields)
self.es.indices.refresh(index=index_name)
print(self.es.cat.count(index=index_name, format="json"))
print(" END Of Import to elastic ", index_name ,"\n")
def iterateJsonFile(self, file_path, isZip=True, limit_pack = -1):
if not os.path.exists(file_path):
print("file zip:" , file_path , " not exist iterateJsonFile " )
return
if isZip :
fileNo = 0
with zipfile.ZipFile(file_path, 'r') as zObject:
fileNo +=1
print("="*10, " zip fileNo: " , fileNo ," iterateJsonFile - | File Numbers:" ,len(zObject.namelist()) , "=" * 10)
packNo = 0
self.counter = 0
for filename in zObject.namelist():
packNo += 1
if limit_pack != -1 :
if packNo > limit_pack :
print('limit_data iterateJsonFile ', limit_pack)
break
print("index iterateJsonFile :", '=>' , filename )
with zObject.open(filename) as file:
data = json.loads(file.read())
# Yield each entry
# yield data
yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in data)
else :
with open(filename, 'r', encoding='utf-8') as file:
data = json.loads(file.read())
# Yield each entry
# yield from (hit for hit in data)
#return data
yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in data)
def es_iterate_all_documents(self, index, body="", pagesize=250, scroll_timeout="25m", **kwargs):
"""
Helper to iterate ALL values from a single index
Yields all the documents.
"""
is_first = True
while True:
# Scroll next
if is_first: # Initialize scroll
# result = self.es.search(index=index, scroll="2m", **kwargs, body={
# "size": pagesize
# })
if body :
result = self.es.search(
index=index,
scroll=scroll_timeout,
**kwargs,
size=pagesize,
body=body
)
else :
result = self.es.search(
index=index,
scroll=scroll_timeout,
**kwargs,
size=pagesize
)
self.total = result["hits"]["total"]["value"]
if self.total > 0:
print("total = %d" % self.total)
is_first = False
else:
# result = es.scroll(body={
# "scroll_id": scroll_id,
# "scroll": scroll_timeout
# })
result = self.es.scroll(scroll_id=scroll_id, scroll=scroll_timeout)
scroll_id = result["_scroll_id"]
hits = result["hits"]["hits"]
self.counter += len(hits)
if self.total > 0 :
print("progress -> %.2f %%" % ((self.counter / self.total) * 100))
# Stop after no more docs
if not hits:
break
# Yield each entry
yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits)
def moveCustomFileds(self, index_name_i, index_name_o, fields=[], renameFileds={}):
try:
body = {}
list = []
try:
list = self.es_iterate_all_documents(index_name_i)
except Exception as e:
print(e)
count = 0
for mentry in list:
count += 1
entry = mentry["source"]
id = mentry["id"]
# print(id)
eid = id
if (count % 100) == 0 :
print("%s -> %.2f " % (id , (count / self.total) if self.total > 0 else 0))
data_filled = False
data = {}
for col in fields:
if '.' in col :
cols = col.split('.')
subsource = entry
for sub in cols :
dCol = subsource.get(sub, None)
if dCol :
subsource = dCol
else :
break
else :
dCol = entry.get(col, None)
if dCol is None:
continue
if col in renameFileds :
data[renameFileds[col]] = dCol
else:
data[col] = dCol
data_filled = True
if not data_filled :
continue
try:
resp = self.update_index_doc(True, index_name_o, eid, data)
except Exception as e:
print(e)
# save_error(id, e)
except Exception as e:
# print("1111")
print(e)
# save_error(id, e)
def mappingIndex(self, index_name_i):
# فقط از طریق کیبانا میشه تغییر مپ داد
# با پایتون نمیشه
# باید ایندکس جدیدی با مپ مطلوب ایجاد کرد و رایندکس کرد
pass
def updateByQueryIndex(self, index_name_i, body):
## sample
# body = {
# "script": {
# "inline": "ctx._source.Device='Test'",
# "lang": "painless"
# },
# "query": {
# "match": {
# "Device": "Boiler"
# }
# }
# }
try:
self.es.update_by_query(body=body, index=index_name_i)
except Exception as e:
print(e)
# save_error(id, e)
def deleteByQueryIndex(self, index_name_i, body):
## sample
# body = {
# "query": {
# "match": {
# "Device": "Boiler"
# }
# }
# }
try:
self.es.delete_by_query(index=index_name_i, body=body )
except Exception as e:
print(e)
# save_error(id, e)
def delete_by_ids(self, index_name_i, ids):
try:
# ids = ['test1', 'test2', 'test3']
query = {"query": {"terms": {"_id": ids}}}
res = self.es.delete_by_query(index=index_name_i, body=query)
print(res)
except Exception as e:
print(e)
# save_error(id, e)

View File

@ -0,0 +1,681 @@
# !pip install hazm
# !pip install transformers==4.26.0
# !pip install --upgrade numpy
# !pip install --upgrade sentence-transformers
"""
Persian Sentence Processing and Vector Analysis
==============================================
This script processes Persian sentences from a JSON file and performs:
1. Word extraction and preprocessing
2. Vector representation using multilingual transformer
3. Similarity analysis for key words
4. Dimensionality reduction to 3D
5. 3D visualization with Persian labels
Author: NLP Expert Assistant
"""
import json
import re
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple, Set
from collections import Counter
import logging
from pathlib import Path
# NLP and ML libraries
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.metrics.pairwise import cosine_similarity
#from normalizer import cleaning
try:
from elastic_helper import ElasticHelper
except Exception as error:
eee = error
pass
# Visualization libraries
# import matplotlib.pyplot as plt
# import plotly.graph_objects as go
# import plotly.express as px
# from plotly.subplots import make_subplots
# Persian text processing
# import hazm
# from hazm import Normalizer, word_tokenize, POSTagger
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class PersianVectorAnalyzer:
"""
A comprehensive class for Persian text processing and vector analysis.
"""
def __init__(self, model_name: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):
"""
Initialize the analyzer with the specified model.
Args:
model_name: The sentence transformer model to use
"""
self.model_name = model_name
self.model = None
#self.normalizer = Normalizer()
self.stop_words = self._load_persian_stop_words()
self.key_words = [
"خدا", "بنده", "جهاد", "ولی", "زکات",
"نماز", "صبر", "عبادت", "ولایت", "خلافت","پیامبر"
]
logger.info(f"Initializing Persian Vector Analyzer with model: {model_name}")
def _load_persian_stop_words(self) -> Set[str]:
"""
Load Persian stop words.
Returns:
Set of Persian stop words
"""
# Common Persian stop words
stop_words = {
'و', 'در', 'به', 'از', 'که', 'این', 'آن', 'با', 'برای', 'تا',
'را', 'هم', 'یا', 'اما', 'اگر', 'چون', 'چرا', 'چگونه', 'کجا',
'چه', 'کی', 'چند', 'چقدر', 'همه', 'هیچ', 'بعضی', 'هر', 'همه',
'خود', 'خویش', 'ما', 'شما', 'آنها', 'ایشان', 'اینها', 'آنها',
'من', 'تو', 'او', 'ما', 'شما', 'آنها', 'ایشان', 'اینها',
'است', 'هست', 'بود', 'شد', 'می', 'باید', 'خواهد', 'دارد',
'کرد', 'شد', 'بود', 'هست', 'است', 'می‌شود', 'می‌کند',
'یک', 'دو', 'سه', 'چهار', 'پنج', 'شش', 'هفت', 'هشت', 'نه', 'ده',
'اول', 'دوم', 'سوم', 'چهارم', 'پنجم', 'ششم', 'هفتم', 'هشتم', 'نهم', 'دهم',
'سال', 'ماه', 'روز', 'هفته', 'ساعت', 'دقیقه', 'ثانیه','پس'
'بله', 'نه', 'آری', 'خیر', 'بلی', 'نخیر',
'حالا', 'الان', 'امروز', 'دیروز', 'فردا', 'هفته', 'ماه', 'سال',
'بالا', 'پایین', 'چپ', 'راست', 'جلو', 'عقب', 'داخل', 'خارج',
'بزرگ', 'کوچک', 'بلند', 'کوتاه', 'پهن', 'باریک', 'ضخیم', 'نازک',
}
return stop_words
def load_model(self):
"""
Load the sentence transformer model.
"""
try:
logger.info("Loading sentence transformer model...")
self.model = SentenceTransformer(self.model_name)
logger.info("Model loaded successfully!")
except Exception as e:
logger.error(f"Error loading model: {e}")
raise
def split_sentence(self, sentence:str):
sentences = []
sentence_len = len(self.tokenize_sentence(sentence))
if sentence_len < 512:
sentences.append(sentence)
else:
temp_sentences = str(sentence).split('.')
for sent in temp_sentences:
sent_len = len(self.tokenize_sentence(sent))
if sent_len > 512:
temp_sentences_2 = str(sent).split('،')
for snt in temp_sentences_2:
sentences.append(snt)
else:
sentences.append(sent)
return sentences
def load_json_data(self, file_path: str) -> List[str]:
"""
Load Persian sentences from JSON file.
Args:
file_path: Path to the JSON file
Returns:
List of Persian sentences
"""
try:
logger.info(f"Loading data from {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# convert dict{dict} to list[dict]
if type(data) == dict:
temp_data = []
for item in data.items():
temp_data.append(item[1])
data = temp_data
sentences = []
if isinstance(data, list):
for index, item in enumerate(data):
print(f'split sentence {index}')
if isinstance(item, dict):
if item['content'] == '':
continue
sentences.append([item['id'],item['content'].strip()])
# for key in ['content']:
# if key in item and item[key]:
# # splited_sentences = self.split_sentence(item[key])
# # splited_sentences = item[key]
# sentences.append(item[key])
# # for sent in splited_sentences:
# # sentences.append(sent)
# else:
# print('fault '+item['sentence-number'])
elif isinstance(item, str):
# splited_sentences = self.split_sentence(item[key])
sentences.append(item)
# for sent in splited_sentences:
# sentences.append(sent)
elif isinstance(data, dict):
# If it's a single object, extract all string values
for value in data.values():
if isinstance(value, str):
sentences.append(value)
# splited_sentences = str(value).split('.')
# for sent in splited_sentences:
# sentences.append(sent)
sentences = [senten for senten in sentences if senten]
logger.info(f"Loaded {len(sentences)} sentences")
return sentences
except Exception as e:
logger.error(f"Error loading JSON data: {e}")
raise
def preprocess_text(self, text: str) -> str:
"""
Preprocess Persian text.
Args:
text: Raw Persian text
Returns:
Preprocessed text
"""
# Normalize text
#text = self.normalizer.normalize(text)
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep Persian characters
text = re.sub(r'[^\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\s]', '', text)
return text.strip()
def tokenize_sentence(self, sentence:str):
try:
tokenizer = AutoTokenizer.from_pretrained(self.model_name)
# print(self.model_name)
tokens = tokenizer.tokenize(sentence)
return tokens
except:
error = "An exception occurred in tokenizer : " + self.model_name
#file.write( error + '\n' )
return []
def extract_words(self, sentences: List[str]) -> List[str]:
"""
Extract all words from sentences.
Args:
sentences: List of Persian sentences
Returns:
List of all words
"""
all_words = []
for sentence in sentences:
# Preprocess sentence
processed_sentence = self.preprocess_text(sentence)
# Tokenize
words = word_tokenize(processed_sentence)
# words = processed_sentence.split()
# Filter out empty strings and very short words
words = [word for word in words if len(word) > 1]
all_words.extend(words)
logger.info(f"Extracted {len(all_words)} words from {len(sentences)} sentences")
return all_words
def remove_stop_words(self, words: List[str]) -> List[str]:
"""
Remove stop words from the word list.
Args:
words: List of words
Returns:
List of words without stop words
"""
filtered_words = [word for word in words if word not in self.stop_words]
logger.info(f"Removed {len(words) - len(filtered_words)} stop words")
return filtered_words
def get_unique_words(self, words: List[str]) -> List[str]:
"""
Get unique words from the list.
Args:
words: List of words
Returns:
List of unique words
"""
unique_words = list(set(words))
logger.info(f"Found {len(unique_words)} unique words from {len(words)} total words")
return unique_words
def compute_word_vectors(self, sentences: List[str]) -> Dict[str, List[float]]:
"""
Compute vector representations for words.
Args:
sentences: List of unique sentences
Returns:
Dictionary mapping sentences to their vector representations
"""
if self.model is None:
self.load_model()
logger.info(f"Computing vectors for {len(sentences)} sections ...")
# print(sentences[0])
# create list of just sentences
just_sentences = [sent['content'] for sent in sentences]
# Compute embeddings
embeddings = self.model.encode(just_sentences, show_progress_bar=True)
# Create dictionary
sentences_vectors = {}
for i, sent in enumerate(sentences):
sentences_vectors[f'sentence-{sentences[i]["id"]}'] = {
'id': sentences[i]['id'],
'fullpath': sentences[i]['fullpath'],
'qanon-title': sentences[i]['qanon-title'],
'section-prefix': sentences[i]['section-prefix'],
'content': sentences[i]['content'],
'embeddings': embeddings[i].tolist()
}
print(f'section {i} embedded!')
logger.info("section vectors computed successfully!")
return sentences_vectors
def find_closest_words(self, word_vectors: Dict[str, List[float]],
key_words: List[str], top_k: int = 20) -> Dict[str, List[str]]:
"""
Find the closest words to each key word.
Args:
word_vectors: Dictionary of word vectors
key_words: List of key words to find neighbors for
top_k: Number of closest words to find
Returns:
Dictionary mapping key words to their closest neighbors
"""
logger.info(f"Finding {top_k} closest words for {len(key_words)} key words...")
# Convert to numpy arrays for faster computation
words = list(word_vectors.keys())
vectors = np.array(list(word_vectors.values()))
closest_words = {}
for key_word in key_words:
if key_word in word_vectors:
# Get the key word vector
key_vector = np.array(word_vectors[key_word]).reshape(1, -1)
# Compute cosine similarities
similarities = cosine_similarity(key_vector, vectors)[0]
# Get indices of top k similar words (excluding the key word itself)
word_indices = np.argsort(similarities)[::-1]
# Filter out the key word itself and get top k
closest_indices = []
for idx in word_indices:
if words[idx] != key_word and len(closest_indices) < top_k:
closest_indices.append(idx)
# Get the closest words
closest_words[key_word] = [words[idx] for idx in closest_indices]
logger.info(f"Found {len(closest_words[key_word])} closest words for '{key_word}'")
else:
logger.warning(f"Key word '{key_word}' not found in word vectors")
closest_words[key_word] = []
return closest_words
def reduce_to_3d(self, word_vectors: Dict[str, List[float]],
method: str = 'tsne') -> Dict[str, List[float]]:
"""
Reduce word vectors to 3D coordinates.
Args:
word_vectors: Dictionary of word vectors
method: Dimensionality reduction method ('pca' or 'tsne')
Returns:
Dictionary mapping words to their 3D coordinates
"""
logger.info(f"Reducing dimensions to 3D using {method.upper()}...")
words = list(word_vectors.keys())
vectors = np.array(list(word_vectors.values()))
if method.lower() == 'pca':
reducer = PCA(n_components=3, random_state=42)
elif method.lower() == 'tsne':
reducer = TSNE(n_components=3, random_state=42, perplexity=min(30, len(vectors)-1))
else:
raise ValueError("Method must be 'pca' or 'tsne'")
# Reduce dimensions
reduced_vectors = reducer.fit_transform(vectors)
# Create dictionary
word_vectors_3d = {}
for i, word in enumerate(words):
word_vectors_3d[word] = reduced_vectors[i].tolist()
logger.info("Dimensionality reduction completed!")
return word_vectors_3d
def save_json(self, data: dict, file_path: str):
"""
Save data to JSON file.
Args:
data: Data to save
file_path: Output file path
"""
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"Data saved to {file_path}")
except Exception as e:
logger.error(f"Error saving to {file_path}: {e}")
raise
# def create_3d_visualization(self, word_vectors_3d: Dict[str, List[float]],
# selected_words: Dict[str, List[str]],
# output_path: str = "persian_words_3d.html"):
# """
# Create 3D visualization of words.
# Args:
# word_vectors_3d: Dictionary of 3D word coordinates
# selected_words: Dictionary of selected words for each key word
# output_path: Output file path for the visualization
# """
# logger.info("Creating 3D visualization...")
# # Prepare data for plotting
# words = list(word_vectors_3d.keys())
# coords = np.array(list(word_vectors_3d.values()))
# # Create color mapping for key words and their neighbors
# colors = []
# sizes = []
# hover_texts = []
# for word in words:
# # Check if word is a key word
# is_key_word = word in self.key_words
# # Check if word is in selected words
# in_selected = False
# key_word_group = None
# for key_word, selected_list in selected_words.items():
# if word in selected_list:
# in_selected = True
# key_word_group = key_word
# break
# if is_key_word:
# colors.append('red')
# sizes.append(15)
# hover_texts.append(f"کلیدواژه: {word}")
# elif in_selected:
# colors.append('blue')
# sizes.append(10)
# hover_texts.append(f"کلمه مرتبط با '{key_word_group}': {word}")
# else:
# colors.append('lightgray')
# sizes.append(5)
# hover_texts.append(f"کلمه: {word}")
# # Create 3D scatter plot
# fig = go.Figure()
# # Add scatter plot
# fig.add_trace(go.Scatter3d(
# x=coords[:, 0],
# y=coords[:, 1],
# z=coords[:, 2],
# mode='markers+text',
# marker=dict(
# size=sizes,
# color=colors,
# opacity=0.8
# ),
# text=words,
# textposition="middle center",
# hovertext=hover_texts,
# hoverinfo='text'
# ))
# # Update layout
# fig.update_layout(
# title={
# 'text': 'نمایش سه‌بعدی کلمات فارسی',
# 'x': 0.5,
# 'xanchor': 'center',
# 'font': {'size': 20}
# },
# scene=dict(
# xaxis_title='محور X',
# yaxis_title='محور Y',
# zaxis_title='محور Z',
# camera=dict(
# eye=dict(x=1.5, y=1.5, z=1.5)
# )
# ),
# width=1000,
# height=800,
# showlegend=False
# )
# # Save the plot
# fig.write_html(output_path)
# logger.info(f"3D visualization saved to {output_path}")
# return fig
def process_pipeline(self, input_file: str, output_dir: str = "output"):
"""
Run the complete processing pipeline.
Args:
input_file(str): Path to input JSON file
output_dir(str): Output directory for results
"""
# Create output directory
Path(output_dir).mkdir(exist_ok=True)
logger.info("Starting Persian Vector Analysis Pipeline...")
# Step 1: Load data
# sentences = self.load_json_data(input_file)
sentences = ALL_SECTIONS
# for s in sentences:
# s_len = len(self.tokenize_sentence(s))
# if s_len > 512:
# print(f'long: {s}')
# Step 2: Extract words
# all_words = self.extract_words(sentences)
# Step 3: Remove stop words
# filtered_words = self.remove_stop_words(all_words)
# filtered_words = all_words
# Step 4: Get unique words
# unique_words = self.get_unique_words(filtered_words)
# Step 5: Compute word vectors
sentences_vectors = self.compute_word_vectors(sentences)
# Step 6: Save word vectors
self.save_json(sentences_vectors, f"{output_dir}/sections-vec-285k.json")
# Step 7: Find closest words to key words
# selected_words = self.find_closest_words(word_vectors, self.key_words)
# Step 8: Save selected words
# self.save_json(selected_words, f"{output_dir}/selected_words.json")
# Step 9: Reduce to 3D
# word_vectors_3d = self.reduce_to_3d(word_vectors, method='tsne')
# Step 10: Save 3D vectors
# self.save_json(word_vectors_3d, f"{output_dir}/words_vector_3d.json")
# Step 11: Create visualization
# self.create_3d_visualization(word_vectors_3d, selected_words,
# f"{output_dir}/persian_words_3d.html")
logger.info("Pipeline completed successfully!")
# Print summary
print("\n" + "="*50)
print("PIPELINE SUMMARY")
print("="*50)
print(f"Input sentences: {len(sentences)}")
# print(f"Total words extracted: {len(all_words)}")
# print(f"Unique words after preprocessing: {len(unique_words)}")
# print(f"Word vectors computed: {len(word_vectors)}")
# print(f"Key words processed: {len(self.key_words)}")
print(f"Output files saved to: {output_dir}/")
print("="*50)
def full_path_text_maker(full_path):
"""
این متد مسیر یک سکشن را می گیرد و متنی را بر اساس ترتیب بخش های آن از جزء به کل بازسازی می کند و بر می گرداند
Args:
full_path(list): لیستی از عناصر مشخص کننده مسیر درختی این سکشن
Returns:
full_path_text(str): متن بازسازی شده از مسیر یک سکشن
"""
full_path_text = ""
for i, path_item in enumerate(reversed(full_path)):
if i == len(full_path) - 1:
full_path_text += ''.join(f'{path_item}')
break
full_path_text += ''.join(f'{path_item} از ')
full_path_text = full_path_text.strip()
return full_path_text
def main():
"""
Main function to run the Persian Vector Analysis.
"""
# Initialize analyzer
analyzer = PersianVectorAnalyzer()
# Define input and output paths
# input_file = "./output-speechs/nahj_speechs_sentences.json"
# output_dir = "output-speechs"
# input_file = "./majles/data/sections.json"
input_file = ""
output_dir = "majles-output"
# Run the complete pipeline
analyzer.process_pipeline(input_file, output_dir)
if __name__ == "__main__":
eh_obj = ElasticHelper()
path = "/home/gpu/data_11/14040611/mj_qa_section.zip"
sections_elastic = eh_obj.iterateJsonFile(path, True)
all_count = 0
dont_cares = []
ALL_SECTIONS = []
for index, item in enumerate(sections_elastic):
all_count +=1
source = item['source']
section_path = source['other_info']['full_path']
id = item['id']
filtered_keys = ['فصل','موخره','امضاء','عنوان']
section_path = source['other_info']['full_path']
flag = False
if '>' in section_path:
path_parts = section_path.split('>')
for key in filtered_keys:
if key in path_parts[-1]:
dont_cares.append(id)
flag = True
break
if flag:
continue
else:
for key in filtered_keys:
if key in section_path:
dont_cares.append(id)
flag = True
break
if flag:
continue
qanon_title = source['qanon_title']
full_path_text = full_path_text_maker(section_path.split('>'))
section_prefix = f"محتوای {full_path_text} {cleaning(qanon_title)} عبارت است از: "
try:
content = cleaning(item['source']['content'])
# کنار گذاشتن سکشن های خیلی کوچک که عملا محتوا ندارند
if len(content.split()) <= 10:
continue
except Exception as error:
print(error)
continue
data = {
'id': id,
'fullpath': section_path,
'qanon-title': qanon_title,
'section-prefix': section_prefix,
'content': content
}
ALL_SECTIONS.append(data)
print(f'all_count: {all_count}')
print(f'dont_cares: {len(dont_cares)}')
print(f'ALL_SECTIONS without dont-cares: {len(ALL_SECTIONS)}')
main()
"""
:: *** نکته مهم *** ::
NOTE !!! after this process run convert_qavanin_json_to_faiss.py due to create faiss index which is used in RAG process
"""

File diff suppressed because one or more lines are too long

0
llm-answer/chat-objs.txt Normal file
View File

1
llm-answer/credit.txt Normal file
View File

@ -0,0 +1 @@
194527.67

View File

@ -0,0 +1,10 @@
query: برای حمایت از ازدواج جوانان و تشکیل خانواده جهت افزایش جمعیت ، چه مواردی پیش بینی شده است؟
error:Error code: 400 - {'error': {'message': 'Developer instruction is not enabled for this model. Please use a different model that supports developer instructions. Please contact support at support@avalai.ir and include the request ID 01998c28-4ccb-7bc3-97a7-0403baa6ed35 in your email if you believe this is an error.', 'type': 'invalid_request', 'param': None, 'code': 'invalid_argument', 'request_id': '01998c28-4ccb-7bc3-97a7-0403baa6ed35'}}
-------------------------------
query: چرا خانواده مهم هست
error:Error code: 429 - {'error': {'message': 'Rate limit reached for requests. Check your quota at https://ava.al/limits or contact support at support@avalai.ir and include the request ID 01998fa2-1f98-7060-be1f-7488dd2fcb18 in your email if you believe this is an error.', 'type': 'rate_limit_exceeded', 'param': None, 'code': 'rate_limit_exceeded', 'solution': 'Pace your requests or upgrade to higher levels. Read the Rate limit guide at https://chat.avalai.ir/platform/limits or contact support at support@avalai.ir and include the request ID 01998fa2-1f98-7060-be1f-7488dd2fcb18 in your email if you believe this is an error.', 'request_id': '01998fa2-1f98-7060-be1f-7488dd2fcb18'}}
-------------------------------

0
llm-answer/passages.txt Normal file
View File

0
llm-answer/result.txt Normal file
View File

76
normalizer.py Normal file
View File

@ -0,0 +1,76 @@
#import hazm
from cleantext import clean
import re
def cleanhtml(raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
#normalizer = hazm.Normalizer()
wierd_pattern = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
u"\U0001f926-\U0001f937"
u'\U00010000-\U0010ffff'
u"\u200d"
u"\u2640-\u2642"
u"\u2600-\u2B55"
u"\u23cf"
u"\u23e9"
u"\u231a"
u"\u3030"
u"\ufe0f"
u"\u2069"
u"\u2066"
# u"\u200c"
u"\u2068"
u"\u2067"
"]+", flags=re.UNICODE)
def cleaning(text):
text = text.strip()
# regular cleaning
# text = clean(text,
# fix_unicode=True,
# to_ascii=False,
# lower=True,
# no_line_breaks=True,
# no_urls=True,
# no_emails=True,
# no_phone_numbers=True,
# no_numbers=False,
# no_digits=False,
# no_currency_symbols=True,
# no_punct=False,
# replace_with_url="",
# replace_with_email="",
# replace_with_phone_number="",
# replace_with_number="",
# replace_with_digit="0",
# replace_with_currency_symbol="",
# )
text = clean(text,
extra_spaces = True,
lowercase = True
)
# cleaning htmls
text = cleanhtml(text)
# normalizing
#text = normalizer.normalize(text)
# removing wierd patterns
text = wierd_pattern.sub(r'', text)
# removing extra spaces, hashtags
text = re.sub("#", "", text)
text = re.sub("\s+", " ", text)
return text

15
requirements.txt Normal file
View File

@ -0,0 +1,15 @@
cleantext==1.1.4
elasticsearch7==7.17.12
faiss_cpu==1.9.0
fastapi==0.117.1
hazm==0.10.0
langchain_openai==0.3.33
numpy==1.21.5
openai==1.108.1
pandas==2.3.2
pydantic==2.11.9
scikit_learn==1.7.2
sentence_transformers==2.5.1
torch==2.4.0
torch==2.1.2
transformers==4.55.1

1
run_docker.bash Normal file
View File

@ -0,0 +1 @@
sudo docker run --name qachat -p 80:80 -v /home/sabr/rag_qavanin_api/:/src/app/ -v /home/sabr/rag_qavanin_api/qavanin-faiss/:/src/app/qavanin-faiss/ -v /home/sabr/rag_qavanin_api/llm-answer/:/src/app/llm-answer/ -v /home/sabr/MODELS:/src/MODELS -it --restart unless-stopped qachat:1.0.0