RD_relation/sections_7K_add_metadata.py

294 lines
9.8 KiB
Python
Raw Normal View History

2025-01-19 15:46:21 +00:00
"""
در این فایل، داده های مربوط به فایل سکشن های تمیز 7هزارتایی، پردازش می شود و متادیتاهای
لازم برای هر سکشن از الستیک خوانده می شود و در یک جیسون جدید ذخیره می شود
"""
from elasticsearch import Elasticsearch
from funcs import read_from_json, write_to_json
import datetime
import os
def es_iterate_all_documents(es, index, pagesize=250, scroll_timeout="25m", **kwargs):
global counter
global total
is_first = True
while True:
# Scroll next
if is_first: # Initialize scroll
# result = es.search(index=index, scroll="2m", **kwargs, body={
# "size": pagesize
# })
result = es.search(
index=index,
scroll="2m",
**kwargs,
size=pagesize,
body={
"query": {
"bool": {
"must_not": [
{"exists": {"field": "nlp_parser.type"}},
{"match": {"content_len": 0}},
{"match": {"parse_state": 1}},
{"match": {"parse_state": 2}}
]
}
}
}
)
total = result["hits"]["total"]["value"]
print("total = %d" % total)
is_first = False
else:
result = es.scroll(scroll_id=scroll_id, scroll=scroll_timeout)
scroll_id = result["_scroll_id"]
hits = result["hits"]["hits"]
counter += len(hits)
print("progress -> %.2f %%" % ((counter / total) * 100))
# Stop after no more docs
if not hits:
break
# Yield each entry
yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits)
def es_iterate_some_documents(es, index, records, pagesize=250, scroll_timeout="25m", **kwargs):
global counter
global total
is_first = True
query = {
"query": {
"terms": {
"_id": records
}
}
}
while True:
# Scroll next
if is_first: # Initialize scroll
# result = es.search(index=index, scroll="2m", **kwargs, body={
# "size": pagesize
# })
result = es.search(
index=index,
scroll="2m",
**kwargs,
size=pagesize,
body= query
)
total = result["hits"]["total"]["value"]
print("total = %d" % total)
is_first = False
else:
result = es.scroll(scroll_id=scroll_id, scroll=scroll_timeout)
scroll_id = result["_scroll_id"]
hits = result["hits"]["hits"]
counter += len(hits)
print("progress -> %.2f %%" % ((counter / total) * 100))
# Stop after no more docs
if not hits:
break
# Yield each entry
yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits)
def prepare_data(ner_obj_list):
ner_data_list = []
for ner_obj in ner_obj_list:
ner_data = {
"key" :ner_obj['ner_key'],
"value" :ner_obj['ner_value'],
"begin" :ner_obj['ner_start_token'],
"end" :ner_obj['ner_end_token'],
"score" :ner_obj['ner_score']
}
ner_data_list.append(ner_data)
return ner_data_list
def relation_finder(all_orgs):
new_orgs = []
for index, section in enumerate(all_orgs):
if index % 1000 == 0:
print(f"relation finder progress: {(index/len(all_orgs)) * 100:.2f} %")
# if index > 1000:
# break
related_sections = []
orgs = section['ai_key']
for org in orgs:
for compare_item in all_orgs:
compare_item_orgs = compare_item['ai_key']
if section['section_id'] == compare_item['section_id']:# جلوگیری از ارتباط یک مقرره با خودش
continue
if org in compare_item_orgs:
# related_sections.append(compare_item['id'])
related_sections.append({
'section_id': compare_item['section_id'],
'qanon_id': compare_item['qanon_id'],
'ai_key': org,
'type': 'ORG',
'weight': 1,
})
new_orgs.append({
'id': section['section_id'],
'qanon_id': section['qanon_id'],
'ai_key': section['orgs'],
# 'orgs_text': section['orgs_text'],
'relations': related_sections,
})
return new_orgs
def extract_weight(current_id, content_ai, relation_list):
relation_list_temp = []
for rel_item in relation_list:
weight = 0
# تعریف یک ست به منظور جلوگیری از درج مقدار تکراری
rel_labels = set()
# تصحیح فرمت شناسه سکشن
current_section_id = rel_item['section_id'].replace("mj_qa_section_","qs")
for item in relation_list:
if item['section_id'].replace("mj_qa_section_","qs") == current_section_id:
weight += 1
for sub_item in item['ai_key']:
for sub_sub in sub_item:
rel_labels.add(sub_sub)
# # تبدیل به مجموعه برای حذف مقادیر تکراری
# unique_items = set()
# # مرور آرایه و افزودن آیتم‌ها به مجموعه
# for row in rel_labels:
# for item in row:
# for sub_item in item:
# unique_items.add(sub_item)
# rel_labels = [[item] for item in unique_items]
# تبدیل ست به لیست
rel_labels = list(rel_labels)
for rel_item2 in relation_list_temp:
if current_section_id == rel_item2['related_id'].replace("mj_qa_section_","qs"):
break
else:
relation_list_temp.append({
"id": current_id,
"related_id": current_section_id,
"content_ai": content_ai,
"qanon_id": rel_item['qanon_id'],
"ai_key": rel_labels,
"type": rel_item['type'],
"weight": weight
})
break
if len(relation_list_temp) == 0:
relation_list_temp.append({
"id": current_id,
"related_id": current_section_id,
"content_ai": content_ai,
"qanon_id": rel_item['qanon_id'],
"ai_key": rel_labels,
"type": rel_item['type'],
"weight": weight
})
return relation_list_temp
# حذف سکشن های تکراری از دیکشنری
def remove_repitetive_sections(sections_group):
finall_sections = []
found = False
for index, item in enumerate(sections_group):
print(" -------------- remove repeated item for section: "+ str(index+1))
if len(finall_sections) == 0:
finall_sections.append(item)
continue
section_id = item["id"]
related_id = item["related_id"]
for section in finall_sections:
if (section["id"] == section_id) and (section["related_id"] == related_id):
found = True
break
# اگر سکشن در مراحل قبلی ذخیره نشده، آن را ذخیره کن
if not found:
finall_sections.append(item)
return finall_sections
print(datetime.datetime.now())
index_name_i = "ai_mj_qa_section-v03"
is_update_state = False
mapping_o = ""
es = Elasticsearch(
"http://127.0.0.1:6900",
basic_auth=("elastic", "SG*7eGwg+KG2_*-1_mMm")
)
try:
if not es.indices.exists(index=index_name_i):
response = es.indices.create(index=index_name_i, body=mapping_o)
# print out the response:
print("not found!!!")
except:
print("elastic error")
counter = 0
total = 0
id = ""
novalid = -15000000000
all_orgs = []
all_orgs_text = ''
orgs_list = []
address = os.getcwd()
main_7k_sections = read_from_json("./data/main_sections_7K.json")
main_7k_sections_list = []
for section in main_7k_sections:
main_7k_sections_list.append(section["id"])
elastic_data = es_iterate_all_documents(es, index_name_i)
count = 0
all_sections = []
for mentry1 in elastic_data:
count += 1
id1 = mentry1["id"].replace("mj_qa_section_","qs")
# if count > 500:
# break
print('add metadata for section: ' + str(count))
if count % 100 == 0:
print(f"add metadata progress: {(count / 7220) * 100:.2f} %")
if id1 in main_7k_sections_list:
entry1 = mentry1["source"]
content_ai1 = entry1["content_ai"]
qanon_id1 = entry1["qanon_id"]
ners1 = entry1['ners_v1']
relations = entry1['relations']
all_sections.append({
"id":id1,
"content":content_ai1,
"qanon_id":qanon_id1,
"ners":ners1,
"relations":relations,
})
eid = id1
write_to_json(all_sections, "./data/main_sections_7K_metadata.json")
print(datetime.datetime.now())
print(" # # # add metadata finished! # # # ")