import zipfile import sys import os import json from time import sleep from elasticsearch import Elasticsearch,helpers class ElasticHelper(): counter = 0 total = 0 id = "" path_mappings = os.getcwd() + '/repo/_other/' def __init__(self, es_url="http://127.0.0.1:6900", es_pass="", es_user="elastic", path_mappings = ""): if path_mappings : self.path_mappings = path_mappings if es_pass == '' : self.es = Elasticsearch(es_url) else: self.es = Elasticsearch( es_url, http_auth=(es_user, es_pass), ) # print(es_url) # print(self.es) self.success_connect = False for a in range(0,10): try : if not self.es.ping(): print('elastic not ping, sleep 30 s : ', a) sleep(5) continue else: self.success_connect = True break except Exception as e: break if not self.success_connect : print('******','not access to elastic service') return self.counter = 0 self.total = 0 self.id = "" def get_doctument(self, index_name, id): res = self.es.get(index=index_name, id=id) return res def exist_doctument(self, index_name, id): res = self.es.exists(index=index_name, id=id) return res def update_index_doc(self, is_update_state, index_name_o, eid, data): if is_update_state: resp = self.es.update(index=index_name_o, id=eid, doc=data) # resp = self.es.update(index=index_name_o, id=eid, body={'doc':data}) else: resp = self.es.index(index=index_name_o, id=eid, document=data) return resp def exportToJsonForAI(self, path_back, index_name, out_name= '', body={}, fields=[]) : print('*' * 50, ' start backup -->', index_name) self.counter = 0 sid = None out = out_name if out_name == '' : out = index_name fout = open( path_back + "/"+ out + '.json', 'a+' , encoding='utf-8') s_res = self.es.search( index=index_name, scroll='5m', size=1000, body=body ) self.total = s_res["hits"]["total"]['value'] print('start index = %s' % index_name) print('total = %d' % self.total) sid = s_res['_scroll_id'] scroll_size = len(s_res['hits']['hits']) file_count = 1 out_json = [] while scroll_size > 0: "Scrolling..." self.counter += scroll_size print("progress -> %.2f %%" % ((self.counter / self.total)*100)) ############################# for item in s_res['hits']['hits']: if fields : item2={} item2['id']=item['_id'] for kf in fields : #print(kf) if kf in item['_source'] : # print(item['_source'][kf]) item2[kf] = item['_source'][kf] #exit() else : item2=item out_json.append(item2) s_res = self.es.scroll(scroll_id=sid, scroll='2m', request_timeout=100000) sid = s_res['_scroll_id'] scroll_size = len(s_res['hits']['hits']) sid = None text = json.dumps(out_json, ensure_ascii=False) fout.write(text) ############################## def backupIndexToZipfile(self, path_back, index_name, out_name= '', body={}, byzip = True, fields=[], noFields=[]) : print('*' * 50, ' start backup -->', index_name) self.counter = 0 sid = None out = out_name if out_name == '' : out = index_name if body == {} : s_res = self.es.search( index=index_name, scroll='5m', size=1000 ) else: s_res = self.es.search( index=index_name, scroll='5m', size=1000, body=body ) self.total = s_res["hits"]["total"]['value'] if self.total == 0 : print('total index_name by query = %d' % self.total) return False if byzip: fout = zipfile.ZipFile(path_back + "/"+ out + '.zip', 'w') else: fout = open( path_back + "/"+ out + '.json', 'a+' , encoding='utf-8') print('start index = %s' % index_name) print('total = %d' % self.total) sid = s_res['_scroll_id'] scroll_size = len(s_res['hits']['hits']) file_count = 1 while scroll_size > 0: "Scrolling..." self.counter += scroll_size print("progress -> %.2f %%" % ((self.counter / self.total)*100)) ############################# out_json = [] for item in s_res['hits']['hits']: if fields : item2={} item2['id']=item['_id'] item2['_source']={} for kf in fields : if kf in item['_source'] : item2['_source'][kf] = item['_source'][kf] else : item2=item if noFields : for kf in noFields : if kf in item2['_source']: del item2['_source'][kf] out_json.append(item2) text = json.dumps(out_json, ensure_ascii=False) out_json = [] if byzip: filename = out + str(file_count) + '.json' file_count +=1 fout.writestr(filename, text.encode('utf-8'), zipfile.ZIP_DEFLATED ) else: fout.write(text) ############################## s_res = self.es.scroll(scroll_id=sid, scroll='2m', request_timeout=100000) sid = s_res['_scroll_id'] scroll_size = len(s_res['hits']['hits']) sid = None fout.close() def restorFileToElastic(self, path_back, index_name, app_key = '', queryDelete = True, map_name='') : if not os.path.exists(path_back) : print(' **** error *** path not exist: ', path_back) return False file_path = path_back + '/' + index_name + '.zip' if not os.path.exists(file_path ) : return False if queryDelete : # اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند if self.deleteIndex(index_name) : self.createIndex(index_name, app_key, map_name) self.zipFileToElastic(file_path, index_name) else : # اگر وجود داشته باشد پرش می کند و کاری نمیکند self.createIndex(index_name, app_key, map_name) self.zipFileToElastic(file_path, index_name) def restorFileToElastic2(self, path_file, index_name, app_key = '', queryDelete = True, map_name='') : if not os.path.exists(path_file) : print(' **** error *** path not exist: ', path_file) return False file_path = path_file if not os.path.exists(file_path ) : return False if queryDelete : # اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند if self.deleteIndex(index_name) : self.createIndex(index_name, app_key, map_name) self.zipFileToElastic(file_path, index_name) else : # اگر وجود داشته باشد پرش می کند و کاری نمیکند self.createIndex(index_name, app_key, map_name) self.zipFileToElastic(file_path, index_name) def renameElasticIndex(self, index_name_i, index_name_o, app_key = '', map_name='') : if self.createIndex(index_name_o, app_key, map_name) : res = self.es.reindex( body={ "source": {"index": index_name_i}, "dest": {"index": index_name_o} }, wait_for_completion=False) print(type(res)) print(res) taskid = res["task"] if res["task"] else "" #tasks = client.TasksClient(self.es) tasks = self.es.tasks while True : res = tasks.get(task_id = taskid) if res["completed"] : break # print( res["task"]) print( '----', index_name_o, ' imported : ', res["task"]["status"]["total"] , ' / ', res["task"]["status"]["created"]) sleep(1) print( '----', index_name_o, ' complated') def deleteIndex(self, index_name) : if not self.es.indices.exists(index=index_name) : print(' ' * 10, " for delete NOT exist index :", index_name ) return True question = 'Is DELETE elastic index (' + index_name +') ? ' if self.query_yes_no(question) : self.es.indices.delete(index = index_name) print('%' * 10 , " Finish DELETE index :", index_name ) return True else : return False def query_yes_no(self, question, default="no"): valid = { "yes": True, "y": True, "ye": True, "no": False, "n": False } if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("invalid default answer: '%s'" % default) while True: print('%'*10, ' quistion ', '%'*10 , '\n') sys.stdout.write(question + prompt) choice = input().lower() if default is not None and choice == "": return valid[default] elif choice in valid: return valid[choice] else: sys.stdout.write("لطفا یکی از موارد روبرو را وارد کنید : 'yes' or 'no' " "(or 'y' or 'n').\n") def createIndexIfNotExist(self, index_name_o, mapping_o=""): try: if not self.es.indices.exists(index=index_name_o): response = self.es.indices.create(index=index_name_o, body=mapping_o) # print out the response: print("create index response:", response) except: print("....... index exist ! ... not created") def createIndex(self, index_name, app_key='', map_name=''): path_base = self.path_mappings path_mapping1 = path_base + 'general/' if app_key == '' : app_key = 'tavasi' path_mapping2 = path_base + app_key + '/' if map_name == '': map_name = index_name if self.es.indices.exists(index=index_name) : print("============== exist index :", index_name ) return True if map_name == 'mj_rg_section' or map_name == 'semantic_search' : map_name = 'mj_qa_section' elif map_name[-3]=='_ai': map_name=[0-len(map_name)-3] print(map_name) mapping_file_path = path_mapping1 + map_name + '.json' print("mapping_file_path : " , mapping_file_path) if not os.path.isfile(mapping_file_path): if not os.path.isfile(mapping_file_path): mapping_file_path = path_mapping2 + map_name + '.json' print("mapping_file_path : " , mapping_file_path) # Create Index With Mapping if os.path.isfile(mapping_file_path): mapping_file = open( mapping_file_path,'r', encoding='utf-8' ) mapping_file_read = mapping_file.read() mapping_data = json.loads(mapping_file_read) mapping_file.close() if self.es.indices.exists(index=index_name) : print("============== exist index :", index_name ) else : self.es.indices.create(index = index_name , body = mapping_data) return True else: print('*** error not find maping file elastic : *******', mapping_file_path) return False def updateBulkList(self, listData, index_name): chunk_size=1000 raise_on_error=False raise_on_exception=False stats_only=True yield_ok = False actions=[] for item in listData: actions.append({ "_op_type": "update", "_index": index_name, "_id" : item['_id'], "doc": item['_source'] } ) helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok ) def importBulkList(self, listData, index_name): chunk_size=100000 raise_on_error=False raise_on_exception=False stats_only=True yield_ok = False for item in listData: actions = [{ "_op_type": "index", "_index": index_name, "_id" : item['_id'], "_source": item['_source'] } ] helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok ) def importJsonDataToElastic(self, jsonData, index_name, fields=[]): chunk_size=1000 raise_on_error=False raise_on_exception=False stats_only=True yield_ok = False actions=[] for item in jsonData: id = item['_id'] if item['_id'] else item['id'] source = item['_source'] if fields : source = {} for col in fields : if col in item['_source'] : source[col] = item['_source'] actions.append({ "_op_type": "index", "_index": index_name, "_id" : id, "_source": source }) helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok ) def fileToElastic(self, file_path, index_name, limit_pack = -1, fields=[]): if not os.path.exists(file_path): print("file zip:" , file_path , " not exist") return print("index:" , index_name , '=>' , file_path ) self.counter = 0 with open(file_path) as file: data = json.loads(file.read()) self.importJsonDataToElastic(data, index_name, fields) self.es.indices.refresh(index=index_name) print(self.es.cat.count(index=index_name, format="json")) def zipFileToElastic(self, file_path, index_name, limit_pack = -1, fields=[]): if not os.path.exists(file_path): print("file zip:" , file_path , " not exist for imort to elastic : ", index_name ) return fileNo = 0 with zipfile.ZipFile(file_path, 'r') as zObject: fileNo +=1 print("="*10, " zip fileNo: " , fileNo ," - ( ", index_name," ) | File Numbers:" ,len(zObject.namelist()) , "=" * 10) packNo = 0 self.counter = 0 for filename in zObject.namelist(): packNo += 1 if limit_pack != -1 : if packNo > limit_pack : print('limit_data ', index_name, ' ', limit_pack) break print("index:" , index_name , '=>' , filename ) with zObject.open(filename) as file: data = json.loads(file.read()) self.importJsonDataToElastic(data, index_name, fields) self.es.indices.refresh(index=index_name) print(self.es.cat.count(index=index_name, format="json")) print(" END Of Import to elastic ", index_name ,"\n") def iterateJsonFile(self, file_path, isZip=True, limit_pack = -1): if not os.path.exists(file_path): print("file zip:" , file_path , " not exist iterateJsonFile " ) return if isZip : fileNo = 0 with zipfile.ZipFile(file_path, 'r') as zObject: fileNo +=1 print("="*10, " zip fileNo: " , fileNo ," iterateJsonFile - | File Numbers:" ,len(zObject.namelist()) , "=" * 10) packNo = 0 self.counter = 0 for filename in zObject.namelist(): packNo += 1 if limit_pack != -1 : if packNo > limit_pack : print('limit_data iterateJsonFile ', limit_pack) break print("index iterateJsonFile :", '=>' , filename ) with zObject.open(filename) as file: data = json.loads(file.read()) # Yield each entry # yield data yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in data) else : with open(filename, 'r', encoding='utf-8') as file: data = json.loads(file.read()) # Yield each entry # yield from (hit for hit in data) #return data yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in data) def es_iterate_all_documents(self, index, body="", pagesize=250, scroll_timeout="25m", **kwargs): """ Helper to iterate ALL values from a single index Yields all the documents. """ is_first = True while True: # Scroll next if is_first: # Initialize scroll # result = self.es.search(index=index, scroll="2m", **kwargs, body={ # "size": pagesize # }) if body : result = self.es.search( index=index, scroll=scroll_timeout, **kwargs, size=pagesize, body=body ) else : result = self.es.search( index=index, scroll=scroll_timeout, **kwargs, size=pagesize ) self.total = result["hits"]["total"]["value"] if self.total > 0: print("total = %d" % self.total) is_first = False else: # result = es.scroll(body={ # "scroll_id": scroll_id, # "scroll": scroll_timeout # }) result = self.es.scroll(scroll_id=scroll_id, scroll=scroll_timeout) scroll_id = result["_scroll_id"] hits = result["hits"]["hits"] self.counter += len(hits) if self.total > 0 : print("progress -> %.2f %%" % ((self.counter / self.total) * 100)) # Stop after no more docs if not hits: break # Yield each entry yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits) def moveCustomFileds(self, index_name_i, index_name_o, fields=[], renameFileds={}): try: body = {} list = [] try: list = self.es_iterate_all_documents(index_name_i) except Exception as e: print(e) count = 0 for mentry in list: count += 1 entry = mentry["source"] id = mentry["id"] # print(id) eid = id if (count % 100) == 0 : print("%s -> %.2f " % (id , (count / self.total) if self.total > 0 else 0)) data_filled = False data = {} for col in fields: if '.' in col : cols = col.split('.') subsource = entry for sub in cols : dCol = subsource.get(sub, None) if dCol : subsource = dCol else : break else : dCol = entry.get(col, None) if dCol is None: continue if col in renameFileds : data[renameFileds[col]] = dCol else: data[col] = dCol data_filled = True if not data_filled : continue try: resp = self.update_index_doc(True, index_name_o, eid, data) except Exception as e: print(e) # save_error(id, e) except Exception as e: # print("1111") print(e) # save_error(id, e) def mappingIndex(self, index_name_i): # فقط از طریق کیبانا میشه تغییر مپ داد # با پایتون نمیشه # باید ایندکس جدیدی با مپ مطلوب ایجاد کرد و رایندکس کرد pass def updateByQueryIndex(self, index_name_i, body): ## sample # body = { # "script": { # "inline": "ctx._source.Device='Test'", # "lang": "painless" # }, # "query": { # "match": { # "Device": "Boiler" # } # } # } try: self.es.update_by_query(body=body, index=index_name_i) except Exception as e: print(e) # save_error(id, e) def deleteByQueryIndex(self, index_name_i, body): ## sample # body = { # "query": { # "match": { # "Device": "Boiler" # } # } # } try: self.es.delete_by_query(index=index_name_i, body=body ) except Exception as e: print(e) # save_error(id, e) def delete_by_ids(self, index_name_i, ids): try: # ids = ['test1', 'test2', 'test3'] query = {"query": {"terms": {"_id": ids}}} res = self.es.delete_by_query(index=index_name_i, body=query) print(res) except Exception as e: print(e) # save_error(id, e)