import zipfile import sys import os import json from time import sleep from elasticsearch import Elasticsearch, helpers class ElasticHelper: """ کلاس ElasticHelper: نوع ورودی: بدون ورودی مستقیم در تعریف کلاس نوع خروجی: شیء از نوع ElasticHelper عملیات: - متغیرهای کلاسی برای شمارش و مدیریت عملیات تعریف می‌کند - مسیر پیش‌فرض مپینگ‌ها را تنظیم می‌کند """ counter = 0 total = 0 id = "" path_mappings = os.getcwd() + "/repo/_other/" def __init__( self, es_url="http://127.0.0.1:6900", es_pass="", es_user="elastic", path_mappings="", ): """ نوع ورودی: - es_url: آدرس Elasticsearch (str) - پیش‌فرض "http://127.0.0.1:6900" - es_pass: رمز عبور (str) - پیش‌فرض خالی - es_user: نام کاربری (str) - پیش‌فرض "elastic" - path_mappings: مسیر مپینگ‌ها (str) - پیش‌فرض خالی نوع خروجی: شیء ElasticHelper عملیات: - اتصال به Elasticsearch را برقرار می‌کند - در صورت وجود رمز عبور، از احراز هویت استفاده می‌کند - تا 10 بار برای اتصال مجدد تلاش می‌کند (هر بار 5 ثانیه انتظار) - در صورت عدم موفقیت، پیام خطا نمایش داده می‌شود """ if path_mappings: self.path_mappings = path_mappings if es_pass == "": self.es = Elasticsearch(es_url) else: self.es = Elasticsearch( es_url, basic_auth=(es_user, es_pass), verify_certs=False, ) # print(es_url) # print(self.es) self.success_connect = False for a in range(0, 10): try: if not self.es.ping(): print("Elastic Connection Not ping, sleep 30 s : ", a) sleep(5) continue else: self.success_connect = True break except Exception as e: break if not self.success_connect: print("******", "not access to elastic service") return self.counter = 0 self.total = 0 self.id = "" def search(self, **params): try: res = self.es.search(**params) except: return {"hits": {"hits": []}} return res def get_document(self, index_name, id): res = self.es.get(index=index_name, id=id) return res def exist_document(self, index_name, id): res = self.es.exists(index=index_name, id=id) return res def update_index_doc(self, is_update_state, index_name_o, eid, data): """ نوع ورودی: - is_update_state: تعیین عملیات (update یا index) (bool) - index_name_o: نام اندیس (str) - eid: شناسه سند (str) - data: داده‌های سند (dict) نوع خروجی: پاسخ Elasticsearch (dict) عملیات: - اگر is_update_state=True باشد: سند را آپدیت می‌کند - در غیر این صورت: سند جدید ایجاد می‌کند """ if is_update_state: resp = self.es.update(index=index_name_o, id=eid, doc=data) # resp = self.es.update(index=index_name_o, id=eid, body={'doc':data}) else: resp = self.es.index(index=index_name_o, id=eid, document=data) return resp # ----------------------------start--------------------------- # متد exportToJsonForAI: # نوع ورودی: # - path_back: مسیر ذخیره فایل (str) # - index_name: نام اندیس (str) # - out_name: نام فایل خروجی (str) - پیش‌فرض خالی # - body: بدنه جستجو (dict) - پیش‌فرض خالی # - fields: لیست فیلدهای مورد نیاز (list) - پیش‌فرض خالی # نوع خروجی: بدون خروجی مستقیم # عملیات: # - داده‌های اندیس را با استفاده از scroll API دریافت می‌کند # - پیشرفت عملیات را نمایش می‌دهد # - داده‌ها را در فایل JSON ذخیره می‌کند # - اگر fields مشخص شده باشد، فقط فیلدهای مورد نظر را استخراج می‌کند # -----------------------------end---------------------------- def exportToJsonForAI( self, path_back, index_name, out_name="", body={}, fields=[], mode="normal", collapse_field="", ): print("*" * 50, " start backup -->", index_name) self.counter = 0 sid = None out = out_name if out_name == "": out = index_name s_res = self.es.search(index=index_name, scroll="5m", size=1000, body=body) self.total = s_res["hits"]["total"]["value"] print("start index = %s" % index_name) print("total = %d" % self.total) sid = s_res["_scroll_id"] scroll_size = len(s_res["hits"]["hits"]) pack_count = 1 out_json = [] if mode == "dict" or collapse_field: out_json = {} prev_collapse_value = "" pack_collapse = [] while scroll_size > 0: "Scrolling..." self.counter += scroll_size print("progress -> %.2f %%" % ((self.counter / self.total) * 100)) #### for test # if pack_count > 2 : # break pack_count += 1 ############################# for item in s_res["hits"]["hits"]: id = item["_id"] item2 = None if mode == "id": # فقط شناسه ها نیاز هست item2 = id elif fields: item2 = {} item2["id"] = id for kf in fields: # print(kf) if kf in item["_source"]: # print(item['_source'][kf]) item2[kf] = item["_source"][kf] elif "." in kf: cols = kf.split(".") subsource = item["_source"] for sub in cols: if sub in subsource: subsource = subsource[sub] continue else: break key = kf.replace(".", "__") item2[key] = subsource # exit() else: item2 = {} item2 = item if collapse_field and collapse_field in item["_source"]: collapse_value = item["_source"][collapse_field] if not prev_collapse_value: prev_collapse_value = collapse_value if not collapse_value == prev_collapse_value: out_json[prev_collapse_value] = pack_collapse pack_collapse = [] prev_collapse_value = collapse_value pack_collapse.append(item2) elif mode == "dict": out_json[id] = item2 else: out_json.append(item2) s_res = self.es.scroll(scroll_id=sid, scroll="2m", request_timeout=100000) sid = s_res["_scroll_id"] scroll_size = len(s_res["hits"]["hits"]) sid = None if collapse_field and prev_collapse_value and pack_collapse: out_json[prev_collapse_value] = pack_collapse with open(path_back + "/" + out, "w", encoding="utf-8") as fout: json.dump(out_json, fout, ensure_ascii=False, indent=4) ############################## # ----------------------------start--------------------------- # متد backupIndexToZipfile: # نوع ورودی: # - path_back: مسیر ذخیره فایل (str) # - index_name: نام اندیس (str) # - out_name: نام فایل خروجی (str) - پیش‌فرض خالی # - body: بدنه جستجو (dict) - پیش‌فرض {"size":1000} # - byzip: تعیین فرمت خروجی (bool) - پیش‌فرض True # - fields: لیست فیلدهای مورد نیاز (list) - پیش‌فرض خالی # - noFields: لیست فیلدهای مورد حذف (list) - پیش‌فرض خالی # نوع خروجی: bool (True اگر داده وجود داشته باشد) # عملیات: # - داده‌های اندیس را با استفاده از scroll API دریافت می‌کند # - پیشرفت عملیات را نمایش می‌دهد # - داده‌ها را در فایل ZIP یا JSON ذخیره می‌کند # - اگر fields مشخص شده باشد، فقط فیلدهای مورد نظر را استخراج می‌کند # - اگر noFields مشخص شده باشد، فیلدهای مورد نظر را حذف می‌کند # -----------------------------end---------------------------- def backupIndexToZipfile( self, path_back, index_name, file_name="", body={"size": 1000}, byzip=True, fields=[], noFields=[], ): print("*" * 50, " start backup -->", index_name) self.counter = 0 sid = None out = index_name if file_name == "": file_name = index_name if body == {}: s_res = self.es.search(index=index_name, scroll="5m", size=1000) else: s_res = self.es.search(index=index_name, scroll="5m", body=body) self.total = s_res["hits"]["total"]["value"] if self.total == 0: print("total index_name by query = %d" % self.total) return False if byzip: fout = zipfile.ZipFile(path_back + "/" + file_name + ".zip", "w") else: fout = open(path_back + "/" + file_name + ".json", "a+", encoding="utf-8") print("start index = %s" % index_name) print("total = %d" % self.total) sid = s_res["_scroll_id"] scroll_size = len(s_res["hits"]["hits"]) file_count = 1 prev_percent = 0 while scroll_size > 0: "Scrolling..." self.counter += scroll_size percent = int((self.counter / self.total) * 100) if percent != prev_percent: print("progress -> %.2f %%" % percent) prev_percent = percent ############################# out_json = [] for item in s_res["hits"]["hits"]: if fields: item2 = {} item2["id"] = item["_id"] item2["_source"] = {} for kf in fields: if kf in item["_source"]: item2["_source"][kf] = item["_source"][kf] else: item2 = item if noFields: for kf in noFields: if kf in item2["_source"]: del item2["_source"][kf] out_json.append(item2) text = json.dumps(out_json, ensure_ascii=False) out_json = [] if byzip: filename = out + str(file_count) + ".json" file_count += 1 fout.writestr(filename, text.encode("utf-8"), zipfile.ZIP_DEFLATED) else: fout.write(text) ############################## s_res = self.es.scroll(scroll_id=sid, scroll="2m", request_timeout=100000) sid = s_res["_scroll_id"] scroll_size = len(s_res["hits"]["hits"]) sid = None fout.close() # ----------------------------start--------------------------- # متد restorFileToElastic: # نوع ورودی: # - path_back: مسیر فایل (str) # - index_name: نام اندیس (str) # - app_key: کلید برنامه برای مپینگ (str) - پیش‌فرض خالی # - queryDelete: تعیین حذف اندیس قبل از بازیابی (bool) - پیش‌فرض True # - map_name: نام فایل مپینگ (str) - پیش‌فرض خالی # نوع خروجی: bool # عملیات: # - وجود فایل ZIP را بررسی می‌کند # - اگر queryDelete=True باشد، از کاربر تأیید حذف را می‌گیرد # - اندیس را ایجاد می‌کند # - داده‌ها را از فایل ZIP به Elasticsearch بازیابی می‌کند # -----------------------------end---------------------------- def restorFileToElastic( self, path_back, index_name, app_key="", queryDelete=True, map_name="" ): if not os.path.exists(path_back): print(" **** error *** path not exist: ", path_back) return False file_path = path_back + "/" + index_name + ".zip" if not os.path.exists(file_path): return False if queryDelete: # اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند if self.deleteIndex(index_name): self.createIndex(index_name, app_key, map_name) self.zipFileToElastic(file_path, index_name) else: # اگر وجود داشته باشد پرش می کند و کاری نمیکند self.createIndex(index_name, app_key, map_name) self.zipFileToElastic(file_path, index_name) def restorFileToElastic2( self, path_file, index_name, app_key="", queryDelete=True, map_name="" ): if not os.path.exists(path_file): print(" **** error *** path not exist: ", path_file) return False file_path = path_file if not os.path.exists(file_path): return False if queryDelete: # اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند if self.deleteIndex(index_name): self.createIndex(index_name, app_key, map_name) self.zipFileToElastic(file_path, index_name) else: # اگر وجود داشته باشد پرش می کند و کاری نمیکند self.createIndex(index_name, app_key, map_name) self.zipFileToElastic(file_path, index_name) # ----------------------------start--------------------------- # متد renameElasticIndex: # نوع ورودی: # - index_name_i: نام اندیس منبع (str) # - index_name_o: نام اندیس مقصد (str) # - app_key: کلید برنامه برای مپینگ (str) - پیش‌فرض خالی # - map_name: نام فایل مپینگ (str) - پیش‌فرض خالی # نوع خروجی: بدون خروجی مستقیم # عملیات: # - اندیس مقصد را ایجاد می‌کند # - با استفاده از reindex API، داده‌ها را از اندیس منبع به مقصد منتقل می‌کند # - پیشرفت عملیات را نمایش می‌دهد # -----------------------------end---------------------------- def renameElasticIndex(self, index_name_i, index_name_o, app_key="", map_name=""): if self.createIndex(index_name_o, app_key, map_name): res = self.es.reindex( body={ "source": {"index": index_name_i}, "dest": {"index": index_name_o}, }, wait_for_completion=False, ) print(type(res)) print(res) taskid = res["task"] if res["task"] else "" # tasks = client.TasksClient(self.es) tasks = self.es.tasks while True: res = tasks.get(task_id=taskid) if res["completed"]: break # print( res["task"]) print( "----", index_name_o, " imported : ", res["task"]["status"]["total"], " / ", res["task"]["status"]["created"], ) sleep(1) print("----", index_name_o, " complated") # ----------------------------start--------------------------- # متد deleteIndex: # نوع ورودی: # - index_name: نام اندیس (str) # نوع خروجی: bool # عملیات: # - وجود اندیس را بررسی می‌کند # - از کاربر تأیید حذف را می‌گیرد # - در صورت تأیید، اندیس را حذف می‌کند # -----------------------------end---------------------------- def deleteIndex(self, index_name): if not self.es.indices.exists(index=index_name): print(" " * 10, " for delete NOT exist index :", index_name) return True question = "Is DELETE elastic index (" + index_name + ") ? " if self.query_yes_no(question): self.es.indices.delete(index=index_name) print("%" * 10, " Finish DELETE index :", index_name) return True else: return False # ----------------------------start--------------------------- # متد query_yes_no: # نوع ورودی: # - question: سوال نمایش داده شده (str) # - default: پاسخ پیش‌فرض (str) - پیش‌فرض "no" # نوع خروجی: bool # عملیات: # - سوال را به کاربر نمایش می‌دهد # - پاسخ کاربر را دریافت و اعتبارسنجی می‌کند # - True برای 'yes'/'y' و False برای 'no'/'n' برمی‌گرداند # -----------------------------end---------------------------- def query_yes_no(self, question, default="no"): valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("invalid default answer: '%s'" % default) while True: print("%" * 10, " quistion ", "%" * 10, "\n") sys.stdout.write(question + prompt) choice = input().lower() if default is not None and choice == "": return valid[default] elif choice in valid: return valid[choice] else: sys.stdout.write( "لطفا یکی از موارد روبرو را وارد کنید : 'yes' or 'no' " "(or 'y' or 'n').\n" ) def createIndexIfNotExist(self, index_name_o, mapping_o=""): """ # نوع ورودی: # - index_name_o: نام اندیس (str) # - mapping_o: مپینگ اندیس (str) - پیش‌فرض خالی # نوع خروجی: بدون خروجی مستقیم # عملیات: # - وجود اندیس را بررسی می‌کند # - در صورت عدم وجود، اندیس را با مپینگ مشخص شده ایجاد می‌کند """ try: if not self.es.indices.exists(index=index_name_o): response = self.es.indices.create(index=index_name_o, body=mapping_o) # print out the response: print("create index response:", response) except: print("....... index exist ! ... not created") # ----------------------------start--------------------------- # متد createIndex: # نوع ورودی: # - index_name: نام اندیس (str) # - app_key: کلید برنامه برای مپینگ (str) - پیش‌فرض خالی # - map_name: نام فایل مپینگ (str) - پیش‌فرض خالی # نوع خروجی: bool # عملیات: # - وجود اندیس را بررسی می‌کند # - مسیر فایل مپینگ را تعیین می‌کند # - فایل مپینگ را خوانده و اندیس را ایجاد می‌کند # - در صورت عدم یافت فایل مپینگ، خطا نمایش داده می‌شود # -----------------------------end---------------------------- def createIndex(self, index_name, app_key="", map_name=""): path_base = self.path_mappings path_mapping1 = path_base + "general/" if app_key == "": app_key = "tavasi" path_mapping2 = path_base + app_key + "/" if map_name == "": map_name = index_name if self.es.indices.exists(index=index_name): print("============== exist index :", index_name) return True if map_name == "mj_rg_section" or map_name == "semantic_search": map_name = "mj_qa_section" elif map_name[-3] == "_ai": map_name = [0 - len(map_name) - 3] print(map_name) mapping_file_path = path_mapping1 + map_name + ".json" print("mapping_file_path : ", mapping_file_path) if not os.path.isfile(mapping_file_path): if not os.path.isfile(mapping_file_path): mapping_file_path = path_mapping2 + map_name + ".json" print("mapping_file_path : ", mapping_file_path) # Create Index With Mapping if os.path.isfile(mapping_file_path): mapping_file = open(mapping_file_path, "r", encoding="utf-8") mapping_file_read = mapping_file.read() mapping_data = json.loads(mapping_file_read) mapping_file.close() if self.es.indices.exists(index=index_name): print("============== exist index :", index_name) else: self.es.indices.create(index=index_name, body=mapping_data) return True else: print("*** error not find maping file elastic : *******", mapping_file_path) return False # ----------------------------start--------------------------- # متد updateBulkList: # نوع ورودی: # - listData: لیست داده‌ها (list) # - index_name: نام اندیس (str) # نوع خروجی: بدون خروجی مستقیم # عملیات: # - داده‌ها را به صورت bulk آپدیت می‌کند # - از helpers.bulk Elasticsearch استفاده می‌کند # -----------------------------end---------------------------- def updateBulkList(self, listData, index_name): chunk_size = 100000 raise_on_error = False raise_on_exception = False stats_only = True yield_ok = False actions = [] for item in listData: actions.append( { "_op_type": "update", "_index": index_name, "_id": item["_id"], "doc": item["_source"], } ) helpers.bulk( self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok, ) # ----------------------------start--------------------------- # متد importBulkList: # نوع ورودی: # - listData: لیست داده‌ها (list) # - index_name: نام اندیس (str) # نوع خروجی: بدون خروجی مستقیم # عملیات: # - داده‌ها را به صورت bulk وارد می‌کند # - از helpers.bulk Elasticsearch استفاده می‌کند # -----------------------------end---------------------------- def importBulkList(self, listData, index_name): chunk_size = 100000 raise_on_error = False raise_on_exception = False stats_only = True yield_ok = False for item in listData: actions = [ { "_op_type": "index", "_index": index_name, "_id": item["_id"], "_source": item["_source"], } ] helpers.bulk( self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok, ) # ----------------------------start--------------------------- # متد importJsonDataToElastic: # نوع ورودی: # - jsonData: داده‌های JSON (list) # - index_name: نام اندیس (str) # نوع خروجی: بدون خروجی مستقیم # عملیات: # - داده‌ها را به صورت bulk وارد می‌کند # - از helpers.bulk Elasticsearch استفاده می‌کند # -----------------------------end---------------------------- def importJsonDataToElastic(self, jsonData, index_name, fields=[]): chunk_size = 100000 raise_on_error = False raise_on_exception = False stats_only = True yield_ok = False actions = [] for item in jsonData: id = item["_id"] if item["_id"] else item["id"] source = item["_source"] if fields: source = {} for col in fields: if col in item["_source"]: source[col] = item["_source"] actions.append( { "_op_type": "index", "_index": index_name, "_id": id, "_source": source, } ) helpers.bulk( self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok, ) # ----------------------------start--------------------------- # متد fileToElastic: # نوع ورودی: # - file_path: مسیر فایل (str) # - index_name: نام اندیس (str) # - limit_pack: محدودیت تعداد بسته‌ها (int) - پیش‌فرض -1 # نوع خروجی: بدون خروجی مستقیم # عملیات: # - فایل JSON را خوانده و داده‌ها را به Elasticsearch وارد می‌کند # - اندیس را refresh می‌کند # - تعداد اسناد را نمایش می‌دهد # -----------------------------end---------------------------- def fileToElastic(self, file_path, index_name, limit_pack=-1, fields=[]): if not os.path.exists(file_path): print("file zip:", file_path, " not exist") return print("index:", index_name, "=>", file_path) self.counter = 0 with open(file_path) as file: data = json.loads(file.read()) self.importJsonDataToElastic(data, index_name, fields) self.es.indices.refresh(index=index_name) print(self.es.cat.count(index=index_name, format="json")) # ----------------------------start--------------------------- # متد zipFileToElastic: # نوع ورودی: # - file_path: مسیر فایل ZIP (str) # - index_name: نام اندیس (str) # - limit_pack: محدودیت تعداد فایل‌ها (int) - پیش‌فرض -1 # نوع خروجی: بدون خروجی مستقیم # عملیات: # - فایل ZIP را باز کرده و هر فایل JSON داخل آن را پردازش می‌کند # - داده‌ها را به Elasticsearch وارد می‌کند # - اندیس را refresh می‌کند # - تعداد اسناد را نمایش می‌دهد # -----------------------------end---------------------------- def zipFileToElastic(self, file_path, index_name, limit_pack=-1, fields=[]): if not os.path.exists(file_path): print( "file zip:", file_path, " not exist for imort to elastic : ", index_name ) return fileNo = 0 with zipfile.ZipFile(file_path, "r") as zObject: fileNo += 1 print( "=" * 10, " zip fileNo: ", fileNo, " - ( ", index_name, " ) | File Numbers:", len(zObject.namelist()), "=" * 10, ) packNo = 0 self.counter = 0 for filename in zObject.namelist(): packNo += 1 if limit_pack != -1: if packNo > limit_pack: print("limit_data ", index_name, " ", limit_pack) break print("index:", index_name, "=>", filename) with zObject.open(filename) as file: data = json.loads(file.read()) self.importJsonDataToElastic(data, index_name, fields) self.es.indices.refresh(index=index_name) print(self.es.cat.count(index=index_name, format="json")) print(" END Of Import to elastic ", index_name, "\n") # ----------------------------start--------------------------- # متد iterateJsonFile: # نوع ورودی: # - file_path: مسیر فایل (str) # - isZip: تعیین نوع فایل (ZIP یا JSON) (bool) - پیش‌فرض True # - limit_pack: محدودیت تعداد فایل‌ها (int) - پیش‌فرض -1 # نوع خروجی: ژنراتور # عملیات: # - اگر isZip=True باشد: فایل ZIP را پردازش می‌کند # - اگر isZip=False باشد: فایل JSON را پردازش می‌کند # - داده‌ها را به صورت ژنراتور برمی‌گرداند # -----------------------------end---------------------------- def iterateJsonFile(self, file_path, isZip=True, limit_pack=-1): if not os.path.exists(file_path): print("file zip:", file_path, " not exist iterateJsonFile ") return if isZip: fileNo = 0 with zipfile.ZipFile(file_path, "r") as zObject: fileNo += 1 print( "=" * 10, " zip fileNo: ", fileNo, " iterateJsonFile - | File Numbers:", len(zObject.namelist()), "=" * 10, ) packNo = 0 self.counter = 0 for filename in zObject.namelist(): packNo += 1 if limit_pack != -1: if packNo > limit_pack: print("limit_data iterateJsonFile ", limit_pack) break print("index iterateJsonFile :", "=>", filename) with zObject.open(filename) as file: data = json.loads(file.read()) # Yield each entry # yield data yield from ( {"source": hit["_source"], "id": hit["_id"]} for hit in data ) else: with open(filename, "r", encoding="utf-8") as file: data = json.loads(file.read()) # Yield each entry # yield from (hit for hit in data) # return data yield from ( {"source": hit["_source"], "id": hit["_id"]} for hit in data ) # ----------------------------start--------------------------- # متد es_iterate_all_documents: # نوع ورودی: # - index: نام اندیس (str) # - body: بدنه جستجو (dict) - پیش‌فرض خالی # - pagesize: اندازه صفحه (int) - پیش‌فرض 250 # - scroll_timeout: زمان اسکرول (str) - پیش‌فرض "25m" # - **kwargs: پارامترهای اضافی # نوع خروجی: ژنراتور # عملیات: # - تمام اسناد اندیس را با استفاده از scroll API دریافت می‌کند # - پیشرفت عملیات را نمایش می‌دهد # - داده‌ها را به صورت ژنراتور برمی‌گرداند # -----------------------------end---------------------------- def es_iterate_all_documents( self, index, body="", pagesize=250, scroll_timeout="25m", **kwargs ): """ Helper to iterate ALL values from a single index Yields all the documents. """ is_first = True while True: # Scroll next if is_first: # Initialize scroll # result = self.es.search(index=index, scroll="2m", **kwargs, body={ # "size": pagesize # }) if body: result = self.es.search( index=index, scroll=scroll_timeout, **kwargs, size=pagesize, body=body ) else: result = self.es.search( index=index, scroll=scroll_timeout, **kwargs, size=pagesize ) self.total = result["hits"]["total"]["value"] if self.total > 0: print("total = %d" % self.total) is_first = False else: # result = es.scroll(body={ # "scroll_id": scroll_id, # "scroll": scroll_timeout # }) result = self.es.scroll(scroll_id=scroll_id, scroll=scroll_timeout) scroll_id = result["_scroll_id"] hits = result["hits"]["hits"] self.counter += len(hits) if self.total > 0: print("progress -> %.2f %%" % ((self.counter / self.total) * 100)) # Stop after no more docs if not hits: break # Yield each entry yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits) def removeCustomFileds( self, index_name_i, fields=[], renameFileds={}, body={}, bulk_update_size=200 ): try: _list = [] try: _list = self.es_iterate_all_documents(index_name_i, body) except Exception as e: print(e) bulk_data = [] count = 0 total_count = 0 for mentry in _list: count += 1 entry = mentry["source"] id = mentry["id"] # print(id) eid = id # if (count % 100) == 0 : # print("%s -> %.2f " % (id , (count / self.total) if self.total > 0 else 0)) data_filled = False data = entry for col in fields: if col in data: del data[col] elif "." in col: cols = col.split(".") subsource = entry for sub in cols: dCol = subsource.get(sub, None) if dCol: subsource = dCol else: break for col in renameFileds.items(): if col in data: dCol = data[col] data[renameFileds[col]] = dCol del data[col] bulk_data.append({"_id": eid, "_source": data}) # انتقال دسته جمعی کدها به الاستیک if len(bulk_data) >= bulk_update_size: total_count += len(bulk_data) print( "=" * 5, " update bulk --> ", "total=" + str(total_count), str(count), ) self.importBulkList(bulk_data, index_name_i) bulk_data = [] if len(bulk_data) >= 0: total_count += len(bulk_data) print( "=" * 5, " update bulk --> ", "total=" + str(total_count), str(count), ) self.importBulkList(bulk_data, index_name_i) bulk_data = [] except Exception as e: # print("1111") print(e) # save_error(id, e) # ----------------------------start--------------------------- # متد moveCustomFileds: # نوع ورودی: # - index_name_i: نام اندیس منبع (str) # - index_name_o: نام اندیس مقصد (str) # - fields: لیست فیلدهای مورد انتقال (list) - پیش‌فرض خالی # - renameFileds: دیکشنری تغییر نام فیلدها (dict) - پیش‌فرض خالی # نوع خروجی: بدون خروجی مستقیم # عملیات: # - تمام اسناد اندیس منبع را دریافت می‌کند # - فیلدهای مشخص شده را استخراج و به اندیس مقصد انتقال می‌دهد # - اگر renameFileds وجود داشته باشد، نام فیلدها را تغییر می‌دهد # -----------------------------end---------------------------- def moveCustomFileds(self, index_name_i, index_name_o, fields=[], renameFileds={}): try: body = {} list = [] try: list = self.es_iterate_all_documents(index_name_i) except Exception as e: print(e) count = 0 for mentry in list: count += 1 entry = mentry["source"] id = mentry["id"] # print(id) eid = id if (count % 100) == 0: print( "%s -> %.2f " % (id, (count / self.total) if self.total > 0 else 0) ) data_filled = False data = {} for col in fields: if "." in col: cols = col.split(".") subsource = entry for sub in cols: dCol = subsource.get(sub, None) if dCol: subsource = dCol else: break else: dCol = entry.get(col, None) if dCol is None: continue if col in renameFileds: data[renameFileds[col]] = dCol else: data[col] = dCol data_filled = True if not data_filled: continue try: resp = self.update_index_doc(True, index_name_o, eid, data) except Exception as e: print(e) # save_error(id, e) except Exception as e: # print("1111") print(e) # save_error(id, e) # ----------------------------start--------------------------- # متد mappingIndex: # نوع ورودی: # - index_name_i: نام اندیس (str) # نوع خروجی: بدون خروجی مستقیم # عملیات: # - توضیح می‌دهد که تغییر مپینگ از طریق پایتون امکان‌پذیر نیست # - باید اندیس جدیدی با مپینگ مطلوب ایجاد و رایندکس شود # -----------------------------end---------------------------- def mappingIndex(self, index_name_i): # فقط از طریق کیبانا میشه تغییر مپ داد # با پایتون نمیشه # باید ایندکس جدیدی با مپ مطلوب ایجاد کرد و رایندکس کرد pass # ----------------------------start--------------------------- # متد updateByQueryIndex: # نوع ورودی: # - index_name_i: نام اندیس (str) # - body: بدنه آپدیت (dict) # نوع خروجی: بدون خروجی مستقیم # عملیات: # - اسناد را با استفاده از update_by_query API آپدیت می‌کند # - در صورت خطا، پیام خطا نمایش داده می‌شود # -----------------------------end---------------------------- def updateByQueryIndex(self, index_name_i, body): ## sample # body = { # "script": { # "inline": "ctx._source.Device='Test'", # "lang": "painless" # }, # "query": { # "match": { # "Device": "Boiler" # } # } # } try: self.es.update_by_query(body=body, index=index_name_i) except Exception as e: print(e) # save_error(id, e) # ----------------------------start--------------------------- # متد deleteByQueryIndex: # نوع ورودی: # - index_name_i: نام اندیس (str) # - body: بدنه حذف (dict) # نوع خروجی: بدون خروجی مستقیم # عملیات: # - اسناد را با استفاده از delete_by_query API حذف می‌کند # - در صورت خطا، پیام خطا نمایش داده می‌شود # -----------------------------end---------------------------- def deleteByQueryIndex(self, index_name_i, body): ## sample # body = { # "query": { # "match": { # "Device": "Boiler" # } # } # } try: self.es.delete_by_query(index=index_name_i, body=body) except Exception as e: print(e) # save_error(id, e) # ----------------------------start--------------------------- # متد delete_by_ids: # نوع ورودی: # - index_name_i: نام اندیس (str) # - ids: لیست شناسه‌ها (list) # نوع خروجی: بدون خروجی مستقیم # عملیات: # - اسناد با شناسه‌های مشخص شده را حذف می‌کند # - در صورت خطا، پیام خطا نمایش داده می‌شود # -----------------------------end---------------------------- def delete_by_ids(self, index_name_i, ids): try: # ids = ['test1', 'test2', 'test3'] query = {"query": {"terms": {"_id": ids}}} res = self.es.delete_by_query(index=index_name_i, body=query) print(res) except Exception as e: print(e) # save_error(id, e)