""" در این فایل، داده های مربوط به فایل سکشن های تمیز 7هزارتایی، پردازش می شود و متادیتاهای لازم برای هر سکشن از الستیک خوانده می شود و در یک جیسون جدید ذخیره می شود """ from elasticsearch import Elasticsearch from funcs import read_from_json, write_to_json import datetime import os def es_iterate_all_documents(es, index, pagesize=250, scroll_timeout="25m", **kwargs): global counter global total is_first = True while True: # Scroll next if is_first: # Initialize scroll # result = es.search(index=index, scroll="2m", **kwargs, body={ # "size": pagesize # }) result = es.search( index=index, scroll="2m", **kwargs, size=pagesize, body={ "query": { "bool": { "must_not": [ {"exists": {"field": "nlp_parser.type"}}, {"match": {"content_len": 0}}, {"match": {"parse_state": 1}}, {"match": {"parse_state": 2}} ] } } } ) total = result["hits"]["total"]["value"] print("total = %d" % total) is_first = False else: result = es.scroll(scroll_id=scroll_id, scroll=scroll_timeout) scroll_id = result["_scroll_id"] hits = result["hits"]["hits"] counter += len(hits) print("progress -> %.2f %%" % ((counter / total) * 100)) # Stop after no more docs if not hits: break # Yield each entry yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits) def es_iterate_some_documents(es, index, records, pagesize=250, scroll_timeout="25m", **kwargs): global counter global total is_first = True query = { "query": { "terms": { "_id": records } } } while True: # Scroll next if is_first: # Initialize scroll # result = es.search(index=index, scroll="2m", **kwargs, body={ # "size": pagesize # }) result = es.search( index=index, scroll="2m", **kwargs, size=pagesize, body= query ) total = result["hits"]["total"]["value"] print("total = %d" % total) is_first = False else: result = es.scroll(scroll_id=scroll_id, scroll=scroll_timeout) scroll_id = result["_scroll_id"] hits = result["hits"]["hits"] counter += len(hits) print("progress -> %.2f %%" % ((counter / total) * 100)) # Stop after no more docs if not hits: break # Yield each entry yield from ({"source": hit["_source"], "id": hit["_id"]} for hit in hits) def prepare_data(ner_obj_list): ner_data_list = [] for ner_obj in ner_obj_list: ner_data = { "key" :ner_obj['ner_key'], "value" :ner_obj['ner_value'], "begin" :ner_obj['ner_start_token'], "end" :ner_obj['ner_end_token'], "score" :ner_obj['ner_score'] } ner_data_list.append(ner_data) return ner_data_list def relation_finder(all_orgs): new_orgs = [] for index, section in enumerate(all_orgs): if index % 1000 == 0: print(f"relation finder progress: {(index/len(all_orgs)) * 100:.2f} %") # if index > 1000: # break related_sections = [] orgs = section['ai_key'] for org in orgs: for compare_item in all_orgs: compare_item_orgs = compare_item['ai_key'] if section['section_id'] == compare_item['section_id']:# جلوگیری از ارتباط یک مقرره با خودش continue if org in compare_item_orgs: # related_sections.append(compare_item['id']) related_sections.append({ 'section_id': compare_item['section_id'], 'qanon_id': compare_item['qanon_id'], 'ai_key': org, 'type': 'ORG', 'weight': 1, }) new_orgs.append({ 'id': section['section_id'], 'qanon_id': section['qanon_id'], 'ai_key': section['orgs'], # 'orgs_text': section['orgs_text'], 'relations': related_sections, }) return new_orgs def extract_weight(current_id, content_ai, relation_list): relation_list_temp = [] for rel_item in relation_list: weight = 0 # تعریف یک ست به منظور جلوگیری از درج مقدار تکراری rel_labels = set() # تصحیح فرمت شناسه سکشن current_section_id = rel_item['section_id'].replace("mj_qa_section_","qs") for item in relation_list: if item['section_id'].replace("mj_qa_section_","qs") == current_section_id: weight += 1 for sub_item in item['ai_key']: for sub_sub in sub_item: rel_labels.add(sub_sub) # # تبدیل به مجموعه برای حذف مقادیر تکراری # unique_items = set() # # مرور آرایه و افزودن آیتم‌ها به مجموعه # for row in rel_labels: # for item in row: # for sub_item in item: # unique_items.add(sub_item) # rel_labels = [[item] for item in unique_items] # تبدیل ست به لیست rel_labels = list(rel_labels) for rel_item2 in relation_list_temp: if current_section_id == rel_item2['related_id'].replace("mj_qa_section_","qs"): break else: relation_list_temp.append({ "id": current_id, "related_id": current_section_id, "content_ai": content_ai, "qanon_id": rel_item['qanon_id'], "ai_key": rel_labels, "type": rel_item['type'], "weight": weight }) break if len(relation_list_temp) == 0: relation_list_temp.append({ "id": current_id, "related_id": current_section_id, "content_ai": content_ai, "qanon_id": rel_item['qanon_id'], "ai_key": rel_labels, "type": rel_item['type'], "weight": weight }) return relation_list_temp # حذف سکشن های تکراری از دیکشنری def remove_repitetive_sections(sections_group): finall_sections = [] found = False for index, item in enumerate(sections_group): print(" -------------- remove repeated item for section: "+ str(index+1)) if len(finall_sections) == 0: finall_sections.append(item) continue section_id = item["id"] related_id = item["related_id"] for section in finall_sections: if (section["id"] == section_id) and (section["related_id"] == related_id): found = True break # اگر سکشن در مراحل قبلی ذخیره نشده، آن را ذخیره کن if not found: finall_sections.append(item) return finall_sections print(datetime.datetime.now()) index_name_i = "ai_mj_qa_section-v03" is_update_state = False mapping_o = "" es = Elasticsearch( "http://127.0.0.1:6900", basic_auth=("elastic", "SG*7eGwg+KG2_*-1_mMm") ) try: if not es.indices.exists(index=index_name_i): response = es.indices.create(index=index_name_i, body=mapping_o) # print out the response: print("not found!!!") except: print("elastic error") counter = 0 total = 0 id = "" novalid = -15000000000 all_orgs = [] all_orgs_text = '' orgs_list = [] address = os.getcwd() main_7k_sections = read_from_json("./data/main_sections_7K.json") main_7k_sections_list = [] for section in main_7k_sections: main_7k_sections_list.append(section["id"]) elastic_data = es_iterate_all_documents(es, index_name_i) count = 0 all_sections = [] for mentry1 in elastic_data: count += 1 id1 = mentry1["id"].replace("mj_qa_section_","qs") # if count > 500: # break print('add metadata for section: ' + str(count)) if count % 100 == 0: print(f"add metadata progress: {(count / 7220) * 100:.2f} %") if id1 in main_7k_sections_list: entry1 = mentry1["source"] content_ai1 = entry1["content_ai"] qanon_id1 = entry1["qanon_id"] ners1 = entry1['ners_v1'] relations = entry1['relations'] all_sections.append({ "id":id1, "content":content_ai1, "qanon_id":qanon_id1, "ners":ners1, "relations":relations, }) eid = id1 write_to_json(all_sections, "./data/main_sections_7K_metadata.json") print(datetime.datetime.now()) print(" # # # add metadata finished! # # # ")