""" سورس تشخیص موجودیت های نامدار برای هر جزء قانون این فرایند برای 285 هزار ماده کمتر از دو ساعت زمان می برد """ from flair.data import Sentence from flair.models import SequenceTagger import re import json import datetime from elastic_helper import ElasticHelper from normalizer import cleaning model = "./models/ner/2025-07-22--20-44-37--HooshvareLab--bert-fa-base-uncased-ner-peyma/final-model.pt" tagger = SequenceTagger.load(model) print('model read and tagger initialized') date = datetime.datetime.now() today = f'{date.year}-{date.month}-{date.day}-{date.hour}' def prepare_data(ner_obj_list): ner_data_list = [] for ner_obj in ner_obj_list: ner_data = { "key" :ner_obj['ner_key'], "value" :ner_obj['ner_value'], "begin" :ner_obj['ner_start_token'], "end" :ner_obj['ner_end_token'], "score" :ner_obj['ner_score'] } ner_data_list.append(ner_data) return ner_data_list def find_ner_values_in_text(text, ner_values): text_temp = text text_tokens = text.split() ner_obj = [] difference = 0 for raw_item in ner_values: raw_ner = raw_item['value'] ner = re.findall(r'"(.*?)"', raw_ner)[0] if ner == ')' or ner == '(' or ner == '/' or ner == 'قانون تغییر' or ner == 'قانون' or ner == '.' or ner == '': continue ner_parts = raw_ner.split(ner)[1] ner_parts = ner_parts.lstrip('"/') ner_type = ner_parts.strip() ner_score = raw_item['score'].strip() ner_type = ner_type.strip() ner_score = ner_score.strip() # پیدا کردن موجودیت نامدار بالا در متن matched_ner = [(m.start(), m.end()) for m in re.finditer(re.escape(ner), text_temp)] if matched_ner: matched_ner_start = matched_ner[0][0] matched_ner_end = matched_ner[0][1] before_ner_text = '' if matched_ner_start > 1: before_ner_text = text_temp[0:matched_ner_start-1] difference = len(before_ner_text.split()) ner_start_token = difference ner_end_token = len(ner.split()) + difference if ner_end_token > len(text_tokens): ner_start_token -= 1 ner_end_token -= 1 ner_tokens = [text_tokens[t] for t in range (ner_start_token,ner_end_token)] # برای جلوگیری از خطای منطقی در هنگامی که مقدار # ner # بیشتر از یکبار در متن وجود دارد، موجودیت بررسی شده را با کاراکتر های خنثی جایگزین می کنیم for t in range (ner_start_token,ner_end_token): text_tokens[t] = '#####' text_temp = ' '.join(text_tokens) text_temp = text_temp.strip() if matched_ner_start == 0: difference = len(ner.split()) ################################## # پیدا کردن شناسه متناظر با این قانون # law_id = find_related_law(ner.strip()) ################################## #endregion ner_obj.append({ 'ner_value' : ner.strip(), 'ner_start_token': ner_start_token, 'ner_end_token' : ner_end_token, 'ner_key' : ner_type.strip(), 'ner_score' : float(ner_score.strip()), #'ner_tokens' : ner_tokens, }) # if law_id != 0: # ner_obj[len(ner_obj)-1]['ner_law_id']= law_id return ner_obj def single_ner_recognizer(input_sentence): """ این متد، موجودیت های نامدار را از متن ورودی استخراج می کند **Args: input_sentence (str): متن سکشن **Returns: ner_obj_list, (obj): آبجکتی از موجودیت های نامدار تشخیص داده شده proccess_result(obj): بخش اول شامل یک کلید از نوع بولین است که موفقیت عملیات را بیان می کند. بخش دوم نیز توضیحی در مورد وضعیت نتیجه عملیات را در بر دارد """ try: proccess_result = True, '' result = [] # if len(input_sentence) > 511 : # sentence_parts = input_sentence.split('.') sentence_parts = split_sentence(input_sentence) for part in sentence_parts: sentence = Sentence(part) tagger.predict(sentence) for span in sentence.get_spans(): result.append(span) final_result = '' ner_values = [] if result: for item in result: value = item.annotation_layers['ner'][0].labeled_identifier score = round(item.score, 2) score = str(score) final_result = final_result + '\n' + value + ' /%/ ' + score ner_values.append({ 'value':value, 'score':score }) ner_obj_list = find_ner_values_in_text(input_sentence, ner_values) except Exception as error: proccess_result = False , str(error.args[0]) ner_obj_list = [] return ner_obj_list, proccess_result # تابع بازگشتی برای تقسیم متن به تکه های کوچکتر از 512 کاراکتر def split_sentence(input_sentence): # تعریف یک لیست داخلی برای نگهداری بخش‌های تقسیم شده parts = [] # کاراکترهایی که بر اساس آنها به ترتیب، یک متن را به زیرمتن های کوچک تر تبدیل می کنیم separators = ['\n', '.', ':', '،'] # تابع بازگشتی def recursive_split(sentence): # اگر تعداد توکن های متن پاس داده شده کمتر یا برابر با 511 کاراکتر باشد، آن را به لیست اضافه کن if len(sentence.split()) <= 256: if sentence != '': parts.append(sentence) return # تلاش برای استفاده از جداکننده‌های مختلف for separator in separators: if separator in sentence: # تقسیم رشته با استفاده از جداکننده‌ی فعلی split_parts = sentence.split(separator) new_sentence = [] for part in split_parts: new_sentence.append(part) # بررسی اینکه آیا همه بخش‌ها به اندازه کافی کوچک شده‌اند for part in new_sentence: # print(len(part)) if len(part.split()) <= 256: if part == '': continue parts.append(part) else: recursive_split(part) return # اگر هیچ جداکننده‌ای کار نکرد، رشته را به دو نیمه تقسیم کن # mid_point = len(sentence) // 2 # recursive_split(sentence[:mid_point]) # recursive_split(sentence[mid_point:]) # شروع تقسیم بازگشتی recursive_split(input_sentence) return parts def do_ner_recognize(sections): len_sections = len(sections) for index, id in enumerate(sections): content = sections[id]['content'] if not content == '': content = cleaning(content) ner_obj_list, ner_result = single_ner_recognizer(content) if not ner_result[0]: print(f'ner recognization error. id: {id}') error_content = f'id: {id} - error: {ner_result[1]}\n' with open(f'./data/ner/ner_errors_{today}.txt', 'a+', encoding='utf-8') as file: file.write(error_content) continue ner_data_list = prepare_data(ner_obj_list) sections[id]['ners_v2'] = ner_data_list print(f'ner process: {index+1}/{len_sections}') print(f'{len_sections} ner recognization finished!') return sections def get_sections(): sections_path = "/home/gpu/data_11/14040423/mj_qa_section.zip" eh_obj = ElasticHelper() sections = eh_obj.iterateJsonFile(sections_path, True) sections = convert_to_dict(sections) return sections def convert_to_dict(sections): sections_dict = {} for item in sections: id = item['id'] source = item['source'] sections_dict[id] = source return sections_dict if __name__ == '__main__': print(f'start: {datetime.datetime.now()}') sections = get_sections() # dictsections = {} # for item in sections: # if not item['id'] == 'qs2180272': # continue # dictsections[item['id']] = item['source'] # break # sections = dictsections sections = do_ner_recognize(sections) with open(f'./data/ner/sections_ner_{today}.json', 'w', encoding='utf-8') as output: data = json.dumps(sections, ensure_ascii=False, indent=4) output.write(data) print(f'end: {datetime.datetime.now()}') print('finished ner recognization!')