mj_bale_chat/utils.py
2025-12-03 14:37:00 +00:00

132 lines
4.4 KiB
Python

from elasticsearch import Elasticsearch, helpers
import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time
from pathlib import Path
from time import sleep
# ------------------------ Global-params
def load_orjson(path: str | Path):
path = Path(path)
with path.open("rb") as f: # باید باینری باز بشه برای orjson
return orjson.loads(f.read())
def save_orjson(path, data):
with open(path, "wb") as f:
f.write(
orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS)
)
def split_text_chunks(text: str, max_len: int = 4000):
"""Split a long text into safe chunks."""
return [text[i : i + max_len] for i in range(0, len(text), max_len)]
class ElasticHelper:
"""
کلاس ElasticHelper:
نوع ورودی: بدون ورودی مستقیم در تعریف کلاس
نوع خروجی: شیء از نوع ElasticHelper
عملیات:
- متغیرهای کلاسی برای شمارش و مدیریت عملیات تعریف می‌کند
- مسیر پیش‌فرض مپینگ‌ها را تنظیم می‌کند
"""
counter = 0
total = 0
id = ""
path_mappings = os.getcwd() + "/repo/_other/"
def __init__(
self,
es_url="http://127.0.0.1:6900",
es_pass="",
es_user="elastic",
path_mappings="",
):
"""
نوع ورودی:
- es_url: آدرس Elasticsearch (str) - پیش‌فرض "http://127.0.0.1:6900"
- es_pass: رمز عبور (str) - پیش‌فرض خالی
- es_user: نام کاربری (str) - پیش‌فرض "elastic"
- path_mappings: مسیر مپینگ‌ها (str) - پیش‌فرض خالی
نوع خروجی: شیء ElasticHelper
عملیات:
- اتصال به Elasticsearch را برقرار می‌کند
- در صورت وجود رمز عبور، از احراز هویت استفاده می‌کند
- تا 10 بار برای اتصال مجدد تلاش می‌کند (هر بار 5 ثانیه انتظار)
- در صورت عدم موفقیت، پیام خطا نمایش داده می‌شود
"""
if path_mappings:
self.path_mappings = path_mappings
if es_pass == "":
self.es = Elasticsearch(es_url)
else:
self.es = Elasticsearch(
es_url,
basic_auth=(es_user, es_pass),
verify_certs=False,
)
# print(es_url)
# print(self.es)
self.success_connect = False
for a in range(0, 10):
try:
if not self.es.ping():
print("Elastic Connection Not ping, sleep 30 s : ", a)
sleep(5)
continue
else:
self.success_connect = True
break
except Exception as e:
break
if not self.success_connect:
print("******", "not access to elastic service")
return
self.counter = 0
self.total = 0
self.id = ""
def search(self, **params):
try:
res = self.es.search(**params)
except:
return {"hits": {"hits": []}}
return res
def get_document(self, index_name, id):
res = self.es.get(index=index_name, id=id)
return res
def exist_document(self, index_name, id):
res = self.es.exists(index=index_name, id=id)
return res
def update_index_doc(self, is_update_state, index_name_o, eid, data):
"""
نوع ورودی:
- is_update_state: تعیین عملیات (update یا index) (bool)
- index_name_o: نام اندیس (str)
- eid: شناسه سند (str)
- data: داده‌های سند (dict)
نوع خروجی: پاسخ Elasticsearch (dict)
عملیات:
- اگر is_update_state=True باشد: سند را آپدیت می‌کند
- در غیر این صورت: سند جدید ایجاد می‌کند
"""
if is_update_state:
resp = self.es.update(index=index_name_o, id=eid, doc=data)
# resp = self.es.update(index=index_name_o, id=eid, body={'doc':data})
else:
resp = self.es.index(index=index_name_o, id=eid, document=data)
return resp