From 4a56846008c15e31a82942857605d2eea988b3bc Mon Sep 17 00:00:00 2001 From: hsafaei Date: Wed, 3 Dec 2025 14:37:00 +0000 Subject: [PATCH] first-step --- .gitignore | 9 + bale_buttons.py | 42 ++++ bale_massages.py | 35 +++ base_model.py | 116 +++++++++ main.py | 632 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + samle_env | 13 + utils.py | 131 ++++++++++ 8 files changed, 979 insertions(+) create mode 100644 .gitignore create mode 100644 bale_buttons.py create mode 100644 bale_massages.py create mode 100644 base_model.py create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 samle_env create mode 100644 utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..67becb4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +__pycache__/ +.env +config.env +app/lib +.venv +.idea +_temp.py +_data_json/ +__pycache__/* diff --git a/bale_buttons.py b/bale_buttons.py new file mode 100644 index 0000000..83ec74e --- /dev/null +++ b/bale_buttons.py @@ -0,0 +1,42 @@ + +START_BUTTONS_INLINE = [ + [{"text": "گفتگو", "callback_data": "law_chat"}], + [{"text": "جستجو", "callback_data": "law_search"}], + [ + # {"text": "راهنما", "callback_data": "help"}, + {"text": "درباره ما", "callback_data": "about"}, + {"text": "تماس با ما", "callback_data": "contact"}, + ], +] + + +BUTTON_TEXT_TO_CALLBACK = { + "جستجو": "law_search", + "گفتگو": "law_chat", + # "راهنما": "help", + "درباره ما": "about", + "تماس با ما": "contact", +} +BUTTON_TEXT_TO_CALLBACK_LIST = [ + [ + "جستجو", + "گفتگو", + # "راهنما", + "درباره ما", + "تماس با ما", + ], +] + +MORE_BUTTON = [[{"text": "🔽 نمایش نتایج بیشتر", "callback_data": f"more"}]] + + +CHAT_EFFORT_BUTTONS = [ + [ + # {"text": "⚡ سریع", "callback_data": "chat_effort_low"}, + {"text": "🧠 بررسی عمیق تر", "callback_data": "chat_effort_medium"}, + # {"text": "⚖ نرمال", "callback_data": "chat_effort_medium"}, + ] + # [ + # {"text": "➕ ادامه پاسخ", "callback_data": "chat_more"} + # ] +] diff --git a/bale_massages.py b/bale_massages.py new file mode 100644 index 0000000..d1152c6 --- /dev/null +++ b/bale_massages.py @@ -0,0 +1,35 @@ +STARTMESSAGE = """👋 سلام دوست عزیز! 🤗 +به دستیار هوشمند قانون یار خوش آمدید! +فقط کافیه به من بگید چه کمکی از دستم برمیاد!""" + + + +ABOUT = """ +من ربات گفتگوگر حقوقی هستم که روی قوانین رسمی جمهوری اسلامی ایران از سامانه قانون‌یار مجلس شورای اسلامی توسعه یافتم. +لذا به هر سوال و گفتگویی طبق همان منابع پاسخ می‌دهم +نشانی سامانه منبع در زیر آمده است +[qanonyar.parliran.ir](https://qanonyar.parliran.ir) + +کارفرما : مرکز فناوری مجلس شورای اسلامی ایران + +""" +CONTACT_US = """لطفا برای ارتباط با ما از طریق مرکز فناوری مجلس شورای اسلامی ایران اقدام فرمایید""" + + + + + +JOST_OJO_TEXT = """📚✨ هر پرسش حقوقی که از قوانین کشور دارید یا هر متن حقوقی که براتون مهم هست، همینجا بفرستید تا با استناد به قوانین با شما گفتگو کنم.. + +📝 می‌تونید: +• موضوع کلی رو بگید +• سؤال بپرسید +• یا متن حقوقی که داری برام بنویسید + +🎯 هرچقدر دقیق‌تر و واضح‌تر توضیح بدید، بهتر می‌توانم راهنمایی‌ کنم. +""" +HOURGLASS = "⏳" # استیکر ساعت شنی +WAIT_TEXT = f"{HOURGLASS} لطفاً کمی صبر کنید...\nدر حال پردازش درخواست قبلی شما هستم." + + +GOFT_OGO_TEXT = "💬 آماده گفتگو هستم" \ No newline at end of file diff --git a/base_model.py b/base_model.py new file mode 100644 index 0000000..f6a0b69 --- /dev/null +++ b/base_model.py @@ -0,0 +1,116 @@ +from pydantic import BaseModel, Field +from typing import List, Optional + + +class BaleStartMessageForm(BaseModel): + id: int + is_bot: bool = False + first_name: str + last_name: Optional[str] = None + username: Optional[str] = None + + +class BaleStartMessageChat(BaseModel): + id: int + type: str + username: Optional[str] = None + first_name: Optional[str] = None + + +class BaleStartMessage(BaseModel): + message_id: int + from_user: BaleStartMessageForm = Field(..., alias="from") + date: int + chat: BaleStartMessageChat + text: str + entities: List[dict] = [] + + class Config: + populate_by_name = True + + +class BaleCallbackFrom(BaseModel): + id: int + is_bot: bool + first_name: str + username: Optional[str] = None + + +class BaleCallbackMessage(BaseModel): + message_id: int + chat: BaleStartMessageChat + text: Optional[str] + + +class BaleCallbackQuery(BaseModel): + id: str + from_user: BaleCallbackFrom = Field(..., alias="from") + message: BaleCallbackMessage + data: str + + class Config: + populate_by_name = True + + +class BaleUpdate(BaseModel): + update_id: int + message: Optional[BaleStartMessage] = None + callback_query: Optional[BaleCallbackQuery] = None + + class Config: + exclude_none = True + + +class QaChatSingle(BaseModel): + id: str + chat_id: int + user_query: str + model_key: str + model_effort: str + + retrived_passage: str + retrived_ref_ids: str + retrived_duration: Optional[int] = 0 + prompt_type: str = "question" + llm_duration: int + full_duration: Optional[int] = 0 + time_create: Optional[int] = 0 + used_ref_ids: Optional[str] = "" + status_text: Optional[str] = "" + status: Optional[int] = 0 + prompt_answer: str + other_info: dict | None + + +class QaChatBlock(BaseModel): + id: str + title: str + user_id: str + is_premium: bool + chat: QaChatSingle + total_token: int + is_end: bool + + +class QaChat(BaseModel): + id: str + chat_id: int + title: Optional[str] = "" + user_id: str + user_query: str + query_type: str = "question" # llm -> greeting, other, legal_question | rag -> question + full_duration: Optional[float] = 0 + other_info: Optional[dict] = "" + + ss_ref_ids: Optional[List[str]] = "" + ss_model_key: Optional[str] = "" + ss_duration: Optional[float] = 0 + ss_answer: Optional[str] = "" + + llm_ref_ids: Optional[List[str]] = [] + llm_model_key: Optional[str] = "" + llm_duration: Optional[float] = 0 + llm_answer: Optional[str] = "" + + status_text: Optional[str] = "" + status: Optional[int] = 0 diff --git a/main.py b/main.py new file mode 100644 index 0000000..a8c79ab --- /dev/null +++ b/main.py @@ -0,0 +1,632 @@ +################# +from fastapi import FastAPI, Request, HTTPException +import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re +from dotenv import load_dotenv +from pathlib import Path +from time import sleep +from enum import Enum + +################# +from base_model import BaleUpdate, QaChat +from bale_buttons import ( + START_BUTTONS_INLINE, + BUTTON_TEXT_TO_CALLBACK, + BUTTON_TEXT_TO_CALLBACK_LIST, + MORE_BUTTON, CHAT_EFFORT_BUTTONS +) +from bale_massages import ( + STARTMESSAGE, + ABOUT, + CONTACT_US, + JOST_OJO_TEXT, + HOURGLASS, + WAIT_TEXT,GOFT_OGO_TEXT +) +from utils import load_orjson, save_orjson, ElasticHelper, split_text_chunks + + +############## Create app +app = FastAPI() +############## + +############## Global-Params +load_dotenv() + +TOKEN = os.getenv("BALE_TOKEN") +ES_URL = os.getenv("ES_URL") +ES_PASSWORD = os.getenv("ES_PASSWORD") +ES_USER_NAME = os.getenv("ES_USER_NAME") +ES_INDEX_NAME = os.getenv("ES_INDEX_NAME") + +BASE_URL = f"https://tapi.bale.ai/bot{TOKEN}" +DATA_DIR = os.path.join(".", "_data_json") +if not os.path.exists(DATA_DIR): + os.makedirs(DATA_DIR) + +GLOBAL_DOMAIN = "https://bl.tavasi.ir" # f"https://YOUR_DOMAIN.com +WEBHOOK_URL = f"{GLOBAL_DOMAIN}/webhook/{TOKEN}" +SET_WEBHOOK_URL = f"https://tapi.bale.ai/bot{TOKEN}/setWebhook" + +MAX_LIMIT_RAG = 100 +STEP_RAG = 10 +USER_STATE = {} +USER_LAST_QUERY = {} +USER_PAGINATION = {} +USER_CHAT_EFFORT = {} +USER_CHAT_LIMIT = {} +USER_LAST_CHAT_QUERY = {} + +TIME_OUT = 60 +MAX_LEN = 4000 # کمی کمتر از حد پایه امن‌تر است + +ES_HELPER = ElasticHelper( + es_url=ES_URL, + es_pass=ES_PASSWORD, + es_user=ES_USER_NAME, +) + +class UserState(str, Enum): + MAIN = "main" + LAW_SEARCH = "law_search" + LAW_CHAT = "law_chat" + BUSY = "busy" + + +class SessionManager: + + def __init__(self): + self.states = {} + self.pagination = {} + + def init(self, chat_id: int): + if chat_id not in self.states: + self.states[chat_id] = UserState.MAIN + + def get_state(self, chat_id: int): + return self.states.get(chat_id, UserState.MAIN) + + def set_state(self, chat_id: int, state: UserState): + self.states[chat_id] = state + + def set_query(self, chat_id: int, query: str): + self.pagination[chat_id] = {"query": query, "limit": 10, "step": 10} + + def increase_limit(self, chat_id: int): + if chat_id in self.pagination: + self.pagination[chat_id]["limit"] += self.pagination[chat_id]["step"] + return self.pagination[chat_id] + return None + + def get_pagination(self, chat_id: int): + return self.pagination.get(chat_id) + + def clear(self, chat_id: int): + self.pagination.pop(chat_id, None) + self.states[chat_id] = UserState.MAIN + + +class BaleBot: + + def __init__(self, session: SessionManager): + self.session = session + + async def handle_update(self, update: BaleUpdate): + + if update.message: + return await self.handle_message(update) + + if update.callback_query: + return await self.handle_callback(update) + + async def handle_message(self, update: BaleUpdate): + chat_id = update.message.chat.id + text = (update.message.text or "").strip() + + self.session.init(chat_id) + + # /start + if text == "/start": + self.session.clear(chat_id) + send_message(chat_id, STARTMESSAGE, buttons=START_BUTTONS_INLINE) + return {"ok": True} + + if text == "تماس با ما": + send_message(chat_id, CONTACT_US) + return {"ok": True} + + if text == "درباره ما": + send_message(chat_id, ABOUT) + return {"ok": True} + + # ✅ اگر BUSY بود + if self.session.get_state(chat_id) == UserState.BUSY: + send_message(chat_id, WAIT_TEXT) + return {"ok": True} + + # ✅ شبیه‌سازی کلیک روی دکمه‌ها با متن + if text == "جستجو": + self.session.set_state(chat_id, UserState.LAW_SEARCH) + send_message(chat_id, JOST_OJO_TEXT) + return {"ok": True} + + if text == "گفتگو": + self.session.set_state(chat_id, UserState.LAW_CHAT) + send_message(chat_id, GOFT_OGO_TEXT) + return {"ok": True} + + # وضعیت فعلی کاربر + state = self.session.get_state(chat_id) + + if state == UserState.LAW_SEARCH: + return await self.handle_search(chat_id, update, text) + + elif state == UserState.LAW_CHAT: + return await self.handle_chat(chat_id, text) + + # اگر هیچکدوم نبود → پیش‌فرض بزن بره گفتگو + else: + self.session.set_state(chat_id, UserState.LAW_CHAT) + return await self.handle_chat(chat_id, text) + + + async def handle_callback(self, update: BaleUpdate): + chat_id = update.callback_query.message.chat.id + data = update.callback_query.data + + if data == "law_search": + self.session.set_state(chat_id, UserState.LAW_SEARCH) + send_message(chat_id, JOST_OJO_TEXT) + return {"ok": True} + + if data.startswith("chat_effort_"): + level = data.replace("chat_effort_", "") + USER_CHAT_EFFORT[chat_id] = level + + send_message(chat_id, f"✅ بله حتما، لطفا کمی منتظر بمانید ...") + + # اجرای مجدد آخرین پرسش + last_query = USER_LAST_CHAT_QUERY.get(chat_id) + if last_query: + return await self.handle_chat(chat_id, last_query, again=True) + + return {"ok": True} + + + if data == "chat_more": + pag = self.session.increase_limit(chat_id) + + if not pag: + send_message(chat_id, "❌ سوالی برای ادامه نیست") + return {"ok": True} + + last_query = USER_LAST_CHAT_QUERY.get(chat_id) + if last_query: + return await self.handle_chat(chat_id, last_query) + + return {"ok": True} + + + + if data == "law_chat": + self.session.set_state(chat_id, UserState.LAW_CHAT) + send_message(chat_id, GOFT_OGO_TEXT) + return {"ok": True} + + if data == "more": + return await self.handle_more(chat_id) + + if data == "stop": + self.session.clear(chat_id) + send_message(chat_id, "✅ پایان نمایش") + return {"ok": True} + + async def handle_search(self, chat_id, update, text): + + self.session.set_state(chat_id, UserState.BUSY) + + self.session.set_query(chat_id, text) + + send_message(chat_id, "⏳ در حال جستجو...") + + try: + pag = self.session.get_pagination(chat_id) + + result = await result_semantic_search( + text=pag["query"], + limit=pag["limit"] + ) + + # print(f'result rag {result} {type(result["ss_answer"])}') + chunked_text_ = get_in_form( + title=pag["query"], + sections=result["ss_answer"], + ) + # print(f'chunked_text_ rag {chunked_text_}') + + send_message( + chat_id, + chunked_text=chunked_text_, + buttons=MORE_BUTTON + ) + + except Exception as e: + send_message(chat_id, "❌ خطا در جستجو") + print(e) + + finally: + self.session.set_state(chat_id, UserState.MAIN) + + return {"ok": True} + + async def handle_more(self, chat_id): + + pag = self.session.increase_limit(chat_id) + + if not pag: + send_message(chat_id, "❌ درخواستی یافت نشد") + return {"ok": True} + + self.session.set_state(chat_id, UserState.BUSY) + + try: + result = await result_semantic_search( + text=pag["query"], + limit=pag["limit"] + ) + + chunked_text_ = get_in_form( + title=pag["query"], + sections=result["ss_answer"], + ) + send_message( + chat_id, + chunked_text=chunked_text_, + buttons=MORE_BUTTON + ) + + except Exception as e: + send_message(chat_id, "❌ خطا در افزایش نتایج") + + finally: + self.session.set_state(chat_id, UserState.MAIN) + + return {"ok": True} + + async def handle_chat(self, chat_id: int, text: str, again=False): + + # رفتن به حالت BUSY + self.session.set_state(chat_id, UserState.BUSY) + + send_message(chat_id, "⏳ در حال پردازش...") + + try: + # اگر اولین پیام است، آن را به عنوان query ذخیره کن + pag = self.session.get_pagination(chat_id) + if not pag: + self.session.set_query(chat_id, text) + pag = self.session.get_pagination(chat_id) + else: + # اگر query قبلا بوده ولی کاربر چیز جدیدی تایپ کرده + if text.strip(): + self.session.set_query(chat_id, text) + pag = self.session.get_pagination(chat_id) + + limit = pag["limit"] + + # effort را از حافظه بیرونی بگیر (یا پیش فرض) + effort = USER_CHAT_EFFORT.get(chat_id, "low") + + + result = await result_chat( + text=pag["query"], + limit=limit, + effort=effort, + again=again + ) + # print(f'result {result}') + text_ = result['llm_answer'] + query_type = result['query_type'] + if isinstance(text_, str): + text_ = format_answer_bale(answer_text=text_) + + # print(f'text_ {text_}') + # ذخیره آخرین سوال برای دکمه‌ها + USER_LAST_CHAT_QUERY[chat_id] = pag["query"] + + print ( '-='*10, effort, query_type) + if query_type == 'legal_question' and effort != 'medium' : + send_message( + chat_id=chat_id, + chunked_text=text_, + buttons=CHAT_EFFORT_BUTTONS + ) + else : + send_message( + chat_id=chat_id, + chunked_text=text_ + ) + + + + except Exception as e: + print("ERROR in handle_chat:", e) + send_message(chat_id, "❌ خطا در پردازش") + + finally: + # بازگشت به حالت چت + self.session.set_state(chat_id, UserState.LAW_CHAT) + + return {"ok": True} + + + +WEB_LINK = "https://majles.tavasi.ir/entity/detail/view/qsection/" + +def get_in_form(title: str, sections: list, max_len: int = 4000): + chunks = [] + current = f"برای پرسش: {title}\n\n" + ref_text = "«منبع»" + + for i, data in enumerate(sections, start=1): + sec_text = data.get("content", "") + idx = data.get("id") + + # ساخت ref کامل + ref = f"[{ref_text}]({WEB_LINK}{idx})" + # متن کامل آیتم + block = f"{i}: {sec_text}\n{ref}\n\n" + + # اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید + if len(current) + len(block) > max_len: + chunks.append(current.rstrip()) + current = "" + + current += block + + # آخرین چانک را هم اضافه کن + if current.strip(): + chunks.append(current.rstrip()) + + return chunks + + + +def format_answer_bale(answer_text: str, max_len: int = 4000): + """ + answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد + sources: مثل ['qs2117427'] + """ + ref_text = "«منبع»" + + def make_link(src): + return f"[{ref_text}]({WEB_LINK}{src})" + + # الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد + # مثلا: (qs123) یا (qs123, qs456, qs789) + pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)" + + def replace_source(m): + content = m.group(1) + codes = [c.strip() for c in content.split(",")] # جداسازی چند کد + links = [make_link(code) for code in codes] + full_match = m.group(0) + # if "منبع" in full_match: + # print(f'Found explicit source(s): {links}') + # else: + # print(f'Found implicit source(s): {links}') + return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان + + # جایگزینی در متن + answer_text = re.sub(pattern, replace_source, answer_text) + + # اگر طول کمتر از max_len بود → تمام + if len(answer_text) <= max_len: + return [answer_text] + + # تقسیم متن اگر طول زیاد شد + chunks = [] + current = "" + + sentences = answer_text.split(". ") + for sentence in sentences: + st = sentence.strip() + if not st.endswith("."): + st += "." + + if len(current) + len(st) > max_len: + chunks.append(current.strip()) + current = "" + + current += st + " " + + if current.strip(): + chunks.append(current.strip()) + + return chunks + + + +def send_message( + chat_id: int, + text: str = None, + buttons=None, # inline buttons + chunked_text=None, +): + url = f"https://tapi.bale.ai/bot{TOKEN}/sendMessage" + + if chunked_text is None: + chunks = split_text_chunks(text) + else: + chunks = chunked_text + + reply_keyboard = [ + [{"text": btn} for btn in row] for row in BUTTON_TEXT_TO_CALLBACK_LIST + ] + + total = len(chunks) + + for i, chunk in enumerate(chunks): + is_last = (i == total - 1) + + payload = { + "chat_id": chat_id, + "text": chunk, + } + + # فقط برای پیام آخر کیبورد بفرست + if is_last: + + reply_markup = { + "keyboard": reply_keyboard, + "resize_keyboard": True, + "one_time_keyboard": False, + } + + if buttons: + reply_markup["inline_keyboard"] = buttons + + payload["reply_markup"] = reply_markup + + r = requests.post(url, json=payload) + + # try: + # r.json() + # print("Send:", r.status_code) + # except: + # print("Send:", r.status_code) + + +async def save_to_es(data: QaChat): + # print("save_to_es data rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr") + try: + + es_res = ES_HELPER.update_index_doc( + is_update_state=False, + index_name_o=ES_INDEX_NAME, + eid=data.id, + data=data.model_dump(), + ) + # type_name, payload, request + # print(f"Saved {es_res}") + except: + print("save_to_es - 000000000000000000000000000000000000000000000") + traceback.print_exc() + + +def initialize_webhook(): + """ + این تابع برای ربات اجباری است + به سرور بله مشخص میکند که به چه server ایی درخواست ها را با این توکن داده شده ارسال کند + """ + params = {"url": WEBHOOK_URL} + r = requests.get(SET_WEBHOOK_URL, params=params) + print("Webhook set status:", r.status_code) + print("Webhook set response:", r.json()) + + + +def chunked_simple_text(answer_text, max_len=4000): + chunks = [] + current = "" + + sentences = answer_text.split(". ") + for sentence in sentences: + st = sentence.strip() + if not st.endswith("."): + st += "." + + if len(current) + len(st) > max_len: + chunks.append(current.strip()) + current = "" + + current += st + " " + + if current.strip(): + chunks.append(current.strip()) + + return chunks + + +async def result_chat(text, limit=10, effort="low", again=False): + url = "http://2.188.15.101:8009/run_chat" + + try: + async with httpx.AsyncClient(timeout=TIME_OUT) as client: + response = await client.post( + url, + json={ + "query": text, + "limit": limit, + "effort": effort, + "again": again, + "mode_type":"bale" + + } + ) + response.raise_for_status() + data = response.json() + result = data.get("result", "❌ پاسخی دریافت نشد") + + # print('results_chat ',type(result)) + return result + + except Exception as e: + print(f"❌ خطای RAG:\n{str(e)}") + return "❌ ارتباط با سرور قطع می‌باشد" + +async def result_semantic_search(text, limit): + + url = "http://2.188.15.101:8009/run_semantic_search" + + try: + async with httpx.AsyncClient(timeout=TIME_OUT) as client: + response = await client.post(url, json={"query": text, "limit": limit}) + response.raise_for_status() + data = response.json() # ⚠️ اینجا response.json() فقط داده می‌دهد، شیء نیست + result = data.get("result") + # result = chunked_simple_text(result) + return result + + except Exception as e: + print(f"❌ خطای RAG:\n{str(e)}") + return "ارتباط با بالا قطع می باشد❌" + + + + +session = SessionManager() +bot = BaleBot(session) + + +@app.post(f"/webhook/{TOKEN}") +async def webhook(request: Request): + raw = await request.json() + + try: + update = BaleUpdate(**raw) + except Exception as e: + print("❌ Parse Error", e) + return {"ok": True} + + print(f'update {update}') + + return await bot.handle_update(update) + +if __name__ == "__main__": + print("\n====== Bot Status: ======") + initialize_webhook() + + print("\n====== Bot Webhook Server Running ======") + uvicorn.run("main:app", host="0.0.0.0", port=8005, reload=True) + + +# if __name__ == "__main__": +# # 1️⃣ ابتدا وبهوک را ست می‌کنیم +# print("\n====== Bot Status: ======") +# initialize_webhook() + +# # 2️⃣ بعد سرور FastAPI را اجرا می‌کنیم تا پیام‌ها را دریافت کند +# print("\n====== Bot Webhook Server Running ======") +# uvicorn.run("main:app", host="0.0.0.0", port=8005, reload=True) + + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..83ebe60 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +elasticsearch==8.13.2 diff --git a/samle_env b/samle_env new file mode 100644 index 0000000..a3e9cfe --- /dev/null +++ b/samle_env @@ -0,0 +1,13 @@ +BALE_TOKEN ="" +BALE_ID="" +BALE_NAME="" + +# ELASTIC CONFIGURATION +ES_URL="http://:" +ES_PASSWORD="" +ES_USER_NAME="" +ES_INDEX_NAME="" +ELASTIC_cluster_name="" +ELASTIC_node_name="" +ELASTIC_LIMIT_IMPORT="" + diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..9f7aa24 --- /dev/null +++ b/utils.py @@ -0,0 +1,131 @@ +from elasticsearch import Elasticsearch, helpers +import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time + +from pathlib import Path + +from time import sleep + +# ------------------------ Global-params + +def load_orjson(path: str | Path): + path = Path(path) + with path.open("rb") as f: # باید باینری باز بشه برای orjson + return orjson.loads(f.read()) + + +def save_orjson(path, data): + with open(path, "wb") as f: + f.write( + orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) + ) + + +def split_text_chunks(text: str, max_len: int = 4000): + """Split a long text into safe chunks.""" + return [text[i : i + max_len] for i in range(0, len(text), max_len)] + + +class ElasticHelper: + """ + کلاس ElasticHelper: + نوع ورودی: بدون ورودی مستقیم در تعریف کلاس + نوع خروجی: شیء از نوع ElasticHelper + عملیات: + - متغیرهای کلاسی برای شمارش و مدیریت عملیات تعریف می‌کند + - مسیر پیش‌فرض مپینگ‌ها را تنظیم می‌کند + """ + + counter = 0 + total = 0 + id = "" + path_mappings = os.getcwd() + "/repo/_other/" + + def __init__( + self, + es_url="http://127.0.0.1:6900", + es_pass="", + es_user="elastic", + path_mappings="", + ): + """ + نوع ورودی: + - es_url: آدرس Elasticsearch (str) - پیش‌فرض "http://127.0.0.1:6900" + - es_pass: رمز عبور (str) - پیش‌فرض خالی + - es_user: نام کاربری (str) - پیش‌فرض "elastic" + - path_mappings: مسیر مپینگ‌ها (str) - پیش‌فرض خالی + نوع خروجی: شیء ElasticHelper + عملیات: + - اتصال به Elasticsearch را برقرار می‌کند + - در صورت وجود رمز عبور، از احراز هویت استفاده می‌کند + - تا 10 بار برای اتصال مجدد تلاش می‌کند (هر بار 5 ثانیه انتظار) + - در صورت عدم موفقیت، پیام خطا نمایش داده می‌شود + """ + if path_mappings: + self.path_mappings = path_mappings + + if es_pass == "": + self.es = Elasticsearch(es_url) + else: + self.es = Elasticsearch( + es_url, + basic_auth=(es_user, es_pass), + verify_certs=False, + ) + # print(es_url) + # print(self.es) + + self.success_connect = False + for a in range(0, 10): + try: + if not self.es.ping(): + print("Elastic Connection Not ping, sleep 30 s : ", a) + sleep(5) + continue + else: + self.success_connect = True + break + + except Exception as e: + break + if not self.success_connect: + print("******", "not access to elastic service") + return + + self.counter = 0 + self.total = 0 + self.id = "" + + def search(self, **params): + try: + res = self.es.search(**params) + except: + return {"hits": {"hits": []}} + return res + + def get_document(self, index_name, id): + res = self.es.get(index=index_name, id=id) + return res + + def exist_document(self, index_name, id): + res = self.es.exists(index=index_name, id=id) + return res + + def update_index_doc(self, is_update_state, index_name_o, eid, data): + """ + نوع ورودی: + - is_update_state: تعیین عملیات (update یا index) (bool) + - index_name_o: نام اندیس (str) + - eid: شناسه سند (str) + - data: داده‌های سند (dict) + نوع خروجی: پاسخ Elasticsearch (dict) + عملیات: + - اگر is_update_state=True باشد: سند را آپدیت می‌کند + - در غیر این صورت: سند جدید ایجاد می‌کند + """ + if is_update_state: + resp = self.es.update(index=index_name_o, id=eid, doc=data) + # resp = self.es.update(index=index_name_o, id=eid, body={'doc':data}) + else: + resp = self.es.index(index=index_name_o, id=eid, document=data) + return resp +