complete classify and ner processes

This commit is contained in:
ajokar 2025-08-07 16:45:10 +03:30
parent 0c822ffa61
commit 63673bc648
11 changed files with 527 additions and 5 deletions

View File

@ -1,6 +1,42 @@
import p1_classifier
import p2_keyword_extractor
import p3_ner_recognizer
import p4_simplifier
import p5_words_embedder
"""
سورس اجرای پردازش های مختلف روی اجزای قانونی
شامل: کلاسیفیکیشن، تشخیص موجودیت های نامدار، استخراج بردار کلمات، استخراج کلیدواژه ها و سادهسازی متن
"""
from p1_classifier import do_classify
from p2_ner_recognizer import do_ner_recognize
from p3_words_embedder import do_word_embedder
from p4_keyword_extractor import do_keyword_extract
from p5_simplifier import do_simplify
from elastic_helper import ElasticHelper
def get_sections():
sections_path = "/home/gpu/data_11/14040423/mj_qa_section.zip"
eh_obj = ElasticHelper()
sections = eh_obj.iterateJsonFile(sections_path, True)
return sections
def main():
# get sections to do nlp processes
sections = get_sections()
# 1. classify
sections = do_classify(sections)
# 2. ner_recognize
sections = do_ner_recognize(sections)
# 3. word embedder
sections = do_word_embedder(sections)
# 4. keyword extract
sections = do_keyword_extract(sections)
# 5. simpify
sections = do_simplify(sections)
print('all nlp processes finished successfully!')

89
normalizer.py Normal file
View File

@ -0,0 +1,89 @@
import hazm
from cleantext import clean
import re
def cleanhtml(raw_html):
cleanr = re.compile('<.*?>')
cleantext = re.sub(cleanr, '', raw_html)
return cleantext
normalizer = hazm.Normalizer()
wierd_pattern = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
u"\U0001f926-\U0001f937"
u'\U00010000-\U0010ffff'
u"\u200d"
u"\u2640-\u2642"
u"\u2600-\u2B55"
u"\u23cf"
u"\u23e9"
u"\u231a"
u"\u3030"
u"\ufe0f"
u"\u2069"
u"\u2066"
# u"\u200c"
u"\u2068"
u"\u2067"
"]+", flags=re.UNICODE)
def cleaning(text):
text = text.strip()
# regular cleaning
# text = clean(text,
# fix_unicode=True,
# to_ascii=False,
# lower=True,
# no_line_breaks=True,
# no_urls=True,
# no_emails=True,
# no_phone_numbers=True,
# no_numbers=False,
# no_digits=False,
# no_currency_symbols=True,
# no_punct=False,
# replace_with_url="",
# replace_with_email="",
# replace_with_phone_number="",
# replace_with_number="",
# replace_with_digit="0",
# replace_with_currency_symbol="",
# )
text = clean(text,
extra_spaces = True,
lowercase = True
)
# cleaning htmls
text = cleanhtml(text)
# normalizing
text = normalizer.normalize(text)
# removing wierd patterns
text = wierd_pattern.sub(r'', text)
# removing extra spaces, hashtags
text = re.sub("#", "", text)
text = re.sub("\s+", " ", text)
return text
# with open('./ghavanins.txt', encoding="utf-8") as fp:
# current_content = fp.read()
# current_content = cleaning(current_content)
# with open('./ghavanins2.txt', 'wb') as f:
# f.write(current_content.encode('utf-8', 'ignore'))

View File

