data_processes/p2_ner_recognizer.py

188 lines
7.4 KiB
Python

"""
سورس تشخیص موجودیت های نامدار برای هر جزء قانون
"""
from flair.data import Sentence
from flair.models import SequenceTagger
import re
model = "./models/ner/2025-07-22--20-44-37--HooshvareLab--bert-fa-base-uncased-ner-peyma/final-model.pt"
tagger = SequenceTagger.load(model)
print('model read and tagger initialized')
def prepare_data(ner_obj_list):
ner_data_list = []
for ner_obj in ner_obj_list:
ner_data = {
"key" :ner_obj['ner_key'],
"value" :ner_obj['ner_value'],
"begin" :ner_obj['ner_start_token'],
"end" :ner_obj['ner_end_token'],
"score" :ner_obj['ner_score']
}
ner_data_list.append(ner_data)
return ner_data_list
def find_ner_values_in_text(text, ner_values):
text_temp = text
text_tokens = text.split()
ner_obj = []
difference = 0
for raw_item in ner_values:
raw_ner = raw_item['value']
ner = re.findall(r'"(.*?)"', raw_ner)[0]
if ner == ')' or ner == '(' or ner == '/' or ner == 'قانون تغییر' or ner == 'قانون' or ner == '.' or ner == '':
continue
ner_parts = raw_ner.split(ner)[1]
ner_parts = ner_parts.lstrip('"/')
ner_type = ner_parts.strip()
ner_score = raw_item['score'].strip()
ner_type = ner_type.strip()
ner_score = ner_score.strip()
# پیدا کردن موجودیت نامدار بالا در متن
matched_ner = [(m.start(), m.end()) for m in re.finditer(re.escape(ner), text_temp)]
if matched_ner:
matched_ner_start = matched_ner[0][0]
matched_ner_end = matched_ner[0][1]
before_ner_text = ''
if matched_ner_start > 1:
before_ner_text = text_temp[0:matched_ner_start-1]
difference = len(before_ner_text.split())
ner_start_token = difference
ner_end_token = len(ner.split()) + difference
if ner_end_token > len(text_tokens):
ner_start_token -= 1
ner_end_token -= 1
ner_tokens = [text_tokens[t] for t in range (ner_start_token,ner_end_token)]
# برای جلوگیری از خطای منطقی در هنگامی که مقدار
# ner
# بیشتر از یکبار در متن وجود دارد، موجودیت بررسی شده را با کاراکتر های خنثی جایگزین می کنیم
for t in range (ner_start_token,ner_end_token):
text_tokens[t] = '#####'
text_temp = ' '.join(text_tokens)
text_temp = text_temp.strip()
if matched_ner_start == 0:
difference = len(ner.split())
##################################
# پیدا کردن شناسه متناظر با این قانون
# law_id = find_related_law(ner.strip())
##################################
#endregion
ner_obj.append({
'ner_value' : ner.strip(),
'ner_start_token': ner_start_token,
'ner_end_token' : ner_end_token,
'ner_key' : ner_type.strip(),
'ner_score' : float(ner_score.strip()),
#'ner_tokens' : ner_tokens,
})
# if law_id != 0:
# ner_obj[len(ner_obj)-1]['ner_law_id']= law_id
return ner_obj
def inference_main(input_sentence):
try:
proccess_result = True, ''
result = []
# if len(input_sentence) > 511 :
# sentence_parts = input_sentence.split('.')
sentence_parts = split_sentence(input_sentence)
for part in sentence_parts:
sentence = Sentence(part)
tagger.predict(sentence)
for span in sentence.get_spans():
result.append(span)
final_result = ''
ner_values = []
if result:
for item in result:
value = item.annotation_layers['ner'][0].labeled_identifier
score = round(item.score, 2)
score = str(score)
final_result = final_result + '\n' + value + ' /%/ ' + score
ner_values.append({
'value':value,
'score':score
})
ner_obj_list = find_ner_values_in_text(input_sentence, ner_values)
except Exception as error:
proccess_result = False , error.args[0]
ner_obj_list = []
return ner_obj_list, input_sentence, proccess_result
# تابع بازگشتی برای تقسیم متن به تکه های کوچکتر از 512 کاراکتر
def split_sentence(input_sentence):
# تعریف یک لیست داخلی برای نگهداری بخش‌های تقسیم شده
parts = []
# کاراکترهایی که بر اساس آنها به ترتیب، یک متن را به زیرمتن های کوچک تر تبدیل می کنیم
separators = ['\n', '.', ':', '،']
# تابع بازگشتی
def recursive_split(sentence):
# اگر تعداد توکن های متن پاس داده شده کمتر یا برابر با 511 کاراکتر باشد، آن را به لیست اضافه کن
if len(sentence.split()) <= 256:
if sentence != '':
parts.append(sentence)
return
# تلاش برای استفاده از جداکننده‌های مختلف
for separator in separators:
if separator in sentence:
# تقسیم رشته با استفاده از جداکننده‌ی فعلی
split_parts = sentence.split(separator)
new_sentence = []
for part in split_parts:
new_sentence.append(part)
# بررسی اینکه آیا همه بخش‌ها به اندازه کافی کوچک شده‌اند
for part in new_sentence:
# print(len(part))
if len(part.split()) <= 256:
if part == '':
continue
parts.append(part)
else:
recursive_split(part)
return
# اگر هیچ جداکننده‌ای کار نکرد، رشته را به دو نیمه تقسیم کن
# mid_point = len(sentence) // 2
# recursive_split(sentence[:mid_point])
# recursive_split(sentence[mid_point:])
# شروع تقسیم بازگشتی
recursive_split(input_sentence)
return parts
def do_ner_recognize(sections):
len_sections = len(sections)
for index, section in enumerate(sections):
content = section['content']
ner_obj_list, content_ai, ner_result = inference_main(content)
ner_data_list = prepare_data(ner_obj_list)
section['ners_v2'] = ner_data_list
print(f'ner process: {section}/{len_sections}')
print(f'len_sections ner recognization finished!')
return sections