first-step

This commit is contained in:
init_mahdi 2025-11-30 09:49:41 +00:00
commit a8fca065f3
21 changed files with 2831 additions and 0 deletions

3
.dockerignore Executable file
View File

@ -0,0 +1,3 @@
./qavanin-faiss
./llm-answer
./data

0
__init__.py Executable file
View File

1
config.env Executable file
View File

@ -0,0 +1 @@
LLM_URL="http://172.16.29.102:8001/v1/"

10
dockerfile Executable file
View File

@ -0,0 +1,10 @@
FROM docker.tavasi.ir/tavasi/qachat_base:1.0.0
WORKDIR /src/app
COPY . /src/app
EXPOSE 80
CMD [ "uvicorn","main:app","--reload","--port","80","--host=0.0.0.0"]

5
dockerfile_base Executable file
View File

@ -0,0 +1,5 @@
FROM docker.tavasi.ir/tavasi/qachat_base:1.0.0
RUN pip install uvicorn[standard]
RUN pip install FlagEmbedding
RUN pip install aiofiles
RUN pip install openai

58
main.py Executable file
View File

@ -0,0 +1,58 @@
from fastapi import FastAPI
from routers.rag_base import router as rag_base
from contextlib import asynccontextmanager
from fastapi.middleware.cors import CORSMiddleware
# --- Lifespan manager ---
@asynccontextmanager
async def lifespan(app: FastAPI):
# 🚀 Startup
print("🚀 Starting up RAG system...")
# ایجاد OSS client و ذخیره در app.state
# --- نکته مهم: اگر elastic_client هم می‌خوای توی startup درست کنی، اینجا اضافه کن ---
# elastic_client = get_elastic_client()
# app.state.elastic_client = elastic_client
yield # برنامه در این حالت اجرا می‌شود
# 🛑 Shutdown
print("🛑 Shutting down RAG system...")
# بستن اتصال‌های باز
client = getattr(app.state, "elastic_client", None)
if client is not None:
await client.close()
# --- ساخت اپلیکیشن ---
def create_app() -> FastAPI:
app = FastAPI(
title="qachat2 Backend",
version="0.1.0",
lifespan=lifespan, # ✅ اینجا lifespan رو متصل می‌کنیم
)
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def simple():
return "ai rag chat qanon OK"
@app.get("/ping")
async def ping():
return "ai rag chat qanon OK"
app.include_router(rag_base, prefix="")
return app
# ✅ نمونه‌سازی نهایی
app = create_app()

16
new_requirements.txt Executable file
View File

@ -0,0 +1,16 @@
cleantext
elasticsearch7
faiss_cpu
fastapi
hazm
langchain_openai
numpy
openai
pandas
pydantic
scikit_learn
sentence_transformers
torch
transformers
orjson
FlagEmbedding==1.3.5

0
routers/__init__.py Executable file
View File

423
routers/ai_data_parser.py Executable file
View File

@ -0,0 +1,423 @@
from typing import List
from pathlib import Path
import os, orjson, time, json, re, asyncio, traceback
from openai import AsyncOpenAI
# ------------------------------ پردازش API ------------------------------
class AsyncCore:
def __init__(
self,
model_name,
task_name,
output_schema,
api_url,
data_path=None,
reasoning_effort="low",
top_p=1,
temperature=0.0,
max_token=128000,
output_path=None,
ai_code_version=None,
request_timeout=30, # ثانیه
api_key="EMPTY",
save_number=2,
semaphore_number=5,
):
self.save_number = save_number
# json file of data
self.data_path = data_path
self.semaphore_number = semaphore_number
self.task_name = task_name
if output_path is None:
output_path = f"./{task_name}"
self.output_path = Path(output_path)
self._temp_path = self.output_path / "batch_data"
self._temp_processed_id_path = self._temp_path / "processed_id.json"
# Create output directory and subdirectories if they don't exist
self.output_path.mkdir(parents=True, exist_ok=True)
self._temp_path.mkdir(parents=True, exist_ok=True)
# self._temp_processed_id_path.mkdir(parents=True, exist_ok=True)
self.request_timeout = request_timeout
self.model_name = model_name
self.api_key = api_key
self.output_schema = output_schema
self.api_url = api_url
self.reasoning_effort = reasoning_effort
self.top_p = top_p
self.temperature = temperature
self.max_token = max_token
if ai_code_version is None:
ai_code_version = f"{model_name}_{reasoning_effort}"
self.ai_code_version = ai_code_version
self.PRIMARY_KEY = {"system_prompt", "user_prompt", "id"}
if data_path != None:
try:
self.data = self.__data_process()
print(f"📦 Loaded {len(self.data)} words")
except Exception as e:
raise ValueError(
f"Data loading/validation failed: {e}\n{traceback.format_exc()}"
)
def __validate_item(self, item, idx):
# Mandatory fields
for key in self.PRIMARY_KEY:
if key not in item:
raise ValueError(f"Missing mandatory key '{key}' in item #{idx}")
if not isinstance(item[key], str):
raise TypeError(
f"Item #{idx}: '{key}' must be a string, got {type(item[key]).__name__}"
)
# Optional field: assistant_prompt
if "assistant_prompt" not in item or item["assistant_prompt"] is None:
item["assistant_prompt"] = None
else:
if not isinstance(item["assistant_prompt"], str):
raise TypeError(
f"Item #{idx}: 'assistant_prompt' must be a string or absent, got {type(item['assistant_prompt']).__name__}"
)
return item # now normalized
def __data_process(self):
raw_data = self.__load_orjson(self.data_path)
if not isinstance(raw_data, list):
raise ValueError("Data must be a list of dictionaries.")
processed_data = []
for idx, item in enumerate(raw_data):
if not isinstance(item, dict):
raise ValueError(f"Item #{idx} is not a dictionary.")
validated_item = self.__validate_item(item, idx)
processed_data.append(validated_item)
return processed_data
def __get_max_number_file(self, directory):
# Pattern to match filenames like out_1.json, out_25.json, etc.
pattern = re.compile(r"output_(\d+)\.json$")
max_num = 0
for filename in os.listdir(directory):
match = pattern.match(filename)
if match:
num = int(match.group(1))
if num > max_num:
max_num = num
return max_num + 1
def __load_orjson(self, path: str | Path):
path = Path(path)
with path.open("rb") as f: # باید باینری باز بشه برای orjson
return orjson.loads(f.read())
def __save_orjson(self, path, data):
with open(path, "wb") as f:
f.write(
orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS)
)
def merge_json_dir(self, input_path, output_path):
directory = Path(input_path)
if not directory.is_dir():
raise ValueError(f"Not valid PATH: {input_path}")
seen_ids = set() # برای ردیابی idهای دیده‌شده (سریع!)
unique_data = [] # فقط داده‌های یکتا
failed_files = []
json_files = list(directory.glob("*.json"))
if not json_files:
print("⚠️ NO JSON File Found In This PATH")
return
for json_file in json_files:
try:
data = self.__load_orjson(json_file)
if not data: # خالی یا None
failed_files.append(json_file.name)
continue
if isinstance(data, list) and isinstance(data[0], dict):
for item in data:
item_id = item.get("id")
if item_id is None:
# اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی
# اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند
continue
if item_id not in seen_ids:
seen_ids.add(item_id)
unique_data.append(item)
else:
raise ValueError(f"no list available in this json -> {json_file}")
except (
json.JSONDecodeError,
ValueError,
OSError,
KeyError,
TypeError,
) as e:
# print(f"❌ Failed in process '{json_file.name}': {e}")
failed_files.append(json_file.name)
# گزارش خطاها
if failed_files:
print("\n❌ We lose this file:")
for name in failed_files:
print(f" - {name}")
else:
print("\n✅ All JSON added")
# ذخیره خروجی
try:
self.__save_orjson(data=unique_data, path=output_path)
print(
f"\n💾 Final file saved: {output_path} (Total unique items: {len(unique_data)})"
)
except Exception as e:
print(f"❌ Error in saving final file: {e}")
def make_new_proccessed_ids_from_file(self, json_in, out_path):
data = self.__load_orjson(json_in)
finall_data = []
for d in data:
if d["id"]:
finall_data.append(d["id"])
finall_data = set(finall_data)
finall_data = list(finall_data)
print(f"-- len ids {len(finall_data)}")
self.__save_orjson(data=finall_data, path=out_path)
# ------------------------------ Main ------------------------------
async def __process_item(self, client, item):
try:
messages = [
{"role": "user", "content": item["user_prompt"]},
]
if item.get("system_prompt"):
messages.append(
{"role": "system", "content": item["system_prompt"]}
)
if item.get("assistant_prompt"):
messages.append(
{"role": "assistant", "content": item["assistant_prompt"]}
)
response = await client.chat.completions.parse(
model=self.model_name,
messages=messages,
temperature=self.temperature,
top_p=self.top_p,
reasoning_effort=self.reasoning_effort,
max_tokens=self.max_token,
stop=None,
response_format=self.output_schema,
)
parsed = (
response.choices[0].message.parsed
if response and response.choices and response.choices[0].message.parsed
else {"raw_text": str(response)}
)
parsed = self.output_schema.model_validate(parsed)
parsed = parsed.model_dump()
parsed = dict(parsed)
parsed["ai_code_version"] = self.ai_code_version
parsed["id"] = item["id"]
# parsed["item"] = item
return parsed, 200
except asyncio.TimeoutError:
print(f"⏳ Timeout on item {item['id']}")
return None, 408
except Exception as e:
print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}")
return None, 400
def async_eval(self, processed_id: List = []):
try:
asyncio.run(self.__async_eval(processed_id))
except KeyboardInterrupt:
print("\n🛑 Interrupted by user.")
traceback.print_exc()
async def __async_eval(self, processed_id: List):
"""
اجرای اصلی تکهستهای و async برای تولید خروجی نهایی.
"""
print("🔹 Starting async data processing...")
# ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------
if not processed_id:
try:
processed_id = self.__load_orjson(self._temp_processed_id_path)
print(
f"📂 Loaded existing processed_id from {self._temp_processed_id_path}"
)
except Exception:
print("⚠️ No valid processed_id found. Starting fresh.")
processed_id = []
# ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------
all_processed_id = set(processed_id)
all_results = []
total_time = []
data = [item for item in self.data if item.get("id") not in all_processed_id]
print(
f" Total items: {len(self.data)} - {len(all_processed_id)} = {len(data)}"
)
# اگر چیزی برای پردازش نیست
if not data:
print("✅ Nothing new to process. All items are already done.")
return
# ------------------ مرحله ۳: شروع پردازش ------------------
print(f"🤖 Model: {self.model_name} | Reasoning: {self.reasoning_effort}")
async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client:
semaphore = asyncio.Semaphore(5)
async def limited_process(item):
async with semaphore:
return await self.__process_item(client, item)
tasks = [asyncio.create_task(limited_process(item)) for item in data]
total_i = 0
# ✅ پردازش به ترتیب تکمیل (نه ترتیب لیست)
for i, task in enumerate(asyncio.as_completed(tasks), start=1):
start = time.time()
try:
parsed, status_code = await asyncio.wait_for(
task, timeout=self.request_timeout
) # ⏱ حداکثر 2 دقیقه
except asyncio.TimeoutError:
print(f"⏳ Task {i} timed out completely")
parsed, status_code = None, 408
total_time.append(time.time() - start)
if status_code == 200:
all_results.append(parsed)
all_processed_id.add(parsed.get("id"))
else:
print(f"⚠️ Skipped item (status={status_code})")
total_i += 1
# ✅ ذخیره‌ی موقت هر n مورد
if total_i >= self.save_number:
print(f"total_i {total_i}")
print(f"self.save_number {self.save_number}")
total_i = 0
self.__save_orjson(
data=list(all_processed_id),
path=self._temp_processed_id_path,
)
print(f"💾 Auto-saved processed ids: {len(all_processed_id)}")
number = self.__get_max_number_file(self._temp_path)
print(f"number {number}")
temp_output_path = self._temp_path / f"output_{number}.json"
self.__save_orjson(data=list(all_results), path=temp_output_path)
print(f"💾 Auto-saved partial data: {len(all_results)}")
all_results.clear()
# ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده
if total_i > 0 or len(all_results) > 0:
print("💾 Final save of remaining data...")
self.__save_orjson(
data=list(all_processed_id),
path=self._temp_processed_id_path,
)
print(f"💾 Auto-saved processed ids: {len(all_processed_id)}")
number = self.__get_max_number_file(self._temp_path)
print(f"number {number}")
temp_output_path = self._temp_path / f"output_{number}.json"
self.__save_orjson(data=list(all_results), path=temp_output_path)
print(f"💾 Auto-saved partial data: {len(all_results)}")
all_results.clear()
# ------------------ مرحله ۴: ذخیره خروجی ------------------
final_data_path = self.output_path / f"final_data_{self.task_name}.json"
processed_id_path = self.output_path / "processed_id.json"
self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path)
all_results = self.__load_orjson(final_data_path)
# make_new_proccessed_ids_from_file()
self.__save_orjson(data=list(all_processed_id), path=processed_id_path)
self.__save_orjson(data=all_results, path=final_data_path)
avg_time = (sum(total_time) / len(total_time)) if total_time else 0
print(
f"\n✅ Processing completed!\n"
f"📊 Total-Data: {len(data)} | "
f"⭕ Ignored-Data: {len(processed_id)} | "
f"📦 Proccessed-Data: {len(all_results)} | "
f"❌ Loss-Data: {len(data)-len(all_results)} | "
f"🕒 Avg Time: {avg_time:.2f}'s per item | "
f"🕒 Total Time: {sum(total_time):.4f}'s | "
f"💾 Results saved to: {final_data_path}"
)
async def single_simple_async_proccess_item(self, item):
async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client:
semaphore = asyncio.Semaphore(5)
async with semaphore:
try:
messages = [
{"role": "user", "content": item["user_prompt"]},
]
if item.get("system_prompt"):
messages.append(
{"role": "system", "content": item["system_prompt"]}
)
if item.get("assistant_prompt"):
messages.append(
{"role": "assistant", "content": item["assistant_prompt"]}
)
response = await client.chat.completions.parse(
model=self.model_name,
messages=messages,
temperature=self.temperature,
top_p=self.top_p,
reasoning_effort=self.reasoning_effort,
max_tokens=self.max_token,
stop=None,
response_format=self.output_schema,
)
parsed = (
response.choices[0].message.parsed
if response and response.choices and response.choices[0].message.parsed
else {"raw_text": str(response)}
)
parsed = self.output_schema.model_validate(parsed)
parsed = parsed.model_dump()
parsed = dict(parsed)
parsed["ai_code_version"] = self.ai_code_version
return parsed, 200
except asyncio.TimeoutError:
print(f"⏳ Timeout on item {item}")
return None, 408
except Exception as e:
print(f"⚠️ Error __process_item {item}: {traceback.print_exc()}")
return None, 400