@ -0,0 +1,197 @@
"""
این سورس برای خواندن کل سکشن های قانون و پیشنهاد تعدادی کلاس بر اساس مدل آموزش دیده جهت این ماموریت مورد استفاده قرار می گیرد
"""
from transformers import pipeline
from normalizer import cleaning
from elastic_helper import ElasticHelper
import transformers
import json
import datetime
import pandas as pd
from transformers import AutoTokenizer
print(transformers.__version__)
# finetuned model for classification path
model_checkpoint = './models/classifier/findtuned_classification_hoosh_with_path_v2__30/checkpoint-1680'
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
# window_size = tokenizer.model_max_length#512#200
window_size = 512
"""
(یعنی سایز پنجره) به این دلیل که تعداد توکن های ورودی به مدل، محدود به متغیر بالاست
متن هایی که سایز آنها بیشتر از این مقدار باشد را از طریق یک پنجره لغزان که روی کل متن حرکت می کند، به چند قسمت تقسیم می کنیم و برای هر پنجره، به صورت جداگانه، کلاس دریافت می کنیم.
سایز این قسمت ها را در step_size معین کرده ایم
"""
step_size = 350#100
# تعداد کلاس هایی که بازای هر سکشن از مدل درخواست می کنیم
Top_k = 4
classifier = pipeline("text-classification", model_checkpoint, framework="pt")
def get_class(sentences, top_k:int=4):
# sentences = cleaning(sentences)
out = classifier(sentences, top_k=top_k, truncation=True, max_length=window_size)
return out
def mean_classes(input_classes):
pass
all_classes = []
for cclass in input_classes:
for item in cclass:
all_classes.append({
'label': item['label'],
'score': item['score']
})
# sorted_classes = sorted(all_classes, key=lambda x: x['class'])
classes_df = pd.DataFrame(all_classes)
# گروه بندی بر اساس کلاس
grouped_df = classes_df.groupby("label").agg(
total_value=("score", "sum"), # مجموع امتیازها
count=("score", "count") # تعداد تکرار هر کلاس
).reset_index()
# تعریف فاکتور وزن بر اساس تعداد تکرار کلاس
grouped_df["weight"] = grouped_df["count"]
# بازسازی امتیاز با دخالت دادن وزن
grouped_df["score"] = grouped_df["total_value"] * grouped_df["weight"]
# حذف ستون‌های اضافی و ایجاد دیتافریم نهایی
final_df = grouped_df[["label", "count", "score"]]
# مرتب سازی دیتافریم نهایی بر اساس بالاترین امتیاز کلاسها
sorted_df = final_df.sort_values(by="score", ascending=False)
# تبدیل دیتافریم به دیکشنری
top_n_classes = sorted_df.head(Top_k).to_dict(orient="records")
for item in top_n_classes:
# تبدیل امتیاز در مبنای درصد
item['score'] = (item['score']*100)/sorted_df['score'].sum()
item.pop('count')
return top_n_classes
def get_window_classes(text):
text_classes = []
tokens = tokenizer(text)['input_ids'][1:-1]
#print(len(tokens))
if len(tokens) > window_size:
for i in range(0, len(tokens), step_size):#- window_size + 1
start_window_slice = tokens[0: i]
window_slice = tokens[i: i + window_size]
start_char = len(tokenizer.decode(start_window_slice).replace('[UNK]', ''))
char_len = len(tokenizer.decode(window_slice).replace('[UNK]', ''))
context_slice = text[start_char: start_char + char_len]
tokens_len = len(tokenizer(context_slice)['input_ids'][1:-1])
# print(f'i: {i},token-len: {tokens_len}', flush=True)
results = get_class(context_slice, Top_k)
text_classes.append(results)
text_classes = mean_classes(text_classes)
else:
text_classes = get_class(text, Top_k)
return text_classes
def full_path_text_maker(full_path):
"""
این متد مسیر یک سکشن را می گیرد و متنی را بر اساس ترتیب بخش های آن از جزء به کل بازسازی می کند و بر می گرداند
full_path_text متن بازسازی شده از مسیر یک سکشن
"""
full_path_text = ""
for i, path_item in enumerate(reversed(full_path)):
if i == len(full_path) - 1:
full_path_text += ''.join(f'{path_item}')
break
full_path_text += ''.join(f'{path_item} از ')
full_path_text = full_path_text.strip()
return full_path_text
def do_classify(sections):
print(f'start classification: {datetime.datetime.now()}')
test_counter = 1
# all = تعداد سکشن های فایل قبلی 282671
all = 285839
# لیستی جهت ذخیره عناوین قانون ها
qanon_title_list = []
# دیکشنری برای ذخیره نتایج که شامل شناسه سکشن ها و 4 کلاس به ترتیب اولویت است
new_sections_dict = {}
for index, item in enumerate(sections):
id = item['id']
source = item['source']
# اگر نوع سکشن، عنوان یا موخره یا امضاء باشد، نیازی به فرایند کلاسبندی نیست و لیست کلاس های مربوط به این سکشن را خالی قرار می دهیم
if source['other_info']['full_path'] == 'عنوان' or source['other_info']['full_path'] == 'موخره' or source['other_info']['full_path'] == 'امضاء':
new_sections_dict[id] ={
"best-class":{},
"other-classes": []}
print(f'section: {all}/{index+1}/{id}', flush=True)
continue
content = source['content']
qanon_title = source['qanon_title']
# این متغیر ریشه سکشن اخیر تا رسیدن به قانون را مشخص می کند
# مثلا: ماده5>تبصره یک>بند الف
full_path = source['other_info']['full_path'].split(">")
# بازسازی متن مسیر سکشن از جزء به کل
full_path_text = full_path_text_maker(full_path)
"""
به دلیل اینکه متن برخی از سکشن ها به تنهایی معنادار نیستند، مسیر جزء به کل ماده و عنوان قانون را به اول همه سکشن ها اضافه می کنیم تا هم نقیصه معنادار نبودن موارد این چنینی را برطرف کنیم و هم برای کلاس بندی، انتخاب مدل را بر اساس عنوان قانون جهت دهی کنیم. به این صورت، مدل، عنوان قانون را هم در کلاس بندی دخیل خواهد کرد
"""
pre_content = f"محتوای {full_path_text} {cleaning(qanon_title)} متن زیر است."
try:
content = cleaning(content)
except Exception as e:
# شناسه ماده هایی که محتوای آنها خالی است در این مسیر ذخیره می شوند
with open('./data/empty_content_log.txt', 'a', encoding='utf-8') as output_file:
output_file.write(id + " >> " + str(e) + "\n")
continue
try:
# دریافت کلاس های مربوط به یک سکشن
section_classes = get_window_classes(f"{pre_content} {content}")
#region collect data for evaluation
""" این قسمت تا 7 خط بعدی، صرفا جهت جمع آوری و ذخیره تعدادی از سکشن ها و کلاس های پیش بینی شده برای آنها، به منظور ارزیابی عملکرد مدل توسط کاربر انسانی است و دخیل در فرایند کلاسبندی نیست"""
if (len(tokenizer(f"{pre_content} {content}")['input_ids'][1:-1]) < 1500) and not qanon_title in qanon_title_list:
with open('./data/classification/test_log_60e_hoosh_fp3.txt', 'a', encoding='utf-8') as output_file:
message = f"\n{test_counter}\n{id} : {pre_content} {content}\nclasses:\n"
for cls in section_classes:
message += f"{cls['label']} >> {cls['score']}\n"
output_file.write(message + "\n")
test_counter+=1
#endregion
except Exception as e:
error = e
with open('./data/classification/errors.txt', 'a', encoding='utf-8') as output_file:
output_file.write(f"{id} -- Error Content:{error}\n")
continue
# item['classes'] = section_classes
# ساماندهی کلاس های پیش بینی شده در عنوان بهترین کلاس و دیگر کلاسها بر اساس امتیاز تخمین مدل و ذخیره در دیکشنری
new_sections_dict[id] ={
"content" : content,
"best-class" : section_classes[0],
"other-classes" : section_classes[1:]
}
""" برای حالت تست که می خواهیم عملکرد مدل کلاسیفایر را ارزیابی کنیم، بدین جهت که تنوعی از قوانین مختلف را بررسی کنیم، عنوان قوانین را ذخیره می کنیم تا از تکرار بررسی سکشن های متعدد از یک قانون پرهیز شود"""
qanon_title_list.append(qanon_title)
print(f'section: {all}/{index+1}/{id}', flush=True)
# ذخیره دیکشنری شناسه های قانون و کلاس های تخمین زده شده در فایل جیسون
with open('./data/classification/all_sections_classes_new_140405.json', 'w', encoding='utf-8') as output_file:
json_data = json.dumps(new_sections_dict, indent=4, ensure_ascii=False)
output_file.write(json_data)
print(f'end: {datetime.datetime.now()}')
print('classification finished!')
classified_sections_dict = new_sections_dict
return classified_sections_dict

