from elasticsearch import Elasticsearch, helpers import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time from pathlib import Path from time import sleep # ------------------------ Global-params def load_orjson(path: str | Path): path = Path(path) with path.open("rb") as f: # باید باینری باز بشه برای orjson return orjson.loads(f.read()) def save_orjson(path, data): with open(path, "wb") as f: f.write( orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) ) def split_text_chunks(text: str, max_len: int = 4000): """Split a long text into safe chunks.""" return [text[i : i + max_len] for i in range(0, len(text), max_len)] class ElasticHelper: """ کلاس ElasticHelper: نوع ورودی: بدون ورودی مستقیم در تعریف کلاس نوع خروجی: شیء از نوع ElasticHelper عملیات: - متغیرهای کلاسی برای شمارش و مدیریت عملیات تعریف می‌کند - مسیر پیش‌فرض مپینگ‌ها را تنظیم می‌کند """ counter = 0 total = 0 id = "" path_mappings = os.getcwd() + "/repo/_other/" def __init__( self, es_url="http://127.0.0.1:6900", es_pass="", es_user="elastic", path_mappings="", ): """ نوع ورودی: - es_url: آدرس Elasticsearch (str) - پیش‌فرض "http://127.0.0.1:6900" - es_pass: رمز عبور (str) - پیش‌فرض خالی - es_user: نام کاربری (str) - پیش‌فرض "elastic" - path_mappings: مسیر مپینگ‌ها (str) - پیش‌فرض خالی نوع خروجی: شیء ElasticHelper عملیات: - اتصال به Elasticsearch را برقرار می‌کند - در صورت وجود رمز عبور، از احراز هویت استفاده می‌کند - تا 10 بار برای اتصال مجدد تلاش می‌کند (هر بار 5 ثانیه انتظار) - در صورت عدم موفقیت، پیام خطا نمایش داده می‌شود """ if path_mappings: self.path_mappings = path_mappings if es_pass == "": self.es = Elasticsearch(es_url) else: self.es = Elasticsearch( es_url, basic_auth=(es_user, es_pass), verify_certs=False, ) # print(es_url) # print(self.es) self.success_connect = False for a in range(0, 10): try: if not self.es.ping(): print("Elastic Connection Not ping, sleep 30 s : ", a) sleep(5) continue else: self.success_connect = True break except Exception as e: break if not self.success_connect: print("******", "not access to elastic service") return self.counter = 0 self.total = 0 self.id = "" def search(self, **params): try: res = self.es.search(**params) except: return {"hits": {"hits": []}} return res def get_document(self, index_name, id): res = self.es.get(index=index_name, id=id) return res def exist_document(self, index_name, id): res = self.es.exists(index=index_name, id=id) return res def update_index_doc(self, is_update_state, index_name_o, eid, data): """ نوع ورودی: - is_update_state: تعیین عملیات (update یا index) (bool) - index_name_o: نام اندیس (str) - eid: شناسه سند (str) - data: داده‌های سند (dict) نوع خروجی: پاسخ Elasticsearch (dict) عملیات: - اگر is_update_state=True باشد: سند را آپدیت می‌کند - در غیر این صورت: سند جدید ایجاد می‌کند """ if is_update_state: resp = self.es.update(index=index_name_o, id=eid, doc=data) # resp = self.es.update(index=index_name_o, id=eid, body={'doc':data}) else: resp = self.es.index(index=index_name_o, id=eid, document=data) return resp