28
routers/base_model.py Executable file
View File

@ -0,0 +1,28 @@
from pydantic import BaseModel
from typing import List
class Title(BaseModel):
title: str
class Query(BaseModel):
query: str
class ChatObject(BaseModel):
title: str
user_query: str
model_key: str
retrived_passage: str
retrived_ref_ids: str
model_answer: str
status:str='success'
prompt_type: str= "question-answer"
class LLMOutput(BaseModel):
text : str
source : List[str]
class LLMInput(BaseModel):
query : str
knowledge : List[dict]

355
routers/chatbot_handler.py Executable file
View File

@ -0,0 +1,355 @@
import numpy as np
import torch, orjson, faiss, re
from typing import List
from sentence_transformers import SentenceTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from FlagEmbedding import FlagReranker
from pathlib import Path
# nlist = 2048
# quantizer = faiss.IndexFlatIP(dim)
# index = faiss.IndexIVFFlat(quantizer, dim, nlist)
# index.train(embeddings)
# index.add(embeddings)
class InitHybridRetrieverReranker:
def __init__(
self,
embeder_path,
reranker_path,
dict_content: List[dict],
faiss_index,
dense_alpha: float = 0.6,
device: str = None,
cache_dir="/src/MODELS",
batch_size=512,
):
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = device
self.dense_alpha = dense_alpha
# ===============================
# تبدیل ورودی فقط یک بار
# ===============================
self.content_list = [x["content"] for x in dict_content]
self.ids_list = [x["id"] for x in dict_content]
self.N = len(self.content_list)
self.faiss_index = faiss_index
# Dense embedder
self.embedder = SentenceTransformer(
local_files_only=True,
model_name_or_path=embeder_path,
cache_folder=cache_dir,
device=self.device,
similarity_fn_name="cosine",
)
# TF-IDF
self.vectorizer = TfidfVectorizer(
analyzer="word",
ngram_range=(1, 2),
token_pattern=r"(?u)\b[\w\u0600-\u06FF]{2,}\b",
)
self.tfidf_matrix = self.vectorizer.fit_transform(self.content_list)
# Reranker
self.reranker = FlagReranker(
model_name_or_path=reranker_path,
local_files_only=True,
use_fp16=True,
devices=device,
cache_dir=cache_dir,
batch_size=batch_size,
normalize=True,
# max_length=1024,
# trust_remote_code=False,
# query_max_length=
)
print("RAG Ready — Retriever + Reranker Loaded")
# ================================
# Dense Search (FAISS)
# ================================
async def dense_retrieve(self, query: str, top_k: int):
if top_k <= 0:
return [], np.array([])
emb = self.embedder.encode(query, convert_to_numpy=True).astype(np.float32)
D, I = self.faiss_index.search(emb.reshape(1, -1), top_k)
return I[0], D[0]
# ================================
# Sparse Search (TF-IDF)
# ================================
async def sparse_retrieve(self, query: str, top_k: int):
if top_k <= 0:
return [], np.array([])
q_vec = self.vectorizer.transform([query])
sims = cosine_similarity(q_vec, self.tfidf_matrix)[0]
k = min(top_k, len(sims))
idx = np.argpartition(-sims, k - 1)[:k]
idx = idx[np.argsort(-sims[idx], kind="mergesort")]
return idx, sims[idx]
# ================================
# Reciprocal Rank Fusion
# ================================
async def fuse(self, d_idx, d_scores, s_idx, s_scores, top_k=50, k_rrf=60):
combined = {}
for rank, idx in enumerate(d_idx):
combined[idx] = combined.get(idx, 0) + 1.0 / (k_rrf + rank)
for rank, idx in enumerate(s_idx):
combined[idx] = combined.get(idx, 0) + 1.0 / (k_rrf + rank)
sorted_items = sorted(combined.items(), key=lambda x: x[1], reverse=True)
return [i[0] for i in sorted_items[:top_k]]
# ================================
# Rerank
# ================================
async def rerank(self, query: str, cand_idx: List[int], final_k: int = 10):
if not cand_idx:
return []
passages = [self.content_list[i] for i in cand_idx]
pairs = [[query, p] for p in passages]
scores = self.reranker.compute_score(pairs, normalize=True, max_length=512)
if isinstance(scores, float):
scores = [scores]
idx_score = list(zip(cand_idx, scores))
idx_score.sort(key=lambda x: x[1], reverse=True)
return idx_score[:final_k]
# ================================
# Main Search Function
# ================================
async def search_base(
self,
query: str,
topk_dense=50,
topk_sparse=50,
pre_rerank_k=50,
final_k=10,
):
d_idx, d_scores = await self.dense_retrieve(query, topk_dense)
s_idx, s_scores = await self.sparse_retrieve(query, topk_sparse)
cand_idx = await self.fuse(d_idx, d_scores, s_idx, s_scores, pre_rerank_k)
final_rank = await self.rerank(query, cand_idx, final_k)
# ===============================
# خروجی سریع و تمیز
# ===============================
return [
{
"id": self.ids_list[idx],
"content": self.content_list[idx],
"score": score,
}
for idx, score in final_rank
]
def load_orjson(path: str | Path):
path = Path(path)
with path.open("rb") as f: # باید باینری باز بشه برای orjson
return orjson.loads(f.read())
def save_orjson(path, data):
with open(path, "wb") as f:
f.write(
orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS)
)
WEB_LINK = "https://majles.tavasi.ir/entity/detail/view/qsection/"
# ref = f"[«{i}»](https://majles.tavasi.ir/entity/detail/view/qsection/{idx})"
def get_in_form(title: str, sections: list, max_len: int = 4000):
chunks = []
current = f"برای پرسش: {title}\n\n"
ref_text = "«منبع»"
for i, data in enumerate(sections, start=1):
sec_text = data.get("content", "")
idx = data.get("id")
# ساخت ref کامل
ref = f"[{ref_text}]({WEB_LINK}{idx})"
# متن کامل آیتم
block = f"{i}: {sec_text}\n{ref}\n\n"
# اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید
if len(current) + len(block) > max_len:
chunks.append(current.rstrip())
current = ""
current += block
# آخرین چانک را هم اضافه کن
if current.strip():
chunks.append(current.rstrip())
return chunks
def chunked_simple_text(answer_text, max_len=4000):
chunks = []
current = ""
sentences = answer_text.split(". ")
for sentence in sentences:
st = sentence.strip()
if not st.endswith("."):
st += "."
if len(current) + len(st) > max_len:
chunks.append(current.strip())
current = ""
current += st + " "
if current.strip():
chunks.append(current.strip())
return chunks
def format_answer_bale(answer_text: str, sources: list, max_len: int = 4000):
"""
answer_text: متن خروجی مدل که داخلش عبارتهای مثل (منبع: qs2117427) وجود دارد
sources: مثل ['qs2117427']
"""
ref_text = "«منبع»"
def make_link(src):
return f"[{ref_text}]({WEB_LINK}{src})"
# الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد
# مثلا: (qs123) یا (qs123, qs456, qs789)
pattern = r"\((?:منبع[: ]+)?([a-zA-Z0-9_, ]+)\)"
def replace_source(m):
content = m.group(1)
codes = [c.strip() for c in content.split(",")] # جداسازی چند کد
links = [make_link(code) for code in codes]
full_match = m.group(0)
# if "منبع" in full_match:
# print(f'Found explicit source(s): {links}')
# else:
# print(f'Found implicit source(s): {links}')
return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان
# جایگزینی در متن
answer_text = re.sub(pattern, replace_source, answer_text)
# اگر طول کمتر از max_len بود → تمام
if len(answer_text) <= max_len:
return [answer_text]
# تقسیم متن اگر طول زیاد شد
chunks = []
current = ""
sentences = answer_text.split(". ")
for sentence in sentences:
st = sentence.strip()
if not st.endswith("."):
st += "."
if len(current) + len(st) > max_len:
chunks.append(current.strip())
current = ""
current += st + " "
if current.strip():
chunks.append(current.strip())
return chunks
def get_user_prompt(query: str):
"""
get a query and prepare a prompt to generate title based on that
"""
title_prompt = f"برای متن {query} یک عنوان با معنا که بین 3 تا 6 کلمه داشته باشد، در قالب یک رشته متن ایجاد کن. سبک و لحن عنوان، حقوقی و کاملا رسمی باشد. عنوان تولید شده کاملا ساده و بدون هیچ مارک داون یا علائم افزوده ای باشد. غیر از عنوان، به هیچ وجه توضیح اضافه ای در قبل یا بعد آن اضافه نکن."
return title_prompt
def format_knowledge_block(knowledge):
lines = []
for item in knowledge:
_id = item.get("id", "unknown")
_content = item.get("content", "")
lines.append(f"- ({_id}) { _content }")
return "\n".join(lines)
def get_user_prompt2(obj):
query = obj.query
knowledge = obj.knowledge
prompt = f"""
شما باید تنها بر اساس اطلاعات ارائه شده پاسخ بدهید و هیچ دانشی خارج از آنها استفاده نکنید.
### پرسش:
{query}
### اسناد قابل استناد:
{format_knowledge_block(knowledge)}
### دستور تولید خروجی:
- پاسخی کاملاً دقیق، تحلیلی و مفهومی ایجاد کن
- لحن رسمی و حقوقی باشد
- اگر پاسخ نیاز به ترکیب چند سند دارد، آنها را ادغام کن
- اگر دادهها کافی نبود، این موضوع را شفاف اعلام کن اما اطلاعات مرتبط را همچنان ارائه بده
"""
return prompt
def get_user_prompt3(query, knowledge_json):
sys = f"""Answer the following based ONLY on the knowledge:
Query:
{query}
Knowledge:
{knowledge_json}"""
return sys
def load_faiss_index(index_path: str, metadata_path: str):
"""بارگذاری ایندکس FAISS و متادیتا (لیست جملات + عناوین)."""
index = faiss.read_index(index_path)
metadata = load_orjson(metadata_path)
metadata = [
{
"id": item["id"],
"content": item["content"],
"prefix": item["prefix"],
}
for item in metadata
]
return metadata, index

