DataCleaning/monir/es_helper.py
2025-11-27 20:31:12 +00:00

1135 lines
44 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import zipfile
import sys
import os
import json
from time import sleep
from elasticsearch import Elasticsearch, helpers
class ElasticHelper:
"""
کلاس ElasticHelper:
نوع ورودی: بدون ورودی مستقیم در تعریف کلاس
نوع خروجی: شیء از نوع ElasticHelper
عملیات:
- متغیرهای کلاسی برای شمارش و مدیریت عملیات تعریف می‌کند
- مسیر پیش‌فرض مپینگ‌ها را تنظیم می‌کند
"""
counter = 0
total = 0
id = ""
path_mappings = os.getcwd() + "/repo/_other/"
def __init__(
self,
es_url="http://127.0.0.1:6900",
es_pass="",
es_user="elastic",
path_mappings="",
):
"""
نوع ورودی:
- es_url: آدرس Elasticsearch (str) - پیش‌فرض "http://127.0.0.1:6900"
- es_pass: رمز عبور (str) - پیش‌فرض خالی
- es_user: نام کاربری (str) - پیش‌فرض "elastic"
- path_mappings: مسیر مپینگ‌ها (str) - پیش‌فرض خالی
نوع خروجی: شیء ElasticHelper
عملیات:
- اتصال به Elasticsearch را برقرار می‌کند
- در صورت وجود رمز عبور، از احراز هویت استفاده می‌کند
- تا 10 بار برای اتصال مجدد تلاش می‌کند (هر بار 5 ثانیه انتظار)
- در صورت عدم موفقیت، پیام خطا نمایش داده می‌شود
"""
if path_mappings:
self.path_mappings = path_mappings
if es_pass == "":
self.es = Elasticsearch(es_url)
else:
self.es = Elasticsearch(
es_url,
basic_auth=(es_user, es_pass),
verify_certs=False,
)
# print(es_url)
# print(self.es)
self.success_connect = False
for a in range(0, 10):
try:
if not self.es.ping():
print("Elastic Connection Not ping, sleep 30 s : ", a)
sleep(5)
continue
else:
self.success_connect = True
break
except Exception as e:
break
if not self.success_connect:
print("******", "not access to elastic service")
return
self.counter = 0
self.total = 0
self.id = ""
def search(self, **params):
try:
res = self.es.search(**params)
except:
return {"hits": {"hits": []}}
return res
def get_document(self, index_name, id):
res = self.es.get(index=index_name, id=id)
return res
def exist_document(self, index_name, id):
res = self.es.exists(index=index_name, id=id)
return res
def update_index_doc(self, is_update_state, index_name_o, eid, data):
"""
نوع ورودی:
- is_update_state: تعیین عملیات (update یا index) (bool)
- index_name_o: نام اندیس (str)
- eid: شناسه سند (str)
- data: داده‌های سند (dict)
نوع خروجی: پاسخ Elasticsearch (dict)
عملیات:
- اگر is_update_state=True باشد: سند را آپدیت می‌کند
- در غیر این صورت: سند جدید ایجاد می‌کند
"""
if is_update_state:
resp = self.es.update(index=index_name_o, id=eid, doc=data)
# resp = self.es.update(index=index_name_o, id=eid, body={'doc':data})
else:
resp = self.es.index(index=index_name_o, id=eid, document=data)
return resp
# ----------------------------start---------------------------
# متد exportToJsonForAI:
# نوع ورودی:
# - path_back: مسیر ذخیره فایل (str)
# - index_name: نام اندیس (str)
# - out_name: نام فایل خروجی (str) - پیش‌فرض خالی
# - body: بدنه جستجو (dict) - پیش‌فرض خالی
# - fields: لیست فیلدهای مورد نیاز (list) - پیش‌فرض خالی
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - داده‌های اندیس را با استفاده از scroll API دریافت می‌کند
# - پیشرفت عملیات را نمایش می‌دهد
# - داده‌ها را در فایل JSON ذخیره می‌کند
# - اگر fields مشخص شده باشد، فقط فیلدهای مورد نظر را استخراج می‌کند
# -----------------------------end----------------------------
def exportToJsonForAI(
self,
path_back,
index_name,
out_name="",
body={},
fields=[],
mode="normal",
collapse_field="",
):
print("*" * 50, " start backup -->", index_name)
self.counter = 0
sid = None
out = out_name
if out_name == "":
out = index_name
s_res = self.es.search(index=index_name, scroll="5m", size=1000, body=body)
self.total = s_res["hits"]["total"]["value"]
print("start index = %s" % index_name)
print("total = %d" % self.total)
sid = s_res["_scroll_id"]
scroll_size = len(s_res["hits"]["hits"])
pack_count = 1
out_json = []
if mode == "dict" or collapse_field:
out_json = {}
prev_collapse_value = ""
pack_collapse = []
while scroll_size > 0:
"Scrolling..."
self.counter += scroll_size
print("progress -> %.2f %%" % ((self.counter / self.total) * 100))
#### for test
# if pack_count > 2 :
# break
pack_count += 1
#############################
for item in s_res["hits"]["hits"]:
id = item["_id"]
item2 = None
if mode == "id": # فقط شناسه ها نیاز هست
item2 = id
elif fields:
item2 = {}
item2["id"] = id
for kf in fields:
# print(kf)
if kf in item["_source"]:
# print(item['_source'][kf])
item2[kf] = item["_source"][kf]
elif "." in kf:
cols = kf.split(".")
subsource = item["_source"]
for sub in cols:
if sub in subsource:
subsource = subsource[sub]
continue
else:
break
key = kf.replace(".", "__")
item2[key] = subsource
# exit()
else:
item2 = {}
item2 = item
if collapse_field and collapse_field in item["_source"]:
collapse_value = item["_source"][collapse_field]
if not prev_collapse_value:
prev_collapse_value = collapse_value
if not collapse_value == prev_collapse_value:
out_json[prev_collapse_value] = pack_collapse
pack_collapse = []
prev_collapse_value = collapse_value
pack_collapse.append(item2)
elif mode == "dict":
out_json[id] = item2
else:
out_json.append(item2)
s_res = self.es.scroll(scroll_id=sid, scroll="2m", request_timeout=100000)
sid = s_res["_scroll_id"]
scroll_size = len(s_res["hits"]["hits"])
sid = None
if collapse_field and prev_collapse_value and pack_collapse:
out_json[prev_collapse_value] = pack_collapse
with open(path_back + "/" + out, "w", encoding="utf-8") as fout:
json.dump(out_json, fout, ensure_ascii=False, indent=4)
##############################
# ----------------------------start---------------------------
# متد backupIndexToZipfile:
# نوع ورودی:
# - path_back: مسیر ذخیره فایل (str)
# - index_name: نام اندیس (str)
# - out_name: نام فایل خروجی (str) - پیش‌فرض خالی
# - body: بدنه جستجو (dict) - پیش‌فرض {"size":1000}
# - byzip: تعیین فرمت خروجی (bool) - پیش‌فرض True
# - fields: لیست فیلدهای مورد نیاز (list) - پیش‌فرض خالی
# - noFields: لیست فیلدهای مورد حذف (list) - پیش‌فرض خالی
# نوع خروجی: bool (True اگر داده وجود داشته باشد)
# عملیات:
# - داده‌های اندیس را با استفاده از scroll API دریافت می‌کند
# - پیشرفت عملیات را نمایش می‌دهد
# - داده‌ها را در فایل ZIP یا JSON ذخیره می‌کند
# - اگر fields مشخص شده باشد، فقط فیلدهای مورد نظر را استخراج می‌کند
# - اگر noFields مشخص شده باشد، فیلدهای مورد نظر را حذف می‌کند
# -----------------------------end----------------------------
def backupIndexToZipfile(
self,
path_back,
index_name,
file_name="",
body={"size": 1000},
byzip=True,
fields=[],
noFields=[],
):
print("*" * 50, " start backup -->", index_name)
self.counter = 0
sid = None
out = index_name
if file_name == "":
file_name = index_name
if body == {}:
s_res = self.es.search(index=index_name, scroll="5m", size=1000)
else:
s_res = self.es.search(index=index_name, scroll="5m", body=body)
self.total = s_res["hits"]["total"]["value"]
if self.total == 0:
print("total index_name by query = %d" % self.total)
return False
if byzip:
fout = zipfile.ZipFile(path_back + "/" + file_name + ".zip", "w")
else:
fout = open(path_back + "/" + file_name + ".json", "a+", encoding="utf-8")
print("start index = %s" % index_name)
print("total = %d" % self.total)
sid = s_res["_scroll_id"]
scroll_size = len(s_res["hits"]["hits"])
file_count = 1
prev_percent = 0
while scroll_size > 0:
"Scrolling..."
self.counter += scroll_size
percent = int((self.counter / self.total) * 100)
if percent != prev_percent:
print("progress -> %.2f %%" % percent)
prev_percent = percent
#############################
out_json = []
for item in s_res["hits"]["hits"]:
if fields:
item2 = {}
item2["id"] = item["_id"]
item2["_source"] = {}
for kf in fields:
if kf in item["_source"]:
item2["_source"][kf] = item["_source"][kf]
else:
item2 = item
if noFields:
for kf in noFields:
if kf in item2["_source"]:
del item2["_source"][kf]
out_json.append(item2)
text = json.dumps(out_json, ensure_ascii=False)
out_json = []
if byzip:
filename = out + str(file_count) + ".json"
file_count += 1
fout.writestr(filename, text.encode("utf-8"), zipfile.ZIP_DEFLATED)
else:
fout.write(text)
##############################
s_res = self.es.scroll(scroll_id=sid, scroll="2m", request_timeout=100000)
sid = s_res["_scroll_id"]
scroll_size = len(s_res["hits"]["hits"])
sid = None
fout.close()
# ----------------------------start---------------------------
# متد restorFileToElastic:
# نوع ورودی:
# - path_back: مسیر فایل (str)
# - index_name: نام اندیس (str)
# - app_key: کلید برنامه برای مپینگ (str) - پیش‌فرض خالی
# - queryDelete: تعیین حذف اندیس قبل از بازیابی (bool) - پیش‌فرض True
# - map_name: نام فایل مپینگ (str) - پیش‌فرض خالی
# نوع خروجی: bool
# عملیات:
# - وجود فایل ZIP را بررسی می‌کند
# - اگر queryDelete=True باشد، از کاربر تأیید حذف را می‌گیرد
# - اندیس را ایجاد می‌کند
# - داده‌ها را از فایل ZIP به Elasticsearch بازیابی می‌کند
# -----------------------------end----------------------------
def restorFileToElastic(
self, path_back, index_name, app_key="", queryDelete=True, map_name=""
):
if not os.path.exists(path_back):
print(" **** error *** path not exist: ", path_back)
return False
file_path = path_back + "/" + index_name + ".zip"
if not os.path.exists(file_path):
return False
if queryDelete:
# اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند
if self.deleteIndex(index_name):
self.createIndex(index_name, app_key, map_name)
self.zipFileToElastic(file_path, index_name)
else: # اگر وجود داشته باشد پرش می کند و کاری نمیکند
self.createIndex(index_name, app_key, map_name)
self.zipFileToElastic(file_path, index_name)
def restorFileToElastic2(
self, path_file, index_name, app_key="", queryDelete=True, map_name=""
):
if not os.path.exists(path_file):
print(" **** error *** path not exist: ", path_file)
return False
file_path = path_file
if not os.path.exists(file_path):
return False
if queryDelete:
# اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند
if self.deleteIndex(index_name):
self.createIndex(index_name, app_key, map_name)
self.zipFileToElastic(file_path, index_name)
else: # اگر وجود داشته باشد پرش می کند و کاری نمیکند
self.createIndex(index_name, app_key, map_name)
self.zipFileToElastic(file_path, index_name)
# ----------------------------start---------------------------
# متد renameElasticIndex:
# نوع ورودی:
# - index_name_i: نام اندیس منبع (str)
# - index_name_o: نام اندیس مقصد (str)
# - app_key: کلید برنامه برای مپینگ (str) - پیش‌فرض خالی
# - map_name: نام فایل مپینگ (str) - پیش‌فرض خالی
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - اندیس مقصد را ایجاد می‌کند
# - با استفاده از reindex API، داده‌ها را از اندیس منبع به مقصد منتقل می‌کند
# - پیشرفت عملیات را نمایش می‌دهد
# -----------------------------end----------------------------
def renameElasticIndex(self, index_name_i, index_name_o, app_key="", map_name=""):
if self.createIndex(index_name_o, app_key, map_name):
res = self.es.reindex(
body={
"source": {"index": index_name_i},
"dest": {"index": index_name_o},
},
wait_for_completion=False,
)
print(type(res))
print(res)
taskid = res["task"] if res["task"] else ""
# tasks = client.TasksClient(self.es)
tasks = self.es.tasks
while True:
res = tasks.get(task_id=taskid)
if res["completed"]:
break
# print( res["task"])
print(
"----",
index_name_o,
" imported : ",
res["task"]["status"]["total"],
" / ",
res["task"]["status"]["created"],
)
sleep(1)
print("----", index_name_o, " complated")
# ----------------------------start---------------------------
# متد deleteIndex:
# نوع ورودی:
# - index_name: نام اندیس (str)
# نوع خروجی: bool
# عملیات:
# - وجود اندیس را بررسی می‌کند
# - از کاربر تأیید حذف را می‌گیرد
# - در صورت تأیید، اندیس را حذف می‌کند
# -----------------------------end----------------------------
def deleteIndex(self, index_name):
if not self.es.indices.exists(index=index_name):
print(" " * 10, " for delete NOT exist index :", index_name)
return True
question = "Is DELETE elastic index (" + index_name + ") ? "
if self.query_yes_no(question):
self.es.indices.delete(index=index_name)
print("%" * 10, " Finish DELETE index :", index_name)
return True
else:
return False
# ----------------------------start---------------------------
# متد query_yes_no:
# نوع ورودی:
# - question: سوال نمایش داده شده (str)
# - default: پاسخ پیش‌فرض (str) - پیش‌فرض "no"
# نوع خروجی: bool
# عملیات:
# - سوال را به کاربر نمایش می‌دهد
# - پاسخ کاربر را دریافت و اعتبارسنجی می‌کند
# - True برای 'yes'/'y' و False برای 'no'/'n' برمی‌گرداند
# -----------------------------end----------------------------
def query_yes_no(self, question, default="no"):
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
print("%" * 10, " quistion ", "%" * 10, "\n")
sys.stdout.write(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
elif choice in valid:
return valid[choice]
else:
sys.stdout.write(
"لطفا یکی از موارد روبرو را وارد کنید : 'yes' or 'no' "
"(or 'y' or 'n').\n"
)
def createIndexIfNotExist(self, index_name_o, mapping_o=""):
"""
# نوع ورودی:
# - index_name_o: نام اندیس (str)
# - mapping_o: مپینگ اندیس (str) - پیش‌فرض خالی
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - وجود اندیس را بررسی می‌کند
# - در صورت عدم وجود، اندیس را با مپینگ مشخص شده ایجاد می‌کند
"""
try:
if not self.es.indices.exists(index=index_name_o):
response = self.es.indices.create(index=index_name_o, body=mapping_o)
# print out the response:
print("create index response:", response)
except:
print("....... index exist ! ... not created")
# ----------------------------start---------------------------
# متد createIndex:
# نوع ورودی:
# - index_name: نام اندیس (str)
# - app_key: کلید برنامه برای مپینگ (str) - پیش‌فرض خالی
# - map_name: نام فایل مپینگ (str) - پیش‌فرض خالی
# نوع خروجی: bool
# عملیات:
# - وجود اندیس را بررسی می‌کند
# - مسیر فایل مپینگ را تعیین می‌کند
# - فایل مپینگ را خوانده و اندیس را ایجاد می‌کند
# - در صورت عدم یافت فایل مپینگ، خطا نمایش داده می‌شود
# -----------------------------end----------------------------
def createIndex(self, index_name, app_key="", map_name=""):
path_base = self.path_mappings
path_mapping1 = path_base + "general/"
if app_key == "":
app_key = "tavasi"
path_mapping2 = path_base + app_key + "/"
if map_name == "":
map_name = index_name
if self.es.indices.exists(index=index_name):
print("============== exist index :", index_name)
return True
if map_name == "mj_rg_section" or map_name == "semantic_search":
map_name = "mj_qa_section"
elif map_name[-3] == "_ai":
map_name = [0 - len(map_name) - 3]
print(map_name)
mapping_file_path = path_mapping1 + map_name + ".json"
print("mapping_file_path : ", mapping_file_path)
if not os.path.isfile(mapping_file_path):
if not os.path.isfile(mapping_file_path):
mapping_file_path = path_mapping2 + map_name + ".json"
print("mapping_file_path : ", mapping_file_path)
# Create Index With Mapping
if os.path.isfile(mapping_file_path):
mapping_file = open(mapping_file_path, "r", encoding="utf-8")
mapping_file_read = mapping_file.read()
mapping_data = json.loads(mapping_file_read)
mapping_file.close()
if self.es.indices.exists(index=index_name):
print("============== exist index :", index_name)
else:
self.es.indices.create(index=index_name, body=mapping_data)
return True
else:
print("*** error not find maping file elastic : *******", mapping_file_path)
return False
# ----------------------------start---------------------------
# متد updateBulkList:
# نوع ورودی:
# - listData: لیست داده‌ها (list)
# - index_name: نام اندیس (str)
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - داده‌ها را به صورت bulk آپدیت می‌کند
# - از helpers.bulk Elasticsearch استفاده می‌کند
# -----------------------------end----------------------------
def updateBulkList(self, listData, index_name):
chunk_size = 100000
raise_on_error = False
raise_on_exception = False
stats_only = True
yield_ok = False
actions = []
for item in listData:
actions.append(
{
"_op_type": "update",
"_index": index_name,
"_id": item["_id"],
"doc": item["_source"],
}
)
helpers.bulk(
self.es,
actions,
chunk_size,
raise_on_error,
raise_on_exception,
stats_only,
yield_ok,
)
# ----------------------------start---------------------------
# متد importBulkList:
# نوع ورودی:
# - listData: لیست داده‌ها (list)
# - index_name: نام اندیس (str)
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - داده‌ها را به صورت bulk وارد می‌کند
# - از helpers.bulk Elasticsearch استفاده می‌کند
# -----------------------------end----------------------------
def importBulkList(self, listData, index_name):
chunk_size = 100000
raise_on_error = False
raise_on_exception = False
stats_only = True
yield_ok = False
for item in listData:
actions = [
{
"_op_type": "index",
"_index": index_name,
"_id": item["_id"],
"_source": item["_source"],
}
]
helpers.bulk(
self.es,
actions,
chunk_size,
raise_on_error,
raise_on_exception,
stats_only,
yield_ok,
)
# ----------------------------start---------------------------
# متد importJsonDataToElastic:
# نوع ورودی:
# - jsonData: داده‌های JSON (list)
# - index_name: نام اندیس (str)
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - داده‌ها را به صورت bulk وارد می‌کند
# - از helpers.bulk Elasticsearch استفاده می‌کند
# -----------------------------end----------------------------
def importJsonDataToElastic(self, jsonData, index_name, fields=[]):
chunk_size = 100000
raise_on_error = False
raise_on_exception = False
stats_only = True
yield_ok = False
actions = []
for item in jsonData:
id = item["_id"] if item["_id"] else item["id"]
source = item["_source"]
if fields:
source = {}
for col in fields:
if col in item["_source"]:
source[col] = item["_source"]
actions.append(
{
"_op_type": "index",
"_index": index_name,
"_id": id,
"_source": source,
}
)
helpers.bulk(
self.es,
actions,
chunk_size,
raise_on_error,
raise_on_exception,
stats_only,
yield_ok,
)
# ----------------------------start---------------------------
# متد fileToElastic:
# نوع ورودی:
# - file_path: مسیر فایل (str)
# - index_name: نام اندیس (str)
# - limit_pack: محدودیت تعداد بسته‌ها (int) - پیش‌فرض -1
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - فایل JSON را خوانده و داده‌ها را به Elasticsearch وارد می‌کند
# - اندیس را refresh می‌کند
# - تعداد اسناد را نمایش می‌دهد
# -----------------------------end----------------------------
def fileToElastic(self, file_path, index_name, limit_pack=-1, fields=[]):
if not os.path.exists(file_path):
print("file zip:", file_path, " not exist")
return
print("index:", index_name, "=>", file_path)
self.counter = 0
with open(file_path) as file:
data = json.loads(file.read())
self.importJsonDataToElastic(data, index_name, fields)
self.es.indices.refresh(index=index_name)
print(self.es.cat.count(index=index_name, format="json"))
# ----------------------------start---------------------------
# متد zipFileToElastic:
# نوع ورودی:
# - file_path: مسیر فایل ZIP (str)
# - index_name: نام اندیس (str)
# - limit_pack: محدودیت تعداد فایل‌ها (int) - پیش‌فرض -1
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - فایل ZIP را باز کرده و هر فایل JSON داخل آن را پردازش می‌کند
# - داده‌ها را به Elasticsearch وارد می‌کند
# - اندیس را refresh می‌کند
# - تعداد اسناد را نمایش می‌دهد
# -----------------------------end----------------------------
def zipFileToElastic(self, file_path, index_name, limit_pack=-1, fields=[]):
if not os.path.exists(file_path):
print(
"file zip:", file_path, " not exist for imort to elastic : ", index_name
)
return
fileNo = 0
with zipfile.ZipFile(file_path, "r") as zObject:
fileNo += 1
print(
"=" * 10,
" zip fileNo: ",
fileNo,
" - ( ",
index_name,
" ) | File Numbers:",
len(zObject.namelist()),
"=" * 10,
)
packNo = 0
self.counter = 0
for filename in zObject.namelist():
packNo += 1
if limit_pack != -1:
if packNo > limit_pack:
print("limit_data ", index_name, " ", limit_pack)
break
print("index:", index_name, "=>", filename)
with zObject.open(filename) as file:
data = json.loads(file.read())
self.importJsonDataToElastic(data, index_name, fields)
self.es.indices.refresh(index=index_name)
print(self.es.cat.count(index=index_name, format="json"))
print(" END Of Import to elastic ", index_name, "\n")
# ----------------------------start---------------------------
# متد iterateJsonFile:
# نوع ورودی:
# - file_path: مسیر فایل (str)
# - isZip: تعیین نوع فایل (ZIP یا JSON) (bool) - پیش‌فرض True
# - limit_pack: محدودیت تعداد فایل‌ها (int) - پیش‌فرض -1
# نوع خروجی: ژنراتور
# عملیات:
# - اگر isZip=True باشد: فایل ZIP را پردازش می‌کند
# - اگر isZip=False باشد: فایل JSON را پردازش می‌کند
# - داده‌ها را به صورت ژنراتور برمی‌گرداند
# -----------------------------end----------------------------
def iterateJsonFile(self, file_path, isZip=True, limit_pack=-1):
if not os.path.exists(file_path):
print("file zip:", file_path, " not exist iterateJsonFile ")
return
if isZip:
fileNo = 0
with zipfile.ZipFile(file_path, "r") as zObject:
fileNo += 1
print(
"=" * 10,
" zip fileNo: ",
fileNo,
" iterateJsonFile - | File Numbers:",
len(zObject.namelist()),
"=" * 10,
)
packNo = 0
self.counter = 0
for filename in zObject.namelist():
packNo += 1
if limit_pack != -1:
if packNo > limit_pack:
print("limit_data iterateJsonFile ", limit_pack)
break
print("index iterateJsonFile :", "=>", filename)
with zObject.open(filename) as file:
data = json.loads(file.read())
# Yield each entry
# yield data
yield from (
{"source": hit["_source"], "id": hit["_id"]} for hit in data
)
else:
with open(filename, "r", encoding="utf-8") as file:
data = json.loads(file.read())
# Yield each entry
# yield from (hit for hit in data)
# return data
yield from (
{"source": hit["_source"], "id": hit["_id"]} for hit in data
)
# ----------------------------start---------------------------
# متد es_iterate_all_documents:
# نوع ورودی:
# - index: نام اندیس (str)
# - body: بدنه جستجو (dict) - پیش‌فرض خالی
# - pagesize: اندازه صفحه (int) - پیش‌فرض 250
# - scroll_timeout: زمان اسکرول (str) - پیش‌فرض "25m"
# - **kwargs: پارامترهای اضافی
# نوع خروجی: ژنراتور
# عملیات:
# - تمام اسناد اندیس را با استفاده از scroll API دریافت می‌کند
# - پیشرفت عملیات را نمایش می‌دهد
# - داده‌ها را به صورت ژنراتور برمی‌گرداند
# -----------------------------end----------------------------
def es_iterate_all_documents(
self, index, body="", pagesize=250, scroll_timeout="25m", **kwargs
):
"""
Helper to iterate ALL values from a single index
Yields all the documents.
"""
is_first = True
while True:
# Scroll next
if is_first: # Initialize scroll
# result = self.es.search(index=index, scroll="2m", **kwargs, body={
# "size": pagesize
# })
if body:
result = self.es.search(
index=index,
scroll=scroll_timeout,
**kwargs,
size=pagesize,
body=body
)
else:
result = self.es.search(
index=index, scroll=scroll_timeout, **kwargs, size=pagesize
)
self.total = result["hits"]["total"]["value"]
if self.total > 0:
print("total = %d" % self.total)
is_first = False
else:
# result = es.scroll(body={
# "scroll_id": scroll_id,
# "scroll": scroll_timeout
# })
result = self.es.scroll(scroll_id=scroll_id, scroll=scroll_timeout)
scroll_id = result["_scroll_id"]
hits = result["hits"]["hits"]
self.counter += len(hits)
if self.total > 0:
print("progress -> %.2f %%" % ((self.counter / self.total) * 100))
# Stop after no more docs
if not hits:
break
# Yield each entry
yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits)
def removeCustomFileds(
self, index_name_i, fields=[], renameFileds={}, body={}, bulk_update_size=200
):
try:
_list = []
try:
_list = self.es_iterate_all_documents(index_name_i, body)
except Exception as e:
print(e)
bulk_data = []
count = 0
total_count = 0
for mentry in _list:
count += 1
entry = mentry["source"]
id = mentry["id"]
# print(id)
eid = id
# if (count % 100) == 0 :
# print("%s -> %.2f " % (id , (count / self.total) if self.total > 0 else 0))
data_filled = False
data = entry
for col in fields:
if col in data:
del data[col]
elif "." in col:
cols = col.split(".")
subsource = entry
for sub in cols:
dCol = subsource.get(sub, None)
if dCol:
subsource = dCol
else:
break
for col in renameFileds.items():
if col in data:
dCol = data[col]
data[renameFileds[col]] = dCol
del data[col]
bulk_data.append({"_id": eid, "_source": data})
# انتقال دسته جمعی کدها به الاستیک
if len(bulk_data) >= bulk_update_size:
total_count += len(bulk_data)
print(
"=" * 5,
" update bulk --> ",
"total=" + str(total_count),
str(count),
)
self.importBulkList(bulk_data, index_name_i)
bulk_data = []
if len(bulk_data) >= 0:
total_count += len(bulk_data)
print(
"=" * 5,
" update bulk --> ",
"total=" + str(total_count),
str(count),
)
self.importBulkList(bulk_data, index_name_i)
bulk_data = []
except Exception as e:
# print("1111")
print(e)
# save_error(id, e)
# ----------------------------start---------------------------
# متد moveCustomFileds:
# نوع ورودی:
# - index_name_i: نام اندیس منبع (str)
# - index_name_o: نام اندیس مقصد (str)
# - fields: لیست فیلدهای مورد انتقال (list) - پیش‌فرض خالی
# - renameFileds: دیکشنری تغییر نام فیلدها (dict) - پیش‌فرض خالی
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - تمام اسناد اندیس منبع را دریافت می‌کند
# - فیلدهای مشخص شده را استخراج و به اندیس مقصد انتقال می‌دهد
# - اگر renameFileds وجود داشته باشد، نام فیلدها را تغییر می‌دهد
# -----------------------------end----------------------------
def moveCustomFileds(self, index_name_i, index_name_o, fields=[], renameFileds={}):
try:
body = {}
list = []
try:
list = self.es_iterate_all_documents(index_name_i)
except Exception as e:
print(e)
count = 0
for mentry in list:
count += 1
entry = mentry["source"]
id = mentry["id"]
# print(id)
eid = id
if (count % 100) == 0:
print(
"%s -> %.2f "
% (id, (count / self.total) if self.total > 0 else 0)
)
data_filled = False
data = {}
for col in fields:
if "." in col:
cols = col.split(".")
subsource = entry
for sub in cols:
dCol = subsource.get(sub, None)
if dCol:
subsource = dCol
else:
break
else:
dCol = entry.get(col, None)
if dCol is None:
continue
if col in renameFileds:
data[renameFileds[col]] = dCol
else:
data[col] = dCol
data_filled = True
if not data_filled:
continue
try:
resp = self.update_index_doc(True, index_name_o, eid, data)
except Exception as e:
print(e)
# save_error(id, e)
except Exception as e:
# print("1111")
print(e)
# save_error(id, e)
# ----------------------------start---------------------------
# متد mappingIndex:
# نوع ورودی:
# - index_name_i: نام اندیس (str)
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - توضیح می‌دهد که تغییر مپینگ از طریق پایتون امکان‌پذیر نیست
# - باید اندیس جدیدی با مپینگ مطلوب ایجاد و رایندکس شود
# -----------------------------end----------------------------
def mappingIndex(self, index_name_i):
# فقط از طریق کیبانا میشه تغییر مپ داد
# با پایتون نمیشه
# باید ایندکس جدیدی با مپ مطلوب ایجاد کرد و رایندکس کرد
pass
# ----------------------------start---------------------------
# متد updateByQueryIndex:
# نوع ورودی:
# - index_name_i: نام اندیس (str)
# - body: بدنه آپدیت (dict)
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - اسناد را با استفاده از update_by_query API آپدیت می‌کند
# - در صورت خطا، پیام خطا نمایش داده می‌شود
# -----------------------------end----------------------------
def updateByQueryIndex(self, index_name_i, body):
## sample
# body = {
# "script": {
# "inline": "ctx._source.Device='Test'",
# "lang": "painless"
# },
# "query": {
# "match": {
# "Device": "Boiler"
# }
# }
# }
try:
self.es.update_by_query(body=body, index=index_name_i)
except Exception as e:
print(e)
# save_error(id, e)
# ----------------------------start---------------------------
# متد deleteByQueryIndex:
# نوع ورودی:
# - index_name_i: نام اندیس (str)
# - body: بدنه حذف (dict)
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - اسناد را با استفاده از delete_by_query API حذف می‌کند
# - در صورت خطا، پیام خطا نمایش داده می‌شود
# -----------------------------end----------------------------
def deleteByQueryIndex(self, index_name_i, body):
## sample
# body = {
# "query": {
# "match": {
# "Device": "Boiler"
# }
# }
# }
try:
self.es.delete_by_query(index=index_name_i, body=body)
except Exception as e:
print(e)
# save_error(id, e)
# ----------------------------start---------------------------
# متد delete_by_ids:
# نوع ورودی:
# - index_name_i: نام اندیس (str)
# - ids: لیست شناسه‌ها (list)
# نوع خروجی: بدون خروجی مستقیم
# عملیات:
# - اسناد با شناسه‌های مشخص شده را حذف می‌کند
# - در صورت خطا، پیام خطا نمایش داده می‌شود
# -----------------------------end----------------------------
def delete_by_ids(self, index_name_i, ids):
try:
# ids = ['test1', 'test2', 'test3']
query = {"query": {"terms": {"_id": ids}}}
res = self.es.delete_by_query(index=index_name_i, body=query)
print(res)
except Exception as e:
print(e)
# save_error(id, e)