data_processes/p2_ner_recognizer.py
2025-08-11 19:56:29 +03:30

253 lines
9.7 KiB
Python

"""
سورس تشخیص موجودیت های نامدار برای هر جزء قانون
این فرایند برای 285 هزار ماده کمتر از دو ساعت زمان می برد
"""
from flair.data import Sentence
from flair.models import SequenceTagger
import re
import json
import datetime
from elastic_helper import ElasticHelper
from normalizer import cleaning
model = "./models/ner/2025-07-22--20-44-37--HooshvareLab--bert-fa-base-uncased-ner-peyma/final-model.pt"
tagger = SequenceTagger.load(model)
print('model read and tagger initialized')
today = f'{datetime.datetime.year}-{datetime.datetime.month}-{datetime.datetime.day}-{datetime.datetime.hour}'
def prepare_data(ner_obj_list):
ner_data_list = []
for ner_obj in ner_obj_list:
ner_data = {
"key" :ner_obj['ner_key'],
"value" :ner_obj['ner_value'],
"begin" :ner_obj['ner_start_token'],
"end" :ner_obj['ner_end_token'],
"score" :ner_obj['ner_score']
}
ner_data_list.append(ner_data)
return ner_data_list
def find_ner_values_in_text(text, ner_values):
text_temp = text
text_tokens = text.split()
ner_obj = []
difference = 0
for raw_item in ner_values:
raw_ner = raw_item['value']
ner = re.findall(r'"(.*?)"', raw_ner)[0]
if ner == ')' or ner == '(' or ner == '/' or ner == 'قانون تغییر' or ner == 'قانون' or ner == '.' or ner == '':
continue
ner_parts = raw_ner.split(ner)[1]
ner_parts = ner_parts.lstrip('"/')
ner_type = ner_parts.strip()
ner_score = raw_item['score'].strip()
ner_type = ner_type.strip()
ner_score = ner_score.strip()
# پیدا کردن موجودیت نامدار بالا در متن
matched_ner = [(m.start(), m.end()) for m in re.finditer(re.escape(ner), text_temp)]
if matched_ner:
matched_ner_start = matched_ner[0][0]
matched_ner_end = matched_ner[0][1]
before_ner_text = ''
if matched_ner_start > 1:
before_ner_text = text_temp[0:matched_ner_start-1]
difference = len(before_ner_text.split())
ner_start_token = difference
ner_end_token = len(ner.split()) + difference
if ner_end_token > len(text_tokens):
ner_start_token -= 1
ner_end_token -= 1
ner_tokens = [text_tokens[t] for t in range (ner_start_token,ner_end_token)]
# برای جلوگیری از خطای منطقی در هنگامی که مقدار
# ner
# بیشتر از یکبار در متن وجود دارد، موجودیت بررسی شده را با کاراکتر های خنثی جایگزین می کنیم
for t in range (ner_start_token,ner_end_token):
text_tokens[t] = '#####'
text_temp = ' '.join(text_tokens)
text_temp = text_temp.strip()
if matched_ner_start == 0:
difference = len(ner.split())
##################################
# پیدا کردن شناسه متناظر با این قانون
# law_id = find_related_law(ner.strip())
##################################
#endregion
ner_obj.append({
'ner_value' : ner.strip(),
'ner_start_token': ner_start_token,
'ner_end_token' : ner_end_token,
'ner_key' : ner_type.strip(),
'ner_score' : float(ner_score.strip()),
#'ner_tokens' : ner_tokens,
})
# if law_id != 0:
# ner_obj[len(ner_obj)-1]['ner_law_id']= law_id
return ner_obj
def single_ner_recognizer(input_sentence):
"""
این متد، موجودیت های نامدار را از متن ورودی استخراج می کند
**Args:
input_sentence (str): متن سکشن
**Returns:
ner_obj_list, (obj): آبجکتی از موجودیت های نامدار تشخیص داده شده
proccess_result(obj): بخش اول شامل یک کلید از نوع بولین است که موفقیت عملیات را بیان می کند. بخش دوم نیز توضیحی در مورد وضعیت نتیجه عملیات را در بر دارد
"""
try:
proccess_result = True, ''
result = []
# if len(input_sentence) > 511 :
# sentence_parts = input_sentence.split('.')
sentence_parts = split_sentence(input_sentence)
for part in sentence_parts:
sentence = Sentence(part)
tagger.predict(sentence)
for span in sentence.get_spans():
result.append(span)
final_result = ''
ner_values = []
if result:
for item in result:
value = item.annotation_layers['ner'][0].labeled_identifier
score = round(item.score, 2)
score = str(score)
final_result = final_result + '\n' + value + ' /%/ ' + score
ner_values.append({
'value':value,
'score':score
})
ner_obj_list = find_ner_values_in_text(input_sentence, ner_values)
except Exception as error:
proccess_result = False , str(error.args[0])
ner_obj_list = []
return ner_obj_list, proccess_result
# تابع بازگشتی برای تقسیم متن به تکه های کوچکتر از 512 کاراکتر
def split_sentence(input_sentence):
# تعریف یک لیست داخلی برای نگهداری بخش‌های تقسیم شده
parts = []
# کاراکترهایی که بر اساس آنها به ترتیب، یک متن را به زیرمتن های کوچک تر تبدیل می کنیم
separators = ['\n', '.', ':', '،']
# تابع بازگشتی
def recursive_split(sentence):
# اگر تعداد توکن های متن پاس داده شده کمتر یا برابر با 511 کاراکتر باشد، آن را به لیست اضافه کن
if len(sentence.split()) <= 256:
if sentence != '':
parts.append(sentence)
return
# تلاش برای استفاده از جداکننده‌های مختلف
for separator in separators:
if separator in sentence:
# تقسیم رشته با استفاده از جداکننده‌ی فعلی
split_parts = sentence.split(separator)
new_sentence = []
for part in split_parts:
new_sentence.append(part)
# بررسی اینکه آیا همه بخش‌ها به اندازه کافی کوچک شده‌اند
for part in new_sentence:
# print(len(part))
if len(part.split()) <= 256:
if part == '':
continue
parts.append(part)
else:
recursive_split(part)
return
# اگر هیچ جداکننده‌ای کار نکرد، رشته را به دو نیمه تقسیم کن
# mid_point = len(sentence) // 2
# recursive_split(sentence[:mid_point])
# recursive_split(sentence[mid_point:])
# شروع تقسیم بازگشتی
recursive_split(input_sentence)
return parts
def do_ner_recognize(sections):
len_sections = len(sections)
for index, id in enumerate(sections):
content = sections[id]['content']
if not content == '':
content = cleaning(content)
ner_obj_list, ner_result = single_ner_recognizer(content)
if not ner_result[0]:
print(f'ner recognization error. id: {id}')
error_content = f'id: {id} - error: {ner_result[1]}\n'
with open(f'./data/ner/ner_errors_{today}.txt', 'a+', encoding='utf-8') as file:
file.write(error_content)
continue
ner_data_list = prepare_data(ner_obj_list)
sections[id]['ners_v2'] = ner_data_list
print(f'ner process: {index+1}/{len_sections}')
print(f'len_sections ner recognization finished!')
return sections
def get_sections():
sections_path = "/home/gpu/data_11/14040423/mj_qa_section.zip"
eh_obj = ElasticHelper()
sections = eh_obj.iterateJsonFile(sections_path, True)
sections = convert_to_dict(sections)
return sections
def convert_to_dict(sections):
sections_dict = {}
for item in sections:
id = item['id']
source = item['source']
sections_dict[id] = source
return sections_dict
if __name__ == '__main__':
print(f'start: {datetime.datetime.now()}')
sections = get_sections()
# dictsections = {}
# for item in sections:
# if not item['id'] == 'qs2180272':
# continue
# dictsections[item['id']] = item['source']
# break
# sections = dictsections
sections = do_ner_recognize(sections)
with open(f'./data/ner/sections_ner_{today}.json', 'w', encoding='utf-8') as output:
data = json.dumps(sections, ensure_ascii=False, indent=4)
output.write(data)
print(f'end: {datetime.datetime.now()}')
print('finished ner recognization!')