250
routers/rag_base.py Executable file
View File

@ -0,0 +1,250 @@
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
import time, os, traceback
from .base_model import Query, LLMOutput, LLMInput, Title
from .ai_data_parser import AsyncCore
from .chatbot_handler import (
InitHybridRetrieverReranker,
format_answer_bale,
get_user_prompt2,
get_user_prompt3,
load_faiss_index,
get_in_form,chunked_simple_text
)
from .static import (
EMBED_MODEL_PATH,
FAISS_INDEX_PATH,
FAISS_METADATA_PATH,
LLM_URL,
SYSTEM_PROMPT_FINALL,
RERANKER_MODEL_PATH,
LLM_ERROR,
MODEL_KEY,
MODEL_NAME,
OUTPUT_PATH_LLM,
REASONING_EFFORT,
TASK_NAME,
LLM_TIME_OUT,
MAX_TOKEN,
SYSTEM_PROPMT2,
)
# ################################################## Global-params
router = APIRouter(tags=["ragchat"])
# # settings= get_settings()
METADATA_DICT, FAISS_INDEX = load_faiss_index(
index_path=FAISS_INDEX_PATH, metadata_path=FAISS_METADATA_PATH
)
RAG = InitHybridRetrieverReranker(
embeder_path=EMBED_MODEL_PATH,
reranker_path=RERANKER_MODEL_PATH,
dict_content=METADATA_DICT,
faiss_index=FAISS_INDEX,
dense_alpha=0.6,
device="cuda",
)
RUNNER_PROMPT = AsyncCore(
model_name=MODEL_NAME,
api_url=LLM_URL,
output_path=OUTPUT_PATH_LLM,
task_name=TASK_NAME,
output_schema=LLMOutput,
reasoning_effort=REASONING_EFFORT,
ai_code_version=MODEL_KEY,
request_timeout=LLM_TIME_OUT,
max_token=MAX_TOKEN,
save_number=1,
)
from pydantic import BaseModel
SYS_PROMPT = """
شما یک ربات متخصص حقوقی و قانونی هستید به نام «قانونیار».
هویت شما: یک وکیل باتجربه، دقیق، مودب و مشتاق کمککردن به افراد در فهم مسائل حقوقی.
در این مرحله، وظیفه شما «تشخیص نوع پیام» است؛ اما باید یک دلیل کوتاه (answer) بدهید
که لحنش لحن یک وکیل کارکشته و حامی باشد.
برچسب خروجی فقط یکی از این موارد است:
- legal_question
- greeting
- other
تعریفها:
1. legal_question: هر پرسش یا متن مرتبط با قوانین، مقررات، دادگاه، وکالت، قرارداد، مالیات، ارث، نکاح و طلاق، شکایت، چک، سفته، ثبت شرکت، حقوق کار و هر موضوع مشابه.
2. greeting: پیامهای غیرحقوقی شامل سلام، معرفی، احوالپرسی، تشکر، خداحافظی.
3. other: پیامهایی که خارج از دو دسته بالا قرار میگیرند.
نکات مهم:
- answer باید بسیار کوتاه باشد (حداکثر دو خط).
- اگر پیام حتی کمی رنگوبوی حقوقی داشته باشد، answer را به سمت توضیح حقوقی ببرید.
- در حالت greeting با لحن رسمی و دوستانه یک وکیل پاسخ بدهید.
- در حالت other، در عین محترمانهگفتن اینکه پیام حقوقی نیست، سعی کنید یک زاویهٔ حقوقی احتمالی مطرح کنید تا راه کمک باز بماند.
خروجی شما حتماً باید شامل دو فیلد باشد:
label: یکی از سه برچسب
answer: توضیح کوتاه، دوستانه، مودب و با لحن یک وکیل حامی
خروجی فقط JSON باشد.
"""
ASSIST_PROMPT = """
### Example 1:
input:
"عین کلی"
output:
{
"label": "legal_question",
"answer": "موضوع شما به یکی از مفاهیم مهم حقوق مدنی مربوط می‌شود و با کمال میل راهنمایی‌تان می‌کنم."
}
### Example 2:
input:
"طرز تهیه ماست"
output:
{
"label":"other",
"answer":"موضوع شما ماهیت حقوقی ندارد، اما اگر دغدغه‌تان مرتبط با استاندارد یا مسئولیت تولید باشد میتوانم به شما کمک کنم، می شود سوال خود را بهتر و کامل تر بپرسید؟ میخواهم دقیق بدانم چه قسمتی از موارد حقوقی تولید ماست رو میخواهید بدونید"
}
### Example 3:
input:
"سلام، تو کی هستی؟"
output:
{
"label":"greeting",
"answer":"سلام و ارادت؛ من قانون‌یار هستم، وکیل همراه شما برای هر پرسش حقوقی."
}
"""
class ChatOutput(BaseModel):
label: str
answer: str | None = None
RUNNER_CHAT = AsyncCore(
model_name=MODEL_NAME,
api_url=LLM_URL,
output_path="/home2/rag_qa_chat2/data/_temp/",
task_name="type-of-chat",
output_schema=ChatOutput,
reasoning_effort="low",
ai_code_version=MODEL_KEY,
request_timeout=LLM_TIME_OUT,
max_token=256,
save_number=1,
top_p=0.8,
temperature=0.5
)
# legal_question
# greeting
# other
# label
# reasoning
async def chat_bot_run(query):
try:
s0 = time.time()
print(f'query {query}')
type_, _ = await RUNNER_CHAT.single_simple_async_proccess_item(
item={
"user_prompt": query,
"system_prompt": SYS_PROMPT,
"assistant_prompt":ASSIST_PROMPT
}
)
type_chat = type_.get('label', None)
answer = type_['answer']
s = time.time()
print(f'type_llm {type_} {s-s0}\'s Toke')
if type_chat != "legal_question":
return chunked_simple_text(answer)
sections_dict = await RAG.search_base(
query,
final_k=10,
topk_dense=100,
topk_sparse=100,
pre_rerank_k=100,
)
e = time.time()
# input_data = LLMInput(query=query, knowledge=sections_dict)
# prompt = get_user_prompt2(input_data)
prompt = get_user_prompt3(query=query, knowledge_json=sections_dict)
llm_answer, _ = await RUNNER_PROMPT.single_simple_async_proccess_item(
item={"user_prompt": prompt, "system_prompt": SYSTEM_PROPMT2},
)
ee = time.time()
finall = format_answer_bale(
answer_text=llm_answer["text"], sources=llm_answer["source"]
)
eee = time.time()
print(f"Rag = {e-s}", f"llm_answer = {ee-e}", f"Format = {eee-ee}", sep="\n")
return finall
except:
traceback.print_exc()
class RagInput(BaseModel):
query: str
limit:int = 10
class RagOutput (BaseModel):
finall_form: str
duration: int
ids: List[str]
async def rag_run(input:RagInput):
try:
s = time.time()
sections_dict = await RAG.search_base(
query=input.query,
final_k=input.limit,
topk_dense=100,
topk_sparse=100,
pre_rerank_k=100,
)
e = time.time()
finall = get_in_form(title=input.query, sections=sections_dict)
ee = time.time()
print(f"Rag = {e-s}", f"Form = {ee-e}", sep="\n")
finall = RagOutput(
)
return finall
except:
traceback.print_exc()
@router.post("/run_chat")
async def run_chat(payload: Query, request: Request):
s = time.time()
try:
answer = await chat_bot_run(payload.query)
except:
print(f"chat_bot_run FAIL!")
answer = LLM_ERROR
e = time.time()
print(f"Total Time {e-s:.2f}'s")
return JSONResponse({"result": answer}, status_code=201)
@router.post("/run_rag")
async def run_chat(payload: Query, request: Request):
s = time.time()
try:
answer = await rag_run(payload.query)
except:
print(f"chat_bot_run FAIL!")
answer = LLM_ERROR
e = time.time()
print(f"Total Time {e-s:.2f}'s")
return JSONResponse({"result": answer}, status_code=201)

