From a8fca065f31bba024dd998dda9d44462ad851e84 Mon Sep 17 00:00:00 2001 From: init_mahdi Date: Sun, 30 Nov 2025 09:49:41 +0000 Subject: [PATCH] first-step --- .dockerignore | 3 + __init__.py | 0 config.env | 1 + dockerfile | 10 + dockerfile_base | 5 + main.py | 58 +++ new_requirements.txt | 16 + routers/__init__.py | 0 routers/ai_data_parser.py | 423 ++++++++++++++++ routers/base_model.py | 28 ++ routers/chatbot_handler.py | 355 ++++++++++++++ routers/rag_base.py | 250 ++++++++++ routers/readme.md | 8 + routers/static.py | 160 ++++++ run_docker.bash | 3 + run_env.bash | 3 + util/convert_qavanin_json_to_faiss.py | 72 +++ util/docker_build.bash | 2 + util/elastic_helper.py | 677 +++++++++++++++++++++++++ util/embedder_sbert_qavanin_285k.py | 681 ++++++++++++++++++++++++++ util/normalizer.py | 76 +++ 21 files changed, 2831 insertions(+) create mode 100755 .dockerignore create mode 100755 __init__.py create mode 100755 config.env create mode 100755 dockerfile create mode 100755 dockerfile_base create mode 100755 main.py create mode 100755 new_requirements.txt create mode 100755 routers/__init__.py create mode 100755 routers/ai_data_parser.py create mode 100755 routers/base_model.py create mode 100755 routers/chatbot_handler.py create mode 100755 routers/rag_base.py create mode 100755 routers/readme.md create mode 100755 routers/static.py create mode 100755 run_docker.bash create mode 100755 run_env.bash create mode 100755 util/convert_qavanin_json_to_faiss.py create mode 100755 util/docker_build.bash create mode 100755 util/elastic_helper.py create mode 100755 util/embedder_sbert_qavanin_285k.py create mode 100755 util/normalizer.py diff --git a/.dockerignore b/.dockerignore new file mode 100755 index 0000000..96f182b --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +./qavanin-faiss +./llm-answer +./data \ No newline at end of file diff --git a/__init__.py b/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/config.env b/config.env new file mode 100755 index 0000000..bec511b --- /dev/null +++ b/config.env @@ -0,0 +1 @@ +LLM_URL="http://172.16.29.102:8001/v1/" \ No newline at end of file diff --git a/dockerfile b/dockerfile new file mode 100755 index 0000000..e68bf58 --- /dev/null +++ b/dockerfile @@ -0,0 +1,10 @@ +FROM docker.tavasi.ir/tavasi/qachat_base:1.0.0 + +WORKDIR /src/app + +COPY . /src/app + +EXPOSE 80 + +CMD [ "uvicorn","main:app","--reload","--port","80","--host=0.0.0.0"] + diff --git a/dockerfile_base b/dockerfile_base new file mode 100755 index 0000000..29d350a --- /dev/null +++ b/dockerfile_base @@ -0,0 +1,5 @@ +FROM docker.tavasi.ir/tavasi/qachat_base:1.0.0 +RUN pip install uvicorn[standard] +RUN pip install FlagEmbedding +RUN pip install aiofiles +RUN pip install openai \ No newline at end of file diff --git a/main.py b/main.py new file mode 100755 index 0000000..f28804d --- /dev/null +++ b/main.py @@ -0,0 +1,58 @@ +from fastapi import FastAPI +from routers.rag_base import router as rag_base +from contextlib import asynccontextmanager +from fastapi.middleware.cors import CORSMiddleware + + +# --- Lifespan manager --- +@asynccontextmanager +async def lifespan(app: FastAPI): + # 🚀 Startup + print("🚀 Starting up RAG system...") + # ایجاد OSS client و ذخیره در app.state + + # --- نکته مهم: اگر elastic_client هم می‌خوای توی startup درست کنی، اینجا اضافه کن --- + # elastic_client = get_elastic_client() + # app.state.elastic_client = elastic_client + + yield # برنامه در این حالت اجرا می‌شود + + # 🛑 Shutdown + print("🛑 Shutting down RAG system...") + # بستن اتصال‌های باز + client = getattr(app.state, "elastic_client", None) + if client is not None: + await client.close() + + +# --- ساخت اپلیکیشن --- +def create_app() -> FastAPI: + app = FastAPI( + title="qachat2 Backend", + version="0.1.0", + lifespan=lifespan, # ✅ اینجا lifespan رو متصل می‌کنیم + ) + + origins = ["*"] + app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + @app.get("/") + async def simple(): + return "ai rag chat qanon OK" + + @app.get("/ping") + async def ping(): + return "ai rag chat qanon OK" + + app.include_router(rag_base, prefix="") + return app + + +# ✅ نمونه‌سازی نهایی +app = create_app() diff --git a/new_requirements.txt b/new_requirements.txt new file mode 100755 index 0000000..053d268 --- /dev/null +++ b/new_requirements.txt @@ -0,0 +1,16 @@ +cleantext +elasticsearch7 +faiss_cpu +fastapi +hazm +langchain_openai +numpy +openai +pandas +pydantic +scikit_learn +sentence_transformers +torch +transformers +orjson +FlagEmbedding==1.3.5 \ No newline at end of file diff --git a/routers/__init__.py b/routers/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/routers/ai_data_parser.py b/routers/ai_data_parser.py new file mode 100755 index 0000000..cc9f724 --- /dev/null +++ b/routers/ai_data_parser.py @@ -0,0 +1,423 @@ +from typing import List +from pathlib import Path +import os, orjson, time, json, re, asyncio, traceback +from openai import AsyncOpenAI + + +# ------------------------------ پردازش API ------------------------------ +class AsyncCore: + def __init__( + self, + model_name, + task_name, + output_schema, + api_url, + data_path=None, + reasoning_effort="low", + top_p=1, + temperature=0.0, + max_token=128000, + output_path=None, + ai_code_version=None, + request_timeout=30, # ثانیه + api_key="EMPTY", + save_number=2, + semaphore_number=5, + ): + + self.save_number = save_number + # json file of data + self.data_path = data_path + self.semaphore_number = semaphore_number + + self.task_name = task_name + if output_path is None: + output_path = f"./{task_name}" + + self.output_path = Path(output_path) + self._temp_path = self.output_path / "batch_data" + self._temp_processed_id_path = self._temp_path / "processed_id.json" + + # Create output directory and subdirectories if they don't exist + self.output_path.mkdir(parents=True, exist_ok=True) + self._temp_path.mkdir(parents=True, exist_ok=True) + # self._temp_processed_id_path.mkdir(parents=True, exist_ok=True) + + self.request_timeout = request_timeout + self.model_name = model_name + self.api_key = api_key + self.output_schema = output_schema + self.api_url = api_url + self.reasoning_effort = reasoning_effort + self.top_p = top_p + self.temperature = temperature + self.max_token = max_token + + if ai_code_version is None: + ai_code_version = f"{model_name}_{reasoning_effort}" + self.ai_code_version = ai_code_version + + self.PRIMARY_KEY = {"system_prompt", "user_prompt", "id"} + if data_path != None: + try: + self.data = self.__data_process() + print(f"📦 Loaded {len(self.data)} words") + except Exception as e: + raise ValueError( + f"Data loading/validation failed: {e}\n{traceback.format_exc()}" + ) + + def __validate_item(self, item, idx): + # Mandatory fields + for key in self.PRIMARY_KEY: + if key not in item: + raise ValueError(f"Missing mandatory key '{key}' in item #{idx}") + if not isinstance(item[key], str): + raise TypeError( + f"Item #{idx}: '{key}' must be a string, got {type(item[key]).__name__}" + ) + + # Optional field: assistant_prompt + if "assistant_prompt" not in item or item["assistant_prompt"] is None: + item["assistant_prompt"] = None + else: + if not isinstance(item["assistant_prompt"], str): + raise TypeError( + f"Item #{idx}: 'assistant_prompt' must be a string or absent, got {type(item['assistant_prompt']).__name__}" + ) + + return item # now normalized + + def __data_process(self): + raw_data = self.__load_orjson(self.data_path) + if not isinstance(raw_data, list): + raise ValueError("Data must be a list of dictionaries.") + + processed_data = [] + for idx, item in enumerate(raw_data): + if not isinstance(item, dict): + raise ValueError(f"Item #{idx} is not a dictionary.") + validated_item = self.__validate_item(item, idx) + processed_data.append(validated_item) + + return processed_data + + def __get_max_number_file(self, directory): + # Pattern to match filenames like out_1.json, out_25.json, etc. + pattern = re.compile(r"output_(\d+)\.json$") + max_num = 0 + + for filename in os.listdir(directory): + match = pattern.match(filename) + if match: + num = int(match.group(1)) + if num > max_num: + max_num = num + return max_num + 1 + + def __load_orjson(self, path: str | Path): + path = Path(path) + with path.open("rb") as f: # باید باینری باز بشه برای orjson + return orjson.loads(f.read()) + + def __save_orjson(self, path, data): + with open(path, "wb") as f: + f.write( + orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) + ) + + def merge_json_dir(self, input_path, output_path): + directory = Path(input_path) + if not directory.is_dir(): + raise ValueError(f"Not valid PATH: {input_path}") + + seen_ids = set() # برای ردیابی idهای دیده‌شده (سریع!) + unique_data = [] # فقط داده‌های یکتا + failed_files = [] + + json_files = list(directory.glob("*.json")) + if not json_files: + print("⚠️ NO JSON File Found In This PATH") + return + + for json_file in json_files: + try: + data = self.__load_orjson(json_file) + if not data: # خالی یا None + failed_files.append(json_file.name) + continue + + if isinstance(data, list) and isinstance(data[0], dict): + for item in data: + item_id = item.get("id") + if item_id is None: + # اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی + # اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند + continue + if item_id not in seen_ids: + seen_ids.add(item_id) + unique_data.append(item) + else: + raise ValueError(f"no list available in this json -> {json_file}") + except ( + json.JSONDecodeError, + ValueError, + OSError, + KeyError, + TypeError, + ) as e: + # print(f"❌ Failed in process '{json_file.name}': {e}") + failed_files.append(json_file.name) + + # گزارش خطاها + if failed_files: + print("\n❌ We lose this file:") + for name in failed_files: + print(f" - {name}") + else: + print("\n✅ All JSON added") + + # ذخیره خروجی + try: + self.__save_orjson(data=unique_data, path=output_path) + print( + f"\n💾 Final file saved: {output_path} (Total unique items: {len(unique_data)})" + ) + except Exception as e: + print(f"❌ Error in saving final file: {e}") + + def make_new_proccessed_ids_from_file(self, json_in, out_path): + data = self.__load_orjson(json_in) + + finall_data = [] + for d in data: + if d["id"]: + finall_data.append(d["id"]) + finall_data = set(finall_data) + finall_data = list(finall_data) + print(f"-- len ids {len(finall_data)}") + + self.__save_orjson(data=finall_data, path=out_path) + + # ------------------------------ Main ------------------------------ + async def __process_item(self, client, item): + try: + messages = [ + {"role": "user", "content": item["user_prompt"]}, + ] + if item.get("system_prompt"): + messages.append( + {"role": "system", "content": item["system_prompt"]} + ) + if item.get("assistant_prompt"): + messages.append( + {"role": "assistant", "content": item["assistant_prompt"]} + ) + + response = await client.chat.completions.parse( + model=self.model_name, + messages=messages, + temperature=self.temperature, + top_p=self.top_p, + reasoning_effort=self.reasoning_effort, + max_tokens=self.max_token, + stop=None, + response_format=self.output_schema, + ) + + parsed = ( + response.choices[0].message.parsed + if response and response.choices and response.choices[0].message.parsed + else {"raw_text": str(response)} + ) + + parsed = self.output_schema.model_validate(parsed) + parsed = parsed.model_dump() + parsed = dict(parsed) + parsed["ai_code_version"] = self.ai_code_version + parsed["id"] = item["id"] + # parsed["item"] = item + return parsed, 200 + + except asyncio.TimeoutError: + print(f"⏳ Timeout on item {item['id']}") + return None, 408 + + except Exception as e: + print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}") + return None, 400 + + def async_eval(self, processed_id: List = []): + try: + asyncio.run(self.__async_eval(processed_id)) + except KeyboardInterrupt: + print("\n🛑 Interrupted by user.") + traceback.print_exc() + + async def __async_eval(self, processed_id: List): + """ + اجرای اصلی تک‌هسته‌ای و async برای تولید خروجی نهایی. + """ + print("🔹 Starting async data processing...") + + # ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------ + if not processed_id: + try: + processed_id = self.__load_orjson(self._temp_processed_id_path) + print( + f"📂 Loaded existing processed_id from {self._temp_processed_id_path}" + ) + except Exception: + print("⚠️ No valid processed_id found. Starting fresh.") + processed_id = [] + + # ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------ + all_processed_id = set(processed_id) + all_results = [] + total_time = [] + + data = [item for item in self.data if item.get("id") not in all_processed_id] + print( + f"➕ Total items: {len(self.data)} - {len(all_processed_id)} = {len(data)}" + ) + + # اگر چیزی برای پردازش نیست + if not data: + print("✅ Nothing new to process. All items are already done.") + return + + # ------------------ مرحله ۳: شروع پردازش ------------------ + print(f"🤖 Model: {self.model_name} | Reasoning: {self.reasoning_effort}") + async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: + semaphore = asyncio.Semaphore(5) + + async def limited_process(item): + async with semaphore: + return await self.__process_item(client, item) + + tasks = [asyncio.create_task(limited_process(item)) for item in data] + + total_i = 0 + # ✅ پردازش به ترتیب تکمیل (نه ترتیب لیست) + for i, task in enumerate(asyncio.as_completed(tasks), start=1): + start = time.time() + try: + parsed, status_code = await asyncio.wait_for( + task, timeout=self.request_timeout + ) # ⏱ حداکثر 2 دقیقه + except asyncio.TimeoutError: + print(f"⏳ Task {i} timed out completely") + parsed, status_code = None, 408 + total_time.append(time.time() - start) + + if status_code == 200: + all_results.append(parsed) + all_processed_id.add(parsed.get("id")) + else: + print(f"⚠️ Skipped item (status={status_code})") + + total_i += 1 + # ✅ ذخیره‌ی موقت هر n مورد + if total_i >= self.save_number: + print(f"total_i {total_i}") + print(f"self.save_number {self.save_number}") + total_i = 0 + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + number = self.__get_max_number_file(self._temp_path) + print(f"number {number}") + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson(data=list(all_results), path=temp_output_path) + print(f"💾 Auto-saved partial data: {len(all_results)}") + all_results.clear() + + # ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده + if total_i > 0 or len(all_results) > 0: + print("💾 Final save of remaining data...") + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + number = self.__get_max_number_file(self._temp_path) + print(f"number {number}") + + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson(data=list(all_results), path=temp_output_path) + print(f"💾 Auto-saved partial data: {len(all_results)}") + all_results.clear() + + # ------------------ مرحله ۴: ذخیره خروجی ------------------ + final_data_path = self.output_path / f"final_data_{self.task_name}.json" + processed_id_path = self.output_path / "processed_id.json" + + self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path) + all_results = self.__load_orjson(final_data_path) + # make_new_proccessed_ids_from_file() + self.__save_orjson(data=list(all_processed_id), path=processed_id_path) + self.__save_orjson(data=all_results, path=final_data_path) + + avg_time = (sum(total_time) / len(total_time)) if total_time else 0 + print( + f"\n✅ Processing completed!\n" + f"📊 Total-Data: {len(data)} | " + f"⭕ Ignored-Data: {len(processed_id)} | " + f"📦 Proccessed-Data: {len(all_results)} | " + f"❌ Loss-Data: {len(data)-len(all_results)} | " + f"🕒 Avg Time: {avg_time:.2f}'s per item | " + f"🕒 Total Time: {sum(total_time):.4f}'s | " + f"💾 Results saved to: {final_data_path}" + ) + + async def single_simple_async_proccess_item(self, item): + async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: + semaphore = asyncio.Semaphore(5) + async with semaphore: + try: + messages = [ + {"role": "user", "content": item["user_prompt"]}, + ] + if item.get("system_prompt"): + messages.append( + {"role": "system", "content": item["system_prompt"]} + ) + if item.get("assistant_prompt"): + messages.append( + {"role": "assistant", "content": item["assistant_prompt"]} + ) + + response = await client.chat.completions.parse( + model=self.model_name, + messages=messages, + temperature=self.temperature, + top_p=self.top_p, + reasoning_effort=self.reasoning_effort, + max_tokens=self.max_token, + stop=None, + response_format=self.output_schema, + ) + + parsed = ( + response.choices[0].message.parsed + if response and response.choices and response.choices[0].message.parsed + else {"raw_text": str(response)} + ) + + parsed = self.output_schema.model_validate(parsed) + parsed = parsed.model_dump() + parsed = dict(parsed) + parsed["ai_code_version"] = self.ai_code_version + return parsed, 200 + + except asyncio.TimeoutError: + print(f"⏳ Timeout on item {item}") + return None, 408 + + except Exception as e: + print(f"⚠️ Error __process_item {item}: {traceback.print_exc()}") + return None, 400 + + diff --git a/routers/base_model.py b/routers/base_model.py new file mode 100755 index 0000000..527742a --- /dev/null +++ b/routers/base_model.py @@ -0,0 +1,28 @@ +from pydantic import BaseModel +from typing import List + + +class Title(BaseModel): + title: str + +class Query(BaseModel): + query: str + +class ChatObject(BaseModel): + title: str + user_query: str + model_key: str + retrived_passage: str + retrived_ref_ids: str + model_answer: str + status:str='success' + prompt_type: str= "question-answer" + + +class LLMOutput(BaseModel): + text : str + source : List[str] + +class LLMInput(BaseModel): + query : str + knowledge : List[dict] \ No newline at end of file diff --git a/routers/chatbot_handler.py b/routers/chatbot_handler.py new file mode 100755 index 0000000..13b9407 --- /dev/null +++ b/routers/chatbot_handler.py @@ -0,0 +1,355 @@ +import numpy as np +import torch, orjson, faiss, re +from typing import List +from sentence_transformers import SentenceTransformer +from sklearn.feature_extraction.text import TfidfVectorizer +from sklearn.metrics.pairwise import cosine_similarity +from FlagEmbedding import FlagReranker +from pathlib import Path + +# nlist = 2048 +# quantizer = faiss.IndexFlatIP(dim) +# index = faiss.IndexIVFFlat(quantizer, dim, nlist) +# index.train(embeddings) +# index.add(embeddings) + + +class InitHybridRetrieverReranker: + def __init__( + self, + embeder_path, + reranker_path, + dict_content: List[dict], + faiss_index, + dense_alpha: float = 0.6, + device: str = None, + cache_dir="/src/MODELS", + batch_size=512, + ): + + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + + self.device = device + self.dense_alpha = dense_alpha + + # =============================== + # تبدیل ورودی فقط یک بار + # =============================== + self.content_list = [x["content"] for x in dict_content] + self.ids_list = [x["id"] for x in dict_content] + self.N = len(self.content_list) + self.faiss_index = faiss_index + + # Dense embedder + self.embedder = SentenceTransformer( + local_files_only=True, + model_name_or_path=embeder_path, + cache_folder=cache_dir, + device=self.device, + similarity_fn_name="cosine", + ) + + # TF-IDF + self.vectorizer = TfidfVectorizer( + analyzer="word", + ngram_range=(1, 2), + token_pattern=r"(?u)\b[\w\u0600-\u06FF]{2,}\b", + ) + self.tfidf_matrix = self.vectorizer.fit_transform(self.content_list) + + # Reranker + self.reranker = FlagReranker( + model_name_or_path=reranker_path, + local_files_only=True, + use_fp16=True, + devices=device, + cache_dir=cache_dir, + batch_size=batch_size, + normalize=True, + # max_length=1024, + # trust_remote_code=False, + # query_max_length= + ) + + print("RAG Ready — Retriever + Reranker Loaded") + + # ================================ + # Dense Search (FAISS) + # ================================ + async def dense_retrieve(self, query: str, top_k: int): + if top_k <= 0: + return [], np.array([]) + + emb = self.embedder.encode(query, convert_to_numpy=True).astype(np.float32) + + D, I = self.faiss_index.search(emb.reshape(1, -1), top_k) + return I[0], D[0] + + # ================================ + # Sparse Search (TF-IDF) + # ================================ + async def sparse_retrieve(self, query: str, top_k: int): + if top_k <= 0: + return [], np.array([]) + + q_vec = self.vectorizer.transform([query]) + sims = cosine_similarity(q_vec, self.tfidf_matrix)[0] + + k = min(top_k, len(sims)) + idx = np.argpartition(-sims, k - 1)[:k] + idx = idx[np.argsort(-sims[idx], kind="mergesort")] + + return idx, sims[idx] + + # ================================ + # Reciprocal Rank Fusion + # ================================ + async def fuse(self, d_idx, d_scores, s_idx, s_scores, top_k=50, k_rrf=60): + combined = {} + + for rank, idx in enumerate(d_idx): + combined[idx] = combined.get(idx, 0) + 1.0 / (k_rrf + rank) + + for rank, idx in enumerate(s_idx): + combined[idx] = combined.get(idx, 0) + 1.0 / (k_rrf + rank) + + sorted_items = sorted(combined.items(), key=lambda x: x[1], reverse=True) + + return [i[0] for i in sorted_items[:top_k]] + + # ================================ + # Rerank + # ================================ + async def rerank(self, query: str, cand_idx: List[int], final_k: int = 10): + if not cand_idx: + return [] + + passages = [self.content_list[i] for i in cand_idx] + pairs = [[query, p] for p in passages] + + scores = self.reranker.compute_score(pairs, normalize=True, max_length=512) + + if isinstance(scores, float): + scores = [scores] + + idx_score = list(zip(cand_idx, scores)) + idx_score.sort(key=lambda x: x[1], reverse=True) + + return idx_score[:final_k] + + # ================================ + # Main Search Function + # ================================ + async def search_base( + self, + query: str, + topk_dense=50, + topk_sparse=50, + pre_rerank_k=50, + final_k=10, + ): + + d_idx, d_scores = await self.dense_retrieve(query, topk_dense) + s_idx, s_scores = await self.sparse_retrieve(query, topk_sparse) + + cand_idx = await self.fuse(d_idx, d_scores, s_idx, s_scores, pre_rerank_k) + final_rank = await self.rerank(query, cand_idx, final_k) + + # =============================== + # خروجی سریع و تمیز + # =============================== + return [ + { + "id": self.ids_list[idx], + "content": self.content_list[idx], + "score": score, + } + for idx, score in final_rank + ] + + +def load_orjson(path: str | Path): + path = Path(path) + with path.open("rb") as f: # باید باینری باز بشه برای orjson + return orjson.loads(f.read()) + + +def save_orjson(path, data): + with open(path, "wb") as f: + f.write( + orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) + ) + + +WEB_LINK = "https://majles.tavasi.ir/entity/detail/view/qsection/" +# ref = f"[«{i}»](https://majles.tavasi.ir/entity/detail/view/qsection/{idx})" + + +def get_in_form(title: str, sections: list, max_len: int = 4000): + chunks = [] + current = f"برای پرسش: {title}\n\n" + ref_text = "«منبع»" + + for i, data in enumerate(sections, start=1): + sec_text = data.get("content", "") + idx = data.get("id") + + # ساخت ref کامل + ref = f"[{ref_text}]({WEB_LINK}{idx})" + # متن کامل آیتم + block = f"{i}: {sec_text}\n{ref}\n\n" + + # اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید + if len(current) + len(block) > max_len: + chunks.append(current.rstrip()) + current = "" + + current += block + + # آخرین چانک را هم اضافه کن + if current.strip(): + chunks.append(current.rstrip()) + + return chunks + +def chunked_simple_text(answer_text, max_len=4000): + chunks = [] + current = "" + + sentences = answer_text.split(". ") + for sentence in sentences: + st = sentence.strip() + if not st.endswith("."): + st += "." + + if len(current) + len(st) > max_len: + chunks.append(current.strip()) + current = "" + + current += st + " " + + if current.strip(): + chunks.append(current.strip()) + + return chunks + +def format_answer_bale(answer_text: str, sources: list, max_len: int = 4000): + """ + answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد + sources: مثل ['qs2117427'] + """ + ref_text = "«منبع»" + + def make_link(src): + return f"[{ref_text}]({WEB_LINK}{src})" + + # الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد + # مثلا: (qs123) یا (qs123, qs456, qs789) + pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)" + + def replace_source(m): + content = m.group(1) + codes = [c.strip() for c in content.split(",")] # جداسازی چند کد + links = [make_link(code) for code in codes] + full_match = m.group(0) + # if "منبع" in full_match: + # print(f'Found explicit source(s): {links}') + # else: + # print(f'Found implicit source(s): {links}') + return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان + + # جایگزینی در متن + answer_text = re.sub(pattern, replace_source, answer_text) + + # اگر طول کمتر از max_len بود → تمام + if len(answer_text) <= max_len: + return [answer_text] + + # تقسیم متن اگر طول زیاد شد + chunks = [] + current = "" + + sentences = answer_text.split(". ") + for sentence in sentences: + st = sentence.strip() + if not st.endswith("."): + st += "." + + if len(current) + len(st) > max_len: + chunks.append(current.strip()) + current = "" + + current += st + " " + + if current.strip(): + chunks.append(current.strip()) + + return chunks + + +def get_user_prompt(query: str): + """ + get a query and prepare a prompt to generate title based on that + """ + title_prompt = f"برای متن {query} یک عنوان با معنا که بین 3 تا 6 کلمه داشته باشد، در قالب یک رشته متن ایجاد کن. سبک و لحن عنوان، حقوقی و کاملا رسمی باشد. عنوان تولید شده کاملا ساده و بدون هیچ مارک داون یا علائم افزوده ای باشد. غیر از عنوان، به هیچ وجه توضیح اضافه ای در قبل یا بعد آن اضافه نکن." + return title_prompt + + +def format_knowledge_block(knowledge): + lines = [] + for item in knowledge: + _id = item.get("id", "unknown") + _content = item.get("content", "") + lines.append(f"- ({_id}) { _content }") + return "\n".join(lines) + + +def get_user_prompt2(obj): + query = obj.query + knowledge = obj.knowledge + + prompt = f""" +شما باید تنها بر اساس اطلاعات ارائه شده پاسخ بدهید و هیچ دانشی خارج از آنها استفاده نکنید. + +### پرسش: +{query} + +### اسناد قابل استناد: +{format_knowledge_block(knowledge)} + +### دستور تولید خروجی: +- پاسخی کاملاً دقیق، تحلیلی و مفهومی ایجاد کن +- لحن رسمی و حقوقی باشد +- اگر پاسخ نیاز به ترکیب چند سند دارد، آنها را ادغام کن +- اگر داده‌ها کافی نبود، این موضوع را شفاف اعلام کن اما اطلاعات مرتبط را همچنان ارائه بده +""" + return prompt + + +def get_user_prompt3(query, knowledge_json): + sys = f"""Answer the following based ONLY on the knowledge: + +Query: +{query} + +Knowledge: +{knowledge_json}""" + return sys + + +def load_faiss_index(index_path: str, metadata_path: str): + """بارگذاری ایندکس FAISS و متادیتا (لیست جملات + عناوین).""" + index = faiss.read_index(index_path) + + metadata = load_orjson(metadata_path) + + metadata = [ + { + "id": item["id"], + "content": item["content"], + "prefix": item["prefix"], + } + for item in metadata + ] + return metadata, index diff --git a/routers/rag_base.py b/routers/rag_base.py new file mode 100755 index 0000000..c30ff85 --- /dev/null +++ b/routers/rag_base.py @@ -0,0 +1,250 @@ +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse +import time, os, traceback +from .base_model import Query, LLMOutput, LLMInput, Title +from .ai_data_parser import AsyncCore +from .chatbot_handler import ( + InitHybridRetrieverReranker, + format_answer_bale, + get_user_prompt2, + get_user_prompt3, + load_faiss_index, + get_in_form,chunked_simple_text +) +from .static import ( + EMBED_MODEL_PATH, + FAISS_INDEX_PATH, + FAISS_METADATA_PATH, + LLM_URL, + SYSTEM_PROMPT_FINALL, + RERANKER_MODEL_PATH, + LLM_ERROR, + MODEL_KEY, + MODEL_NAME, + OUTPUT_PATH_LLM, + REASONING_EFFORT, + TASK_NAME, + LLM_TIME_OUT, + MAX_TOKEN, + SYSTEM_PROPMT2, +) + + +# ################################################## Global-params +router = APIRouter(tags=["ragchat"]) +# # settings= get_settings() + + +METADATA_DICT, FAISS_INDEX = load_faiss_index( + index_path=FAISS_INDEX_PATH, metadata_path=FAISS_METADATA_PATH +) + +RAG = InitHybridRetrieverReranker( + embeder_path=EMBED_MODEL_PATH, + reranker_path=RERANKER_MODEL_PATH, + dict_content=METADATA_DICT, + faiss_index=FAISS_INDEX, + dense_alpha=0.6, + device="cuda", +) + +RUNNER_PROMPT = AsyncCore( + model_name=MODEL_NAME, + api_url=LLM_URL, + output_path=OUTPUT_PATH_LLM, + task_name=TASK_NAME, + output_schema=LLMOutput, + reasoning_effort=REASONING_EFFORT, + ai_code_version=MODEL_KEY, + request_timeout=LLM_TIME_OUT, + max_token=MAX_TOKEN, + save_number=1, +) + +from pydantic import BaseModel + +SYS_PROMPT = """ +شما یک ربات متخصص حقوقی و قانونی هستید به نام «قانون‌یار». +هویت شما: یک وکیل باتجربه، دقیق، مودب و مشتاق کمک‌کردن به افراد در فهم مسائل حقوقی. + +در این مرحله، وظیفه شما «تشخیص نوع پیام» است؛ اما باید یک دلیل کوتاه (answer) بدهید +که لحنش لحن یک وکیل کارکشته و حامی باشد. + +برچسب خروجی فقط یکی از این موارد است: +- legal_question +- greeting +- other + +تعریف‌ها: +1. legal_question: هر پرسش یا متن مرتبط با قوانین، مقررات، دادگاه، وکالت، قرارداد، مالیات، ارث، نکاح و طلاق، شکایت، چک، سفته، ثبت شرکت، حقوق کار و هر موضوع مشابه. +2. greeting: پیام‌های غیرحقوقی شامل سلام، معرفی، احوال‌پرسی، تشکر، خداحافظی. +3. other: پیام‌هایی که خارج از دو دسته بالا قرار می‌گیرند. + +نکات مهم: +- answer باید بسیار کوتاه باشد (حداکثر دو خط). +- اگر پیام حتی کمی رنگ‌وبوی حقوقی داشته باشد، answer را به سمت توضیح حقوقی ببرید. +- در حالت greeting با لحن رسمی و دوستانه یک وکیل پاسخ بدهید. +- در حالت other، در عین محترمانه‌گفتن اینکه پیام حقوقی نیست، سعی کنید یک زاویهٔ حقوقی احتمالی مطرح کنید تا راه کمک باز بماند. + +خروجی شما حتماً باید شامل دو فیلد باشد: +label: یکی از سه برچسب +answer: توضیح کوتاه، دوستانه، مودب و با لحن یک وکیل حامی + +خروجی فقط JSON باشد. +""" + + +ASSIST_PROMPT = """ +### Example 1: +input: +"عین کلی" +output: +{ + "label": "legal_question", + "answer": "موضوع شما به یکی از مفاهیم مهم حقوق مدنی مربوط می‌شود و با کمال میل راهنمایی‌تان می‌کنم." +} + +### Example 2: +input: +"طرز تهیه ماست" +output: +{ + "label":"other", + "answer":"موضوع شما ماهیت حقوقی ندارد، اما اگر دغدغه‌تان مرتبط با استاندارد یا مسئولیت تولید باشد میتوانم به شما کمک کنم، می شود سوال خود را بهتر و کامل تر بپرسید؟ میخواهم دقیق بدانم چه قسمتی از موارد حقوقی تولید ماست رو میخواهید بدونید" +} + +### Example 3: +input: +"سلام، تو کی هستی؟" +output: +{ + "label":"greeting", + "answer":"سلام و ارادت؛ من قانون‌یار هستم، وکیل همراه شما برای هر پرسش حقوقی." +} +""" + +class ChatOutput(BaseModel): + label: str + answer: str | None = None + +RUNNER_CHAT = AsyncCore( + model_name=MODEL_NAME, + api_url=LLM_URL, + output_path="/home2/rag_qa_chat2/data/_temp/", + task_name="type-of-chat", + output_schema=ChatOutput, + reasoning_effort="low", + ai_code_version=MODEL_KEY, + request_timeout=LLM_TIME_OUT, + max_token=256, + save_number=1, + top_p=0.8, + temperature=0.5 +) + +# legal_question +# greeting +# other + +# label +# reasoning +async def chat_bot_run(query): + try: + s0 = time.time() + print(f'query {query}') + type_, _ = await RUNNER_CHAT.single_simple_async_proccess_item( + item={ + "user_prompt": query, + "system_prompt": SYS_PROMPT, + "assistant_prompt":ASSIST_PROMPT + } + ) + type_chat = type_.get('label', None) + answer = type_['answer'] + s = time.time() + print(f'type_llm {type_} {s-s0}\'s Toke') + if type_chat != "legal_question": + return chunked_simple_text(answer) + + sections_dict = await RAG.search_base( + query, + final_k=10, + topk_dense=100, + topk_sparse=100, + pre_rerank_k=100, + ) + e = time.time() + # input_data = LLMInput(query=query, knowledge=sections_dict) + # prompt = get_user_prompt2(input_data) + prompt = get_user_prompt3(query=query, knowledge_json=sections_dict) + + llm_answer, _ = await RUNNER_PROMPT.single_simple_async_proccess_item( + item={"user_prompt": prompt, "system_prompt": SYSTEM_PROPMT2}, + ) + ee = time.time() + finall = format_answer_bale( + answer_text=llm_answer["text"], sources=llm_answer["source"] + ) + eee = time.time() + print(f"Rag = {e-s}", f"llm_answer = {ee-e}", f"Format = {eee-ee}", sep="\n") + return finall + except: + traceback.print_exc() + + +class RagInput(BaseModel): + query: str + limit:int = 10 + +class RagOutput (BaseModel): + finall_form: str + duration: int + ids: List[str] + +async def rag_run(input:RagInput): + try: + + s = time.time() + sections_dict = await RAG.search_base( + query=input.query, + final_k=input.limit, + topk_dense=100, + topk_sparse=100, + pre_rerank_k=100, + ) + e = time.time() + finall = get_in_form(title=input.query, sections=sections_dict) + ee = time.time() + print(f"Rag = {e-s}", f"Form = {ee-e}", sep="\n") + finall = RagOutput( + + ) + return finall + except: + traceback.print_exc() + + +@router.post("/run_chat") +async def run_chat(payload: Query, request: Request): + s = time.time() + try: + answer = await chat_bot_run(payload.query) + except: + print(f"chat_bot_run FAIL!") + answer = LLM_ERROR + e = time.time() + print(f"Total Time {e-s:.2f}'s") + return JSONResponse({"result": answer}, status_code=201) + + +@router.post("/run_rag") +async def run_chat(payload: Query, request: Request): + s = time.time() + try: + answer = await rag_run(payload.query) + except: + print(f"chat_bot_run FAIL!") + answer = LLM_ERROR + e = time.time() + print(f"Total Time {e-s:.2f}'s") + return JSONResponse({"result": answer}, status_code=201) diff --git a/routers/readme.md b/routers/readme.md new file mode 100755 index 0000000..581ceef --- /dev/null +++ b/routers/readme.md @@ -0,0 +1,8 @@ +# "gpt-4o", "gpt-4o-mini", "deepseek-chat" , "gemini-2.0-flash", gemini-2.5-flash-lite +# gpt-4o : 500 +# gpt-4o-mini : 34 +# deepseek-chat: : 150 +# gemini-2.0-flash : error +# cf.gemma-3-12b-it : 1 +# gemini-2.5-flash-lite : 35 خیلی خوب + diff --git a/routers/static.py b/routers/static.py new file mode 100755 index 0000000..7478ae9 --- /dev/null +++ b/routers/static.py @@ -0,0 +1,160 @@ +from dotenv import load_dotenv +import os +LLM_URL = "http://localhost:8004/v1/" # "http://172.16.29.102:8001/v1/" +EMBED_MODEL_PATH = "/home2/MODELS/models--sentence-transformers--paraphrase-multilingual-MiniLM-L12-v2/snapshots/86741b4e3f5cb7765a600d3a3d55a0f6a6cb443d" # "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" +RERANKER_MODEL_PATH = "/home2/MODELS/bge_reranker_m3_v2/bge-reranker-v2-m3" # "BAAI/bge-reranker-v2-m3" + + +FAISS_INDEX_PATH = "/home2/rag_qavanin_api/data/qavanin-faiss/faiss_index_qavanin_285k.index" # "/src/app/data/qavanin-faiss/faiss_index_qavanin_285k.index" +FAISS_METADATA_PATH = "/home2/rag_qavanin_api/data/qavanin-faiss/faiss_index_qavanin_285k_metadata.json" # "/src/app/data/qavanin-faiss/faiss_index_qavanin_285k_metadata.json" + + +PATH_LOG = "./data/llm-answer/" +load_dotenv() +RERANK_BATCH = int(os.environ.get("rerank_batch_size")) +API_KEY = os.getenv("api_key") +LLM_ERROR = "با عرض پوزش؛ ❌in ragمتاسفانه خطایی رخ داده است. لطفا لحظاتی دیگر دوباره تلاش نمائید" +MODEL_KEY = "oss-120-hamava" +MODEL_NAME="gpt-oss-20b" # "gpt-oss-120b" +OUTPUT_PATH_LLM="/home2/rag_qa_chat2/data/_temp" +TASK_NAME="bale-chat" +REASONING_EFFORT="low" +LLM_TIME_OUT=30 +MAX_TOKEN=8192 + + + + +SYSTEM_PROMPT_FINALL = """شما یک دستیار تحلیل‌گر حقوقی متخصص در استنتاج دقیق از اسناد قانونی هستید. + +ورودی شما شامل: +- یک پرسش کاربر (query) +- مجموعه‌ای از چند متن قانونی (knowledge)، که هر کدام شامل: + - id (شناسه سند) + - content (متن بند قانونی) + +وظیفه شما: +1. پرسش را دقیق بخوانید و فقط بر اساس اطلاعات موجود در اسناد ارائه شده پاسخ دهید. +2. از خودتان هیچ اطلاعات جدید، تخمین، تفسیر شخصی، یا دانش خارج از اسناد وارد نکنید. +3. اگر یک پاسخ نیاز به ترکیب چند سند دارد، آن‌ها را استخراج و در هم ادغام کنید و نتیجه را کاملاً روان و قابل فهم بنویسید. +4. پاسخ باید: + - تحلیل‌محور + - شفاف + - فارسی استاندارد و حقوقی + - ساختاریافته و قابل ارائه باشد +5. هر جمله یا بند از پاسخ **حتماً باید به یک یا چند id سند مشخص وصل شود**. + - اگر برای جمله‌ای منبعی پیدا نشد، صریحاً در متن ذکر کنید: "(هیچ منبع مرتبط موجود نیست)" + - از اضافه کردن idهای فرضی یا خارج از knowledge خودداری شود. +6. از تکرار مستقیم یا کپی کردن جملات خام اسناد اجتناب کنید. آن‌ها را با بازنویسی تحلیلی به کار ببرید. +7. در پایان پاسخ: + - حتماً لیست تمام شناسه‌های سندهای استفاده‌شده را برگردانید. + - فقط id های اسنادی که واقعاً در پاسخ استفاده شده‌اند ذکر شوند به صورت دقیقا: (qs2127) + - ترتیب اهمیت و ارتباط در لیست رعایت شود. +8. پاسخ نهایی باید دقیقاً در فرمت JSON زیر برگردد و هیچ متن دیگری خارج از آن اضافه نشود: + +{ + "text" : "متن کامل پاسخ تحلیلی و دقیق به پرسش، هر جمله یا بند با (id) سند مرتبط یا (هیچ منبع مرتبط موجود نیست) مشخص شود.", + "source": ["qs123", "qs545", ...] +} + +ورودی نمونه: +{ + query: "متن سوال", + knowledge: [ + {"id": "qs01", "content": "..."}, + {"id": "qs02", "content": "..."}, + ... + ] +} +""" + +SYSTEM_PROPMT2 = '''You are a legal reasoning model that MUST base the answer ONLY on the documents provided in `knowledge`. + +STRICT RULES: +1. You have no knowledge outside the provided documents. +2. Before generating the answer you MUST: + A. Extract the list of all valid document IDs from `knowledge`. + B. Think through the answer sentence-by-sentence. + C. Every sentence MUST be directly supported by one or more document IDs. + +3. Any sentence that is not directly supported by at least one `id` MUST be removed. + +4. Document IDs must appear in the text as: + (qs123) + (qs1002) + etc. + +5. The final answer MUST be returned strictly as: +{ + "text": "...", + "source": ["qs001", "qs999"] +} + +Where: +- `text` contains the final written response with citations inline. +- `source` contains ONLY the list of IDs actually used in the answer, no duplicates, order by relevance. + +6. JSON MUST be valid. No comments, no trailing commas. + +7. To the extent that there is even the slightest relevance to the question in the documentation, generate an answer from the documentation, indicating that a close answer to the user's question was not found. + +8. Finally, if no document supports the question, return: +{ + "text": "هیچ سند مرتبطی یافت نشد.", + "source": [] +} + +9. Length must NOT be shortened. Provide full analysis, fully detailed. +Before generating your answer: + +Extract the list of VALID IDs from `knowledge`. +You MUST NOT invent IDs. +Any ID not in that list is forbidden. + +''' +############# +""" +شما یک دستیار تحلیل‌گر حقوقی متخصص در استنتاج دقیق از اسناد قانونی هستید. + +ورودی شما شامل: +- یک پرسش کاربر (query) +- مجموعه‌ای از چند متن قانونی (knowledge)، که هر کدام شامل: + - id (شناسه سند) + - content (متن بند قانونی) + +وظیفه شما: +1. پرسش را دقیق بخوانید و فقط بر اساس اطلاعات موجود در اسناد ارائه شده پاسخ دهید. +2. از خودتان هیچ اطلاعات جدید، تخمین، تفسیر شخصی، یا دانش خارج از اسناد وارد نکنید. +3. اگر یک پاسخ نیاز به ترکیب چند سند دارد، آن‌ها را استخراج و در هم ادغام کنید و نتیجه را کاملاً روان و قابل فهم بنویسید. +4. پاسخ باید: + - تحلیل‌محور + - شفاف + - فارسی استاندارد و حقوقی + - ساختاریافته و قابل ارائه باشد +5. از تکرار مستقیم یا کپی کردن جملات خام اسناد اجتناب کنید. آن‌ها را با بازنویسی تحلیلی به کار ببرید. +6. اگر اطلاعات موجود برای پاسخ کامل کافی نبود: + - این موضوع را صریح اعلام کنید + - اما موارد مرتبط موجود را همچنان خلاصه و ارائه کنید +7. در پایان پاسخ: + - لیست شناسه‌های سندهای استفاده‌شده را برگردانید +- فقط id های اسنادی که واقعاً در پاسخ استفاده شده‌اند ذکر شوند به صورت دقیقا : (qs2127) + - ترتیب اهمیت در لیست رعایت شود +8. پاسخ نهایی باید دقیقاً در فرمت زیر برگردد: + +خروجی نمونه: +{ + "text" : "متن کامل پاسخ تحلیلی و دقیق به پرسش", + "source": ["qs123", "qs545", ...] +} + +بدون هیچ توضیح یا متن اضافه خارج از این قالب. + +ورودی نمونه: +{ + query: "متن سوال", + knowledge: [ + {"id": "qs01", "content": "..."}, + {"id": "qs02", "content": "..."}, + ... + ] +}""" diff --git a/run_docker.bash b/run_docker.bash new file mode 100755 index 0000000..846e468 --- /dev/null +++ b/run_docker.bash @@ -0,0 +1,3 @@ +docker stop qachat2 +docker rm qachat2 +docker run --name qachat2 -p 8009:80 --net qachat_net --gpus=all -v ./:/src/app/ -v ./data/:/src/app/data/ -v ./../MODELS:/src/MODELS -v ./../cache:/root/.cache/huggingface/hub -it --restart unless-stopped docker.tavasi.ir/tavasi/qachat2:1.0.0 diff --git a/run_env.bash b/run_env.bash new file mode 100755 index 0000000..6930baa --- /dev/null +++ b/run_env.bash @@ -0,0 +1,3 @@ +source /home2/.venv/bin/activate + +uvicorn main:app --port=8009 --host=0.0.0.0 diff --git a/util/convert_qavanin_json_to_faiss.py b/util/convert_qavanin_json_to_faiss.py new file mode 100755 index 0000000..25df593 --- /dev/null +++ b/util/convert_qavanin_json_to_faiss.py @@ -0,0 +1,72 @@ +import json +import numpy as np +import faiss +import os + +def create_faiss_index_from_json(json_file_path, faiss_index_path, metadata_file_path): + print(f'try to read {json_file_path} ...') + # --- 1. بارگذاری داده‌ها از JSON --- + with open(json_file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + print(f'file reading finished') + + # فرض بر این است که هر عنصر شامل فیلدهای زیر است: + # { + # "speech_title": "title", + # "sentence": "متن جمله", + # "embeddings": [0.12, 0.34, ...] + # } + + sentences = [] + titles = [] + embeddings_list = [] + prefix_list = [] + for k, item in data.items(): + sentences.append(item['content']) + titles.append(item['id']) + embeddings_list.append(item['embeddings']) + prefix_list.append(item['section-prefix']) + + embeddings = np.array(embeddings_list).astype('float32') # ابعاد: (n, d) + dimension = embeddings.shape[1] + + print(f"Loaded {len(embeddings)} embeddings with dimension {dimension}") + + # --- 2. ایجاد ایندکس FAISS برای GPU --- + # اگر فقط CPU دارید، از faiss.IndexFlatL2 استفاده کنید. + # اگر GPU دارید، ابتدا ایندکس را روی CPU ایجاد و سپس به GPU انتقال دهید. + cpu_index = faiss.IndexFlatL2(dimension) # معیار فاصله L2 (Euclidean) + + # انتقال ایندکس به GPU + if faiss.get_num_gpus() > 0: + print("Using GPU for FAISS index...") + res = faiss.StandardGpuResources() + gpu_index = faiss.index_cpu_to_gpu(res, 0, cpu_index) + else: + print("GPU not available, using CPU.") + gpu_index = cpu_index + + # --- 3. افزودن داده‌ها به ایندکس --- + gpu_index.add(embeddings) + print(f"Total vectors indexed: {gpu_index.ntotal}") + + # --- 4. ذخیره ایندکس به فایل --- + # برای ذخیره باید به CPU منتقل شود + final_index = faiss.index_gpu_to_cpu(gpu_index) if isinstance(gpu_index, faiss.Index) and faiss.get_num_gpus() > 0 else gpu_index + os.makedirs(os.path.dirname(faiss_index_path), exist_ok=True) + faiss.write_index(final_index, faiss_index_path) + print(f"FAISS index saved to {faiss_index_path}") + + # --- 5. ذخیره متادیتا (برای نگاشت نتایج جستجو) --- + metadata = [{"id": id, "content": c, 'prefix': p} for id, c, p in zip(titles, sentences,prefix_list)] + with open(metadata_file_path, 'w', encoding='utf-8') as f: + json.dump(metadata, f, ensure_ascii=False, indent=2) + print(f"Metadata saved to {metadata_file_path}") + +if __name__ == '__main__': + # استفاده از متد + json_file_path = '../majles-output/sections-vec-285k.json' + faiss_index_path = '../data/qavanin-faiss/faiss_index_qavanin_285k.index' + metadata_file_path = '../data/qavanin-faiss/faiss_index_qavanin_285k_metadata.json' + + create_faiss_index_from_json(json_file_path, faiss_index_path, metadata_file_path) \ No newline at end of file diff --git a/util/docker_build.bash b/util/docker_build.bash new file mode 100755 index 0000000..ae9db3a --- /dev/null +++ b/util/docker_build.bash @@ -0,0 +1,2 @@ +sudo docker build -t docker.tavasi.ir/tavasi/qachat_base:1.0.0 -f dockerfile_base . +sudo docker build -t docker.tavasi.ir/tavasi/qachat2:1.0.0 . diff --git a/util/elastic_helper.py b/util/elastic_helper.py new file mode 100755 index 0000000..57797bd --- /dev/null +++ b/util/elastic_helper.py @@ -0,0 +1,677 @@ +import zipfile +import sys +import os +import json +from time import sleep +from elasticsearch7 import Elasticsearch,helpers + +class ElasticHelper(): + + counter = 0 + total = 0 + id = "" + path_mappings = os.getcwd() + '/repo/_other/' + + def __init__(self, es_url="http://127.0.0.1:6900", es_pass="", es_user="elastic", path_mappings = ""): + + if path_mappings : + self.path_mappings = path_mappings + + if es_pass == '' : + self.es = Elasticsearch(es_url) + else: + self.es = Elasticsearch( + es_url, + http_auth=(es_user, es_pass), + ) + + # print(es_url) + # print(self.es) + + self.success_connect = False + for a in range(0,10): + try : + if not self.es.ping(): + print('elastic not ping, sleep 30 s : ', a) + sleep(5) + continue + else: + self.success_connect = True + break + + except Exception as e: + break + if not self.success_connect : + print('******','not access to elastic service') + return + + + self.counter = 0 + self.total = 0 + self.id = "" + + + def get_doctument(self, index_name, id): + res = self.es.get(index=index_name, id=id) + return res + + def exist_doctument(self, index_name, id): + res = self.es.exists(index=index_name, id=id) + return res + + def update_index_doc(self, is_update_state, index_name_o, eid, data): + if is_update_state: + resp = self.es.update(index=index_name_o, id=eid, doc=data) + # resp = self.es.update(index=index_name_o, id=eid, body={'doc':data}) + else: + resp = self.es.index(index=index_name_o, id=eid, document=data) + return resp + + + def exportToJsonForAI(self, path_back, index_name, out_name= '', body={}, fields=[]) : + print('*' * 50, ' start backup -->', index_name) + self.counter = 0 + sid = None + + out = out_name + if out_name == '' : + out = index_name + + fout = open( path_back + "/"+ out + '.json', 'a+' , encoding='utf-8') + + s_res = self.es.search( + index=index_name, + scroll='5m', + size=1000, + body=body + ) + self.total = s_res["hits"]["total"]['value'] + + print('start index = %s' % index_name) + print('total = %d' % self.total) + + sid = s_res['_scroll_id'] + scroll_size = len(s_res['hits']['hits']) + file_count = 1 + out_json = [] + while scroll_size > 0: + "Scrolling..." + self.counter += scroll_size + print("progress -> %.2f %%" % ((self.counter / self.total)*100)) + ############################# + for item in s_res['hits']['hits']: + + if fields : + item2={} + item2['id']=item['_id'] + for kf in fields : + #print(kf) + if kf in item['_source'] : + # print(item['_source'][kf]) + item2[kf] = item['_source'][kf] + #exit() + else : + item2=item + + out_json.append(item2) + + + s_res = self.es.scroll(scroll_id=sid, scroll='2m', request_timeout=100000) + sid = s_res['_scroll_id'] + scroll_size = len(s_res['hits']['hits']) + + sid = None + text = json.dumps(out_json, ensure_ascii=False) + fout.write(text) + + ############################## + + def backupIndexToZipfile(self, path_back, index_name, out_name= '', body={}, byzip = True, fields=[], noFields=[]) : + print('*' * 50, ' start backup -->', index_name) + self.counter = 0 + sid = None + + out = out_name + if out_name == '' : + out = index_name + + + if body == {} : + s_res = self.es.search( + index=index_name, + scroll='5m', + size=1000 + ) + else: + s_res = self.es.search( + index=index_name, + scroll='5m', + size=1000, + body=body + ) + + self.total = s_res["hits"]["total"]['value'] + if self.total == 0 : + print('total index_name by query = %d' % self.total) + return False + + if byzip: + fout = zipfile.ZipFile(path_back + "/"+ out + '.zip', 'w') + else: + fout = open( path_back + "/"+ out + '.json', 'a+' , encoding='utf-8') + + + print('start index = %s' % index_name) + print('total = %d' % self.total) + + sid = s_res['_scroll_id'] + scroll_size = len(s_res['hits']['hits']) + file_count = 1 + while scroll_size > 0: + "Scrolling..." + self.counter += scroll_size + print("progress -> %.2f %%" % ((self.counter / self.total)*100)) + ############################# + out_json = [] + for item in s_res['hits']['hits']: + if fields : + item2={} + item2['id']=item['_id'] + item2['_source']={} + for kf in fields : + if kf in item['_source'] : + item2['_source'][kf] = item['_source'][kf] + else : + item2=item + + if noFields : + for kf in noFields : + if kf in item2['_source']: + del item2['_source'][kf] + + + out_json.append(item2) + + + text = json.dumps(out_json, ensure_ascii=False) + out_json = [] + if byzip: + filename = out + str(file_count) + '.json' + file_count +=1 + fout.writestr(filename, text.encode('utf-8'), zipfile.ZIP_DEFLATED ) + else: + fout.write(text) + + ############################## + s_res = self.es.scroll(scroll_id=sid, scroll='2m', request_timeout=100000) + sid = s_res['_scroll_id'] + scroll_size = len(s_res['hits']['hits']) + sid = None + fout.close() + + + def restorFileToElastic(self, path_back, index_name, app_key = '', queryDelete = True, map_name='') : + if not os.path.exists(path_back) : + print(' **** error *** path not exist: ', path_back) + return False + + file_path = path_back + '/' + index_name + '.zip' + if not os.path.exists(file_path ) : + return False + + if queryDelete : + # اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند + if self.deleteIndex(index_name) : + self.createIndex(index_name, app_key, map_name) + self.zipFileToElastic(file_path, index_name) + else : # اگر وجود داشته باشد پرش می کند و کاری نمیکند + self.createIndex(index_name, app_key, map_name) + self.zipFileToElastic(file_path, index_name) + + def restorFileToElastic2(self, path_file, index_name, app_key = '', queryDelete = True, map_name='') : + if not os.path.exists(path_file) : + print(' **** error *** path not exist: ', path_file) + return False + + file_path = path_file + if not os.path.exists(file_path ) : + return False + + if queryDelete : + # اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند + if self.deleteIndex(index_name) : + self.createIndex(index_name, app_key, map_name) + self.zipFileToElastic(file_path, index_name) + else : # اگر وجود داشته باشد پرش می کند و کاری نمیکند + self.createIndex(index_name, app_key, map_name) + self.zipFileToElastic(file_path, index_name) + + + def renameElasticIndex(self, index_name_i, index_name_o, app_key = '', map_name='') : + + if self.createIndex(index_name_o, app_key, map_name) : + res = self.es.reindex( + body={ + "source": {"index": index_name_i}, + "dest": {"index": index_name_o} + }, + wait_for_completion=False) + + print(type(res)) + print(res) + + taskid = res["task"] if res["task"] else "" + #tasks = client.TasksClient(self.es) + tasks = self.es.tasks + while True : + res = tasks.get(task_id = taskid) + if res["completed"] : + break + + # print( res["task"]) + print( '----', index_name_o, ' imported : ', res["task"]["status"]["total"] , ' / ', res["task"]["status"]["created"]) + sleep(1) + print( '----', index_name_o, ' complated') + + + def deleteIndex(self, index_name) : + if not self.es.indices.exists(index=index_name) : + print(' ' * 10, " for delete NOT exist index :", index_name ) + return True + + question = 'Is DELETE elastic index (' + index_name +') ? ' + if self.query_yes_no(question) : + self.es.indices.delete(index = index_name) + print('%' * 10 , " Finish DELETE index :", index_name ) + return True + else : + return False + + def query_yes_no(self, question, default="no"): + valid = { "yes": True, "y": True, "ye": True, "no": False, "n": False } + if default is None: + prompt = " [y/n] " + elif default == "yes": + prompt = " [Y/n] " + elif default == "no": + prompt = " [y/N] " + else: + raise ValueError("invalid default answer: '%s'" % default) + + while True: + print('%'*10, ' quistion ', '%'*10 , '\n') + sys.stdout.write(question + prompt) + choice = input().lower() + if default is not None and choice == "": + return valid[default] + elif choice in valid: + return valid[choice] + else: + sys.stdout.write("لطفا یکی از موارد روبرو را وارد کنید : 'yes' or 'no' " "(or 'y' or 'n').\n") + + def createIndexIfNotExist(self, index_name_o, mapping_o=""): + try: + if not self.es.indices.exists(index=index_name_o): + response = self.es.indices.create(index=index_name_o, body=mapping_o) + # print out the response: + print("create index response:", response) + except: + print("....... index exist ! ... not created") + + + def createIndex(self, index_name, app_key='', map_name=''): + + path_base = self.path_mappings + path_mapping1 = path_base + 'general/' + if app_key == '' : + app_key = 'tavasi' + path_mapping2 = path_base + app_key + '/' + + + if map_name == '': + map_name = index_name + + if self.es.indices.exists(index=index_name) : + print("============== exist index :", index_name ) + return True + + if map_name == 'mj_rg_section' or map_name == 'semantic_search' : + map_name = 'mj_qa_section' + elif map_name[-3]=='_ai': + map_name=[0-len(map_name)-3] + print(map_name) + + mapping_file_path = path_mapping1 + map_name + '.json' + print("mapping_file_path : " , mapping_file_path) + if not os.path.isfile(mapping_file_path): + if not os.path.isfile(mapping_file_path): + mapping_file_path = path_mapping2 + map_name + '.json' + + print("mapping_file_path : " , mapping_file_path) + + # Create Index With Mapping + if os.path.isfile(mapping_file_path): + mapping_file = open( mapping_file_path,'r', encoding='utf-8' ) + mapping_file_read = mapping_file.read() + mapping_data = json.loads(mapping_file_read) + mapping_file.close() + if self.es.indices.exists(index=index_name) : + print("============== exist index :", index_name ) + else : + self.es.indices.create(index = index_name , body = mapping_data) + return True + else: + print('*** error not find maping file elastic : *******', mapping_file_path) + return False + + + def updateBulkList(self, listData, index_name): + chunk_size=1000 + raise_on_error=False + raise_on_exception=False + stats_only=True + yield_ok = False + + actions=[] + for item in listData: + actions.append({ + "_op_type": "update", + "_index": index_name, + "_id" : item['_id'], + "doc": item['_source'] + } + ) + helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok ) + + def importBulkList(self, listData, index_name): + chunk_size=100000 + raise_on_error=False + raise_on_exception=False + stats_only=True + yield_ok = False + + for item in listData: + actions = [{ + "_op_type": "index", + "_index": index_name, + "_id" : item['_id'], + "_source": item['_source'] + } + ] + helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok ) + + + def importJsonDataToElastic(self, jsonData, index_name, fields=[]): + chunk_size=1000 + raise_on_error=False + raise_on_exception=False + stats_only=True + yield_ok = False + + actions=[] + + for item in jsonData: + id = item['_id'] if item['_id'] else item['id'] + source = item['_source'] + if fields : + source = {} + for col in fields : + if col in item['_source'] : + source[col] = item['_source'] + + + actions.append({ + "_op_type": "index", + "_index": index_name, + "_id" : id, + "_source": source + }) + helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok ) + + + def fileToElastic(self, file_path, index_name, limit_pack = -1, fields=[]): + if not os.path.exists(file_path): + print("file zip:" , file_path , " not exist") + return + print("index:" , index_name , '=>' , file_path ) + self.counter = 0 + with open(file_path) as file: + data = json.loads(file.read()) + self.importJsonDataToElastic(data, index_name, fields) + + self.es.indices.refresh(index=index_name) + print(self.es.cat.count(index=index_name, format="json")) + + def zipFileToElastic(self, file_path, index_name, limit_pack = -1, fields=[]): + if not os.path.exists(file_path): + print("file zip:" , file_path , " not exist for imort to elastic : ", index_name ) + return + + fileNo = 0 + with zipfile.ZipFile(file_path, 'r') as zObject: + fileNo +=1 + print("="*10, " zip fileNo: " , fileNo ," - ( ", index_name," ) | File Numbers:" ,len(zObject.namelist()) , "=" * 10) + + packNo = 0 + self.counter = 0 + for filename in zObject.namelist(): + packNo += 1 + if limit_pack != -1 : + if packNo > limit_pack : + print('limit_data ', index_name, ' ', limit_pack) + break + + print("index:" , index_name , '=>' , filename ) + with zObject.open(filename) as file: + data = json.loads(file.read()) + self.importJsonDataToElastic(data, index_name, fields) + + self.es.indices.refresh(index=index_name) + print(self.es.cat.count(index=index_name, format="json")) + print(" END Of Import to elastic ", index_name ,"\n") + + + def iterateJsonFile(self, file_path, isZip=True, limit_pack = -1): + if not os.path.exists(file_path): + print("file zip:" , file_path , " not exist iterateJsonFile " ) + return + + if isZip : + fileNo = 0 + with zipfile.ZipFile(file_path, 'r') as zObject: + fileNo +=1 + print("="*10, " zip fileNo: " , fileNo ," iterateJsonFile - | File Numbers:" ,len(zObject.namelist()) , "=" * 10) + + packNo = 0 + self.counter = 0 + for filename in zObject.namelist(): + packNo += 1 + if limit_pack != -1 : + if packNo > limit_pack : + print('limit_data iterateJsonFile ', limit_pack) + break + + print("index iterateJsonFile :", '=>' , filename ) + with zObject.open(filename) as file: + data = json.loads(file.read()) + # Yield each entry + # yield data + yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in data) + else : + with open(filename, 'r', encoding='utf-8') as file: + data = json.loads(file.read()) + # Yield each entry + # yield from (hit for hit in data) + #return data + yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in data) + + + def es_iterate_all_documents(self, index, body="", pagesize=250, scroll_timeout="25m", **kwargs): + """ + Helper to iterate ALL values from a single index + Yields all the documents. + """ + is_first = True + while True: + # Scroll next + if is_first: # Initialize scroll + # result = self.es.search(index=index, scroll="2m", **kwargs, body={ + # "size": pagesize + # }) + if body : + result = self.es.search( + index=index, + scroll=scroll_timeout, + **kwargs, + size=pagesize, + body=body + ) + else : + result = self.es.search( + index=index, + scroll=scroll_timeout, + **kwargs, + size=pagesize + ) + + self.total = result["hits"]["total"]["value"] + if self.total > 0: + print("total = %d" % self.total) + is_first = False + else: + # result = es.scroll(body={ + # "scroll_id": scroll_id, + # "scroll": scroll_timeout + # }) + result = self.es.scroll(scroll_id=scroll_id, scroll=scroll_timeout) + + scroll_id = result["_scroll_id"] + hits = result["hits"]["hits"] + self.counter += len(hits) + if self.total > 0 : + print("progress -> %.2f %%" % ((self.counter / self.total) * 100)) + # Stop after no more docs + if not hits: + break + # Yield each entry + yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits) + + + def moveCustomFileds(self, index_name_i, index_name_o, fields=[], renameFileds={}): + try: + body = {} + list = [] + try: + list = self.es_iterate_all_documents(index_name_i) + except Exception as e: + print(e) + + count = 0 + for mentry in list: + count += 1 + + entry = mentry["source"] + id = mentry["id"] + # print(id) + eid = id + + if (count % 100) == 0 : + print("%s -> %.2f " % (id , (count / self.total) if self.total > 0 else 0)) + + data_filled = False + data = {} + for col in fields: + + if '.' in col : + cols = col.split('.') + subsource = entry + for sub in cols : + dCol = subsource.get(sub, None) + if dCol : + subsource = dCol + else : + break + else : + dCol = entry.get(col, None) + + if dCol is None: + continue + + if col in renameFileds : + data[renameFileds[col]] = dCol + else: + data[col] = dCol + + data_filled = True + + if not data_filled : + continue + + try: + resp = self.update_index_doc(True, index_name_o, eid, data) + except Exception as e: + print(e) + # save_error(id, e) + + except Exception as e: + # print("1111") + print(e) + + # save_error(id, e) + + def mappingIndex(self, index_name_i): + # فقط از طریق کیبانا میشه تغییر مپ داد + + # با پایتون نمیشه + # باید ایندکس جدیدی با مپ مطلوب ایجاد کرد و رایندکس کرد + pass + + def updateByQueryIndex(self, index_name_i, body): + ## sample + # body = { + # "script": { + # "inline": "ctx._source.Device='Test'", + # "lang": "painless" + # }, + # "query": { + # "match": { + # "Device": "Boiler" + # } + # } + # } + try: + self.es.update_by_query(body=body, index=index_name_i) + + except Exception as e: + print(e) + # save_error(id, e) + + + def deleteByQueryIndex(self, index_name_i, body): + ## sample + # body = { + # "query": { + # "match": { + # "Device": "Boiler" + # } + # } + # } + try: + self.es.delete_by_query(index=index_name_i, body=body ) + + except Exception as e: + print(e) + # save_error(id, e) + + def delete_by_ids(self, index_name_i, ids): + try: + # ids = ['test1', 'test2', 'test3'] + + query = {"query": {"terms": {"_id": ids}}} + res = self.es.delete_by_query(index=index_name_i, body=query) + print(res) + + except Exception as e: + print(e) + # save_error(id, e) + diff --git a/util/embedder_sbert_qavanin_285k.py b/util/embedder_sbert_qavanin_285k.py new file mode 100755 index 0000000..236b1a7 --- /dev/null +++ b/util/embedder_sbert_qavanin_285k.py @@ -0,0 +1,681 @@ +# !pip install hazm +# !pip install transformers==4.26.0 +# !pip install --upgrade numpy +# !pip install --upgrade sentence-transformers +""" +Persian Sentence Processing and Vector Analysis +============================================== + +This script processes Persian sentences from a JSON file and performs: +1. Word extraction and preprocessing +2. Vector representation using multilingual transformer +3. Similarity analysis for key words +4. Dimensionality reduction to 3D +5. 3D visualization with Persian labels + +Author: NLP Expert Assistant +""" +import json +import re +import numpy as np +import pandas as pd +from typing import List, Dict, Tuple, Set +from collections import Counter +import logging +from pathlib import Path + +# NLP and ML libraries +from sentence_transformers import SentenceTransformer +from transformers import AutoTokenizer +from sklearn.decomposition import PCA +from sklearn.manifold import TSNE +from sklearn.metrics.pairwise import cosine_similarity +#from normalizer import cleaning +try: + from util.elastic_helper import ElasticHelper +except Exception as error: + eee = error + pass +# Visualization libraries +# import matplotlib.pyplot as plt +# import plotly.graph_objects as go +# import plotly.express as px +# from plotly.subplots import make_subplots + +# Persian text processing +# import hazm +# from hazm import Normalizer, word_tokenize, POSTagger + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + + + +class PersianVectorAnalyzer: + """ + A comprehensive class for Persian text processing and vector analysis. + """ + + def __init__(self, model_name: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"): + """ + Initialize the analyzer with the specified model. + + Args: + model_name: The sentence transformer model to use + """ + self.model_name = model_name + self.model = None + #self.normalizer = Normalizer() + self.stop_words = self._load_persian_stop_words() + self.key_words = [ + "خدا", "بنده", "جهاد", "ولی", "زکات", + "نماز", "صبر", "عبادت", "ولایت", "خلافت","پیامبر" + ] + + logger.info(f"Initializing Persian Vector Analyzer with model: {model_name}") + + def _load_persian_stop_words(self) -> Set[str]: + """ + Load Persian stop words. + + Returns: + Set of Persian stop words + """ + # Common Persian stop words + stop_words = { + 'و', 'در', 'به', 'از', 'که', 'این', 'آن', 'با', 'برای', 'تا', + 'را', 'هم', 'یا', 'اما', 'اگر', 'چون', 'چرا', 'چگونه', 'کجا', + 'چه', 'کی', 'چند', 'چقدر', 'همه', 'هیچ', 'بعضی', 'هر', 'همه', + 'خود', 'خویش', 'ما', 'شما', 'آنها', 'ایشان', 'اینها', 'آنها', + 'من', 'تو', 'او', 'ما', 'شما', 'آنها', 'ایشان', 'اینها', + 'است', 'هست', 'بود', 'شد', 'می', 'باید', 'خواهد', 'دارد', + 'کرد', 'شد', 'بود', 'هست', 'است', 'می‌شود', 'می‌کند', + 'یک', 'دو', 'سه', 'چهار', 'پنج', 'شش', 'هفت', 'هشت', 'نه', 'ده', + 'اول', 'دوم', 'سوم', 'چهارم', 'پنجم', 'ششم', 'هفتم', 'هشتم', 'نهم', 'دهم', + 'سال', 'ماه', 'روز', 'هفته', 'ساعت', 'دقیقه', 'ثانیه','پس' + 'بله', 'نه', 'آری', 'خیر', 'بلی', 'نخیر', + 'حالا', 'الان', 'امروز', 'دیروز', 'فردا', 'هفته', 'ماه', 'سال', + 'بالا', 'پایین', 'چپ', 'راست', 'جلو', 'عقب', 'داخل', 'خارج', + 'بزرگ', 'کوچک', 'بلند', 'کوتاه', 'پهن', 'باریک', 'ضخیم', 'نازک', + + + + } + return stop_words + + def load_model(self): + """ + Load the sentence transformer model. + """ + try: + logger.info("Loading sentence transformer model...") + self.model = SentenceTransformer(self.model_name) + logger.info("Model loaded successfully!") + except Exception as e: + logger.error(f"Error loading model: {e}") + raise + def split_sentence(self, sentence:str): + sentences = [] + sentence_len = len(self.tokenize_sentence(sentence)) + if sentence_len < 512: + sentences.append(sentence) + else: + temp_sentences = str(sentence).split('.') + for sent in temp_sentences: + sent_len = len(self.tokenize_sentence(sent)) + if sent_len > 512: + temp_sentences_2 = str(sent).split('،') + for snt in temp_sentences_2: + sentences.append(snt) + else: + sentences.append(sent) + + return sentences + + def load_json_data(self, file_path: str) -> List[str]: + """ + Load Persian sentences from JSON file. + + Args: + file_path: Path to the JSON file + + Returns: + List of Persian sentences + """ + try: + logger.info(f"Loading data from {file_path}") + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + # convert dict{dict} to list[dict] + if type(data) == dict: + temp_data = [] + for item in data.items(): + temp_data.append(item[1]) + data = temp_data + + sentences = [] + if isinstance(data, list): + for index, item in enumerate(data): + print(f'split sentence {index}') + if isinstance(item, dict): + if item['content'] == '': + continue + sentences.append([item['id'],item['content'].strip()]) + # for key in ['content']: + # if key in item and item[key]: + # # splited_sentences = self.split_sentence(item[key]) + # # splited_sentences = item[key] + # sentences.append(item[key]) + # # for sent in splited_sentences: + # # sentences.append(sent) + # else: + # print('fault '+item['sentence-number']) + elif isinstance(item, str): + # splited_sentences = self.split_sentence(item[key]) + sentences.append(item) + # for sent in splited_sentences: + # sentences.append(sent) + elif isinstance(data, dict): + # If it's a single object, extract all string values + for value in data.values(): + if isinstance(value, str): + sentences.append(value) + # splited_sentences = str(value).split('.') + # for sent in splited_sentences: + # sentences.append(sent) + + sentences = [senten for senten in sentences if senten] + logger.info(f"Loaded {len(sentences)} sentences") + return sentences + + except Exception as e: + logger.error(f"Error loading JSON data: {e}") + raise + + def preprocess_text(self, text: str) -> str: + """ + Preprocess Persian text. + + Args: + text: Raw Persian text + + Returns: + Preprocessed text + """ + + # Normalize text + #text = self.normalizer.normalize(text) + + # Remove extra whitespace + text = re.sub(r'\s+', ' ', text) + + # Remove special characters but keep Persian characters + text = re.sub(r'[^\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\s]', '', text) + + return text.strip() + + def tokenize_sentence(self, sentence:str): + + try: + tokenizer = AutoTokenizer.from_pretrained(self.model_name) + # print(self.model_name) + tokens = tokenizer.tokenize(sentence) + return tokens + except: + error = "An exception occurred in tokenizer : " + self.model_name + #file.write( error + '\n' ) + return [] + + def extract_words(self, sentences: List[str]) -> List[str]: + """ + Extract all words from sentences. + + Args: + sentences: List of Persian sentences + + Returns: + List of all words + """ + all_words = [] + + for sentence in sentences: + # Preprocess sentence + processed_sentence = self.preprocess_text(sentence) + + # Tokenize + words = word_tokenize(processed_sentence) + # words = processed_sentence.split() + # Filter out empty strings and very short words + words = [word for word in words if len(word) > 1] + + all_words.extend(words) + + logger.info(f"Extracted {len(all_words)} words from {len(sentences)} sentences") + return all_words + + def remove_stop_words(self, words: List[str]) -> List[str]: + """ + Remove stop words from the word list. + + Args: + words: List of words + + Returns: + List of words without stop words + """ + filtered_words = [word for word in words if word not in self.stop_words] + logger.info(f"Removed {len(words) - len(filtered_words)} stop words") + return filtered_words + + def get_unique_words(self, words: List[str]) -> List[str]: + """ + Get unique words from the list. + + Args: + words: List of words + + Returns: + List of unique words + """ + unique_words = list(set(words)) + logger.info(f"Found {len(unique_words)} unique words from {len(words)} total words") + return unique_words + + def compute_word_vectors(self, sentences: List[str]) -> Dict[str, List[float]]: + """ + Compute vector representations for words. + + Args: + sentences: List of unique sentences + + Returns: + Dictionary mapping sentences to their vector representations + """ + if self.model is None: + self.load_model() + + logger.info(f"Computing vectors for {len(sentences)} sections ...") + # print(sentences[0]) + # create list of just sentences + just_sentences = [sent['content'] for sent in sentences] + # Compute embeddings + embeddings = self.model.encode(just_sentences, show_progress_bar=True) + + # Create dictionary + sentences_vectors = {} + for i, sent in enumerate(sentences): + sentences_vectors[f'sentence-{sentences[i]["id"]}'] = { + 'id': sentences[i]['id'], + 'fullpath': sentences[i]['fullpath'], + 'qanon-title': sentences[i]['qanon-title'], + 'section-prefix': sentences[i]['section-prefix'], + 'content': sentences[i]['content'], + 'embeddings': embeddings[i].tolist() + } + print(f'section {i} embedded!') + + logger.info("section vectors computed successfully!") + return sentences_vectors + + def find_closest_words(self, word_vectors: Dict[str, List[float]], + key_words: List[str], top_k: int = 20) -> Dict[str, List[str]]: + """ + Find the closest words to each key word. + + Args: + word_vectors: Dictionary of word vectors + key_words: List of key words to find neighbors for + top_k: Number of closest words to find + + Returns: + Dictionary mapping key words to their closest neighbors + """ + logger.info(f"Finding {top_k} closest words for {len(key_words)} key words...") + + # Convert to numpy arrays for faster computation + words = list(word_vectors.keys()) + vectors = np.array(list(word_vectors.values())) + + closest_words = {} + + for key_word in key_words: + if key_word in word_vectors: + # Get the key word vector + key_vector = np.array(word_vectors[key_word]).reshape(1, -1) + + # Compute cosine similarities + similarities = cosine_similarity(key_vector, vectors)[0] + + # Get indices of top k similar words (excluding the key word itself) + word_indices = np.argsort(similarities)[::-1] + + # Filter out the key word itself and get top k + closest_indices = [] + for idx in word_indices: + if words[idx] != key_word and len(closest_indices) < top_k: + closest_indices.append(idx) + + # Get the closest words + closest_words[key_word] = [words[idx] for idx in closest_indices] + logger.info(f"Found {len(closest_words[key_word])} closest words for '{key_word}'") + else: + logger.warning(f"Key word '{key_word}' not found in word vectors") + closest_words[key_word] = [] + + return closest_words + + def reduce_to_3d(self, word_vectors: Dict[str, List[float]], + method: str = 'tsne') -> Dict[str, List[float]]: + """ + Reduce word vectors to 3D coordinates. + + Args: + word_vectors: Dictionary of word vectors + method: Dimensionality reduction method ('pca' or 'tsne') + + Returns: + Dictionary mapping words to their 3D coordinates + """ + logger.info(f"Reducing dimensions to 3D using {method.upper()}...") + + words = list(word_vectors.keys()) + vectors = np.array(list(word_vectors.values())) + + if method.lower() == 'pca': + reducer = PCA(n_components=3, random_state=42) + elif method.lower() == 'tsne': + reducer = TSNE(n_components=3, random_state=42, perplexity=min(30, len(vectors)-1)) + else: + raise ValueError("Method must be 'pca' or 'tsne'") + + # Reduce dimensions + reduced_vectors = reducer.fit_transform(vectors) + + # Create dictionary + word_vectors_3d = {} + for i, word in enumerate(words): + word_vectors_3d[word] = reduced_vectors[i].tolist() + + logger.info("Dimensionality reduction completed!") + return word_vectors_3d + + def save_json(self, data: dict, file_path: str): + """ + Save data to JSON file. + + Args: + data: Data to save + file_path: Output file path + """ + try: + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + logger.info(f"Data saved to {file_path}") + except Exception as e: + logger.error(f"Error saving to {file_path}: {e}") + raise + + # def create_3d_visualization(self, word_vectors_3d: Dict[str, List[float]], + # selected_words: Dict[str, List[str]], + # output_path: str = "persian_words_3d.html"): + # """ + # Create 3D visualization of words. + + # Args: + # word_vectors_3d: Dictionary of 3D word coordinates + # selected_words: Dictionary of selected words for each key word + # output_path: Output file path for the visualization + # """ + # logger.info("Creating 3D visualization...") + + # # Prepare data for plotting + # words = list(word_vectors_3d.keys()) + # coords = np.array(list(word_vectors_3d.values())) + + # # Create color mapping for key words and their neighbors + # colors = [] + # sizes = [] + # hover_texts = [] + + # for word in words: + # # Check if word is a key word + # is_key_word = word in self.key_words + + # # Check if word is in selected words + # in_selected = False + # key_word_group = None + # for key_word, selected_list in selected_words.items(): + # if word in selected_list: + # in_selected = True + # key_word_group = key_word + # break + + # if is_key_word: + # colors.append('red') + # sizes.append(15) + # hover_texts.append(f"کلیدواژه: {word}") + # elif in_selected: + # colors.append('blue') + # sizes.append(10) + # hover_texts.append(f"کلمه مرتبط با '{key_word_group}': {word}") + # else: + # colors.append('lightgray') + # sizes.append(5) + # hover_texts.append(f"کلمه: {word}") + + # # Create 3D scatter plot + # fig = go.Figure() + + # # Add scatter plot + # fig.add_trace(go.Scatter3d( + # x=coords[:, 0], + # y=coords[:, 1], + # z=coords[:, 2], + # mode='markers+text', + # marker=dict( + # size=sizes, + # color=colors, + # opacity=0.8 + # ), + # text=words, + # textposition="middle center", + # hovertext=hover_texts, + # hoverinfo='text' + # )) + + # # Update layout + # fig.update_layout( + # title={ + # 'text': 'نمایش سه‌بعدی کلمات فارسی', + # 'x': 0.5, + # 'xanchor': 'center', + # 'font': {'size': 20} + # }, + # scene=dict( + # xaxis_title='محور X', + # yaxis_title='محور Y', + # zaxis_title='محور Z', + # camera=dict( + # eye=dict(x=1.5, y=1.5, z=1.5) + # ) + # ), + # width=1000, + # height=800, + # showlegend=False + # ) + + # # Save the plot + # fig.write_html(output_path) + # logger.info(f"3D visualization saved to {output_path}") + + # return fig + + def process_pipeline(self, input_file: str, output_dir: str = "output"): + """ + Run the complete processing pipeline. + + Args: + input_file(str): Path to input JSON file + output_dir(str): Output directory for results + """ + # Create output directory + Path(output_dir).mkdir(exist_ok=True) + + logger.info("Starting Persian Vector Analysis Pipeline...") + + # Step 1: Load data + # sentences = self.load_json_data(input_file) + sentences = ALL_SECTIONS + + # for s in sentences: + # s_len = len(self.tokenize_sentence(s)) + # if s_len > 512: + # print(f'long: {s}') + # Step 2: Extract words + # all_words = self.extract_words(sentences) + + # Step 3: Remove stop words + # filtered_words = self.remove_stop_words(all_words) + # filtered_words = all_words + + # Step 4: Get unique words + # unique_words = self.get_unique_words(filtered_words) + + # Step 5: Compute word vectors + sentences_vectors = self.compute_word_vectors(sentences) + + # Step 6: Save word vectors + self.save_json(sentences_vectors, f"{output_dir}/sections-vec-285k.json") + + # Step 7: Find closest words to key words + # selected_words = self.find_closest_words(word_vectors, self.key_words) + + # Step 8: Save selected words + # self.save_json(selected_words, f"{output_dir}/selected_words.json") + + # Step 9: Reduce to 3D + # word_vectors_3d = self.reduce_to_3d(word_vectors, method='tsne') + + # Step 10: Save 3D vectors + # self.save_json(word_vectors_3d, f"{output_dir}/words_vector_3d.json") + + # Step 11: Create visualization + # self.create_3d_visualization(word_vectors_3d, selected_words, + # f"{output_dir}/persian_words_3d.html") + + logger.info("Pipeline completed successfully!") + + # Print summary + print("\n" + "="*50) + print("PIPELINE SUMMARY") + print("="*50) + print(f"Input sentences: {len(sentences)}") + # print(f"Total words extracted: {len(all_words)}") + # print(f"Unique words after preprocessing: {len(unique_words)}") + # print(f"Word vectors computed: {len(word_vectors)}") + # print(f"Key words processed: {len(self.key_words)}") + print(f"Output files saved to: {output_dir}/") + print("="*50) + +def full_path_text_maker(full_path): + """ + این متد مسیر یک سکشن را می گیرد و متنی را بر اساس ترتیب بخش های آن از جزء به کل بازسازی می کند و بر می گرداند + + Args: + full_path(list): لیستی از عناصر مشخص کننده مسیر درختی این سکشن + Returns: + full_path_text(str): متن بازسازی شده از مسیر یک سکشن + """ + full_path_text = "" + for i, path_item in enumerate(reversed(full_path)): + if i == len(full_path) - 1: + full_path_text += ''.join(f'{path_item}') + break + full_path_text += ''.join(f'{path_item} از ') + full_path_text = full_path_text.strip() + return full_path_text + +def main(): + """ + Main function to run the Persian Vector Analysis. + """ + # Initialize analyzer + analyzer = PersianVectorAnalyzer() + + # Define input and output paths + # input_file = "./output-speechs/nahj_speechs_sentences.json" + # output_dir = "output-speechs" + # input_file = "./majles/data/sections.json" + input_file = "" + output_dir = "./data/majles-output" + + # Run the complete pipeline + analyzer.process_pipeline(input_file, output_dir) + + +if __name__ == "__main__": + eh_obj = ElasticHelper() + path = "/home/gpu/data_11/14040611/mj_qa_section.zip" + sections_elastic = eh_obj.iterateJsonFile(path, True) + all_count = 0 + dont_cares = [] + ALL_SECTIONS = [] + for index, item in enumerate(sections_elastic): + all_count +=1 + source = item['source'] + section_path = source['other_info']['full_path'] + id = item['id'] + + filtered_keys = ['فصل','موخره','امضاء','عنوان'] + section_path = source['other_info']['full_path'] + flag = False + if '>' in section_path: + path_parts = section_path.split('>') + for key in filtered_keys: + if key in path_parts[-1]: + dont_cares.append(id) + flag = True + break + if flag: + continue + else: + for key in filtered_keys: + if key in section_path: + dont_cares.append(id) + flag = True + break + if flag: + continue + + qanon_title = source['qanon_title'] + full_path_text = full_path_text_maker(section_path.split('>')) + section_prefix = f"محتوای {full_path_text} {cleaning(qanon_title)} عبارت است از: " + + try: + content = cleaning(item['source']['content']) + # کنار گذاشتن سکشن های خیلی کوچک که عملا محتوا ندارند + if len(content.split()) <= 10: + continue + except Exception as error: + print(error) + continue + data = { + 'id': id, + 'fullpath': section_path, + 'qanon-title': qanon_title, + 'section-prefix': section_prefix, + 'content': content + } + ALL_SECTIONS.append(data) + print(f'all_count: {all_count}') + print(f'dont_cares: {len(dont_cares)}') + print(f'ALL_SECTIONS without dont-cares: {len(ALL_SECTIONS)}') + + main() + + """ + :: *** نکته مهم *** :: + NOTE !!! after this process run convert_qavanin_json_to_faiss.py due to create faiss index which is used in RAG process + """ \ No newline at end of file diff --git a/util/normalizer.py b/util/normalizer.py new file mode 100755 index 0000000..9d80129 --- /dev/null +++ b/util/normalizer.py @@ -0,0 +1,76 @@ +#import hazm +from cleantext import clean +import re + +def cleanhtml(raw_html): + cleanr = re.compile('<.*?>') + cleantext = re.sub(cleanr, '', raw_html) + return cleantext + +#normalizer = hazm.Normalizer() +wierd_pattern = re.compile("[" + u"\U0001F600-\U0001F64F" # emoticons + u"\U0001F300-\U0001F5FF" # symbols & pictographs + u"\U0001F680-\U0001F6FF" # transport & map symbols + u"\U0001F1E0-\U0001F1FF" # flags (iOS) + u"\U00002702-\U000027B0" + u"\U000024C2-\U0001F251" + u"\U0001f926-\U0001f937" + u'\U00010000-\U0010ffff' + u"\u200d" + u"\u2640-\u2642" + u"\u2600-\u2B55" + u"\u23cf" + u"\u23e9" + u"\u231a" + u"\u3030" + u"\ufe0f" + u"\u2069" + u"\u2066" + # u"\u200c" + u"\u2068" + u"\u2067" + "]+", flags=re.UNICODE) + +def cleaning(text): + text = text.strip() + + # regular cleaning + # text = clean(text, + # fix_unicode=True, + # to_ascii=False, + # lower=True, + # no_line_breaks=True, + # no_urls=True, + # no_emails=True, + # no_phone_numbers=True, + # no_numbers=False, + # no_digits=False, + # no_currency_symbols=True, + # no_punct=False, + # replace_with_url="", + # replace_with_email="", + # replace_with_phone_number="", + # replace_with_number="", + # replace_with_digit="0", + # replace_with_currency_symbol="", + # ) + text = clean(text, + extra_spaces = True, + lowercase = True + ) + + # cleaning htmls + text = cleanhtml(text) + + # normalizing + #text = normalizer.normalize(text) + + # removing wierd patterns + text = wierd_pattern.sub(r'', text) + + # removing extra spaces, hashtags + text = re.sub("#", "", text) + text = re.sub("\s+", " ", text) + + return text