first files
This commit is contained in:
commit
393aace89c
677
elastic_helper.py
Normal file
677
elastic_helper.py
Normal file
|
@ -0,0 +1,677 @@
|
|||
import zipfile
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
from time import sleep
|
||||
from elasticsearch import Elasticsearch,helpers
|
||||
|
||||
class ElasticHelper():
|
||||
|
||||
counter = 0
|
||||
total = 0
|
||||
id = ""
|
||||
path_mappings = os.getcwd() + '/repo/_other/'
|
||||
|
||||
def __init__(self, es_url="http://127.0.0.1:6900", es_pass="", es_user="elastic", path_mappings = ""):
|
||||
|
||||
if path_mappings :
|
||||
self.path_mappings = path_mappings
|
||||
|
||||
if es_pass == '' :
|
||||
self.es = Elasticsearch(es_url)
|
||||
else:
|
||||
self.es = Elasticsearch(
|
||||
es_url,
|
||||
http_auth=(es_user, es_pass),
|
||||
)
|
||||
|
||||
# print(es_url)
|
||||
# print(self.es)
|
||||
|
||||
self.success_connect = False
|
||||
for a in range(0,10):
|
||||
try :
|
||||
if not self.es.ping():
|
||||
print('elastic not ping, sleep 30 s : ', a)
|
||||
sleep(5)
|
||||
continue
|
||||
else:
|
||||
self.success_connect = True
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
break
|
||||
if not self.success_connect :
|
||||
print('******','not access to elastic service')
|
||||
return
|
||||
|
||||
|
||||
self.counter = 0
|
||||
self.total = 0
|
||||
self.id = ""
|
||||
|
||||
|
||||
def get_doctument(self, index_name, id):
|
||||
res = self.es.get(index=index_name, id=id)
|
||||
return res
|
||||
|
||||
def exist_doctument(self, index_name, id):
|
||||
res = self.es.exists(index=index_name, id=id)
|
||||
return res
|
||||
|
||||
def update_index_doc(self, is_update_state, index_name_o, eid, data):
|
||||
if is_update_state:
|
||||
resp = self.es.update(index=index_name_o, id=eid, doc=data)
|
||||
# resp = self.es.update(index=index_name_o, id=eid, body={'doc':data})
|
||||
else:
|
||||
resp = self.es.index(index=index_name_o, id=eid, document=data)
|
||||
return resp
|
||||
|
||||
|
||||
def exportToJsonForAI(self, path_back, index_name, out_name= '', body={}, fields=[]) :
|
||||
print('*' * 50, ' start backup -->', index_name)
|
||||
self.counter = 0
|
||||
sid = None
|
||||
|
||||
out = out_name
|
||||
if out_name == '' :
|
||||
out = index_name
|
||||
|
||||
fout = open( path_back + "/"+ out + '.json', 'a+' , encoding='utf-8')
|
||||
|
||||
s_res = self.es.search(
|
||||
index=index_name,
|
||||
scroll='5m',
|
||||
size=1000,
|
||||
body=body
|
||||
)
|
||||
self.total = s_res["hits"]["total"]['value']
|
||||
|
||||
print('start index = %s' % index_name)
|
||||
print('total = %d' % self.total)
|
||||
|
||||
sid = s_res['_scroll_id']
|
||||
scroll_size = len(s_res['hits']['hits'])
|
||||
file_count = 1
|
||||
out_json = []
|
||||
while scroll_size > 0:
|
||||
"Scrolling..."
|
||||
self.counter += scroll_size
|
||||
print("progress -> %.2f %%" % ((self.counter / self.total)*100))
|
||||
#############################
|
||||
for item in s_res['hits']['hits']:
|
||||
|
||||
if fields :
|
||||
item2={}
|
||||
item2['id']=item['_id']
|
||||
for kf in fields :
|
||||
#print(kf)
|
||||
if kf in item['_source'] :
|
||||
# print(item['_source'][kf])
|
||||
item2[kf] = item['_source'][kf]
|
||||
#exit()
|
||||
else :
|
||||
item2=item
|
||||
|
||||
out_json.append(item2)
|
||||
|
||||
|
||||
s_res = self.es.scroll(scroll_id=sid, scroll='2m', request_timeout=100000)
|
||||
sid = s_res['_scroll_id']
|
||||
scroll_size = len(s_res['hits']['hits'])
|
||||
|
||||
sid = None
|
||||
text = json.dumps(out_json, ensure_ascii=False)
|
||||
fout.write(text)
|
||||
|
||||
##############################
|
||||
|
||||
def backupIndexToZipfile(self, path_back, index_name, out_name= '', body={}, byzip = True, fields=[], noFields=[]) :
|
||||
print('*' * 50, ' start backup -->', index_name)
|
||||
self.counter = 0
|
||||
sid = None
|
||||
|
||||
out = out_name
|
||||
if out_name == '' :
|
||||
out = index_name
|
||||
|
||||
|
||||
if body == {} :
|
||||
s_res = self.es.search(
|
||||
index=index_name,
|
||||
scroll='5m',
|
||||
size=1000
|
||||
)
|
||||
else:
|
||||
s_res = self.es.search(
|
||||
index=index_name,
|
||||
scroll='5m',
|
||||
size=1000,
|
||||
body=body
|
||||
)
|
||||
|
||||
self.total = s_res["hits"]["total"]['value']
|
||||
if self.total == 0 :
|
||||
print('total index_name by query = %d' % self.total)
|
||||
return False
|
||||
|
||||
if byzip:
|
||||
fout = zipfile.ZipFile(path_back + "/"+ out + '.zip', 'w')
|
||||
else:
|
||||
fout = open( path_back + "/"+ out + '.json', 'a+' , encoding='utf-8')
|
||||
|
||||
|
||||
print('start index = %s' % index_name)
|
||||
print('total = %d' % self.total)
|
||||
|
||||
sid = s_res['_scroll_id']
|
||||
scroll_size = len(s_res['hits']['hits'])
|
||||
file_count = 1
|
||||
while scroll_size > 0:
|
||||
"Scrolling..."
|
||||
self.counter += scroll_size
|
||||
print("progress -> %.2f %%" % ((self.counter / self.total)*100))
|
||||
#############################
|
||||
out_json = []
|
||||
for item in s_res['hits']['hits']:
|
||||
if fields :
|
||||
item2={}
|
||||
item2['id']=item['_id']
|
||||
item2['_source']={}
|
||||
for kf in fields :
|
||||
if kf in item['_source'] :
|
||||
item2['_source'][kf] = item['_source'][kf]
|
||||
else :
|
||||
item2=item
|
||||
|
||||
if noFields :
|
||||
for kf in noFields :
|
||||
if kf in item2['_source']:
|
||||
del item2['_source'][kf]
|
||||
|
||||
|
||||
out_json.append(item2)
|
||||
|
||||
|
||||
text = json.dumps(out_json, ensure_ascii=False)
|
||||
out_json = []
|
||||
if byzip:
|
||||
filename = out + str(file_count) + '.json'
|
||||
file_count +=1
|
||||
fout.writestr(filename, text.encode('utf-8'), zipfile.ZIP_DEFLATED )
|
||||
else:
|
||||
fout.write(text)
|
||||
|
||||
##############################
|
||||
s_res = self.es.scroll(scroll_id=sid, scroll='2m', request_timeout=100000)
|
||||
sid = s_res['_scroll_id']
|
||||
scroll_size = len(s_res['hits']['hits'])
|
||||
sid = None
|
||||
fout.close()
|
||||
|
||||
|
||||
def restorFileToElastic(self, path_back, index_name, app_key = '', queryDelete = True, map_name='') :
|
||||
if not os.path.exists(path_back) :
|
||||
print(' **** error *** path not exist: ', path_back)
|
||||
return False
|
||||
|
||||
file_path = path_back + '/' + index_name + '.zip'
|
||||
if not os.path.exists(file_path ) :
|
||||
return False
|
||||
|
||||
if queryDelete :
|
||||
# اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند
|
||||
if self.deleteIndex(index_name) :
|
||||
self.createIndex(index_name, app_key, map_name)
|
||||
self.zipFileToElastic(file_path, index_name)
|
||||
else : # اگر وجود داشته باشد پرش می کند و کاری نمیکند
|
||||
self.createIndex(index_name, app_key, map_name)
|
||||
self.zipFileToElastic(file_path, index_name)
|
||||
|
||||
def restorFileToElastic2(self, path_file, index_name, app_key = '', queryDelete = True, map_name='') :
|
||||
if not os.path.exists(path_file) :
|
||||
print(' **** error *** path not exist: ', path_file)
|
||||
return False
|
||||
|
||||
file_path = path_file
|
||||
if not os.path.exists(file_path ) :
|
||||
return False
|
||||
|
||||
if queryDelete :
|
||||
# اگر وجود داشته باشد، از کاربر برای حذفش سوال میکند
|
||||
if self.deleteIndex(index_name) :
|
||||
self.createIndex(index_name, app_key, map_name)
|
||||
self.zipFileToElastic(file_path, index_name)
|
||||
else : # اگر وجود داشته باشد پرش می کند و کاری نمیکند
|
||||
self.createIndex(index_name, app_key, map_name)
|
||||
self.zipFileToElastic(file_path, index_name)
|
||||
|
||||
|
||||
def renameElasticIndex(self, index_name_i, index_name_o, app_key = '', map_name='') :
|
||||
|
||||
if self.createIndex(index_name_o, app_key, map_name) :
|
||||
res = self.es.reindex(
|
||||
body={
|
||||
"source": {"index": index_name_i},
|
||||
"dest": {"index": index_name_o}
|
||||
},
|
||||
wait_for_completion=False)
|
||||
|
||||
print(type(res))
|
||||
print(res)
|
||||
|
||||
taskid = res["task"] if res["task"] else ""
|
||||
#tasks = client.TasksClient(self.es)
|
||||
tasks = self.es.tasks
|
||||
while True :
|
||||
res = tasks.get(task_id = taskid)
|
||||
if res["completed"] :
|
||||
break
|
||||
|
||||
# print( res["task"])
|
||||
print( '----', index_name_o, ' imported : ', res["task"]["status"]["total"] , ' / ', res["task"]["status"]["created"])
|
||||
sleep(1)
|
||||
print( '----', index_name_o, ' complated')
|
||||
|
||||
|
||||
def deleteIndex(self, index_name) :
|
||||
if not self.es.indices.exists(index=index_name) :
|
||||
print(' ' * 10, " for delete NOT exist index :", index_name )
|
||||
return True
|
||||
|
||||
question = 'Is DELETE elastic index (' + index_name +') ? '
|
||||
if self.query_yes_no(question) :
|
||||
self.es.indices.delete(index = index_name)
|
||||
print('%' * 10 , " Finish DELETE index :", index_name )
|
||||
return True
|
||||
else :
|
||||
return False
|
||||
|
||||
def query_yes_no(self, question, default="no"):
|
||||
valid = { "yes": True, "y": True, "ye": True, "no": False, "n": False }
|
||||
if default is None:
|
||||
prompt = " [y/n] "
|
||||
elif default == "yes":
|
||||
prompt = " [Y/n] "
|
||||
elif default == "no":
|
||||
prompt = " [y/N] "
|
||||
else:
|
||||
raise ValueError("invalid default answer: '%s'" % default)
|
||||
|
||||
while True:
|
||||
print('%'*10, ' quistion ', '%'*10 , '\n')
|
||||
sys.stdout.write(question + prompt)
|
||||
choice = input().lower()
|
||||
if default is not None and choice == "":
|
||||
return valid[default]
|
||||
elif choice in valid:
|
||||
return valid[choice]
|
||||
else:
|
||||
sys.stdout.write("لطفا یکی از موارد روبرو را وارد کنید : 'yes' or 'no' " "(or 'y' or 'n').\n")
|
||||
|
||||
def createIndexIfNotExist(self, index_name_o, mapping_o=""):
|
||||
try:
|
||||
if not self.es.indices.exists(index=index_name_o):
|
||||
response = self.es.indices.create(index=index_name_o, body=mapping_o)
|
||||
# print out the response:
|
||||
print("create index response:", response)
|
||||
except:
|
||||
print("....... index exist ! ... not created")
|
||||
|
||||
|
||||
def createIndex(self, index_name, app_key='', map_name=''):
|
||||
|
||||
path_base = self.path_mappings
|
||||
path_mapping1 = path_base + 'general/'
|
||||
if app_key == '' :
|
||||
app_key = 'tavasi'
|
||||
path_mapping2 = path_base + app_key + '/'
|
||||
|
||||
|
||||
if map_name == '':
|
||||
map_name = index_name
|
||||
|
||||
if self.es.indices.exists(index=index_name) :
|
||||
print("============== exist index :", index_name )
|
||||
return True
|
||||
|
||||
if map_name == 'mj_rg_section' or map_name == 'semantic_search' :
|
||||
map_name = 'mj_qa_section'
|
||||
elif map_name[-3]=='_ai':
|
||||
map_name=[0-len(map_name)-3]
|
||||
print(map_name)
|
||||
|
||||
mapping_file_path = path_mapping1 + map_name + '.json'
|
||||
print("mapping_file_path : " , mapping_file_path)
|
||||
if not os.path.isfile(mapping_file_path):
|
||||
if not os.path.isfile(mapping_file_path):
|
||||
mapping_file_path = path_mapping2 + map_name + '.json'
|
||||
|
||||
print("mapping_file_path : " , mapping_file_path)
|
||||
|
||||
# Create Index With Mapping
|
||||
if os.path.isfile(mapping_file_path):
|
||||
mapping_file = open( mapping_file_path,'r', encoding='utf-8' )
|
||||
mapping_file_read = mapping_file.read()
|
||||
mapping_data = json.loads(mapping_file_read)
|
||||
mapping_file.close()
|
||||
if self.es.indices.exists(index=index_name) :
|
||||
print("============== exist index :", index_name )
|
||||
else :
|
||||
self.es.indices.create(index = index_name , body = mapping_data)
|
||||
return True
|
||||
else:
|
||||
print('*** error not find maping file elastic : *******', mapping_file_path)
|
||||
return False
|
||||
|
||||
|
||||
def updateBulkList(self, listData, index_name):
|
||||
chunk_size=1000
|
||||
raise_on_error=False
|
||||
raise_on_exception=False
|
||||
stats_only=True
|
||||
yield_ok = False
|
||||
|
||||
actions=[]
|
||||
for item in listData:
|
||||
actions.append({
|
||||
"_op_type": "update",
|
||||
"_index": index_name,
|
||||
"_id" : item['_id'],
|
||||
"doc": item['_source']
|
||||
}
|
||||
)
|
||||
helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok )
|
||||
|
||||
def importBulkList(self, listData, index_name):
|
||||
chunk_size=100000
|
||||
raise_on_error=False
|
||||
raise_on_exception=False
|
||||
stats_only=True
|
||||
yield_ok = False
|
||||
|
||||
for item in listData:
|
||||
actions = [{
|
||||
"_op_type": "index",
|
||||
"_index": index_name,
|
||||
"_id" : item['_id'],
|
||||
"_source": item['_source']
|
||||
}
|
||||
]
|
||||
helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok )
|
||||
|
||||
|
||||
def importJsonDataToElastic(self, jsonData, index_name, fields=[]):
|
||||
chunk_size=1000
|
||||
raise_on_error=False
|
||||
raise_on_exception=False
|
||||
stats_only=True
|
||||
yield_ok = False
|
||||
|
||||
actions=[]
|
||||
|
||||
for item in jsonData:
|
||||
id = item['_id'] if item['_id'] else item['id']
|
||||
source = item['_source']
|
||||
if fields :
|
||||
source = {}
|
||||
for col in fields :
|
||||
if col in item['_source'] :
|
||||
source[col] = item['_source']
|
||||
|
||||
|
||||
actions.append({
|
||||
"_op_type": "index",
|
||||
"_index": index_name,
|
||||
"_id" : id,
|
||||
"_source": source
|
||||
})
|
||||
helpers.bulk(self.es, actions, chunk_size, raise_on_error, raise_on_exception, stats_only, yield_ok )
|
||||
|
||||
|
||||
def fileToElastic(self, file_path, index_name, limit_pack = -1, fields=[]):
|
||||
if not os.path.exists(file_path):
|
||||
print("file zip:" , file_path , " not exist")
|
||||
return
|
||||
print("index:" , index_name , '=>' , file_path )
|
||||
self.counter = 0
|
||||
with open(file_path) as file:
|
||||
data = json.loads(file.read())
|
||||
self.importJsonDataToElastic(data, index_name, fields)
|
||||
|
||||
self.es.indices.refresh(index=index_name)
|
||||
print(self.es.cat.count(index=index_name, format="json"))
|
||||
|
||||
def zipFileToElastic(self, file_path, index_name, limit_pack = -1, fields=[]):
|
||||
if not os.path.exists(file_path):
|
||||
print("file zip:" , file_path , " not exist for imort to elastic : ", index_name )
|
||||
return
|
||||
|
||||
fileNo = 0
|
||||
with zipfile.ZipFile(file_path, 'r') as zObject:
|
||||
fileNo +=1
|
||||
print("="*10, " zip fileNo: " , fileNo ," - ( ", index_name," ) | File Numbers:" ,len(zObject.namelist()) , "=" * 10)
|
||||
|
||||
packNo = 0
|
||||
self.counter = 0
|
||||
for filename in zObject.namelist():
|
||||
packNo += 1
|
||||
if limit_pack != -1 :
|
||||
if packNo > limit_pack :
|
||||
print('limit_data ', index_name, ' ', limit_pack)
|
||||
break
|
||||
|
||||
print("index:" , index_name , '=>' , filename )
|
||||
with zObject.open(filename) as file:
|
||||
data = json.loads(file.read())
|
||||
self.importJsonDataToElastic(data, index_name, fields)
|
||||
|
||||
self.es.indices.refresh(index=index_name)
|
||||
print(self.es.cat.count(index=index_name, format="json"))
|
||||
print(" END Of Import to elastic ", index_name ,"\n")
|
||||
|
||||
|
||||
def iterateJsonFile(self, file_path, isZip=True, limit_pack = -1):
|
||||
if not os.path.exists(file_path):
|
||||
print("file zip:" , file_path , " not exist iterateJsonFile " )
|
||||
return
|
||||
|
||||
if isZip :
|
||||
fileNo = 0
|
||||
with zipfile.ZipFile(file_path, 'r') as zObject:
|
||||
fileNo +=1
|
||||
print("="*10, " zip fileNo: " , fileNo ," iterateJsonFile - | File Numbers:" ,len(zObject.namelist()) , "=" * 10)
|
||||
|
||||
packNo = 0
|
||||
self.counter = 0
|
||||
for filename in zObject.namelist():
|
||||
packNo += 1
|
||||
if limit_pack != -1 :
|
||||
if packNo > limit_pack :
|
||||
print('limit_data iterateJsonFile ', limit_pack)
|
||||
break
|
||||
|
||||
print("index iterateJsonFile :", '=>' , filename )
|
||||
with zObject.open(filename) as file:
|
||||
data = json.loads(file.read())
|
||||
# Yield each entry
|
||||
# yield data
|
||||
yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in data)
|
||||
else :
|
||||
with open(filename, 'r', encoding='utf-8') as file:
|
||||
data = json.loads(file.read())
|
||||
# Yield each entry
|
||||
# yield from (hit for hit in data)
|
||||
#return data
|
||||
yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in data)
|
||||
|
||||
|
||||
def es_iterate_all_documents(self, index, body="", pagesize=250, scroll_timeout="25m", **kwargs):
|
||||
"""
|
||||
Helper to iterate ALL values from a single index
|
||||
Yields all the documents.
|
||||
"""
|
||||
is_first = True
|
||||
while True:
|
||||
# Scroll next
|
||||
if is_first: # Initialize scroll
|
||||
# result = self.es.search(index=index, scroll="2m", **kwargs, body={
|
||||
# "size": pagesize
|
||||
# })
|
||||
if body :
|
||||
result = self.es.search(
|
||||
index=index,
|
||||
scroll=scroll_timeout,
|
||||
**kwargs,
|
||||
size=pagesize,
|
||||
body=body
|
||||
)
|
||||
else :
|
||||
result = self.es.search(
|
||||
index=index,
|
||||
scroll=scroll_timeout,
|
||||
**kwargs,
|
||||
size=pagesize
|
||||
)
|
||||
|
||||
self.total = result["hits"]["total"]["value"]
|
||||
if self.total > 0:
|
||||
print("total = %d" % self.total)
|
||||
is_first = False
|
||||
else:
|
||||
# result = es.scroll(body={
|
||||
# "scroll_id": scroll_id,
|
||||
# "scroll": scroll_timeout
|
||||
# })
|
||||
result = self.es.scroll(scroll_id=scroll_id, scroll=scroll_timeout)
|
||||
|
||||
scroll_id = result["_scroll_id"]
|
||||
hits = result["hits"]["hits"]
|
||||
self.counter += len(hits)
|
||||
if self.total > 0 :
|
||||
print("progress -> %.2f %%" % ((self.counter / self.total) * 100))
|
||||
# Stop after no more docs
|
||||
if not hits:
|
||||
break
|
||||
# Yield each entry
|
||||
yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits)
|
||||
|
||||
|
||||
def moveCustomFileds(self, index_name_i, index_name_o, fields=[], renameFileds={}):
|
||||
try:
|
||||
body = {}
|
||||
list = []
|
||||
try:
|
||||
list = self.es_iterate_all_documents(index_name_i)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
count = 0
|
||||
for mentry in list:
|
||||
count += 1
|
||||
|
||||
entry = mentry["source"]
|
||||
id = mentry["id"]
|
||||
# print(id)
|
||||
eid = id
|
||||
|
||||
if (count % 100) == 0 :
|
||||
print("%s -> %.2f " % (id , (count / self.total) if self.total > 0 else 0))
|
||||
|
||||
data_filled = False
|
||||
data = {}
|
||||
for col in fields:
|
||||
|
||||
if '.' in col :
|
||||
cols = col.split('.')
|
||||
subsource = entry
|
||||
for sub in cols :
|
||||
dCol = subsource.get(sub, None)
|
||||
if dCol :
|
||||
subsource = dCol
|
||||
else :
|
||||
break
|
||||
else :
|
||||
dCol = entry.get(col, None)
|
||||
|
||||
if dCol is None:
|
||||
continue
|
||||
|
||||
if col in renameFileds :
|
||||
data[renameFileds[col]] = dCol
|
||||
else:
|
||||
data[col] = dCol
|
||||
|
||||
data_filled = True
|
||||
|
||||
if not data_filled :
|
||||
continue
|
||||
|
||||
try:
|
||||
resp = self.update_index_doc(True, index_name_o, eid, data)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
# save_error(id, e)
|
||||
|
||||
except Exception as e:
|
||||
# print("1111")
|
||||
print(e)
|
||||
|
||||
# save_error(id, e)
|
||||
|
||||
def mappingIndex(self, index_name_i):
|
||||
# فقط از طریق کیبانا میشه تغییر مپ داد
|
||||
|
||||
# با پایتون نمیشه
|
||||
# باید ایندکس جدیدی با مپ مطلوب ایجاد کرد و رایندکس کرد
|
||||
pass
|
||||
|
||||
def updateByQueryIndex(self, index_name_i, body):
|
||||
## sample
|
||||
# body = {
|
||||
# "script": {
|
||||
# "inline": "ctx._source.Device='Test'",
|
||||
# "lang": "painless"
|
||||
# },
|
||||
# "query": {
|
||||
# "match": {
|
||||
# "Device": "Boiler"
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
try:
|
||||
self.es.update_by_query(body=body, index=index_name_i)
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
# save_error(id, e)
|
||||
|
||||
|
||||
def deleteByQueryIndex(self, index_name_i, body):
|
||||
## sample
|
||||
# body = {
|
||||
# "query": {
|
||||
# "match": {
|
||||
# "Device": "Boiler"
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
try:
|
||||
self.es.delete_by_query(index=index_name_i, body=body )
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
# save_error(id, e)
|
||||
|
||||
def delete_by_ids(self, index_name_i, ids):
|
||||
try:
|
||||
# ids = ['test1', 'test2', 'test3']
|
||||
|
||||
query = {"query": {"terms": {"_id": ids}}}
|
||||
res = self.es.delete_by_query(index=index_name_i, body=query)
|
||||
print(res)
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
# save_error(id, e)
|
||||
|
35
get_recent_laws.py
Normal file
35
get_recent_laws.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
from elastic_helper import ElasticHelper
|
||||
import datetime
|
||||
import json
|
||||
|
||||
eh_obj = ElasticHelper()
|
||||
path = "/home/gpu/data_11/14040423/mj_qa_section.zip"
|
||||
sections = eh_obj.iterateJsonFile(path, True)
|
||||
|
||||
# تاریخی که داده ها از این تاریخ به بعد، پردازش های مختلف را ندیده است
|
||||
update_time = datetime.datetime(1403,10,5)
|
||||
|
||||
def get_data_from_date(date):
|
||||
recent_sections = {}
|
||||
for i, item in enumerate(sections):
|
||||
id = item['id']
|
||||
source = item['source']
|
||||
ts_date = source['ts_date']
|
||||
ts_date_standard = datetime.datetime(ts_date.split('/')[0],ts_date.split('/')[1],ts_date.split('/')[2])
|
||||
|
||||
if ts_date_standard>date:
|
||||
recent_sections[id] = source
|
||||
|
||||
return recent_sections
|
||||
|
||||
if __name__ == '__main__':
|
||||
recent_sections = get_data_from_date(update_time)
|
||||
|
||||
with open('./data/recent_sections.json', 'w', encoding='utf-8') as file:
|
||||
data = json.dump(recent_sections)
|
||||
file.write(data)
|
||||
|
||||
print('finished!')
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user