8
routers/readme.md Executable file
View File

@ -0,0 +1,8 @@
# "gpt-4o", "gpt-4o-mini", "deepseek-chat" , "gemini-2.0-flash", gemini-2.5-flash-lite
# gpt-4o : 500
# gpt-4o-mini : 34
# deepseek-chat: : 150
# gemini-2.0-flash : error
# cf.gemma-3-12b-it : 1
# gemini-2.5-flash-lite : 35 خیلی خوب

160
routers/static.py Executable file
View File

@ -0,0 +1,160 @@
from dotenv import load_dotenv
import os
LLM_URL = "http://localhost:8004/v1/" # "http://172.16.29.102:8001/v1/"
EMBED_MODEL_PATH = "/home2/MODELS/models--sentence-transformers--paraphrase-multilingual-MiniLM-L12-v2/snapshots/86741b4e3f5cb7765a600d3a3d55a0f6a6cb443d" # "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
RERANKER_MODEL_PATH = "/home2/MODELS/bge_reranker_m3_v2/bge-reranker-v2-m3" # "BAAI/bge-reranker-v2-m3"
FAISS_INDEX_PATH = "/home2/rag_qavanin_api/data/qavanin-faiss/faiss_index_qavanin_285k.index" # "/src/app/data/qavanin-faiss/faiss_index_qavanin_285k.index"
FAISS_METADATA_PATH = "/home2/rag_qavanin_api/data/qavanin-faiss/faiss_index_qavanin_285k_metadata.json" # "/src/app/data/qavanin-faiss/faiss_index_qavanin_285k_metadata.json"
PATH_LOG = "./data/llm-answer/"
load_dotenv()
RERANK_BATCH = int(os.environ.get("rerank_batch_size"))
API_KEY = os.getenv("api_key")
LLM_ERROR = "با عرض پوزش؛ ❌in ragمتاسفانه خطایی رخ داده است. لطفا لحظاتی دیگر دوباره تلاش نمائید"
MODEL_KEY = "oss-120-hamava"
MODEL_NAME="gpt-oss-20b" # "gpt-oss-120b"
OUTPUT_PATH_LLM="/home2/rag_qa_chat2/data/_temp"
TASK_NAME="bale-chat"
REASONING_EFFORT="low"
LLM_TIME_OUT=30
MAX_TOKEN=8192
SYSTEM_PROMPT_FINALL = """شما یک دستیار تحلیل‌گر حقوقی متخصص در استنتاج دقیق از اسناد قانونی هستید.
ورودی شما شامل:
- یک پرسش کاربر (query)
- مجموعهای از چند متن قانونی (knowledge)، که هر کدام شامل:
- id (شناسه سند)
- content (متن بند قانونی)
وظیفه شما:
1. پرسش را دقیق بخوانید و فقط بر اساس اطلاعات موجود در اسناد ارائه شده پاسخ دهید.
2. از خودتان هیچ اطلاعات جدید، تخمین، تفسیر شخصی، یا دانش خارج از اسناد وارد نکنید.
3. اگر یک پاسخ نیاز به ترکیب چند سند دارد، آنها را استخراج و در هم ادغام کنید و نتیجه را کاملاً روان و قابل فهم بنویسید.
4. پاسخ باید:
- تحلیلمحور
- شفاف
- فارسی استاندارد و حقوقی
- ساختاریافته و قابل ارائه باشد
5. هر جمله یا بند از پاسخ **حتماً باید به یک یا چند id سند مشخص وصل شود**.
- اگر برای جملهای منبعی پیدا نشد، صریحاً در متن ذکر کنید: "(هیچ منبع مرتبط موجود نیست)"
- از اضافه کردن idهای فرضی یا خارج از knowledge خودداری شود.
6. از تکرار مستقیم یا کپی کردن جملات خام اسناد اجتناب کنید. آنها را با بازنویسی تحلیلی به کار ببرید.
7. در پایان پاسخ:
- حتماً لیست تمام شناسههای سندهای استفادهشده را برگردانید.
- فقط id های اسنادی که واقعاً در پاسخ استفاده شدهاند ذکر شوند به صورت دقیقا: (qs2127)
- ترتیب اهمیت و ارتباط در لیست رعایت شود.
8. پاسخ نهایی باید دقیقاً در فرمت JSON زیر برگردد و هیچ متن دیگری خارج از آن اضافه نشود:
{
"text" : "متن کامل پاسخ تحلیلی و دقیق به پرسش، هر جمله یا بند با (id) سند مرتبط یا (هیچ منبع مرتبط موجود نیست) مشخص شود.",
"source": ["qs123", "qs545", ...]
}
ورودی نمونه:
{
query: "متن سوال",
knowledge: [
{"id": "qs01", "content": "..."},
{"id": "qs02", "content": "..."},
...
]
}
"""
SYSTEM_PROPMT2 = '''You are a legal reasoning model that MUST base the answer ONLY on the documents provided in `knowledge`.
STRICT RULES:
1. You have no knowledge outside the provided documents.
2. Before generating the answer you MUST:
A. Extract the list of all valid document IDs from `knowledge`.
B. Think through the answer sentence-by-sentence.
C. Every sentence MUST be directly supported by one or more document IDs.
3. Any sentence that is not directly supported by at least one `id` MUST be removed.
4. Document IDs must appear in the text as:
(qs123)
(qs1002)
etc.
5. The final answer MUST be returned strictly as:
{
"text": "...",
"source": ["qs001", "qs999"]
}
Where:
- `text` contains the final written response with citations inline.
- `source` contains ONLY the list of IDs actually used in the answer, no duplicates, order by relevance.
6. JSON MUST be valid. No comments, no trailing commas.
7. To the extent that there is even the slightest relevance to the question in the documentation, generate an answer from the documentation, indicating that a close answer to the user's question was not found.
8. Finally, if no document supports the question, return:
{
"text": "هیچ سند مرتبطی یافت نشد.",
"source": []
}
9. Length must NOT be shortened. Provide full analysis, fully detailed.
Before generating your answer:
Extract the list of VALID IDs from `knowledge`.
You MUST NOT invent IDs.
Any ID not in that list is forbidden.
'''
#############
"""
شما یک دستیار تحلیلگر حقوقی متخصص در استنتاج دقیق از اسناد قانونی هستید.
ورودی شما شامل:
- یک پرسش کاربر (query)
- مجموعهای از چند متن قانونی (knowledge)، که هر کدام شامل:
- id (شناسه سند)
- content (متن بند قانونی)
وظیفه شما:
1. پرسش را دقیق بخوانید و فقط بر اساس اطلاعات موجود در اسناد ارائه شده پاسخ دهید.
2. از خودتان هیچ اطلاعات جدید، تخمین، تفسیر شخصی، یا دانش خارج از اسناد وارد نکنید.
3. اگر یک پاسخ نیاز به ترکیب چند سند دارد، آنها را استخراج و در هم ادغام کنید و نتیجه را کاملاً روان و قابل فهم بنویسید.
4. پاسخ باید:
- تحلیلمحور
- شفاف
- فارسی استاندارد و حقوقی
- ساختاریافته و قابل ارائه باشد
5. از تکرار مستقیم یا کپی کردن جملات خام اسناد اجتناب کنید. آنها را با بازنویسی تحلیلی به کار ببرید.
6. اگر اطلاعات موجود برای پاسخ کامل کافی نبود:
- این موضوع را صریح اعلام کنید
- اما موارد مرتبط موجود را همچنان خلاصه و ارائه کنید
7. در پایان پاسخ:
- لیست شناسههای سندهای استفادهشده را برگردانید
- فقط id های اسنادی که واقعاً در پاسخ استفاده شدهاند ذکر شوند به صورت دقیقا : (qs2127)
- ترتیب اهمیت در لیست رعایت شود
8. پاسخ نهایی باید دقیقاً در فرمت زیر برگردد:
خروجی نمونه:
{
"text" : "متن کامل پاسخ تحلیلی و دقیق به پرسش",
"source": ["qs123", "qs545", ...]
}
بدون هیچ توضیح یا متن اضافه خارج از این قالب.
ورودی نمونه:
{
query: "متن سوال",
knowledge: [
{"id": "qs01", "content": "..."},
{"id": "qs02", "content": "..."},
...
]
}"""