188
p2_ner_recognizer.py Normal file
View File

@ -0,0 +1,188 @@
"""
سورس تشخیص موجودیت های نامدار برای هر جزء قانون
"""
from flair.data import Sentence
from flair.models import SequenceTagger
import re
model = "./models/ner/2025-07-22--20-44-37--HooshvareLab--bert-fa-base-uncased-ner-peyma/final-model.pt"
tagger = SequenceTagger.load(model)
print('model read and tagger initialized')
def prepare_data(ner_obj_list):
ner_data_list = []
for ner_obj in ner_obj_list:
ner_data = {
"key" :ner_obj['ner_key'],
"value" :ner_obj['ner_value'],
"begin" :ner_obj['ner_start_token'],
"end" :ner_obj['ner_end_token'],
"score" :ner_obj['ner_score']
}
ner_data_list.append(ner_data)
return ner_data_list
def find_ner_values_in_text(text, ner_values):
text_temp = text
text_tokens = text.split()
ner_obj = []
difference = 0
for raw_item in ner_values:
raw_ner = raw_item['value']
ner = re.findall(r'"(.*?)"', raw_ner)[0]
if ner == ')' or ner == '(' or ner == '/' or ner == 'قانون تغییر' or ner == 'قانون' or ner == '.' or ner == '':
continue
ner_parts = raw_ner.split(ner)[1]
ner_parts = ner_parts.lstrip('"/')
ner_type = ner_parts.strip()
ner_score = raw_item['score'].strip()
ner_type = ner_type.strip()
ner_score = ner_score.strip()
# پیدا کردن موجودیت نامدار بالا در متن
matched_ner = [(m.start(), m.end()) for m in re.finditer(re.escape(ner), text_temp)]
if matched_ner:
matched_ner_start = matched_ner[0][0]
matched_ner_end = matched_ner[0][1]
before_ner_text = ''
if matched_ner_start > 1:
before_ner_text = text_temp[0:matched_ner_start-1]
difference = len(before_ner_text.split())
ner_start_token = difference
ner_end_token = len(ner.split()) + difference
if ner_end_token > len(text_tokens):
ner_start_token -= 1
ner_end_token -= 1
ner_tokens = [text_tokens[t] for t in range (ner_start_token,ner_end_token)]
# برای جلوگیری از خطای منطقی در هنگامی که مقدار
# ner
# بیشتر از یکبار در متن وجود دارد، موجودیت بررسی شده را با کاراکتر های خنثی جایگزین می کنیم
for t in range (ner_start_token,ner_end_token):
text_tokens[t] = '#####'
text_temp = ' '.join(text_tokens)
text_temp = text_temp.strip()
if matched_ner_start == 0:
difference = len(ner.split())
##################################
# پیدا کردن شناسه متناظر با این قانون
# law_id = find_related_law(ner.strip())
##################################
#endregion
ner_obj.append({
'ner_value' : ner.strip(),
'ner_start_token': ner_start_token,
'ner_end_token' : ner_end_token,
'ner_key' : ner_type.strip(),
'ner_score' : float(ner_score.strip()),
#'ner_tokens' : ner_tokens,
})
# if law_id != 0:
# ner_obj[len(ner_obj)-1]['ner_law_id']= law_id
return ner_obj
def inference_main(input_sentence):
try:
proccess_result = True, ''
result = []
# if len(input_sentence) > 511 :
# sentence_parts = input_sentence.split('.')
sentence_parts = split_sentence(input_sentence)
for part in sentence_parts:
sentence = Sentence(part)
tagger.predict(sentence)
for span in sentence.get_spans():
result.append(span)
final_result = ''
ner_values = []
if result:
for item in result:
value = item.annotation_layers['ner'][0].labeled_identifier
score = round(item.score, 2)
score = str(score)
final_result = final_result + '\n' + value + ' /%/ ' + score
ner_values.append({
'value':value,
'score':score
})
ner_obj_list = find_ner_values_in_text(input_sentence, ner_values)
except Exception as error:
proccess_result = False , error.args[0]
ner_obj_list = []
return ner_obj_list, input_sentence, proccess_result
# تابع بازگشتی برای تقسیم متن به تکه های کوچکتر از 512 کاراکتر
def split_sentence(input_sentence):
# تعریف یک لیست داخلی برای نگهداری بخش‌های تقسیم شده
parts = []
# کاراکترهایی که بر اساس آنها به ترتیب، یک متن را به زیرمتن های کوچک تر تبدیل می کنیم
separators = ['\n', '.', ':', '،']
# تابع بازگشتی
def recursive_split(sentence):
# اگر تعداد توکن های متن پاس داده شده کمتر یا برابر با 511 کاراکتر باشد، آن را به لیست اضافه کن
if len(sentence.split()) <= 256:
if sentence != '':
parts.append(sentence)
return
# تلاش برای استفاده از جداکننده‌های مختلف
for separator in separators:
if separator in sentence:
# تقسیم رشته با استفاده از جداکننده‌ی فعلی
split_parts = sentence.split(separator)
new_sentence = []
for part in split_parts:
new_sentence.append(part)
# بررسی اینکه آیا همه بخش‌ها به اندازه کافی کوچک شده‌اند
for part in new_sentence:
# print(len(part))
if len(part.split()) <= 256:
if part == '':
continue
parts.append(part)
else:
recursive_split(part)
return
# اگر هیچ جداکننده‌ای کار نکرد، رشته را به دو نیمه تقسیم کن
# mid_point = len(sentence) // 2
# recursive_split(sentence[:mid_point])
# recursive_split(sentence[mid_point:])
# شروع تقسیم بازگشتی
recursive_split(input_sentence)
return parts
def do_ner_recognize(sections):
len_sections = len(sections)
for index, section in enumerate(sections):
content = section['content']
ner_obj_list, content_ai, ner_result = inference_main(content)
ner_data_list = prepare_data(ner_obj_list)
section['ners_v2'] = ner_data_list
print(f'ner process: {section}/{len_sections}')
print(f'len_sections ner recognization finished!')
return sections

View File

4
p3_words_embedder.py Normal file
View File

@ -0,0 +1,4 @@
def do_word_embedder(sections):
pass

4
p4_keyword_extractor.py Normal file
View File

@ -0,0 +1,4 @@
def do_keyword_extract(sections):
pass

View File

4
p5_simplifier.py Normal file
View File

@ -0,0 +1,4 @@
def do_simplify(sections):
pass

View File