From 393aace89c442a8f3747e2a21e707c2e7707c263 Mon Sep 17 00:00:00 2001 From: ajokar Date: Wed, 30 Jul 2025 18:27:30 +0330 Subject: [PATCH] first files --- elastic_helper.py | 677 +++++++++++++++++++++++++++++++++++++++++++++ get_recent_laws.py | 35 +++ 2 files changed, 712 insertions(+) create mode 100644 elastic_helper.py create mode 100644 get_recent_laws.py diff --git a/elastic_helper.py b/elastic_helper.py new file mode 100644 index 0000000..df133f5 --- /dev/null +++ b/elastic_helper.py @@ -0,0 +1,677 @@ +import zipfile +import sys +import os +import json +from time import sleep +from elasticsearch import Elasticsearch,helpers + +class ElasticHelper(): + + counter = 0 + total = 0 + id = "" + path_mappings = os.getcwd() + '/repo/_other/' + + def __init__(self, es_url="http://127.0.0.1:6900", es_pass="", es_user="elastic", path_mappings = ""): + + if path_mappings : + self.path_mappings = path_mappings + + if es_pass == '' : + self.es = Elasticsearch(es_url) + else: + self.es = Elasticsearch( + es_url, + http_auth=(es_user, es_pass), + ) + + # print(es_url) + # print(self.es) + + self.success_connect = False + for a in range(0,10): + try : + if not self.es.ping(): + print('elastic not ping, sleep 30 s : ', a) + sleep(5) + continue + else: + self.success_connect = True + break + + except Exception as e: + break + if not self.success_connect : + print('******','not access to elastic service') + return + + + self.counter = 0 + self.total = 0 + self.id = "" + + + def get_doctument(self, index_name, id): + res = self.es.get(index=index_name, id=id) + return res + + def exist_doctument(self, index_name, id): + res = self.es.exists(index=index_name, id=id) + return res + + def update_index_doc(self, is_update_state, index_name_o, eid, data): + if is_update_state: + resp = self.es.update(index=index_name_o, id=eid, doc=data) + # resp = self.es.update(index=index_name_o, id=eid, body={'doc':data}) + else: + resp = self.es.index(index=index_name_o, id=eid, document=data) + return resp + + + def exportToJsonForAI(self, path_back, index_name, out_name= '', body={}, fields=[]) : + print('*' * 50, ' start backup -->', index_name) + self.counter = 0 + sid = None + + out = out_name + if out_name == '' : + out = index_name + + fout = open( path_back + "/"+ out + '.json', 'a+' , encoding='utf-8') + + s_res = self.es.search( + index=index_name, + scroll='5m', + size=1000, + body=body + ) + self.total = s_res["hits"]["total"]['value'] + + print('start index = %s' % index_name) + print('total = %d' % self.total) + + sid = s_res['_scroll_id'] + scroll_size = len(s_res['hits']['hits']) + file_count = 1 + out_json = [] + while scroll_size > 0: + "Scrolling..." + self.counter += scroll_size + print("progress -> %.2f %%" % ((self.counter / self.total)*100)) + ############################# + for item in s_res['hits']['hits']: + + if fields : + item2={} + item2['id']=item['_id'] + for kf in fields : + #print(kf) + if kf in item['_source'] : + # print(item['_source'][kf]) + item2[kf] = item['_source'][kf] + #exit() + else : + item2=item + + out_json.append(item2) + + + s_res = self.es.scroll(scroll_id=sid, scroll='2m', request_timeout=100000) + sid = s_res['_scroll_id'] + scroll_size = len(s_res['hits']['hits']) + + sid = None + text = json.dumps(out_json, ensure_ascii=False) + fout.write(text) + + ############################## + + def backupIndexToZipfile(self, path_back, index_name, out_name= '', body={}, byzip = True, fields=[], noFields=[]) : + print('*' * 50, ' start backup -->', index_name) + self.counter = 0 + sid = None + + out = out_name + if out_name == '' : + out = index_name + + + if body == {} : + s_res = self.es.search( + index=index_name, + scroll='5m', + size=1000 + ) + else: + s_res = self.es.search( + index=index_name, + scroll='5m', + size=1000, + body=body + ) + + self.total = s_res["hits"]["total"]['value'] + if self.total == 0 : + print('total index_name by query = %d' % self.total) + return False + + if byzip: + fout = zipfile.ZipFile(path_back + "/"+ out + '.zip', 'w') + else: + fout = open( path_back + "/"+ out + '.json', 'a+' , encoding='utf-8') + + + print('start index = %s' % index_name) + print('total = %d' % self.total) + + sid = s_res['_scroll_id'] + scroll_size = len(s_res['hits']['hits']) + file_count = 1 + while scroll_size > 0: + "Scrolling..." + self.counter += scroll_size + print("progress -> %.2f %%" % ((self.counter / self.total)*100)) + ############################# + out_json = [] + for item in s_res['hits']['hits']: + if fields : + item2={} + item2['id']=item['_id'] + item2['_source']={} + for kf in fields : + if kf in item['_source'] : + item2['_source'][kf] = item['_source'][kf] + else : + item2=item + + if noFields : + for kf in noFields : + if kf in item2['_source']: + del item2['_source'][kf] + + + out_json.append(item2) + + + text = json.dumps(out_json, ensure_ascii=False) + out_json = [] + if byzip: + filename = out + str(file_count) + '.json' + file_count +=1 + fout.writestr(filename, text.encode('utf-8'), zipfile.ZIP_DEFLATED ) + else: + fout.write(text) + + ############################## + s_res = self.es.scroll(scroll_id=sid, scroll='2m', request_timeout=100000) + sid = s_res['_scroll_id'] + scroll_size = len(s_res['hits']['hits']) + sid = None + fout.close() + + + def restorFileToElastic(self, path_back, index_name, app_key = '', queryDelete = True, map_name='') : + if not os.path.exists(path_back) : + print(' **** error *** path not exist: ', path_back) + return False + + file_path = path_back + '/' + index_name + '.zip' + if not os.path.exists(file_path ) : + return False + + if queryDelete : + # اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند + if self.deleteIndex(index_name) : + self.createIndex(index_name, app_key, map_name) + self.zipFileToElastic(file_path, index_name) + else : # اگر وجود داشته باشد پرش می کند و کاری نمیکند + self.createIndex(index_name, app_key, map_name) + self.zipFileToElastic(file_path, index_name) + + def restorFileToElastic2(self, path_file, index_name, app_key = '', queryDelete = True, map_name='') : + if not os.path.exists(path_file) : + print(' **** error *** path not exist: ', path_file) + return False + + file_path = path_file + if not os.path.exists(file_path ) : + return False + + if queryDelete : + # اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند + if self.deleteIndex(index_name) : + self.createIndex(index_name, app_key, map_name) + self.zipFileToElastic(file_path, index_name) + else : # اگر وجود داشته باشد پرش می کند و کاری نمیکند + self.createIndex(index_name, app_key, map_name) + self.zipFileToElastic(file_path, index_name) + + + def renameElasticIndex(self, index_name_i, index_name_o, app_key = '', map_name='') : + + if self.createIndex(index_name_o, app_key, map_name) : + res = self.es.reindex( + body={ + "source": {"index": index_name_i}, + "dest": {"index": index_name_o} + }, + wait_for_completion=False) + + print(type(res)) + print(res) + + taskid = res["task"] if res["task"] else "" + #tasks = client.TasksClient(self.es) + tasks = self.es.tasks + while True : + res = tasks.get(task_id = taskid) + if res["completed"] : + break + + # print( res["task"]) + print( '----', index_name_o, ' imported : ', res["task"]["status"]["total"] , ' / ', res["task"]["status"]["created"]) + sleep(1) + print( '----', index_name_o, ' complated') + + + def deleteIndex(self, index_name) : + if not self.es.indices.exists(index=index_name) : + print(' ' * 10, " for delete NOT exist index :", index_name ) + return True + + question = 'Is DELETE elastic index (' + index_name +') ? ' + if self.query_yes_no(question) : + self.es.indices.delete(index = index_name) + print('%' * 10 , " Finish DELETE index :", index_name ) + return True + else : + return False + + def query_yes_no(self, question, default="no"): + valid = { "yes": True, "y": True, "ye": True, "no": False, "n": False } + if default is None: + prompt = " [y/n] " + elif default == "yes": + prompt = " [Y/n] " + elif default == "no": + prompt = " [y/N] " + else: + raise ValueError("invalid default answer: '%s'" % default) + + while True: + print('%'*10, ' quistion ', '%'*10 , '\n') + sys.stdout.write(question + prompt) + choice = input().lower() + if default is not None and choice == "": + return valid[default] + elif choice in valid: + return valid[choice] + else: + sys.stdout.write("لطفا یکی از موارد روبرو را وارد کنید : 'yes' or 'no' " "(or 'y' or 'n').\n") + + def createIndexIfNotExist(self, index_name_o, mapping_o=""): + try: + if not self.es.indices.exists(index=index_name_o): + response = self.es.indices.create(index=index_name_o, body=mapping_o) + # print out the response: + print("create index response:", response) + except: + print("....... index exist ! ... not created") + + + def createIndex(self, index_name, app_key='', map_name=''): + + path_base = self.path_mappings + path_mapping1 = path_base + 'general/' + if app_key == '' : + app_key = 'tavasi' + path_mapping2 = path_base + app_key + '/' + + + if map_name == '': + map_name = index_name + + if self.es.indices.exists(index=index_name) : + print("============== exist index :", index_name ) + return True + + if map_name == 'mj_rg_section' or map_name == 'semantic_search' : + map_name = 'mj_qa_section' + elif map_name[-3]=='_ai': + map_name=[0-len(map_name)-3] + print(map_name) + + mapping_file_path = path_mapping1 + map_name + '.json' + print("mapping_file_path : " , mapping_file_path) + if not os.path.isfile(mapping_file_path): + if not os.path.isfile(mapping_file_path): + mapping_file_path = path_mapping2 + map_name + '.json' + + print("mapping_file_path : " , mapping_file_path) + + # Create Index With Mapping + if os.path.isfile(mapping_file_path): + mapping_file = open( mapping_file_path,'r', encoding='utf-8' ) + mapping_file_read = mapping_file.read() + mapping_data = json.loads(mapping_file_read) + mapping_file.close() + if self.es.indices.exists(index=index_name) : + print("============== exist index :", index_name ) + else : + self.es.indices.create(index = index_name , body = mapping_data) + return True + else: + print('*** error not find maping file elastic : *******', mapping_file_path) + return False + + + def updateBulkList(self, listData, index_name): + chunk_size=1000 + raise_on_error=False + raise_on_exception=False + stats_only=True + yield_ok = False + + actions=[] + for item in listData: + actions.append({ + "_op_type": "update", + "_index": index_name, + "_id" : item['_id'], + "doc": item['_source'] + } + ) + helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok ) + + def importBulkList(self, listData, index_name): + chunk_size=100000 + raise_on_error=False + raise_on_exception=False + stats_only=True + yield_ok = False + + for item in listData: + actions = [{ + "_op_type": "index", + "_index": index_name, + "_id" : item['_id'], + "_source": item['_source'] + } + ] + helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok ) + + + def importJsonDataToElastic(self, jsonData, index_name, fields=[]): + chunk_size=1000 + raise_on_error=False + raise_on_exception=False + stats_only=True + yield_ok = False + + actions=[] + + for item in jsonData: + id = item['_id'] if item['_id'] else item['id'] + source = item['_source'] + if fields : + source = {} + for col in fields : + if col in item['_source'] : + source[col] = item['_source'] + + + actions.append({ + "_op_type": "index", + "_index": index_name, + "_id" : id, + "_source": source + }) + helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok ) + + + def fileToElastic(self, file_path, index_name, limit_pack = -1, fields=[]): + if not os.path.exists(file_path): + print("file zip:" , file_path , " not exist") + return + print("index:" , index_name , '=>' , file_path ) + self.counter = 0 + with open(file_path) as file: + data = json.loads(file.read()) + self.importJsonDataToElastic(data, index_name, fields) + + self.es.indices.refresh(index=index_name) + print(self.es.cat.count(index=index_name, format="json")) + + def zipFileToElastic(self, file_path, index_name, limit_pack = -1, fields=[]): + if not os.path.exists(file_path): + print("file zip:" , file_path , " not exist for imort to elastic : ", index_name ) + return + + fileNo = 0 + with zipfile.ZipFile(file_path, 'r') as zObject: + fileNo +=1 + print("="*10, " zip fileNo: " , fileNo ," - ( ", index_name," ) | File Numbers:" ,len(zObject.namelist()) , "=" * 10) + + packNo = 0 + self.counter = 0 + for filename in zObject.namelist(): + packNo += 1 + if limit_pack != -1 : + if packNo > limit_pack : + print('limit_data ', index_name, ' ', limit_pack) + break + + print("index:" , index_name , '=>' , filename ) + with zObject.open(filename) as file: + data = json.loads(file.read()) + self.importJsonDataToElastic(data, index_name, fields) + + self.es.indices.refresh(index=index_name) + print(self.es.cat.count(index=index_name, format="json")) + print(" END Of Import to elastic ", index_name ,"\n") + + + def iterateJsonFile(self, file_path, isZip=True, limit_pack = -1): + if not os.path.exists(file_path): + print("file zip:" , file_path , " not exist iterateJsonFile " ) + return + + if isZip : + fileNo = 0 + with zipfile.ZipFile(file_path, 'r') as zObject: + fileNo +=1 + print("="*10, " zip fileNo: " , fileNo ," iterateJsonFile - | File Numbers:" ,len(zObject.namelist()) , "=" * 10) + + packNo = 0 + self.counter = 0 + for filename in zObject.namelist(): + packNo += 1 + if limit_pack != -1 : + if packNo > limit_pack : + print('limit_data iterateJsonFile ', limit_pack) + break + + print("index iterateJsonFile :", '=>' , filename ) + with zObject.open(filename) as file: + data = json.loads(file.read()) + # Yield each entry + # yield data + yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in data) + else : + with open(filename, 'r', encoding='utf-8') as file: + data = json.loads(file.read()) + # Yield each entry + # yield from (hit for hit in data) + #return data + yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in data) + + + def es_iterate_all_documents(self, index, body="", pagesize=250, scroll_timeout="25m", **kwargs): + """ + Helper to iterate ALL values from a single index + Yields all the documents. + """ + is_first = True + while True: + # Scroll next + if is_first: # Initialize scroll + # result = self.es.search(index=index, scroll="2m", **kwargs, body={ + # "size": pagesize + # }) + if body : + result = self.es.search( + index=index, + scroll=scroll_timeout, + **kwargs, + size=pagesize, + body=body + ) + else : + result = self.es.search( + index=index, + scroll=scroll_timeout, + **kwargs, + size=pagesize + ) + + self.total = result["hits"]["total"]["value"] + if self.total > 0: + print("total = %d" % self.total) + is_first = False + else: + # result = es.scroll(body={ + # "scroll_id": scroll_id, + # "scroll": scroll_timeout + # }) + result = self.es.scroll(scroll_id=scroll_id, scroll=scroll_timeout) + + scroll_id = result["_scroll_id"] + hits = result["hits"]["hits"] + self.counter += len(hits) + if self.total > 0 : + print("progress -> %.2f %%" % ((self.counter / self.total) * 100)) + # Stop after no more docs + if not hits: + break + # Yield each entry + yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits) + + + def moveCustomFileds(self, index_name_i, index_name_o, fields=[], renameFileds={}): + try: + body = {} + list = [] + try: + list = self.es_iterate_all_documents(index_name_i) + except Exception as e: + print(e) + + count = 0 + for mentry in list: + count += 1 + + entry = mentry["source"] + id = mentry["id"] + # print(id) + eid = id + + if (count % 100) == 0 : + print("%s -> %.2f " % (id , (count / self.total) if self.total > 0 else 0)) + + data_filled = False + data = {} + for col in fields: + + if '.' in col : + cols = col.split('.') + subsource = entry + for sub in cols : + dCol = subsource.get(sub, None) + if dCol : + subsource = dCol + else : + break + else : + dCol = entry.get(col, None) + + if dCol is None: + continue + + if col in renameFileds : + data[renameFileds[col]] = dCol + else: + data[col] = dCol + + data_filled = True + + if not data_filled : + continue + + try: + resp = self.update_index_doc(True, index_name_o, eid, data) + except Exception as e: + print(e) + # save_error(id, e) + + except Exception as e: + # print("1111") + print(e) + + # save_error(id, e) + + def mappingIndex(self, index_name_i): + # فقط از طریق کیبانا میشه تغییر مپ داد + + # با پایتون نمیشه + # باید ایندکس جدیدی با مپ مطلوب ایجاد کرد و رایندکس کرد + pass + + def updateByQueryIndex(self, index_name_i, body): + ## sample + # body = { + # "script": { + # "inline": "ctx._source.Device='Test'", + # "lang": "painless" + # }, + # "query": { + # "match": { + # "Device": "Boiler" + # } + # } + # } + try: + self.es.update_by_query(body=body, index=index_name_i) + + except Exception as e: + print(e) + # save_error(id, e) + + + def deleteByQueryIndex(self, index_name_i, body): + ## sample + # body = { + # "query": { + # "match": { + # "Device": "Boiler" + # } + # } + # } + try: + self.es.delete_by_query(index=index_name_i, body=body ) + + except Exception as e: + print(e) + # save_error(id, e) + + def delete_by_ids(self, index_name_i, ids): + try: + # ids = ['test1', 'test2', 'test3'] + + query = {"query": {"terms": {"_id": ids}}} + res = self.es.delete_by_query(index=index_name_i, body=query) + print(res) + + except Exception as e: + print(e) + # save_error(id, e) + diff --git a/get_recent_laws.py b/get_recent_laws.py new file mode 100644 index 0000000..584f1a5 --- /dev/null +++ b/get_recent_laws.py @@ -0,0 +1,35 @@ +from elastic_helper import ElasticHelper +import datetime +import json + +eh_obj = ElasticHelper() +path = "/home/gpu/data_11/14040423/mj_qa_section.zip" +sections = eh_obj.iterateJsonFile(path, True) + +# تاریخی که داده ها از این تاریخ به بعد، پردازش های مختلف را ندیده است +update_time = datetime.datetime(1403,10,5) + +def get_data_from_date(date): + recent_sections = {} + for i, item in enumerate(sections): + id = item['id'] + source = item['source'] + ts_date = source['ts_date'] + ts_date_standard = datetime.datetime(ts_date.split('/')[0],ts_date.split('/')[1],ts_date.split('/')[2]) + + if ts_date_standard>date: + recent_sections[id] = source + + return recent_sections + +if __name__ == '__main__': + recent_sections = get_data_from_date(update_time) + + with open('./data/recent_sections.json', 'w', encoding='utf-8') as file: + data = json.dump(recent_sections) + file.write(data) + + print('finished!') + + +