3
run_docker.bash Executable file
View File

@ -0,0 +1,3 @@
docker stop qachat2
docker rm qachat2
docker run --name qachat2 -p 8009:80 --net qachat_net --gpus=all -v ./:/src/app/ -v ./data/:/src/app/data/ -v ./../MODELS:/src/MODELS -v ./../cache:/root/.cache/huggingface/hub -it --restart unless-stopped docker.tavasi.ir/tavasi/qachat2:1.0.0

3
run_env.bash Executable file
View File

@ -0,0 +1,3 @@
source /home2/.venv/bin/activate
uvicorn main:app --port=8009 --host=0.0.0.0

View File

@ -0,0 +1,72 @@
import json
import numpy as np
import faiss
import os
def create_faiss_index_from_json(json_file_path, faiss_index_path, metadata_file_path):
print(f'try to read {json_file_path} ...')
# --- 1. بارگذاری داده‌ها از JSON ---
with open(json_file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f'file reading finished')
# فرض بر این است که هر عنصر شامل فیلدهای زیر است:
# {
# "speech_title": "title",
# "sentence": "متن جمله",
# "embeddings": [0.12, 0.34, ...]
# }
sentences = []
titles = []
embeddings_list = []
prefix_list = []
for k, item in data.items():
sentences.append(item['content'])
titles.append(item['id'])
embeddings_list.append(item['embeddings'])
prefix_list.append(item['section-prefix'])
embeddings = np.array(embeddings_list).astype('float32') # ابعاد: (n, d)
dimension = embeddings.shape[1]
print(f"Loaded {len(embeddings)} embeddings with dimension {dimension}")
# --- 2. ایجاد ایندکس FAISS برای GPU ---
# اگر فقط CPU دارید، از faiss.IndexFlatL2 استفاده کنید.
# اگر GPU دارید، ابتدا ایندکس را روی CPU ایجاد و سپس به GPU انتقال دهید.
cpu_index = faiss.IndexFlatL2(dimension) # معیار فاصله L2 (Euclidean)
# انتقال ایندکس به GPU
if faiss.get_num_gpus() > 0:
print("Using GPU for FAISS index...")
res = faiss.StandardGpuResources()
gpu_index = faiss.index_cpu_to_gpu(res, 0, cpu_index)
else:
print("GPU not available, using CPU.")
gpu_index = cpu_index
# --- 3. افزودن داده‌ها به ایندکس ---
gpu_index.add(embeddings)
print(f"Total vectors indexed: {gpu_index.ntotal}")
# --- 4. ذخیره ایندکس به فایل ---
# برای ذخیره باید به CPU منتقل شود
final_index = faiss.index_gpu_to_cpu(gpu_index) if isinstance(gpu_index, faiss.Index) and faiss.get_num_gpus() > 0 else gpu_index
os.makedirs(os.path.dirname(faiss_index_path), exist_ok=True)
faiss.write_index(final_index, faiss_index_path)
print(f"FAISS index saved to {faiss_index_path}")
# --- 5. ذخیره متادیتا (برای نگاشت نتایج جستجو) ---
metadata = [{"id": id, "content": c, 'prefix': p} for id, c, p in zip(titles, sentences,prefix_list)]
with open(metadata_file_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
print(f"Metadata saved to {metadata_file_path}")
if __name__ == '__main__':
# استفاده از متد
json_file_path = '../majles-output/sections-vec-285k.json'
faiss_index_path = '../data/qavanin-faiss/faiss_index_qavanin_285k.index'
metadata_file_path = '../data/qavanin-faiss/faiss_index_qavanin_285k_metadata.json'
create_faiss_index_from_json(json_file_path, faiss_index_path, metadata_file_path)

2
util/docker_build.bash Executable file
View File

@ -0,0 +1,2 @@
sudo docker build -t docker.tavasi.ir/tavasi/qachat_base:1.0.0 -f dockerfile_base .
sudo docker build -t docker.tavasi.ir/tavasi/qachat2:1.0.0 .

677
util/elastic_helper.py Executable file
View File

@ -0,0 +1,677 @@
import zipfile
import sys
import os
import json
from time import sleep
from elasticsearch7 import Elasticsearch,helpers
class ElasticHelper():
counter = 0
total = 0
id = ""
path_mappings = os.getcwd() + '/repo/_other/'
def __init__(self, es_url="http://127.0.0.1:6900", es_pass="", es_user="elastic", path_mappings = ""):
if path_mappings :
self.path_mappings = path_mappings
if es_pass == '' :
self.es = Elasticsearch(es_url)
else:
self.es = Elasticsearch(
es_url,
http_auth=(es_user, es_pass),
)
# print(es_url)
# print(self.es)
self.success_connect = False
for a in range(0,10):
try :
if not self.es.ping():
print('elastic not ping, sleep 30 s : ', a)
sleep(5)
continue
else:
self.success_connect = True
break
except Exception as e:
break
if not self.success_connect :
print('******','not access to elastic service')
return
self.counter = 0
self.total = 0
self.id = ""
def get_doctument(self, index_name, id):
res = self.es.get(index=index_name, id=id)
return res
def exist_doctument(self, index_name, id):
res = self.es.exists(index=index_name, id=id)
return res
def update_index_doc(self, is_update_state, index_name_o, eid, data):
if is_update_state:
resp = self.es.update(index=index_name_o, id=eid, doc=data)
# resp = self.es.update(index=index_name_o, id=eid, body={'doc':data})
else:
resp = self.es.index(index=index_name_o, id=eid, document=data)
return resp
def exportToJsonForAI(self, path_back, index_name, out_name= '', body={}, fields=[]) :
print('*' * 50, ' start backup -->', index_name)
self.counter = 0
sid = None
out = out_name
if out_name == '' :
out = index_name
fout = open( path_back + "/"+ out + '.json', 'a+' , encoding='utf-8')
s_res = self.es.search(
index=index_name,
scroll='5m',
size=1000,
body=body
)
self.total = s_res["hits"]["total"]['value']
print('start index = %s' % index_name)
print('total = %d' % self.total)
sid = s_res['_scroll_id']
scroll_size = len(s_res['hits']['hits'])
file_count = 1
out_json = []
while scroll_size > 0:
"Scrolling..."
self.counter += scroll_size
print("progress -> %.2f %%" % ((self.counter / self.total)*100))
#############################
for item in s_res['hits']['hits']:
if fields :
item2={}
item2['id']=item['_id']
for kf in fields :
#print(kf)
if kf in item['_source'] :
# print(item['_source'][kf])
item2[kf] = item['_source'][kf]
#exit()
else :
item2=item
out_json.append(item2)
s_res = self.es.scroll(scroll_id=sid, scroll='2m', request_timeout=100000)
sid = s_res['_scroll_id']
scroll_size = len(s_res['hits']['hits'])
sid = None
text = json.dumps(out_json, ensure_ascii=False)
fout.write(text)
##############################
def backupIndexToZipfile(self, path_back, index_name, out_name= '', body={}, byzip = True, fields=[], noFields=[]) :
print('*' * 50, ' start backup -->', index_name)
self.counter = 0
sid = None
out = out_name
if out_name == '' :
out = index_name
if body == {} :
s_res = self.es.search(
index=index_name,
scroll='5m',
size=1000
)
else:
s_res = self.es.search(
index=index_name,
scroll='5m',
size=1000,
body=body
)
self.total = s_res["hits"]["total"]['value']
if self.total == 0 :
print('total index_name by query = %d' % self.total)
return False
if byzip:
fout = zipfile.ZipFile(path_back + "/"+ out + '.zip', 'w')
else:
fout = open( path_back + "/"+ out + '.json', 'a+' , encoding='utf-8')
print('start index = %s' % index_name)
print('total = %d' % self.total)
sid = s_res['_scroll_id']
scroll_size = len(s_res['hits']['hits'])
file_count = 1
while scroll_size > 0:
"Scrolling..."
self.counter += scroll_size
print("progress -> %.2f %%" % ((self.counter / self.total)*100))
#############################
out_json = []
for item in s_res['hits']['hits']:
if fields :
item2={}
item2['id']=item['_id']
item2['_source']={}
for kf in fields :
if kf in item['_source'] :
item2['_source'][kf] = item['_source'][kf]
else :
item2=item
if noFields :
for kf in noFields :
if kf in item2['_source']:
del item2['_source'][kf]
out_json.append(item2)
text = json.dumps(out_json, ensure_ascii=False)
out_json = []
if byzip:
filename = out + str(file_count) + '.json'
file_count +=1
fout.writestr(filename, text.encode('utf-8'), zipfile.ZIP_DEFLATED )
else:
fout.write(text)
##############################
s_res = self.es.scroll(scroll_id=sid, scroll='2m', request_timeout=100000)
sid = s_res['_scroll_id']
scroll_size = len(s_res['hits']['hits'])
sid = None
fout.close()
def restorFileToElastic(self, path_back, index_name, app_key = '', queryDelete = True, map_name='') :
if not os.path.exists(path_back) :
print(' **** error *** path not exist: ', path_back)
return False
file_path = path_back + '/' + index_name + '.zip'
if not os.path.exists(file_path ) :
return False
if queryDelete :
# اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند
if self.deleteIndex(index_name) :
self.createIndex(index_name, app_key, map_name)
self.zipFileToElastic(file_path, index_name)
else : # اگر وجود داشته باشد پرش می کند و کاری نمیکند
self.createIndex(index_name, app_key, map_name)
self.zipFileToElastic(file_path, index_name)
def restorFileToElastic2(self, path_file, index_name, app_key = '', queryDelete = True, map_name='') :
if not os.path.exists(path_file) :
print(' **** error *** path not exist: ', path_file)
return False
file_path = path_file
if not os.path.exists(file_path ) :
return False
if queryDelete :
# اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند
if self.deleteIndex(index_name) :
self.createIndex(index_name, app_key, map_name)
self.zipFileToElastic(file_path, index_name)
else : # اگر وجود داشته باشد پرش می کند و کاری نمیکند
self.createIndex(index_name, app_key, map_name)
self.zipFileToElastic(file_path, index_name)
def renameElasticIndex(self, index_name_i, index_name_o, app_key = '', map_name='') :
if self.createIndex(index_name_o, app_key, map_name) :
res = self.es.reindex(
body={
"source": {"index": index_name_i},
"dest": {"index": index_name_o}
},
wait_for_completion=False)
print(type(res))
print(res)
taskid = res["task"] if res["task"] else ""
#tasks = client.TasksClient(self.es)
tasks = self.es.tasks
while True :
res = tasks.get(task_id = taskid)
if res["completed"] :
break
# print( res["task"])
print( '----', index_name_o, ' imported : ', res["task"]["status"]["total"] , ' / ', res["task"]["status"]["created"])
sleep(1)
print( '----', index_name_o, ' complated')
def deleteIndex(self, index_name) :
if not self.es.indices.exists(index=index_name) :
print(' ' * 10, " for delete NOT exist index :", index_name )
return True
question = 'Is DELETE elastic index (' + index_name +') ? '
if self.query_yes_no(question) :
self.es.indices.delete(index = index_name)
print('%' * 10 , " Finish DELETE index :", index_name )
return True
else :
return False
def query_yes_no(self, question, default="no"):
valid = { "yes": True, "y": True, "ye": True, "no": False, "n": False }
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
print('%'*10, ' quistion ', '%'*10 , '\n')
sys.stdout.write(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
elif choice in valid:
return valid[choice]
else:
sys.stdout.write("لطفا یکی از موارد روبرو را وارد کنید : 'yes' or 'no' " "(or 'y' or 'n').\n")
def createIndexIfNotExist(self, index_name_o, mapping_o=""):
try:
if not self.es.indices.exists(index=index_name_o):
response = self.es.indices.create(index=index_name_o, body=mapping_o)
# print out the response:
print("create index response:", response)
except:
print("....... index exist ! ... not created")
def createIndex(self, index_name, app_key='', map_name=''):
path_base = self.path_mappings
path_mapping1 = path_base + 'general/'
if app_key == '' :
app_key = 'tavasi'
path_mapping2 = path_base + app_key + '/'
if map_name == '':
map_name = index_name
if self.es.indices.exists(index=index_name) :
print("============== exist index :", index_name )
return True
if map_name == 'mj_rg_section' or map_name == 'semantic_search' :
map_name = 'mj_qa_section'
elif map_name[-3]=='_ai':
map_name=[0-len(map_name)-3]
print(map_name)
mapping_file_path = path_mapping1 + map_name + '.json'
print("mapping_file_path : " , mapping_file_path)
if not os.path.isfile(mapping_file_path):
if not os.path.isfile(mapping_file_path):
mapping_file_path = path_mapping2 + map_name + '.json'
print("mapping_file_path : " , mapping_file_path)
# Create Index With Mapping
if os.path.isfile(mapping_file_path):
mapping_file = open( mapping_file_path,'r', encoding='utf-8' )
mapping_file_read = mapping_file.read()
mapping_data = json.loads(mapping_file_read)
mapping_file.close()
if self.es.indices.exists(index=index_name) :
print("============== exist index :", index_name )
else :
self.es.indices.create(index = index_name , body = mapping_data)
return True
else:
print('*** error not find maping file elastic : *******', mapping_file_path)
return False
def updateBulkList(self, listData, index_name):
chunk_size=1000
raise_on_error=False
raise_on_exception=False
stats_only=True
yield_ok = False
actions=[]
for item in listData:
actions.append({
"_op_type": "update",
"_index": index_name,
"_id" : item['_id'],
"doc": item['_source']
}
)
helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok )
def importBulkList(self, listData, index_name):
chunk_size=100000
raise_on_error=False
raise_on_exception=False
stats_only=True
yield_ok = False
for item in listData:
actions = [{
"_op_type": "index",
"_index": index_name,
"_id" : item['_id'],
"_source": item['_source']
}
]
helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok )
def importJsonDataToElastic(self, jsonData, index_name, fields=[]):
chunk_size=1000
raise_on_error=False
raise_on_exception=False
stats_only=True
yield_ok = False
actions=[]
for item in jsonData:
id = item['_id'] if item['_id'] else item['id']
source = item['_source']
if fields :
source = {}
for col in fields :
if col in item['_source'] :
source[col] = item['_source']
actions.append({
"_op_type": "index",
"_index": index_name,
"_id" : id,
"_source": source
})
helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok )
def fileToElastic(self, file_path, index_name, limit_pack = -1, fields=[]):
if not os.path.exists(file_path):
print("file zip:" , file_path , " not exist")
return
print("index:" , index_name , '=>' , file_path )
self.counter = 0
with open(file_path) as file:
data = json.loads(file.read())
self.importJsonDataToElastic(data, index_name, fields)
self.es.indices.refresh(index=index_name)
print(self.es.cat.count(index=index_name, format="json"))
def zipFileToElastic(self, file_path, index_name, limit_pack = -1, fields=[]):
if not os.path.exists(file_path):
print("file zip:" , file_path , " not exist for imort to elastic : ", index_name )
return
fileNo = 0
with zipfile.ZipFile(file_path, 'r') as zObject:
fileNo +=1
print("="*10, " zip fileNo: " , fileNo ," - ( ", index_name," ) | File Numbers:" ,len(zObject.namelist()) , "=" * 10)
packNo = 0
self.counter = 0
for filename in zObject.namelist():
packNo += 1
if limit_pack != -1 :
if packNo > limit_pack :
print('limit_data ', index_name, ' ', limit_pack)
break
print("index:" , index_name , '=>' , filename )
with zObject.open(filename) as file:
data = json.loads(file.read())
self.importJsonDataToElastic(data, index_name, fields)
self.es.indices.refresh(index=index_name)
print(self.es.cat.count(index=index_name, format="json"))
print(" END Of Import to elastic ", index_name ,"\n")
def iterateJsonFile(self, file_path, isZip=True, limit_pack = -1):
if not os.path.exists(file_path):
print("file zip:" , file_path , " not exist iterateJsonFile " )
return
if isZip :
fileNo = 0
with zipfile.ZipFile(file_path, 'r') as zObject:
fileNo +=1
print("="*10, " zip fileNo: " , fileNo ," iterateJsonFile - | File Numbers:" ,len(zObject.namelist()) , "=" * 10)
packNo = 0
self.counter = 0
for filename in zObject.namelist():
packNo += 1
if limit_pack != -1 :
if packNo > limit_pack :
print('limit_data iterateJsonFile ', limit_pack)
break
print("index iterateJsonFile :", '=>' , filename )
with zObject.open(filename) as file:
data = json.loads(file.read())
# Yield each entry
# yield data
yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in data)
else :
with open(filename, 'r', encoding='utf-8') as file:
data = json.loads(file.read())
# Yield each entry
# yield from (hit for hit in data)
#return data
yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in data)
def es_iterate_all_documents(self, index, body="", pagesize=250, scroll_timeout="25m", **kwargs):
"""
Helper to iterate ALL values from a single index
Yields all the documents.
"""
is_first = True
while True:
# Scroll next
if is_first: # Initialize scroll
# result = self.es.search(index=index, scroll="2m", **kwargs, body={
# "size": pagesize
# })
if body :
result = self.es.search(
index=index,
scroll=scroll_timeout,
**kwargs,
size=pagesize,
body=body
)
else :
result = self.es.search(
index=index,
scroll=scroll_timeout,
**kwargs,
size=pagesize
)
self.total = result["hits"]["total"]["value"]
if self.total > 0:
print("total = %d" % self.total)
is_first = False
else:
# result = es.scroll(body={
# "scroll_id": scroll_id,
# "scroll": scroll_timeout
# })
result = self.es.scroll(scroll_id=scroll_id, scroll=scroll_timeout)
scroll_id = result["_scroll_id"]
hits = result["hits"]["hits"]
self.counter += len(hits)
if self.total > 0 :
print("progress -> %.2f %%" % ((self.counter / self.total) * 100))
# Stop after no more docs
if not hits:
break
# Yield each entry
yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits)
def moveCustomFileds(self, index_name_i, index_name_o, fields=[], renameFileds={}):
try:
body = {}
list = []
try:
list = self.es_iterate_all_documents(index_name_i)
except Exception as e:
print(e)
count = 0
for mentry in list:
count += 1
entry = mentry["source"]
id = mentry["id"]
# print(id)
eid = id
if (count % 100) == 0 :
print("%s -> %.2f " % (id , (count / self.total) if self.total > 0 else 0))
data_filled = False
data = {}
for col in fields:
if '.' in col :
cols = col.split('.')
subsource = entry
for sub in cols :
dCol = subsource.get(sub, None)
if dCol :
subsource = dCol
else :
break
else :
dCol = entry.get(col, None)
if dCol is None:
continue
if col in renameFileds :
data[renameFileds[col]] = dCol
else:
data[col] = dCol
data_filled = True
if not data_filled :
continue
try:
resp = self.update_index_doc(True, index_name_o, eid, data)
except Exception as e:
print(e)
# save_error(id, e)
except Exception as e:
# print("1111")
print(e)
# save_error(id, e)
def mappingIndex(self, index_name_i):
# فقط از طریق کیبانا میشه تغییر مپ داد
# با پایتون نمیشه
# باید ایندکس جدیدی با مپ مطلوب ایجاد کرد و رایندکس کرد
pass
def updateByQueryIndex(self, index_name_i, body):
## sample
# body = {
# "script": {
# "inline": "ctx._source.Device='Test'",
# "lang": "painless"
# },
# "query": {
# "match": {
# "Device": "Boiler"
# }
# }
# }
try:
self.es.update_by_query(body=body, index=index_name_i)
except Exception as e:
print(e)
# save_error(id, e)
def deleteByQueryIndex(self, index_name_i, body):
## sample
# body = {
# "query": {
# "match": {
# "Device": "Boiler"
# }
# }
# }
try:
self.es.delete_by_query(index=index_name_i, body=body )
except Exception as e:
print(e)
# save_error(id, e)
def delete_by_ids(self, index_name_i, ids):
try:
# ids = ['test1', 'test2', 'test3']
query = {"query": {"terms": {"_id": ids}}}
res = self.es.delete_by_query(index=index_name_i, body=query)
print(res)
except Exception as e:
print(e)
# save_error(id, e)

