commit 1472bf0e9fcba1f120aa6dfd5932a96d9d180c74 Author: init_mahdi Date: Thu Nov 27 20:31:12 2025 +0000 first step diff --git a/monir/.env b/monir/.env new file mode 100644 index 0000000..8a81336 --- /dev/null +++ b/monir/.env @@ -0,0 +1,4 @@ +ES_URL = 'http://192.168.23.60/9200' +ES_USER_NAME = 'elastic' +ES_PASSWORD = '1234' +LLM_URL = 'http://2.188.15.102:8001/v1/' diff --git a/monir/__init__.py b/monir/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/monir/__pycache__/base_model.cpython-310.pyc b/monir/__pycache__/base_model.cpython-310.pyc new file mode 100644 index 0000000..70bac45 Binary files /dev/null and b/monir/__pycache__/base_model.cpython-310.pyc differ diff --git a/monir/__pycache__/es_helper.cpython-310.pyc b/monir/__pycache__/es_helper.cpython-310.pyc new file mode 100644 index 0000000..f1f7922 Binary files /dev/null and b/monir/__pycache__/es_helper.cpython-310.pyc differ diff --git a/monir/__pycache__/llm_helper.cpython-310.pyc b/monir/__pycache__/llm_helper.cpython-310.pyc new file mode 100644 index 0000000..3b77563 Binary files /dev/null and b/monir/__pycache__/llm_helper.cpython-310.pyc differ diff --git a/monir/base_model.py b/monir/base_model.py new file mode 100644 index 0000000..995556b --- /dev/null +++ b/monir/base_model.py @@ -0,0 +1,339 @@ +from pydantic import BaseModel, Field +from typing import Optional, List, Dict, Union, Any + + +# فیلد اجباری: +# str = Field(..., description=) +# فیلد اختیاری: +# Optional[str] = Field("", description=) + + +class Properties(BaseModel): + """ + Standard Form Of Foreign Key To Other Index + """ + + id: str = Field(..., description="شناسه یکتا") + title: Optional[str] = Field("", description="عنوان") + author: Optional[str] = Field("", description="سازنده") + sub_type: Optional[str] = Field("", description="نوع دوم") + + +class TreeInfo(BaseModel): + title: str + parent_id: str = Field(..., description="") + child_order: int + level: int + full_path: Optional[str] = Field("", description="") + path_headings: str + + +class NlpParses(BaseModel): + main_type: str + nlp_type: str + begin: int + end: int + text: str + referes: str + dependency_infos: str # nested + + +class Embeddings(BaseModel): + type: Optional[str] = (Field("", description=""),) + dims: Optional[int] = (Field(1024, description=""),) + index: Optional[bool] = (Field(True, description=""),) + similarity: Optional[str] = Field("", description="") + + +class FileLinks(BaseModel): + title: str + link: str + type: str + description: str + + +class UserLogs(BaseModel): + id: str + user_id: int + username: str + time_edit: int + property: str + + +class MnSection(BaseModel): + id: str + old_id: str + main_type: str + html: str + content: str + meet_info: Properties + term_info: Properties + tree_info: TreeInfo + content_len: int + word_len: int + tags: str + keywords: str + sort_date_timestamp: int + nlp_parses: NlpParses + embeddings: Embeddings + file_links: FileLinks + time_edit: int + user_edit: int + user_logs: UserLogs + + +class MnTerm(BaseModel): + id: str + author: str + sub_type: Optional[str] = "" + title: Optional[str] = "" + begin_date: Optional[int] = "" + end_date: Optional[int] = "" + begin_year: Optional[int] = "" + main_type: Optional[str] = "" + content: Optional[str] = "" + tags: Optional[str] = "" + keywords: Optional[str] = "" + + +class MnMeetEntity(BaseModel): + id: str + main_type: str + sub_type: str + title: str + content: str + permit_tags: str + search_state: str + user_create: str + time_create: int + time_edit: int + file_links: FileLinks + meet_info: Properties + term_info: Properties + + +class Subjects(BaseModel): + id: int + title: str + + +class ReportInfo(BaseModel): + films: int + sounds: int + photos: int + + +class MnMeet(BaseModel): + """ + Monir Meet Standard fields foramt + """ + + id: str + sanad_id: str + main_type: str + sub_type: Optional[str] = "" + person_code: Optional[str] = "" + research_code: str + meet_code: str + old_meet_id: int + title: str + meet_no: int + author: str + term_info: Optional[Properties] = Field("", description="کلید اتصال به جدول ترم") + subtitle: str + subjects: Optional[List[Subjects]] = [] + allwords: str + tags: Optional[List[str]] = [] + keywords: str + verb: str + sanad_year: Optional[int] = "" + sanad_date: Optional[int] = "" + amplify: str + ralation: str + city: str + place: str + address: str + audience: str + attendees: str + report_info: ReportInfo + mindex: Optional[str] = Field("", description="mindex index فهرست") + mintro: Optional[str] = Field("", description="mintro خلاصه ") + content: str + completion: Optional[str] = Field("", description="type: completion") + sort_date_timestamp: int + permit_tags: str + resource_info: str + + +class MnSanadLink(BaseModel): + text: Optional[str] = "" + link: str + in_search: bool + title: str + + + +class Properties(BaseModel): + """ + Standard Form Of Foreign Key To Other Index + """ + + id: str = Field(..., description="شناسه یکتا") + title: Optional[str] = Field("", description="عنوان") + author: Optional[str] = Field("", description="سازنده") + sub_type: Optional[str] = Field("", description="نوع دوم") + +class TreeProperties(BaseModel): + parent_id :str + child_order: int + level: int + full_path: str + title: str + path_headings: str + +class NlpParses(BaseModel): + main_type: str + nlp_type: str + begin: int + end: int + text: str + referes: str + dependency_infos: Dict + +class UserLogs(BaseModel): + id:str + user_id:int + username:str + time_edit:int + property:str + +class MNSection(BaseModel): + main_type : str + id : str + html : str + content : str + meet_info: Properties + term_info: Properties + tree_info: TreeProperties + content_len: int + word_len: int + tags: str + keywords: str + sort_date_timestamp: int + nlp_parses: NlpParses + embeddings: Embeddings + file_links:FileLinks + time_edit: int + user_edit: int + user_logs: UserLogs + + +class MnSanadVersionInfo(BaseModel): + timestamp: int + number: int + title: str + + +class SoundLinks(BaseModel): + link: str + title: str + + +class MnSanad(BaseModel): + id: Optional[str] = "" + sort_date_timestamp: Optional[int] = "" + title: Optional[str] = "" + subtitle: Optional[str] = "" + research_code: Optional[int] = "" + content: Optional[str] = "" + version_info: Optional[MnSanadVersionInfo] = "" + meet_lid: Optional[int] = "" + meet_id: Optional[int] = "" + meet_no: Optional[int] = "" + meet_code: Optional[int] = "" + allwords: Optional[str] = "" + keywords: Optional[str] = "" + person_code: Optional[str] = "" + subject: Optional[List] = "" + city: Optional[str] = "" + author: str + begin_year: Optional[int] = "" + begin_date: Optional[int] = "" + end_date: Optional[int] = "" + branch: Optional[str] = "" + ralation: Optional[str] = "" + research_id: Optional[int] = "" + mintro: Optional[str] = "" + mindex: Optional[str] = "" + RowNum: Optional[int] = "" + resource_info: Optional[str] = "" + in_tadvin: Optional[bool] = "" + format: Optional[str] = "" + verb: Optional[str] = "" + address: Optional[str] = "" + attendees: Optional[str] = "" + amplify: Optional[str] = "" + audience: Optional[str] = "" + place: Optional[str] = "" + permit_tags: Optional[str] = "" + photos: Optional[int] = "" + tags: Optional[List[str]] = "" + films: Optional[int] = "" + sounds: Optional[int] = "" + file_links: Optional[List[MnSanadLink]] = "" + sound_links: Optional[List[SoundLinks]] = "" + video_links: Optional[List[SoundLinks]] = "" + photo_links: Optional[List[SoundLinks]] = "" + + model_config = {"exclude_" "": True} + + +class MnSanad2Meet(BaseModel): + id: str + sort_date_timestamp: Optional[int] = "" + title: str + subtitle: str + research_code: str + format: str + content: str + # version_info: Optional[List|dict] = "" + meet_lid: Optional[str] = "" + meet_id: Optional[int] = "" + meet_no: Optional[int] = "" + meet_code: Optional[str] = "" + allwords: Optional[str] = "" + keywords: Optional[str] = "" + person_code: Optional[str] = "" + subject: Optional[List] = [] + city: Optional[str] = "" + author: str + begin_year: Optional[int] = "" + begin_date: Optional[str] = "" + end_date: Optional[str] = "" + branch: str + ralation: Optional[str] = "" + research_id: int + mintro: Optional[str] = "" + mindex: Optional[str] = "" + # RowNum: Optional[int]= "" + resource_info: Optional[str] = "" + # in_tadvin: Optional[bool] = "" + verb: Optional[str] = "" + address: Optional[str] = "" + attendees: Optional[str] = "" + amplify: Optional[str] = "" + audience: Optional[str] = "" + place: Optional[str] = "" + permit_tags: Optional[str] = "" + photos: Optional[int] = 0 + tags: Optional[List[str]] = [] + films: Optional[int] = 0 + sounds: Optional[int] = 0 + file_links: Optional[List[MnSanadLink]] = "" + sound_links: Optional[List[SoundLinks]] = "" + video_links: Optional[List[SoundLinks]] = "" + photo_links: Optional[List[SoundLinks]] = "" + + # model_config = { + # "exclude_""": True + # } diff --git a/monir/doc_type.py b/monir/doc_type.py new file mode 100644 index 0000000..91eaa9a --- /dev/null +++ b/monir/doc_type.py @@ -0,0 +1,349 @@ +type_count = [ + { + "key": "جلسه علمی", + "doc_count": 7332 + }, + { + "key": "منبر", + "doc_count": 3970 + }, + { + "key": "درس خارج", + "doc_count": 2450 + }, + { + "key": "تدریس", + "doc_count": 1401 + }, + { + "key": "سخنرانی", + "doc_count": 1221 + }, + { + "key": "-", + "doc_count": 992 + }, + { + "key": "مشاوره", + "doc_count": 858 + }, + { + "key": "مدیریت", + "doc_count": 652 + }, + { + "key": "مصاحبه", + "doc_count": 628 + }, + { + "key": "مباحثه و گفتگو", + "doc_count": 587 + }, + { + "key": "جزوه", + "doc_count": 583 + }, + { + "key": "مناظره و گفتگو", + "doc_count": 496 + }, + { + "key": "گزارش", + "doc_count": 395 + }, + { + "key": "--", + "doc_count": 250 + }, + { + "key": "جدول", + "doc_count": 208 + }, + { + "key": "کتاب", + "doc_count": 173 + }, + { + "key": "صورت جلسه", + "doc_count": 126 + }, + { + "key": "فهرست", + "doc_count": 124 + }, + { + "key": "ارائه", + "doc_count": 90 + }, + { + "key": "بازدید (دیدار)", + "doc_count": 83 + }, + { + "key": "مقاله", + "doc_count": 81 + }, + { + "key": "طرح", + "doc_count": 74 + }, + { + "key": "کلیپ", + "doc_count": 68 + }, + { + "key": "گزارش رصد", + "doc_count": 64 + }, + { + "key": "نامه", + "doc_count": 48 + }, + { + "key": "نمودار", + "doc_count": 45 + }, + { + "key": "آیین نامه", + "doc_count": 36 + }, + { + "key": "تقریر", + "doc_count": 35 + }, + { + "key": "خلاصه", + "doc_count": 25 + }, + { + "key": "پیش نویس", + "doc_count": 25 + }, + { + "key": "نقد", + "doc_count": 23 + }, + { + "key": "چکیده", + "doc_count": 16 + }, + { + "key": "یادبود", + "doc_count": 13 + }, + { + "key": "فرم", + "doc_count": 12 + }, + { + "key": "فرم فیش", + "doc_count": 11 + }, + { + "key": "مناجات", + "doc_count": 10 + }, + { + "key": "دعا و مناجات", + "doc_count": 8 + }, + { + "key": "فیش", + "doc_count": 6 + }, + { + "key": "پرسش و پاسخ", + "doc_count": 6 + }, + { + "key": "بیانیه", + "doc_count": 5 + }, + { + "key": "اجلاسیه", + "doc_count": 4 + }, + { + "key": "پایان نامه", + "doc_count": 4 + }, + { + "key": "", + "doc_count": 3 + }, + { + "key": "آئین نامه", + "doc_count": 3 + }, + { + "key": "رزومه", + "doc_count": 3 + }, + { + "key": "قرارداد", + "doc_count": 3 + }, + { + "key": "مصوبه", + "doc_count": 3 + }, + { + "key": "نمونه سوال", + "doc_count": 3 + }, + { + "key": "همایش", + "doc_count": 3 + }, + { + "key": "پژوهش", + "doc_count": 3 + }, + { + "key": "چارت", + "doc_count": 3 + }, + { + "key": "کنفرانس", + "doc_count": 3 + }, + { + "key": "تلخیص", + "doc_count": 2 + }, + { + "key": "قرائت زیارت", + "doc_count": 2 + }, + { + "key": "لیست", + "doc_count": 2 + },{ + "key": "متن جلسه", + "doc_count": 2 + }, + { + "key": "مستند تلوزیونی", + "doc_count": 2 + }, + { + "key": "نشست علمی", + "doc_count": 2 + }, + { + "key": "کتاب داخلی", + "doc_count": 2 + }, + { + "key": "گزارش جلسه", + "doc_count": 2 + }, + { + "key": "برنامه", + "doc_count": 1 + }, + { + "key": "بروشور", + "doc_count": 1 + }, + { + "key": "بزرگداشت", + "doc_count": 1 + }, + { + "key": "جزو", + "doc_count": 1 + }, + { + "key": "خطبه عقد(دائم)", + "doc_count": 1 + }, + { + "key": "روضه", + "doc_count": 1 + }, + { + "key": "زندگی نامه", + "doc_count": 1 + }, + { + "key": "زیارتنامه", + "doc_count": 1 + }, + { + "key": "سائر", + "doc_count": 1 + }, + { + "key": "سالگرد", + "doc_count": 1 + }, + { + "key": "سایر", + "doc_count": 1 + }, + { + "key": "طرج", + "doc_count": 1 + }, + { + "key": "عقد", + "doc_count": 1 + }, + { + "key": "ماتریس", + "doc_count": 1 + }, + { + "key": "مدل", + "doc_count": 1 + }, + { + "key": "مراحل دستیابی و به کارگیری الگوی پیشرفت اسلامی ـ ", + "doc_count": 1 + }, + { + "key": "مقالات", + "doc_count": 1 + }, + { + "key": "مقاله و ارائه", + "doc_count": 1 + }, + { + "key": "نماه", + "doc_count": 1 + }, + { + "key": "نمایه", + "doc_count": 1 + }, + { + "key": "هرم", + "doc_count": 1 + }, + { + "key": "پرسشنامه", + "doc_count": 1 + }, + { + "key": "پروژه", + "doc_count": 1 + }, + { + "key": "پیش نشست", + "doc_count": 1 + }, + { + "key": "کاربرگ", + "doc_count": 1 + }, + { + "key": "کتاب سایت", + "doc_count": 1 + }, + { + "key": "کمیسیون خبرگان", + "doc_count": 1 + } + ] \ No newline at end of file diff --git a/monir/es_helper.py b/monir/es_helper.py new file mode 100644 index 0000000..7cb190b --- /dev/null +++ b/monir/es_helper.py @@ -0,0 +1,1134 @@ +import zipfile +import sys +import os +import json +from time import sleep +from elasticsearch import Elasticsearch, helpers + + +class ElasticHelper: + """ + کلاس ElasticHelper: + نوع ورودی: بدون ورودی مستقیم در تعریف کلاس + نوع خروجی: شیء از نوع ElasticHelper + عملیات: + - متغیرهای کلاسی برای شمارش و مدیریت عملیات تعریف می‌کند + - مسیر پیش‌فرض مپینگ‌ها را تنظیم می‌کند + """ + + counter = 0 + total = 0 + id = "" + path_mappings = os.getcwd() + "/repo/_other/" + + def __init__( + self, + es_url="http://127.0.0.1:6900", + es_pass="", + es_user="elastic", + path_mappings="", + ): + """ + نوع ورودی: + - es_url: آدرس Elasticsearch (str) - پیش‌فرض "http://127.0.0.1:6900" + - es_pass: رمز عبور (str) - پیش‌فرض خالی + - es_user: نام کاربری (str) - پیش‌فرض "elastic" + - path_mappings: مسیر مپینگ‌ها (str) - پیش‌فرض خالی + نوع خروجی: شیء ElasticHelper + عملیات: + - اتصال به Elasticsearch را برقرار می‌کند + - در صورت وجود رمز عبور، از احراز هویت استفاده می‌کند + - تا 10 بار برای اتصال مجدد تلاش می‌کند (هر بار 5 ثانیه انتظار) + - در صورت عدم موفقیت، پیام خطا نمایش داده می‌شود + """ + if path_mappings: + self.path_mappings = path_mappings + + if es_pass == "": + self.es = Elasticsearch(es_url) + else: + self.es = Elasticsearch( + es_url, + basic_auth=(es_user, es_pass), + verify_certs=False, + ) + # print(es_url) + # print(self.es) + + self.success_connect = False + for a in range(0, 10): + try: + if not self.es.ping(): + print("Elastic Connection Not ping, sleep 30 s : ", a) + sleep(5) + continue + else: + self.success_connect = True + break + + except Exception as e: + break + if not self.success_connect: + print("******", "not access to elastic service") + return + + self.counter = 0 + self.total = 0 + self.id = "" + + def search(self, **params): + try: + res = self.es.search(**params) + except: + return {"hits": {"hits": []}} + return res + + def get_document(self, index_name, id): + res = self.es.get(index=index_name, id=id) + return res + + def exist_document(self, index_name, id): + res = self.es.exists(index=index_name, id=id) + return res + + + def update_index_doc(self, is_update_state, index_name_o, eid, data): + """ + نوع ورودی: + - is_update_state: تعیین عملیات (update یا index) (bool) + - index_name_o: نام اندیس (str) + - eid: شناسه سند (str) + - data: داده‌های سند (dict) + نوع خروجی: پاسخ Elasticsearch (dict) + عملیات: + - اگر is_update_state=True باشد: سند را آپدیت می‌کند + - در غیر این صورت: سند جدید ایجاد می‌کند + """ + if is_update_state: + resp = self.es.update(index=index_name_o, id=eid, doc=data) + # resp = self.es.update(index=index_name_o, id=eid, body={'doc':data}) + else: + resp = self.es.index(index=index_name_o, id=eid, document=data) + return resp + + # ----------------------------start--------------------------- + # متد exportToJsonForAI: + # نوع ورودی: + # - path_back: مسیر ذخیره فایل (str) + # - index_name: نام اندیس (str) + # - out_name: نام فایل خروجی (str) - پیش‌فرض خالی + # - body: بدنه جستجو (dict) - پیش‌فرض خالی + # - fields: لیست فیلدهای مورد نیاز (list) - پیش‌فرض خالی + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - داده‌های اندیس را با استفاده از scroll API دریافت می‌کند + # - پیشرفت عملیات را نمایش می‌دهد + # - داده‌ها را در فایل JSON ذخیره می‌کند + # - اگر fields مشخص شده باشد، فقط فیلدهای مورد نظر را استخراج می‌کند + # -----------------------------end---------------------------- + def exportToJsonForAI( + self, + path_back, + index_name, + out_name="", + body={}, + fields=[], + mode="normal", + collapse_field="", + ): + print("*" * 50, " start backup -->", index_name) + self.counter = 0 + sid = None + + out = out_name + if out_name == "": + out = index_name + + s_res = self.es.search(index=index_name, scroll="5m", size=1000, body=body) + self.total = s_res["hits"]["total"]["value"] + + print("start index = %s" % index_name) + print("total = %d" % self.total) + + sid = s_res["_scroll_id"] + scroll_size = len(s_res["hits"]["hits"]) + pack_count = 1 + out_json = [] + if mode == "dict" or collapse_field: + out_json = {} + + prev_collapse_value = "" + pack_collapse = [] + while scroll_size > 0: + "Scrolling..." + self.counter += scroll_size + print("progress -> %.2f %%" % ((self.counter / self.total) * 100)) + + #### for test + # if pack_count > 2 : + # break + pack_count += 1 + ############################# + for item in s_res["hits"]["hits"]: + id = item["_id"] + item2 = None + if mode == "id": # فقط شناسه ها نیاز هست + item2 = id + elif fields: + item2 = {} + item2["id"] = id + for kf in fields: + # print(kf) + if kf in item["_source"]: + # print(item['_source'][kf]) + item2[kf] = item["_source"][kf] + elif "." in kf: + cols = kf.split(".") + subsource = item["_source"] + for sub in cols: + if sub in subsource: + subsource = subsource[sub] + continue + else: + break + key = kf.replace(".", "__") + item2[key] = subsource + + # exit() + else: + item2 = {} + item2 = item + + if collapse_field and collapse_field in item["_source"]: + collapse_value = item["_source"][collapse_field] + if not prev_collapse_value: + prev_collapse_value = collapse_value + + if not collapse_value == prev_collapse_value: + out_json[prev_collapse_value] = pack_collapse + pack_collapse = [] + prev_collapse_value = collapse_value + + pack_collapse.append(item2) + + elif mode == "dict": + out_json[id] = item2 + else: + out_json.append(item2) + + s_res = self.es.scroll(scroll_id=sid, scroll="2m", request_timeout=100000) + sid = s_res["_scroll_id"] + scroll_size = len(s_res["hits"]["hits"]) + + sid = None + + if collapse_field and prev_collapse_value and pack_collapse: + out_json[prev_collapse_value] = pack_collapse + + with open(path_back + "/" + out, "w", encoding="utf-8") as fout: + json.dump(out_json, fout, ensure_ascii=False, indent=4) + + ############################## + + # ----------------------------start--------------------------- + # متد backupIndexToZipfile: + # نوع ورودی: + # - path_back: مسیر ذخیره فایل (str) + # - index_name: نام اندیس (str) + # - out_name: نام فایل خروجی (str) - پیش‌فرض خالی + # - body: بدنه جستجو (dict) - پیش‌فرض {"size":1000} + # - byzip: تعیین فرمت خروجی (bool) - پیش‌فرض True + # - fields: لیست فیلدهای مورد نیاز (list) - پیش‌فرض خالی + # - noFields: لیست فیلدهای مورد حذف (list) - پیش‌فرض خالی + # نوع خروجی: bool (True اگر داده وجود داشته باشد) + # عملیات: + # - داده‌های اندیس را با استفاده از scroll API دریافت می‌کند + # - پیشرفت عملیات را نمایش می‌دهد + # - داده‌ها را در فایل ZIP یا JSON ذخیره می‌کند + # - اگر fields مشخص شده باشد، فقط فیلدهای مورد نظر را استخراج می‌کند + # - اگر noFields مشخص شده باشد، فیلدهای مورد نظر را حذف می‌کند + # -----------------------------end---------------------------- + def backupIndexToZipfile( + self, + path_back, + index_name, + file_name="", + body={"size": 1000}, + byzip=True, + fields=[], + noFields=[], + ): + print("*" * 50, " start backup -->", index_name) + self.counter = 0 + sid = None + + out = index_name + + if file_name == "": + file_name = index_name + + if body == {}: + s_res = self.es.search(index=index_name, scroll="5m", size=1000) + else: + s_res = self.es.search(index=index_name, scroll="5m", body=body) + self.total = s_res["hits"]["total"]["value"] + if self.total == 0: + print("total index_name by query = %d" % self.total) + return False + + if byzip: + fout = zipfile.ZipFile(path_back + "/" + file_name + ".zip", "w") + else: + fout = open(path_back + "/" + file_name + ".json", "a+", encoding="utf-8") + + print("start index = %s" % index_name) + print("total = %d" % self.total) + + sid = s_res["_scroll_id"] + scroll_size = len(s_res["hits"]["hits"]) + file_count = 1 + prev_percent = 0 + while scroll_size > 0: + "Scrolling..." + self.counter += scroll_size + percent = int((self.counter / self.total) * 100) + if percent != prev_percent: + print("progress -> %.2f %%" % percent) + prev_percent = percent + ############################# + out_json = [] + for item in s_res["hits"]["hits"]: + if fields: + item2 = {} + item2["id"] = item["_id"] + item2["_source"] = {} + for kf in fields: + if kf in item["_source"]: + item2["_source"][kf] = item["_source"][kf] + else: + item2 = item + + if noFields: + for kf in noFields: + if kf in item2["_source"]: + del item2["_source"][kf] + + out_json.append(item2) + + text = json.dumps(out_json, ensure_ascii=False) + out_json = [] + if byzip: + filename = out + str(file_count) + ".json" + file_count += 1 + fout.writestr(filename, text.encode("utf-8"), zipfile.ZIP_DEFLATED) + else: + fout.write(text) + + ############################## + s_res = self.es.scroll(scroll_id=sid, scroll="2m", request_timeout=100000) + sid = s_res["_scroll_id"] + scroll_size = len(s_res["hits"]["hits"]) + sid = None + fout.close() + + # ----------------------------start--------------------------- + # متد restorFileToElastic: + # نوع ورودی: + # - path_back: مسیر فایل (str) + # - index_name: نام اندیس (str) + # - app_key: کلید برنامه برای مپینگ (str) - پیش‌فرض خالی + # - queryDelete: تعیین حذف اندیس قبل از بازیابی (bool) - پیش‌فرض True + # - map_name: نام فایل مپینگ (str) - پیش‌فرض خالی + # نوع خروجی: bool + # عملیات: + # - وجود فایل ZIP را بررسی می‌کند + # - اگر queryDelete=True باشد، از کاربر تأیید حذف را می‌گیرد + # - اندیس را ایجاد می‌کند + # - داده‌ها را از فایل ZIP به Elasticsearch بازیابی می‌کند + # -----------------------------end---------------------------- + def restorFileToElastic( + self, path_back, index_name, app_key="", queryDelete=True, map_name="" + ): + if not os.path.exists(path_back): + print(" **** error *** path not exist: ", path_back) + return False + + file_path = path_back + "/" + index_name + ".zip" + if not os.path.exists(file_path): + return False + + if queryDelete: + # اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند + if self.deleteIndex(index_name): + self.createIndex(index_name, app_key, map_name) + self.zipFileToElastic(file_path, index_name) + else: # اگر وجود داشته باشد پرش می کند و کاری نمیکند + self.createIndex(index_name, app_key, map_name) + self.zipFileToElastic(file_path, index_name) + + def restorFileToElastic2( + self, path_file, index_name, app_key="", queryDelete=True, map_name="" + ): + if not os.path.exists(path_file): + print(" **** error *** path not exist: ", path_file) + return False + + file_path = path_file + if not os.path.exists(file_path): + return False + + if queryDelete: + # اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند + if self.deleteIndex(index_name): + self.createIndex(index_name, app_key, map_name) + self.zipFileToElastic(file_path, index_name) + else: # اگر وجود داشته باشد پرش می کند و کاری نمیکند + self.createIndex(index_name, app_key, map_name) + self.zipFileToElastic(file_path, index_name) + + # ----------------------------start--------------------------- + # متد renameElasticIndex: + # نوع ورودی: + # - index_name_i: نام اندیس منبع (str) + # - index_name_o: نام اندیس مقصد (str) + # - app_key: کلید برنامه برای مپینگ (str) - پیش‌فرض خالی + # - map_name: نام فایل مپینگ (str) - پیش‌فرض خالی + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - اندیس مقصد را ایجاد می‌کند + # - با استفاده از reindex API، داده‌ها را از اندیس منبع به مقصد منتقل می‌کند + # - پیشرفت عملیات را نمایش می‌دهد + # -----------------------------end---------------------------- + def renameElasticIndex(self, index_name_i, index_name_o, app_key="", map_name=""): + + if self.createIndex(index_name_o, app_key, map_name): + res = self.es.reindex( + body={ + "source": {"index": index_name_i}, + "dest": {"index": index_name_o}, + }, + wait_for_completion=False, + ) + + print(type(res)) + print(res) + + taskid = res["task"] if res["task"] else "" + # tasks = client.TasksClient(self.es) + tasks = self.es.tasks + while True: + res = tasks.get(task_id=taskid) + if res["completed"]: + break + + # print( res["task"]) + print( + "----", + index_name_o, + " imported : ", + res["task"]["status"]["total"], + " / ", + res["task"]["status"]["created"], + ) + sleep(1) + print("----", index_name_o, " complated") + + # ----------------------------start--------------------------- + # متد deleteIndex: + # نوع ورودی: + # - index_name: نام اندیس (str) + # نوع خروجی: bool + # عملیات: + # - وجود اندیس را بررسی می‌کند + # - از کاربر تأیید حذف را می‌گیرد + # - در صورت تأیید، اندیس را حذف می‌کند + # -----------------------------end---------------------------- + def deleteIndex(self, index_name): + if not self.es.indices.exists(index=index_name): + print(" " * 10, " for delete NOT exist index :", index_name) + return True + + question = "Is DELETE elastic index (" + index_name + ") ? " + if self.query_yes_no(question): + self.es.indices.delete(index=index_name) + print("%" * 10, " Finish DELETE index :", index_name) + return True + else: + return False + + # ----------------------------start--------------------------- + # متد query_yes_no: + # نوع ورودی: + # - question: سوال نمایش داده شده (str) + # - default: پاسخ پیش‌فرض (str) - پیش‌فرض "no" + # نوع خروجی: bool + # عملیات: + # - سوال را به کاربر نمایش می‌دهد + # - پاسخ کاربر را دریافت و اعتبارسنجی می‌کند + # - True برای 'yes'/'y' و False برای 'no'/'n' برمی‌گرداند + # -----------------------------end---------------------------- + def query_yes_no(self, question, default="no"): + valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} + if default is None: + prompt = " [y/n] " + elif default == "yes": + prompt = " [Y/n] " + elif default == "no": + prompt = " [y/N] " + else: + raise ValueError("invalid default answer: '%s'" % default) + + while True: + print("%" * 10, " quistion ", "%" * 10, "\n") + sys.stdout.write(question + prompt) + choice = input().lower() + if default is not None and choice == "": + return valid[default] + elif choice in valid: + return valid[choice] + else: + sys.stdout.write( + "لطفا یکی از موارد روبرو را وارد کنید : 'yes' or 'no' " + "(or 'y' or 'n').\n" + ) + + + def createIndexIfNotExist(self, index_name_o, mapping_o=""): + """ + # نوع ورودی: + # - index_name_o: نام اندیس (str) + # - mapping_o: مپینگ اندیس (str) - پیش‌فرض خالی + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - وجود اندیس را بررسی می‌کند + # - در صورت عدم وجود، اندیس را با مپینگ مشخص شده ایجاد می‌کند + """ + try: + if not self.es.indices.exists(index=index_name_o): + response = self.es.indices.create(index=index_name_o, body=mapping_o) + # print out the response: + print("create index response:", response) + except: + print("....... index exist ! ... not created") + + # ----------------------------start--------------------------- + # متد createIndex: + # نوع ورودی: + # - index_name: نام اندیس (str) + # - app_key: کلید برنامه برای مپینگ (str) - پیش‌فرض خالی + # - map_name: نام فایل مپینگ (str) - پیش‌فرض خالی + # نوع خروجی: bool + # عملیات: + # - وجود اندیس را بررسی می‌کند + # - مسیر فایل مپینگ را تعیین می‌کند + # - فایل مپینگ را خوانده و اندیس را ایجاد می‌کند + # - در صورت عدم یافت فایل مپینگ، خطا نمایش داده می‌شود + # -----------------------------end---------------------------- + def createIndex(self, index_name, app_key="", map_name=""): + + path_base = self.path_mappings + path_mapping1 = path_base + "general/" + if app_key == "": + app_key = "tavasi" + path_mapping2 = path_base + app_key + "/" + + if map_name == "": + map_name = index_name + + if self.es.indices.exists(index=index_name): + print("============== exist index :", index_name) + return True + + if map_name == "mj_rg_section" or map_name == "semantic_search": + map_name = "mj_qa_section" + elif map_name[-3] == "_ai": + map_name = [0 - len(map_name) - 3] + print(map_name) + + mapping_file_path = path_mapping1 + map_name + ".json" + print("mapping_file_path : ", mapping_file_path) + if not os.path.isfile(mapping_file_path): + if not os.path.isfile(mapping_file_path): + mapping_file_path = path_mapping2 + map_name + ".json" + + print("mapping_file_path : ", mapping_file_path) + + # Create Index With Mapping + if os.path.isfile(mapping_file_path): + mapping_file = open(mapping_file_path, "r", encoding="utf-8") + mapping_file_read = mapping_file.read() + mapping_data = json.loads(mapping_file_read) + mapping_file.close() + if self.es.indices.exists(index=index_name): + print("============== exist index :", index_name) + else: + self.es.indices.create(index=index_name, body=mapping_data) + return True + else: + print("*** error not find maping file elastic : *******", mapping_file_path) + return False + + # ----------------------------start--------------------------- + # متد updateBulkList: + # نوع ورودی: + # - listData: لیست داده‌ها (list) + # - index_name: نام اندیس (str) + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - داده‌ها را به صورت bulk آپدیت می‌کند + # - از helpers.bulk Elasticsearch استفاده می‌کند + # -----------------------------end---------------------------- + def updateBulkList(self, listData, index_name): + chunk_size = 100000 + raise_on_error = False + raise_on_exception = False + stats_only = True + yield_ok = False + + actions = [] + for item in listData: + actions.append( + { + "_op_type": "update", + "_index": index_name, + "_id": item["_id"], + "doc": item["_source"], + } + ) + helpers.bulk( + self.es, + actions, + chunk_size, + raise_on_error, + raise_on_exception, + stats_only, + yield_ok, + ) + + # ----------------------------start--------------------------- + # متد importBulkList: + # نوع ورودی: + # - listData: لیست داده‌ها (list) + # - index_name: نام اندیس (str) + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - داده‌ها را به صورت bulk وارد می‌کند + # - از helpers.bulk Elasticsearch استفاده می‌کند + # -----------------------------end---------------------------- + def importBulkList(self, listData, index_name): + chunk_size = 100000 + raise_on_error = False + raise_on_exception = False + stats_only = True + yield_ok = False + + for item in listData: + actions = [ + { + "_op_type": "index", + "_index": index_name, + "_id": item["_id"], + "_source": item["_source"], + } + ] + helpers.bulk( + self.es, + actions, + chunk_size, + raise_on_error, + raise_on_exception, + stats_only, + yield_ok, + ) + + # ----------------------------start--------------------------- + # متد importJsonDataToElastic: + # نوع ورودی: + # - jsonData: داده‌های JSON (list) + # - index_name: نام اندیس (str) + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - داده‌ها را به صورت bulk وارد می‌کند + # - از helpers.bulk Elasticsearch استفاده می‌کند + # -----------------------------end---------------------------- + def importJsonDataToElastic(self, jsonData, index_name, fields=[]): + chunk_size = 100000 + raise_on_error = False + raise_on_exception = False + stats_only = True + yield_ok = False + + actions = [] + + for item in jsonData: + id = item["_id"] if item["_id"] else item["id"] + source = item["_source"] + if fields: + source = {} + for col in fields: + if col in item["_source"]: + source[col] = item["_source"] + + actions.append( + { + "_op_type": "index", + "_index": index_name, + "_id": id, + "_source": source, + } + ) + helpers.bulk( + self.es, + actions, + chunk_size, + raise_on_error, + raise_on_exception, + stats_only, + yield_ok, + ) + + # ----------------------------start--------------------------- + # متد fileToElastic: + # نوع ورودی: + # - file_path: مسیر فایل (str) + # - index_name: نام اندیس (str) + # - limit_pack: محدودیت تعداد بسته‌ها (int) - پیش‌فرض -1 + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - فایل JSON را خوانده و داده‌ها را به Elasticsearch وارد می‌کند + # - اندیس را refresh می‌کند + # - تعداد اسناد را نمایش می‌دهد + # -----------------------------end---------------------------- + def fileToElastic(self, file_path, index_name, limit_pack=-1, fields=[]): + if not os.path.exists(file_path): + print("file zip:", file_path, " not exist") + return + print("index:", index_name, "=>", file_path) + self.counter = 0 + with open(file_path) as file: + data = json.loads(file.read()) + self.importJsonDataToElastic(data, index_name, fields) + + self.es.indices.refresh(index=index_name) + print(self.es.cat.count(index=index_name, format="json")) + + # ----------------------------start--------------------------- + # متد zipFileToElastic: + # نوع ورودی: + # - file_path: مسیر فایل ZIP (str) + # - index_name: نام اندیس (str) + # - limit_pack: محدودیت تعداد فایل‌ها (int) - پیش‌فرض -1 + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - فایل ZIP را باز کرده و هر فایل JSON داخل آن را پردازش می‌کند + # - داده‌ها را به Elasticsearch وارد می‌کند + # - اندیس را refresh می‌کند + # - تعداد اسناد را نمایش می‌دهد + # -----------------------------end---------------------------- + def zipFileToElastic(self, file_path, index_name, limit_pack=-1, fields=[]): + if not os.path.exists(file_path): + print( + "file zip:", file_path, " not exist for imort to elastic : ", index_name + ) + return + + fileNo = 0 + with zipfile.ZipFile(file_path, "r") as zObject: + fileNo += 1 + print( + "=" * 10, + " zip fileNo: ", + fileNo, + " - ( ", + index_name, + " ) | File Numbers:", + len(zObject.namelist()), + "=" * 10, + ) + + packNo = 0 + self.counter = 0 + for filename in zObject.namelist(): + packNo += 1 + if limit_pack != -1: + if packNo > limit_pack: + print("limit_data ", index_name, " ", limit_pack) + break + + print("index:", index_name, "=>", filename) + with zObject.open(filename) as file: + data = json.loads(file.read()) + self.importJsonDataToElastic(data, index_name, fields) + + self.es.indices.refresh(index=index_name) + print(self.es.cat.count(index=index_name, format="json")) + print(" END Of Import to elastic ", index_name, "\n") + + # ----------------------------start--------------------------- + # متد iterateJsonFile: + # نوع ورودی: + # - file_path: مسیر فایل (str) + # - isZip: تعیین نوع فایل (ZIP یا JSON) (bool) - پیش‌فرض True + # - limit_pack: محدودیت تعداد فایل‌ها (int) - پیش‌فرض -1 + # نوع خروجی: ژنراتور + # عملیات: + # - اگر isZip=True باشد: فایل ZIP را پردازش می‌کند + # - اگر isZip=False باشد: فایل JSON را پردازش می‌کند + # - داده‌ها را به صورت ژنراتور برمی‌گرداند + # -----------------------------end---------------------------- + def iterateJsonFile(self, file_path, isZip=True, limit_pack=-1): + if not os.path.exists(file_path): + print("file zip:", file_path, " not exist iterateJsonFile ") + return + + if isZip: + fileNo = 0 + with zipfile.ZipFile(file_path, "r") as zObject: + fileNo += 1 + print( + "=" * 10, + " zip fileNo: ", + fileNo, + " iterateJsonFile - | File Numbers:", + len(zObject.namelist()), + "=" * 10, + ) + + packNo = 0 + self.counter = 0 + for filename in zObject.namelist(): + packNo += 1 + if limit_pack != -1: + if packNo > limit_pack: + print("limit_data iterateJsonFile ", limit_pack) + break + + print("index iterateJsonFile :", "=>", filename) + with zObject.open(filename) as file: + data = json.loads(file.read()) + # Yield each entry + # yield data + yield from ( + {"source": hit["_source"], "id": hit["_id"]} for hit in data + ) + else: + with open(filename, "r", encoding="utf-8") as file: + data = json.loads(file.read()) + # Yield each entry + # yield from (hit for hit in data) + # return data + yield from ( + {"source": hit["_source"], "id": hit["_id"]} for hit in data + ) + + # ----------------------------start--------------------------- + # متد es_iterate_all_documents: + # نوع ورودی: + # - index: نام اندیس (str) + # - body: بدنه جستجو (dict) - پیش‌فرض خالی + # - pagesize: اندازه صفحه (int) - پیش‌فرض 250 + # - scroll_timeout: زمان اسکرول (str) - پیش‌فرض "25m" + # - **kwargs: پارامترهای اضافی + # نوع خروجی: ژنراتور + # عملیات: + # - تمام اسناد اندیس را با استفاده از scroll API دریافت می‌کند + # - پیشرفت عملیات را نمایش می‌دهد + # - داده‌ها را به صورت ژنراتور برمی‌گرداند + # -----------------------------end---------------------------- + def es_iterate_all_documents( + self, index, body="", pagesize=250, scroll_timeout="25m", **kwargs + ): + """ + Helper to iterate ALL values from a single index + Yields all the documents. + """ + is_first = True + while True: + # Scroll next + if is_first: # Initialize scroll + # result = self.es.search(index=index, scroll="2m", **kwargs, body={ + # "size": pagesize + # }) + if body: + result = self.es.search( + index=index, + scroll=scroll_timeout, + **kwargs, + size=pagesize, + body=body + ) + else: + result = self.es.search( + index=index, scroll=scroll_timeout, **kwargs, size=pagesize + ) + + self.total = result["hits"]["total"]["value"] + if self.total > 0: + print("total = %d" % self.total) + is_first = False + else: + # result = es.scroll(body={ + # "scroll_id": scroll_id, + # "scroll": scroll_timeout + # }) + result = self.es.scroll(scroll_id=scroll_id, scroll=scroll_timeout) + + scroll_id = result["_scroll_id"] + hits = result["hits"]["hits"] + self.counter += len(hits) + if self.total > 0: + print("progress -> %.2f %%" % ((self.counter / self.total) * 100)) + # Stop after no more docs + if not hits: + break + # Yield each entry + yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits) + + def removeCustomFileds( + self, index_name_i, fields=[], renameFileds={}, body={}, bulk_update_size=200 + ): + try: + _list = [] + try: + _list = self.es_iterate_all_documents(index_name_i, body) + except Exception as e: + print(e) + + bulk_data = [] + count = 0 + total_count = 0 + for mentry in _list: + count += 1 + + entry = mentry["source"] + id = mentry["id"] + # print(id) + eid = id + + # if (count % 100) == 0 : + # print("%s -> %.2f " % (id , (count / self.total) if self.total > 0 else 0)) + + data_filled = False + data = entry + for col in fields: + if col in data: + del data[col] + + elif "." in col: + cols = col.split(".") + subsource = entry + for sub in cols: + dCol = subsource.get(sub, None) + if dCol: + subsource = dCol + else: + break + + for col in renameFileds.items(): + if col in data: + dCol = data[col] + data[renameFileds[col]] = dCol + del data[col] + + bulk_data.append({"_id": eid, "_source": data}) + + # انتقال دسته جمعی کدها به الاستیک + if len(bulk_data) >= bulk_update_size: + total_count += len(bulk_data) + print( + "=" * 5, + " update bulk --> ", + "total=" + str(total_count), + str(count), + ) + self.importBulkList(bulk_data, index_name_i) + bulk_data = [] + + if len(bulk_data) >= 0: + total_count += len(bulk_data) + print( + "=" * 5, + " update bulk --> ", + "total=" + str(total_count), + str(count), + ) + self.importBulkList(bulk_data, index_name_i) + bulk_data = [] + + except Exception as e: + # print("1111") + print(e) + + # save_error(id, e) + + # ----------------------------start--------------------------- + # متد moveCustomFileds: + # نوع ورودی: + # - index_name_i: نام اندیس منبع (str) + # - index_name_o: نام اندیس مقصد (str) + # - fields: لیست فیلدهای مورد انتقال (list) - پیش‌فرض خالی + # - renameFileds: دیکشنری تغییر نام فیلدها (dict) - پیش‌فرض خالی + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - تمام اسناد اندیس منبع را دریافت می‌کند + # - فیلدهای مشخص شده را استخراج و به اندیس مقصد انتقال می‌دهد + # - اگر renameFileds وجود داشته باشد، نام فیلدها را تغییر می‌دهد + # -----------------------------end---------------------------- + def moveCustomFileds(self, index_name_i, index_name_o, fields=[], renameFileds={}): + try: + body = {} + list = [] + try: + list = self.es_iterate_all_documents(index_name_i) + except Exception as e: + print(e) + + count = 0 + for mentry in list: + count += 1 + + entry = mentry["source"] + id = mentry["id"] + # print(id) + eid = id + + if (count % 100) == 0: + print( + "%s -> %.2f " + % (id, (count / self.total) if self.total > 0 else 0) + ) + + data_filled = False + data = {} + for col in fields: + + if "." in col: + cols = col.split(".") + subsource = entry + for sub in cols: + dCol = subsource.get(sub, None) + if dCol: + subsource = dCol + else: + break + else: + dCol = entry.get(col, None) + + if dCol is None: + continue + + if col in renameFileds: + data[renameFileds[col]] = dCol + else: + data[col] = dCol + + data_filled = True + + if not data_filled: + continue + + try: + resp = self.update_index_doc(True, index_name_o, eid, data) + except Exception as e: + print(e) + # save_error(id, e) + + except Exception as e: + # print("1111") + print(e) + + # save_error(id, e) + + # ----------------------------start--------------------------- + # متد mappingIndex: + # نوع ورودی: + # - index_name_i: نام اندیس (str) + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - توضیح می‌دهد که تغییر مپینگ از طریق پایتون امکان‌پذیر نیست + # - باید اندیس جدیدی با مپینگ مطلوب ایجاد و رایندکس شود + # -----------------------------end---------------------------- + def mappingIndex(self, index_name_i): + # فقط از طریق کیبانا میشه تغییر مپ داد + + # با پایتون نمیشه + # باید ایندکس جدیدی با مپ مطلوب ایجاد کرد و رایندکس کرد + pass + + # ----------------------------start--------------------------- + # متد updateByQueryIndex: + # نوع ورودی: + # - index_name_i: نام اندیس (str) + # - body: بدنه آپدیت (dict) + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - اسناد را با استفاده از update_by_query API آپدیت می‌کند + # - در صورت خطا، پیام خطا نمایش داده می‌شود + # -----------------------------end---------------------------- + def updateByQueryIndex(self, index_name_i, body): + ## sample + # body = { + # "script": { + # "inline": "ctx._source.Device='Test'", + # "lang": "painless" + # }, + # "query": { + # "match": { + # "Device": "Boiler" + # } + # } + # } + try: + self.es.update_by_query(body=body, index=index_name_i) + + except Exception as e: + print(e) + # save_error(id, e) + + # ----------------------------start--------------------------- + # متد deleteByQueryIndex: + # نوع ورودی: + # - index_name_i: نام اندیس (str) + # - body: بدنه حذف (dict) + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - اسناد را با استفاده از delete_by_query API حذف می‌کند + # - در صورت خطا، پیام خطا نمایش داده می‌شود + # -----------------------------end---------------------------- + def deleteByQueryIndex(self, index_name_i, body): + ## sample + # body = { + # "query": { + # "match": { + # "Device": "Boiler" + # } + # } + # } + try: + self.es.delete_by_query(index=index_name_i, body=body) + + except Exception as e: + print(e) + # save_error(id, e) + + # ----------------------------start--------------------------- + # متد delete_by_ids: + # نوع ورودی: + # - index_name_i: نام اندیس (str) + # - ids: لیست شناسه‌ها (list) + # نوع خروجی: بدون خروجی مستقیم + # عملیات: + # - اسناد با شناسه‌های مشخص شده را حذف می‌کند + # - در صورت خطا، پیام خطا نمایش داده می‌شود + # -----------------------------end---------------------------- + def delete_by_ids(self, index_name_i, ids): + try: + # ids = ['test1', 'test2', 'test3'] + + query = {"query": {"terms": {"_id": ids}}} + res = self.es.delete_by_query(index=index_name_i, body=query) + print(res) + + except Exception as e: + print(e) + # save_error(id, e) diff --git a/monir/llm_helper.py b/monir/llm_helper.py new file mode 100644 index 0000000..60739a3 --- /dev/null +++ b/monir/llm_helper.py @@ -0,0 +1,368 @@ +from typing import List +from pathlib import Path +import os, orjson, time, json, re, asyncio, traceback +from openai import AsyncOpenAI + +# -------------------------------------------------------------------- + + +# ------------------------------ پردازش API ------------------------------ +class AsyncCore: + def __init__( + self, + model_name, + task_name, + data_path, + output_schema, + api_url, + reasoning_effort='low', + top_p=1, + temperature=0.0, + max_token=128000, + output_path=None, + ai_code_version=None, + request_timeout=30, # ثانیه + api_key="EMPTY", + save_number=2, + ): + + self.save_number = save_number + # json file of data + self.data_path = data_path + + self.task_name = task_name + if output_path is None: + output_path = f"./{task_name}" + + self.output_path = Path(output_path) + self._temp_path = self.output_path / "batch_data" + self._temp_processed_id_path = self._temp_path / "processed_id.json" + + # Create output directory and subdirectories if they don't exist + self.output_path.mkdir(parents=True, exist_ok=True) + self._temp_path.mkdir(parents=True, exist_ok=True) + # self._temp_processed_id_path.mkdir(parents=True, exist_ok=True) + + self.request_timeout = request_timeout + self.model_name = model_name + self.api_key = api_key + self.output_schema = output_schema + self.api_url = api_url + self.reasoning_effort = reasoning_effort + self.top_p = top_p + self.temperature = temperature + self.max_token = max_token + + if ai_code_version is None: + ai_code_version = f"{model_name}_{reasoning_effort}" + self.ai_code_version = ai_code_version + + self.PRIMARY_KEY = {"system_prompt", "user_prompt", "id"} + + try: + self.data = self.__data_process() + print(f"📦 Loaded {len(self.data)} words") + except Exception as e: + raise ValueError( + f"Data loading/validation failed: {e}\n{traceback.format_exc()}" + ) + + def __validate_item(self, item, idx): + # Mandatory fields + for key in self.PRIMARY_KEY: + if key not in item: + raise ValueError(f"Missing mandatory key '{key}' in item #{idx}") + if not isinstance(item[key], str): + raise TypeError( + f"Item #{idx}: '{key}' must be a string, got {type(item[key]).__name__}" + ) + + # Optional field: assistant_prompt + if "assistant_prompt" not in item or item["assistant_prompt"] is None: + item["assistant_prompt"] = None + else: + if not isinstance(item["assistant_prompt"], str): + raise TypeError( + f"Item #{idx}: 'assistant_prompt' must be a string or absent, got {type(item['assistant_prompt']).__name__}" + ) + + return item # now normalized + + def __data_process(self): + raw_data = self.__load_orjson(self.data_path) + if not isinstance(raw_data, list): + raise ValueError("Data must be a list of dictionaries.") + + processed_data = [] + for idx, item in enumerate(raw_data): + if not isinstance(item, dict): + raise ValueError(f"Item #{idx} is not a dictionary.") + validated_item = self.__validate_item(item, idx) + processed_data.append(validated_item) + + return processed_data + + def __get_max_number_file(self, directory): + # Pattern to match filenames like out_1.json, out_25.json, etc. + pattern = re.compile(r"output_(\d+)\.json$") + max_num = 0 + + for filename in os.listdir(directory): + match = pattern.match(filename) + if match: + num = int(match.group(1)) + if num > max_num: + max_num = num + return max_num + 1 + + def __load_orjson(self, path: str | Path): + path = Path(path) + with path.open("rb") as f: # باید باینری باز بشه برای orjson + return orjson.loads(f.read()) + + def __save_orjson(self, path, data): + with open(path, "wb") as f: + f.write( + orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) + ) + + def merge_json_dir(self, input_path, output_path): + directory = Path(input_path) + if not directory.is_dir(): + raise ValueError(f"Not valid PATH: {input_path}") + + seen_ids = set() # برای ردیابی idهای دیده‌شده (سریع!) + unique_data = [] # فقط داده‌های یکتا + failed_files = [] + + json_files = list(directory.glob("*.json")) + if not json_files: + print("⚠️ NO JSON File Found In This PATH") + return + + for json_file in json_files: + try: + data = self.__load_orjson(json_file) + if not data: # خالی یا None + failed_files.append(json_file.name) + continue + + if isinstance(data, list) and isinstance(data[0], dict): + for item in data: + item_id = item.get("id") + if item_id is None: + # اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی + # اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند + continue + if item_id not in seen_ids: + seen_ids.add(item_id) + unique_data.append(item) + else: + raise ValueError(f"no list available in this json -> {json_file}") + except ( + json.JSONDecodeError, + ValueError, + OSError, + KeyError, + TypeError, + ) as e: + # print(f"❌ Failed in process '{json_file.name}': {e}") + failed_files.append(json_file.name) + + # گزارش خطاها + if failed_files: + print("\n❌ We lose this file:") + for name in failed_files: + print(f" - {name}") + else: + print("\n✅ All JSON added") + + # ذخیره خروجی + try: + self.__save_orjson(data=unique_data, path=output_path) + print( + f"\n💾 Final file saved: {output_path} (Total unique items: {len(unique_data)})" + ) + except Exception as e: + print(f"❌ Error in saving final file: {e}") + + def make_new_proccessed_ids_from_file(self, json_in, out_path): + data = self.__load_orjson(json_in) + + finall_data = [] + for d in data: + if d["id"]: + finall_data.append(d["id"]) + finall_data = set(finall_data) + finall_data = list(finall_data) + print(f"-- len ids {len(finall_data)}") + + self.__save_orjson(data=finall_data, path=out_path) + + # ------------------------------ Main ------------------------------ + async def __process_item(self, client, item): + try: + messages = [ + {"role": "system", "content": item["system_prompt"]}, + {"role": "user", "content": item["user_prompt"]}, + ] + if item.get("assistant_prompt"): + messages.append( + {"role": "assistant", "content": item["assistant_prompt"]} + ) + + response = await client.chat.completions.parse( + model=self.model_name, + messages=messages, + temperature=self.temperature, + top_p=self.top_p, + reasoning_effort=self.reasoning_effort, + max_tokens=self.max_token, + stop=None, + response_format=self.output_schema, + ) + + parsed = ( + response.choices[0].message.parsed + if response and response.choices and response.choices[0].message.parsed + else {"raw_text": str(response)} + ) + + parsed = self.output_schema.model_validate(parsed) + parsed = dict(parsed) + parsed["ai_code_version"] = self.ai_code_version + parsed["id"] = item["id"] + return parsed, 200 + + except asyncio.TimeoutError: + print(f"⏳ Timeout on item {item['id']}") + return None, 408 + + except Exception as e: + print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}") + return None, 400 + + def async_eval(self, processed_id: List = []): + try: + asyncio.run(self.__async_eval(processed_id)) + except KeyboardInterrupt: + print("\n🛑 Interrupted by user.") + traceback.print_exc() + + async def __async_eval(self, processed_id: List): + """ + اجرای اصلی تک‌هسته‌ای و async برای تولید خروجی نهایی. + """ + print("🔹 Starting async data processing...") + + # ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------ + if not processed_id: + try: + processed_id = self.__load_orjson(self._temp_processed_id_path) + print( + f"📂 Loaded existing processed_id from {self._temp_processed_id_path}" + ) + except Exception: + print("⚠️ No valid processed_id found. Starting fresh.") + processed_id = [] + + # ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------ + all_processed_id = set(processed_id) + all_results = [] + total_time = [] + + data = [item for item in self.data if item.get("id") not in all_processed_id] + print( + f"➕ Total items: {len(self.data)} - {len(all_processed_id)} = {len(data)}" + ) + + # اگر چیزی برای پردازش نیست + if not data: + print("✅ Nothing new to process. All items are already done.") + return + + # ------------------ مرحله ۳: شروع پردازش ------------------ + print(f"🤖 Model: {self.model_name} | Reasoning: {self.reasoning_effort}") + async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: + semaphore = asyncio.Semaphore(5) + + async def limited_process(item): + async with semaphore: + return await self.__process_item(client, item) + + tasks = [asyncio.create_task(limited_process(item)) for item in data] + + total_i = 0 + # ✅ پردازش به ترتیب تکمیل (نه ترتیب لیست) + for i, task in enumerate(asyncio.as_completed(tasks), start=1): + start = time.time() + try: + parsed, status_code = await asyncio.wait_for( + task, timeout=self.request_timeout + ) # ⏱ حداکثر 2 دقیقه + except asyncio.TimeoutError: + print(f"⏳ Task {i} timed out completely") + parsed, status_code = None, 408 + total_time.append(time.time() - start) + + if status_code == 200: + all_results.append(parsed) + all_processed_id.add(parsed.get("id")) + else: + print(f"⚠️ Skipped item {parsed.get('id')} (status={status_code})") + + total_i += 1 + # ✅ ذخیره‌ی موقت هر n مورد + if total_i >= self.save_number: + print(f"total_i {total_i}") + print(f"self.save_number {self.save_number}") + total_i = 0 + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + number = self.__get_max_number_file(self._temp_path) + print(f"number {number}") + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson(data=list(all_results), path=temp_output_path) + print(f"💾 Auto-saved partial data: {len(all_results)}") + all_results.clear() + + # ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده + if total_i > 0 or len(all_results) > 0: + print("💾 Final save of remaining data...") + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + number = self.__get_max_number_file(self._temp_path) + print(f"number {number}") + + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson(data=list(all_results), path=temp_output_path) + print(f"💾 Auto-saved partial data: {len(all_results)}") + all_results.clear() + + # ------------------ مرحله ۴: ذخیره خروجی ------------------ + final_data_path = self.output_path / f"final_data_{self.task_name}.json" + processed_id_path = self.output_path / "processed_id.json" + + self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path) + all_results = self.__load_orjson(final_data_path) + # make_new_proccessed_ids_from_file() + self.__save_orjson(data=list(all_processed_id), path=processed_id_path) + self.__save_orjson(data=all_results, path=final_data_path) + + avg_time = (sum(total_time) / len(total_time)) if total_time else 0 + print( + f"\n✅ Processing completed!\n" + f"📊 Total-Data: {len(data)} | " + f"⭕ Ignored-Data: {len(processed_id)} | " + f"📦 Proccessed-Data: {len(all_results)} | " + f"❌ Loss-Data: {len(data)-len(all_results)} | " + f"🕒 Avg Time: {avg_time:.2f}'s per item | " + f"🕒 Total Time: {sum(total_time):.4f}'s | " + f"💾 Results saved to: {final_data_path}" + ) diff --git a/monir/main.py b/monir/main.py new file mode 100644 index 0000000..8bed261 --- /dev/null +++ b/monir/main.py @@ -0,0 +1,88 @@ +from dotenv import load_dotenv +import os +from llm_helper import AsyncCore +from es_helper import ElasticHelper +from base_model import MnMeet +import time, traceback, uuid, orjson, re +from datetime import datetime, timezone +from elasticsearch.helpers import scan +from typing import Union +from pathlib import Path +from collections import defaultdict +from typing import List + +load_dotenv() +ES_URL = os.getenv("ES_URL") +ES_USER_NAME = os.getenv("ES_USER_NAME") +ES_PASSWORD = os.getenv("ES_PASSWORD") +LLM_URL = os.getenv("LLM_URL") + +def save_orjson(path, data): + with open(path, "wb") as f: + f.write( + orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) + ) + +def load_orjson(path: str | Path): + path = Path(path) + with path.open("rb") as f: # باید باینری باز بشه برای orjson + return orjson.loads(f.read()) + +# --------------------------- flow +term_index_name = "mn_term" +meet_index_name = "mn_meet" +ment_index_name = "mn_meet_entity" +sections_index_name = "" +dash = "-" * 25 + +es_helper = ElasticHelper( + es_url=ES_URL, + es_user=ES_USER_NAME, + es_pass=ES_PASSWORD, +) + +############ DELETE INDEXES +# es_helper.deleteIndex(index_name=term_index_name) +# es_helper.deleteIndex(index_name=meet_index_name) +# es_helper.deleteIndex(index_name=ment_index_name) + +############ CREATE INDEXES +# es_helper.createIndexIfNotExist(index_name_o=term_index_name) +# es_helper.createIndexIfNotExist(index_name_o=meet_index_name) +# es_helper.createIndexIfNotExist(index_name_o=ment_index_name) + + +es = es_helper.es +# fields = list(MnMeet.model_fields.keys()) +fields = [ + "id", + "sanad_id", + "main_type", + "title", + "author", + "content", +] +# old_data = es_helper.search( +# index=old_index_name, _source=fields, query={"match_all": {}}, size=3 +# ) +# old_data = old_data["hits"]["hits"] # don't use in scan +################### for all data +old_data = list( + scan( + es, + index=meet_index_name, + query={ + "_source": fields, + "query": + { "term": + { + "main_type": "جلسه علمی"}}, + # {"match_all": {}}, + }, + ) +) +print(f'--- old_data {len(old_data)}') +save_orjson( + data=old_data, + path='./data_content_1.json' +) \ No newline at end of file diff --git a/monir/requirements.txt b/monir/requirements.txt new file mode 100644 index 0000000..981f548 --- /dev/null +++ b/monir/requirements.txt @@ -0,0 +1,4 @@ +python-dotenv +openai +elasticsearch==8.13.0 +orjson \ No newline at end of file