301 lines
15 KiB
Python
301 lines
15 KiB
Python
|
||
"""
|
||
این سورس برای خواندن کل سکشن های قانون و پیشنهاد تعدادی کلاس بر اساس مدل آموزش دیده جهت این ماموریت مورد استفاده قرار می گیرد
|
||
"""
|
||
from transformers import pipeline
|
||
from normalizer import cleaning
|
||
import transformers
|
||
import json
|
||
import datetime
|
||
import pandas as pd
|
||
from transformers import AutoTokenizer
|
||
print(f'transformers version: {transformers.__version__}')
|
||
from elastic_helper import ElasticHelper
|
||
|
||
date = datetime.datetime.now()
|
||
today = f'{date.year}-{date.month}-{date.day}-{date.hour}'
|
||
|
||
# finetuned model for classification path
|
||
model_checkpoint = './models/classifier/findtuned_classification_hoosh_with_path_v2__30/checkpoint-1680'
|
||
|
||
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
|
||
|
||
window_size = 512
|
||
"""
|
||
(یعنی سایز پنجره) به این دلیل که تعداد توکن های ورودی به مدل، محدود به متغیر بالاست
|
||
متن هایی که سایز آنها بیشتر از این مقدار باشد را از طریق یک پنجره لغزان که روی کل متن حرکت می کند، به چند قسمت تقسیم می کنیم و برای هر پنجره، به صورت جداگانه، کلاس دریافت می کنیم.
|
||
سایز این قسمت ها را در step_size معین کرده ایم
|
||
"""
|
||
step_size = 350#100
|
||
# تعداد کلاس هایی که بازای هر سکشن از مدل درخواست می کنیم
|
||
Top_k = 4
|
||
# set device = 0 => to use GPU
|
||
classifier = pipeline("text-classification", model_checkpoint, framework="pt", device=0)
|
||
print(f'Classification Model Loaded: {model_checkpoint}')
|
||
|
||
def get_sections():
|
||
"""
|
||
دریافت کل سکشن های قانونی در مسیر مشخص شده
|
||
"""
|
||
sections_path = "/home/gpu/data_11/14040423/mj_qa_section.zip"
|
||
eh_obj = ElasticHelper()
|
||
sections = eh_obj.iterateJsonFile(sections_path, True)
|
||
sections = convert_to_dict(sections)
|
||
return sections
|
||
|
||
def convert_to_dict(sections):
|
||
"""
|
||
تبدیل لیست سکشن های قانون به دیکشنری
|
||
"""
|
||
sections_dict = {}
|
||
for item in sections:
|
||
id = item['id']
|
||
source = item['source']
|
||
sections_dict[id] = source
|
||
|
||
return sections_dict
|
||
|
||
def get_class(sentence, top_k:int=4):
|
||
"""
|
||
متدی برای تعیین تعدادی از بهترین کلاس های متناسب با متن ورودی
|
||
|
||
Args:
|
||
sentence(str): متنی که قرار است مدل، کلاس های آن را تشخیص دهد
|
||
top_k(int): تعداد کلاس هایی که بر اساس اولویت امتیاز، مدل باید تشخیص دهد
|
||
|
||
Returns:
|
||
recognized_class(list[obj]): لیستی از آبجکت کلاس ها شامل عنوان و امتیاز هر کلاس
|
||
"""
|
||
# sentences = cleaning(sentences)
|
||
recognized_class = classifier(sentence, top_k=top_k, truncation=True, max_length=window_size)
|
||
return recognized_class
|
||
|
||
def mean_classes(input_classes):
|
||
"""
|
||
محاسبه کلاس بر اساس میانگین امتیازات کلاس های مختلفی که در پنجره شناور برای یک متن به دست آمده است
|
||
|
||
Args:
|
||
input_classes(list[obj]): لیستی از آبجکت کلاس ها شامل عنوان کلاس و امتیاز مربوط به آن که مدل تشخیص داده است
|
||
|
||
Returns:
|
||
top_class(obj): تک آبجکتی شامل عنوان و امتیاز بهترین کلاس تشخیص داده شده برای این متن
|
||
"""
|
||
pass
|
||
all_classes = []
|
||
for cclass in input_classes:
|
||
for item in cclass:
|
||
all_classes.append({
|
||
'label': item['label'],
|
||
'score': item['score']
|
||
})
|
||
|
||
# sorted_classes = sorted(all_classes, key=lambda x: x['class'])
|
||
classes_df = pd.DataFrame(all_classes)
|
||
# گروه بندی بر اساس کلاس
|
||
grouped_df = classes_df.groupby("label").agg(
|
||
total_value=("score", "sum"), # مجموع امتیازها
|
||
count=("score", "count") # تعداد تکرار هر کلاس
|
||
).reset_index()
|
||
# تعریف فاکتور وزن بر اساس تعداد تکرار کلاس
|
||
grouped_df["weight"] = grouped_df["count"]
|
||
# بازسازی امتیاز با دخالت دادن وزن
|
||
grouped_df["score"] = grouped_df["total_value"] * grouped_df["weight"]
|
||
# حذف ستونهای اضافی و ایجاد دیتافریم نهایی
|
||
final_df = grouped_df[["label", "count", "score"]]
|
||
# مرتب سازی دیتافریم نهایی بر اساس بالاترین امتیاز کلاسها
|
||
sorted_df = final_df.sort_values(by="score", ascending=False)
|
||
# تبدیل دیتافریم به دیکشنری
|
||
top_n_classes = sorted_df.head(Top_k).to_dict(orient="records")
|
||
|
||
for item in top_n_classes:
|
||
# تبدیل امتیاز در مبنای درصد
|
||
item['score'] = (item['score']*100)/sorted_df['score'].sum()
|
||
item.pop('count')
|
||
|
||
return top_n_classes
|
||
|
||
def get_window_classes(text):
|
||
"""
|
||
این متد، متن ورودی را بررسی می کند و اگر اندازه آن بیشتر از اندازه ورودی قابل قبول برای مدل بود، یک پنجره به اندازه ای که قبلا تعریف شده روی متن حرکت می دهد و برای هر حرکت، متن پنجره را کلاس بندی می کند و لیست آبجکت کلاس ها را بر می گرداند
|
||
در صورتی که متن ورودی از اندازه مدل بزرگتر نبود، متن ورودی بدون ورود به فرایند پنجره ها کلاس بندی می شود
|
||
|
||
Args:
|
||
text(str): متن ورودی که باید کلاس بندی شود
|
||
|
||
Returns:
|
||
text_classes(list[obj]): لیست آبجکت ها شامل عنوان و امتیاز کلاس تشخیص داده شده برای این متن
|
||
"""
|
||
text_classes = []
|
||
tokens = tokenizer(text)['input_ids'][1:-1]
|
||
|
||
if len(tokens) > window_size:
|
||
for i in range(0, len(tokens), step_size):
|
||
start_window_slice = tokens[0: i]
|
||
window_slice = tokens[i: i + window_size]
|
||
start_char = len(tokenizer.decode(start_window_slice).replace('[UNK]', ''))
|
||
char_len = len(tokenizer.decode(window_slice).replace('[UNK]', ''))
|
||
context_slice = text[start_char: start_char + char_len]
|
||
# tokens_len = len(tokenizer(context_slice)['input_ids'][1:-1])
|
||
# print(f'i: {i},token-len: {tokens_len}', flush=True)
|
||
results = get_class(context_slice, Top_k)
|
||
text_classes.append(results)
|
||
|
||
# محاسبه بهترین کلاس ها بر اساس میانگین امتیازات کلاسهای تشخیص داده شده
|
||
text_classes = mean_classes(text_classes)
|
||
else:
|
||
text_classes = get_class(text, Top_k)
|
||
|
||
return text_classes
|
||
|
||
def full_path_text_maker(full_path):
|
||
"""
|
||
این متد مسیر یک سکشن را می گیرد و متنی را بر اساس ترتیب بخش های آن از جزء به کل بازسازی می کند و بر می گرداند
|
||
|
||
Args:
|
||
full_path_text(str): متن اولیه از مسیر یک سکشن
|
||
|
||
Returns:
|
||
full_path_text(str): متن بازسازی شده از مسیر یک سکشن
|
||
|
||
"""
|
||
full_path_text = ""
|
||
for i, path_item in enumerate(reversed(full_path)):
|
||
if i == len(full_path) - 1:
|
||
full_path_text += ''.join(f'{path_item}')
|
||
break
|
||
full_path_text += ''.join(f'{path_item} از ')
|
||
full_path_text = full_path_text.strip()
|
||
return full_path_text
|
||
|
||
def single_section_classification(id, section_source):
|
||
"""
|
||
این متد، متن ورودی را کلاسبندی می کند
|
||
|
||
**Args:
|
||
id (str): شناسه سکشن
|
||
section_source (obj): سورس یک سکشن که شامل متن قانون و متادیتاهای مربوط به آن می شود
|
||
|
||
**Returns:
|
||
classification_result(obj): چند کلاس پیشنهادی به ترتیب اولویت
|
||
classification_status(bool): بیان می کند که عملیات کلاس بندی موفق بوده یا خیر
|
||
desc(str): توضیحی در مورد موفقیت یا خطای عملیات ارائه می دهد
|
||
"""
|
||
classification_result ={
|
||
"best-class":{},
|
||
"other-classes": []}
|
||
content = section_source['content']
|
||
# اگر متن سکشن خالی بود، کلاس ها را به صورت خالی برگردان
|
||
if content =='':
|
||
return classification_result, True, 'Classification was successful'
|
||
|
||
# اگر نوع سکشن، عنوان یا موخره یا امضاء باشد، نیازی به فرایند کلاسبندی نیست و لیست کلاس های مربوط به این سکشن را خالی قرار می دهیم
|
||
filtered_keys = ['فصل','موخره','امضاء','عنوان']
|
||
section_path = section_source['other_info']['full_path']
|
||
if '>' in section_path:
|
||
path_parts = section_path.split('>')
|
||
for key in filtered_keys:
|
||
if key in path_parts[-1]:
|
||
return classification_result, True, 'Classification was successful'
|
||
else:
|
||
for key in filtered_keys:
|
||
if key in section_path:
|
||
return classification_result, True, 'Classification was successful'
|
||
|
||
qanon_title = section_source['qanon_title']
|
||
# این متغیر ریشه سکشن اخیر تا رسیدن به قانون را مشخص می کند
|
||
# مثلا: ماده5>تبصره یک>بند الف
|
||
full_path = section_source['other_info']['full_path'].split(">")
|
||
# بازسازی متن مسیر سکشن از جزء به کل
|
||
full_path_text = full_path_text_maker(full_path)
|
||
|
||
"""
|
||
به دلیل اینکه متن برخی از سکشن ها به تنهایی معنادار نیستند، مسیر جزء به کل ماده و عنوان قانون را به اول همه سکشن ها اضافه می کنیم تا هم نقیصه معنادار نبودن موارد این چنینی را برطرف کنیم و هم برای کلاس بندی، انتخاب مدل را بر اساس عنوان قانون جهت دهی کنیم. به این صورت، مدل، عنوان قانون را هم در کلاس بندی دخیل خواهد کرد
|
||
"""
|
||
pre_content = f"محتوای {full_path_text} {cleaning(qanon_title)} متن زیر است."
|
||
try:
|
||
content = cleaning(content)
|
||
except Exception as error:
|
||
# شناسه ماده هایی که محتوای آنها خالی است در این مسیر ذخیره می شوند
|
||
with open('./data/cleaning_content_log.txt', 'a', encoding='utf-8') as output_file:
|
||
output_file.write(id + " >> " + str(error) + "\n")
|
||
print('cleaning content error!')
|
||
return classification_result, False, str(error)
|
||
try:
|
||
# دریافت کلاس های مربوط به یک سکشن
|
||
section_classes = get_window_classes(f"{pre_content} {content}")
|
||
|
||
except Exception as error:
|
||
error_content = f"{id} -- Classification Error Content:{error}\n"
|
||
with open('./data/classification/errors.txt', 'a', encoding='utf-8') as output_file:
|
||
output_file.write(error_content)
|
||
print(error_content)
|
||
return classification_result, False, error_content
|
||
|
||
classification_result ={
|
||
"best-class" : section_classes[0],
|
||
"other-classes" : section_classes[1:]
|
||
}
|
||
return classification_result, True, 'Classification was successful'
|
||
|
||
def do_classify(sections):
|
||
"""
|
||
کلاسبندی مجموعه ای از سکشن های قانون به صورت یکجا در صورت نیاز
|
||
|
||
Args:
|
||
sections(list[obj]): لیستی از آبجکت سکشن های قانونی که در هر آبجکت، متادیتاهای مختلف آن سکشن قرار دارد
|
||
|
||
Returns:
|
||
sections(list[obj]): لیستی از آبجکت متادیتاهای سکشن های قانونی که در ورودی دریافت کرده بود که در این پردازش، متادیتای مربوط به کلاس نیز به آنها اضافه شده است
|
||
"""
|
||
print(f'start classification: {datetime.datetime.now()}')
|
||
|
||
test_counter = 1
|
||
# all = تعداد سکشن های فایل قبلی 282671
|
||
all = len(sections)
|
||
# لیستی جهت ذخیره عناوین قانون ها
|
||
qanon_title_list = []
|
||
# دیکشنری برای ذخیره نتایج که شامل شناسه سکشن ها و 4 کلاس به ترتیب اولویت است
|
||
new_sections_dict = {}
|
||
|
||
for index, id in enumerate(sections):
|
||
|
||
source = sections[id]
|
||
classification_result, classification_status, desc = single_section_classification(id, source)
|
||
|
||
if not classification_status:
|
||
print(f'id: {id} classification error. error description: {desc}')
|
||
|
||
# ساماندهی کلاس های پیش بینی شده در عنوان بهترین کلاس و دیگر کلاسها بر اساس امتیاز تخمین مدل و ذخیره در دیکشنری
|
||
# new_sections_dict[id] = classification_result
|
||
sections[id]['ai_codes'] = classification_result
|
||
|
||
""" برای حالت تست که می خواهیم عملکرد مدل کلاسیفایر را ارزیابی کنیم، بدین جهت که تنوعی از قوانین مختلف را بررسی کنیم، عنوان قوانین را ذخیره می کنیم تا از تکرار بررسی سکشن های متعدد از یک قانون پرهیز شود"""
|
||
# qanon_title = source['qanon_title']
|
||
# qanon_title_list.append(qanon_title)
|
||
print(f'section: {all}/{index+1}/{id}', flush=True)
|
||
# ذخیره دیکشنری شناسه های قانون و کلاس های تخمین زده شده در فایل جیسون
|
||
with open('./data/classification/all_sections_classes_new_1404--.json', 'w', encoding='utf-8') as output_file:
|
||
json_data = json.dumps(new_sections_dict, indent=4, ensure_ascii=False)
|
||
output_file.write(json_data)
|
||
|
||
print(f'end: {datetime.datetime.now()}')
|
||
print('classification finished!')
|
||
|
||
# classified_sections_dict = new_sections_dict
|
||
|
||
return sections
|
||
|
||
if __name__ == '__main__':
|
||
|
||
sections = get_sections()
|
||
|
||
# اجرای عملیات کلاس بندی
|
||
classified_sections = do_classify(sections)
|
||
|
||
with open(f'classified_sections_{today}.json', 'w', encoding='utf-8') as output:
|
||
data = json.dumps(classified_sections, ensure_ascii=False, indent=4)
|
||
output.write(data)
|
||
|
||
print(f'end: {datetime.datetime.now()}')
|
||
|
||
print('finished ner recognization!') |