View File

@ -0,0 +1,681 @@
# !pip install hazm
# !pip install transformers==4.26.0
# !pip install --upgrade numpy
# !pip install --upgrade sentence-transformers
"""
Persian Sentence Processing and Vector Analysis
==============================================
This script processes Persian sentences from a JSON file and performs:
1. Word extraction and preprocessing
2. Vector representation using multilingual transformer
3. Similarity analysis for key words
4. Dimensionality reduction to 3D
5. 3D visualization with Persian labels
Author: NLP Expert Assistant
"""
import json
import re
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple, Set
from collections import Counter
import logging
from pathlib import Path
# NLP and ML libraries
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.metrics.pairwise import cosine_similarity
#from normalizer import cleaning
try:
from util.elastic_helper import ElasticHelper
except Exception as error:
eee = error
pass
# Visualization libraries
# import matplotlib.pyplot as plt
# import plotly.graph_objects as go
# import plotly.express as px
# from plotly.subplots import make_subplots
# Persian text processing
# import hazm
# from hazm import Normalizer, word_tokenize, POSTagger
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class PersianVectorAnalyzer:
"""
A comprehensive class for Persian text processing and vector analysis.
"""
def __init__(self, model_name: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):
"""
Initialize the analyzer with the specified model.
Args:
model_name: The sentence transformer model to use
"""
self.model_name = model_name
self.model = None
#self.normalizer = Normalizer()
self.stop_words = self._load_persian_stop_words()
self.key_words = [
"خدا", "بنده", "جهاد", "ولی", "زکات",
"نماز", "صبر", "عبادت", "ولایت", "خلافت","پیامبر"
]
logger.info(f"Initializing Persian Vector Analyzer with model: {model_name}")
def _load_persian_stop_words(self) -> Set[str]:
"""
Load Persian stop words.
Returns:
Set of Persian stop words
"""
# Common Persian stop words
stop_words = {
'و', 'در', 'به', 'از', 'که', 'این', 'آن', 'با', 'برای', 'تا',
'را', 'هم', 'یا', 'اما', 'اگر', 'چون', 'چرا', 'چگونه', 'کجا',
'چه', 'کی', 'چند', 'چقدر', 'همه', 'هیچ', 'بعضی', 'هر', 'همه',
'خود', 'خویش', 'ما', 'شما', 'آنها', 'ایشان', 'اینها', 'آنها',
'من', 'تو', 'او', 'ما', 'شما', 'آنها', 'ایشان', 'اینها',
'است', 'هست', 'بود', 'شد', 'می', 'باید', 'خواهد', 'دارد',
'کرد', 'شد', 'بود', 'هست', 'است', 'می‌شود', 'می‌کند',
'یک', 'دو', 'سه', 'چهار', 'پنج', 'شش', 'هفت', 'هشت', 'نه', 'ده',
'اول', 'دوم', 'سوم', 'چهارم', 'پنجم', 'ششم', 'هفتم', 'هشتم', 'نهم', 'دهم',
'سال', 'ماه', 'روز', 'هفته', 'ساعت', 'دقیقه', 'ثانیه','پس'
'بله', 'نه', 'آری', 'خیر', 'بلی', 'نخیر',
'حالا', 'الان', 'امروز', 'دیروز', 'فردا', 'هفته', 'ماه', 'سال',
'بالا', 'پایین', 'چپ', 'راست', 'جلو', 'عقب', 'داخل', 'خارج',
'بزرگ', 'کوچک', 'بلند', 'کوتاه', 'پهن', 'باریک', 'ضخیم', 'نازک',
}
return stop_words
def load_model(self):
"""
Load the sentence transformer model.
"""
try:
logger.info("Loading sentence transformer model...")
self.model = SentenceTransformer(self.model_name)
logger.info("Model loaded successfully!")
except Exception as e:
logger.error(f"Error loading model: {e}")
raise
def split_sentence(self, sentence:str):
sentences = []
sentence_len = len(self.tokenize_sentence(sentence))
if sentence_len < 512:
sentences.append(sentence)
else:
temp_sentences = str(sentence).split('.')
for sent in temp_sentences:
sent_len = len(self.tokenize_sentence(sent))
if sent_len > 512:
temp_sentences_2 = str(sent).split('،')
for snt in temp_sentences_2:
sentences.append(snt)
else:
sentences.append(sent)
return sentences
def load_json_data(self, file_path: str) -> List[str]:
"""
Load Persian sentences from JSON file.
Args:
file_path: Path to the JSON file
Returns:
List of Persian sentences
"""
try:
logger.info(f"Loading data from {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# convert dict{dict} to list[dict]
if type(data) == dict:
temp_data = []
for item in data.items():
temp_data.append(item[1])
data = temp_data
sentences = []
if isinstance(data, list):
for index, item in enumerate(data):
print(f'split sentence {index}')
if isinstance(item, dict):
if item['content'] == '':
continue
sentences.append([item['id'],item['content'].strip()])
# for key in ['content']:
# if key in item and item[key]:
# # splited_sentences = self.split_sentence(item[key])
# # splited_sentences = item[key]
# sentences.append(item[key])
# # for sent in splited_sentences:
# # sentences.append(sent)
# else:
# print('fault '+item['sentence-number'])
elif isinstance(item, str):
# splited_sentences = self.split_sentence(item[key])
sentences.append(item)
# for sent in splited_sentences:
# sentences.append(sent)
elif isinstance(data, dict):
# If it's a single object, extract all string values
for value in data.values():
if isinstance(value, str):
sentences.append(value)
# splited_sentences = str(value).split('.')
# for sent in splited_sentences:
# sentences.append(sent)
sentences = [senten for senten in sentences if senten]
logger.info(f"Loaded {len(sentences)} sentences")
return sentences
except Exception as e:
logger.error(f"Error loading JSON data: {e}")
raise
def preprocess_text(self, text: str) -> str:
"""
Preprocess Persian text.
Args:
text: Raw Persian text
Returns:
Preprocessed text
"""
# Normalize text
#text = self.normalizer.normalize(text)
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep Persian characters
text = re.sub(r'[^\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\s]', '', text)
return text.strip()
def tokenize_sentence(self, sentence:str):
try:
tokenizer = AutoTokenizer.from_pretrained(self.model_name)
# print(self.model_name)
tokens = tokenizer.tokenize(sentence)
return tokens
except:
error = "An exception occurred in tokenizer : " + self.model_name
#file.write( error + '\n' )
return []
def extract_words(self, sentences: List[str]) -> List[str]:
"""
Extract all words from sentences.
Args:
sentences: List of Persian sentences
Returns:
List of all words
"""
all_words = []
for sentence in sentences:
# Preprocess sentence
processed_sentence = self.preprocess_text(sentence)
# Tokenize
words = word_tokenize(processed_sentence)
# words = processed_sentence.split()
# Filter out empty strings and very short words
words = [word for word in words if len(word) > 1]
all_words.extend(words)
logger.info(f"Extracted {len(all_words)} words from {len(sentences)} sentences")
return all_words
def remove_stop_words(self, words: List[str]) -> List[str]:
"""
Remove stop words from the word list.
Args:
words: List of words
Returns:
List of words without stop words
"""
filtered_words = [word for word in words if word not in self.stop_words]
logger.info(f"Removed {len(words) - len(filtered_words)} stop words")
return filtered_words
def get_unique_words(self, words: List[str]) -> List[str]:
"""
Get unique words from the list.
Args:
words: List of words
Returns:
List of unique words
"""
unique_words = list(set(words))
logger.info(f"Found {len(unique_words)} unique words from {len(words)} total words")
return unique_words
def compute_word_vectors(self, sentences: List[str]) -> Dict[str, List[float]]:
"""
Compute vector representations for words.
Args:
sentences: List of unique sentences
Returns:
Dictionary mapping sentences to their vector representations
"""
if self.model is None:
self.load_model()
logger.info(f"Computing vectors for {len(sentences)} sections ...")
# print(sentences[0])
# create list of just sentences
just_sentences = [sent['content'] for sent in sentences]
# Compute embeddings
embeddings = self.model.encode(just_sentences, show_progress_bar=True)
# Create dictionary
sentences_vectors = {}
for i, sent in enumerate(sentences):
sentences_vectors[f'sentence-{sentences[i]["id"]}'] = {
'id': sentences[i]['id'],
'fullpath': sentences[i]['fullpath'],
'qanon-title': sentences[i]['qanon-title'],
'section-prefix': sentences[i]['section-prefix'],
'content': sentences[i]['content'],
'embeddings': embeddings[i].tolist()
}
print(f'section {i} embedded!')
logger.info("section vectors computed successfully!")
return sentences_vectors
def find_closest_words(self, word_vectors: Dict[str, List[float]],
key_words: List[str], top_k: int = 20) -> Dict[str, List[str]]:
"""
Find the closest words to each key word.
Args:
word_vectors: Dictionary of word vectors
key_words: List of key words to find neighbors for
top_k: Number of closest words to find
Returns:
Dictionary mapping key words to their closest neighbors
"""
logger.info(f"Finding {top_k} closest words for {len(key_words)} key words...")
# Convert to numpy arrays for faster computation
words = list(word_vectors.keys())
vectors = np.array(list(word_vectors.values()))
closest_words = {}
for key_word in key_words:
if key_word in word_vectors:
# Get the key word vector
key_vector = np.array(word_vectors[key_word]).reshape(1, -1)
# Compute cosine similarities
similarities = cosine_similarity(key_vector, vectors)[0]
# Get indices of top k similar words (excluding the key word itself)
word_indices = np.argsort(similarities)[::-1]
# Filter out the key word itself and get top k
closest_indices = []
for idx in word_indices:
if words[idx] != key_word and len(closest_indices) < top_k:
closest_indices.append(idx)
# Get the closest words
closest_words[key_word] = [words[idx] for idx in closest_indices]
logger.info(f"Found {len(closest_words[key_word])} closest words for '{key_word}'")
else:
logger.warning(f"Key word '{key_word}' not found in word vectors")
closest_words[key_word] = []
return closest_words
def reduce_to_3d(self, word_vectors: Dict[str, List[float]],
method: str = 'tsne') -> Dict[str, List[float]]:
"""
Reduce word vectors to 3D coordinates.
Args:
word_vectors: Dictionary of word vectors
method: Dimensionality reduction method ('pca' or 'tsne')
Returns:
Dictionary mapping words to their 3D coordinates
"""
logger.info(f"Reducing dimensions to 3D using {method.upper()}...")
words = list(word_vectors.keys())
vectors = np.array(list(word_vectors.values()))
if method.lower() == 'pca':
reducer = PCA(n_components=3, random_state=42)
elif method.lower() == 'tsne':
reducer = TSNE(n_components=3, random_state=42, perplexity=min(30, len(vectors)-1))
else:
raise ValueError("Method must be 'pca' or 'tsne'")
# Reduce dimensions
reduced_vectors = reducer.fit_transform(vectors)
# Create dictionary
word_vectors_3d = {}
for i, word in enumerate(words):
word_vectors_3d[word] = reduced_vectors[i].tolist()
logger.info("Dimensionality reduction completed!")
return word_vectors_3d
def save_json(self, data: dict, file_path: str):
"""
Save data to JSON file.
Args:
data: Data to save
file_path: Output file path
"""
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"Data saved to {file_path}")
except Exception as e:
logger.error(f"Error saving to {file_path}: {e}")
raise
# def create_3d_visualization(self, word_vectors_3d: Dict[str, List[float]],
# selected_words: Dict[str, List[str]],
# output_path: str = "persian_words_3d.html"):
# """
# Create 3D visualization of words.
# Args:
# word_vectors_3d: Dictionary of 3D word coordinates
# selected_words: Dictionary of selected words for each key word
# output_path: Output file path for the visualization
# """
# logger.info("Creating 3D visualization...")
# # Prepare data for plotting
# words = list(word_vectors_3d.keys())
# coords = np.array(list(word_vectors_3d.values()))
# # Create color mapping for key words and their neighbors
# colors = []
# sizes = []
# hover_texts = []
# for word in words:
# # Check if word is a key word
# is_key_word = word in self.key_words
# # Check if word is in selected words
# in_selected = False
# key_word_group = None
# for key_word, selected_list in selected_words.items():
# if word in selected_list:
# in_selected = True
# key_word_group = key_word
# break
# if is_key_word:
# colors.append('red')
# sizes.append(15)
# hover_texts.append(f"کلیدواژه: {word}")
# elif in_selected:
# colors.append('blue')
# sizes.append(10)
# hover_texts.append(f"کلمه مرتبط با '{key_word_group}': {word}")
# else:
# colors.append('lightgray')
# sizes.append(5)
# hover_texts.append(f"کلمه: {word}")
# # Create 3D scatter plot
# fig = go.Figure()
# # Add scatter plot
# fig.add_trace(go.Scatter3d(
# x=coords[:, 0],
# y=coords[:, 1],
# z=coords[:, 2],
# mode='markers+text',
# marker=dict(
# size=sizes,
# color=colors,
# opacity=0.8
# ),
# text=words,
# textposition="middle center",
# hovertext=hover_texts,
# hoverinfo='text'
# ))
# # Update layout
# fig.update_layout(
# title={
# 'text': 'نمایش سه‌بعدی کلمات فارسی',
# 'x': 0.5,
# 'xanchor': 'center',
# 'font': {'size': 20}
# },
# scene=dict(
# xaxis_title='محور X',
# yaxis_title='محور Y',
# zaxis_title='محور Z',
# camera=dict(
# eye=dict(x=1.5, y=1.5, z=1.5)
# )
# ),
# width=1000,
# height=800,
# showlegend=False
# )
# # Save the plot
# fig.write_html(output_path)
# logger.info(f"3D visualization saved to {output_path}")
# return fig
def process_pipeline(self, input_file: str, output_dir: str = "output"):
"""
Run the complete processing pipeline.
Args:
input_file(str): Path to input JSON file
output_dir(str): Output directory for results
"""
# Create output directory
Path(output_dir).mkdir(exist_ok=True)
logger.info("Starting Persian Vector Analysis Pipeline...")
# Step 1: Load data
# sentences = self.load_json_data(input_file)
sentences = ALL_SECTIONS
# for s in sentences:
# s_len = len(self.tokenize_sentence(s))
# if s_len > 512:
# print(f'long: {s}')
# Step 2: Extract words
# all_words = self.extract_words(sentences)
# Step 3: Remove stop words
# filtered_words = self.remove_stop_words(all_words)
# filtered_words = all_words
# Step 4: Get unique words
# unique_words = self.get_unique_words(filtered_words)
# Step 5: Compute word vectors
sentences_vectors = self.compute_word_vectors(sentences)
# Step 6: Save word vectors
self.save_json(sentences_vectors, f"{output_dir}/sections-vec-285k.json")
# Step 7: Find closest words to key words
# selected_words = self.find_closest_words(word_vectors, self.key_words)
# Step 8: Save selected words
# self.save_json(selected_words, f"{output_dir}/selected_words.json")
# Step 9: Reduce to 3D
# word_vectors_3d = self.reduce_to_3d(word_vectors, method='tsne')
# Step 10: Save 3D vectors
# self.save_json(word_vectors_3d, f"{output_dir}/words_vector_3d.json")
# Step 11: Create visualization
# self.create_3d_visualization(word_vectors_3d, selected_words,
# f"{output_dir}/persian_words_3d.html")
logger.info("Pipeline completed successfully!")
# Print summary
print("\n" + "="*50)
print("PIPELINE SUMMARY")
print("="*50)
print(f"Input sentences: {len(sentences)}")
# print(f"Total words extracted: {len(all_words)}")
# print(f"Unique words after preprocessing: {len(unique_words)}")
# print(f"Word vectors computed: {len(word_vectors)}")
# print(f"Key words processed: {len(self.key_words)}")
print(f"Output files saved to: {output_dir}/")
print("="*50)
def full_path_text_maker(full_path):
"""
این متد مسیر یک سکشن را می گیرد و متنی را بر اساس ترتیب بخش های آن از جزء به کل بازسازی می کند و بر می گرداند
Args:
full_path(list): لیستی از عناصر مشخص کننده مسیر درختی این سکشن
Returns:
full_path_text(str): متن بازسازی شده از مسیر یک سکشن
"""
full_path_text = ""
for i, path_item in enumerate(reversed(full_path)):
if i == len(full_path) - 1:
full_path_text += ''.join(f'{path_item}')
break
full_path_text += ''.join(f'{path_item} از ')
full_path_text = full_path_text.strip()
return full_path_text
def main():
"""
Main function to run the Persian Vector Analysis.
"""
# Initialize analyzer
analyzer = PersianVectorAnalyzer()
# Define input and output paths
# input_file = "./output-speechs/nahj_speechs_sentences.json"
# output_dir = "output-speechs"
# input_file = "./majles/data/sections.json"
input_file = ""
output_dir = "./data/majles-output"
# Run the complete pipeline
analyzer.process_pipeline(input_file, output_dir)
if __name__ == "__main__":
eh_obj = ElasticHelper()
path = "/home/gpu/data_11/14040611/mj_qa_section.zip"
sections_elastic = eh_obj.iterateJsonFile(path, True)
all_count = 0
dont_cares = []
ALL_SECTIONS = []
for index, item in enumerate(sections_elastic):
all_count +=1
source = item['source']
section_path = source['other_info']['full_path']
id = item['id']
filtered_keys = ['فصل','موخره','امضاء','عنوان']
section_path = source['other_info']['full_path']
flag = False
if '>' in section_path:
path_parts = section_path.split('>')
for key in filtered_keys:
if key in path_parts[-1]:
dont_cares.append(id)
flag = True
break
if flag:
continue
else:
for key in filtered_keys:
if key in section_path:
dont_cares.append(id)
flag = True
break
if flag:
continue
qanon_title = source['qanon_title']
full_path_text = full_path_text_maker(section_path.split('>'))
section_prefix = f"محتوای {full_path_text} {cleaning(qanon_title)} عبارت است از: "
try:
content = cleaning(item['source']['content'])
# کنار گذاشتن سکشن های خیلی کوچک که عملا محتوا ندارند
if len(content.split()) <= 10:
continue
except Exception as error:
print(error)
continue
data = {
'id': id,
'fullpath': section_path,
'qanon-title': qanon_title,
'section-prefix': section_prefix,
'content': content
}
ALL_SECTIONS.append(data)
print(f'all_count: {all_count}')
print(f'dont_cares: {len(dont_cares)}')
print(f'ALL_SECTIONS without dont-cares: {len(ALL_SECTIONS)}')
main()
"""
:: *** نکته مهم *** ::
NOTE !!! after this process run convert_qavanin_json_to_faiss.py due to create faiss index which is used in RAG process
"""

76
util/normalizer.py Executable file
View File

@ -0,0 +1,76 @@
#import hazm
from cleantext import clean
import re
def cleanhtml(raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
#normalizer = hazm.Normalizer()
wierd_pattern = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
u"\U0001f926-\U0001f937"
u'\U00010000-\U0010ffff'
u"\u200d"
u"\u2640-\u2642"
u"\u2600-\u2B55"
u"\u23cf"
u"\u23e9"
u"\u231a"
u"\u3030"
u"\ufe0f"
u"\u2069"
u"\u2066"
# u"\u200c"
u"\u2068"
u"\u2067"
"]+", flags=re.UNICODE)
def cleaning(text):
text = text.strip()
# regular cleaning
# text = clean(text,
# fix_unicode=True,
# to_ascii=False,
# lower=True,
# no_line_breaks=True,
# no_urls=True,
# no_emails=True,
# no_phone_numbers=True,
# no_numbers=False,
# no_digits=False,
# no_currency_symbols=True,
# no_punct=False,
# replace_with_url="",
# replace_with_email="",
# replace_with_phone_number="",
# replace_with_number="",
# replace_with_digit="0",
# replace_with_currency_symbol="",
# )
text = clean(text,
extra_spaces = True,
lowercase = True
)
# cleaning htmls
text = cleanhtml(text)
# normalizing
#text = normalizer.normalize(text)
# removing wierd patterns
text = wierd_pattern.sub(r'', text)
# removing extra spaces, hashtags
text = re.sub("#", "", text)
text = re.sub("\s+", " ", text)
return text