data_processes/p1_classifier.py
2025-08-11 19:56:29 +03:30

206 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
این سورس برای خواندن کل سکشن های قانون و پیشنهاد تعدادی کلاس بر اساس مدل آموزش دیده جهت این ماموریت مورد استفاده قرار می گیرد
"""
from transformers import pipeline
from normalizer import cleaning
from elastic_helper import ElasticHelper
import transformers
import json
import datetime
import pandas as pd
from transformers import AutoTokenizer
print(transformers.__version__)
# finetuned model for classification path
model_checkpoint = './models/classifier/findtuned_classification_hoosh_with_path_v2__30/checkpoint-1680'
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
# window_size = tokenizer.model_max_length#512#200
window_size = 512
"""
(یعنی سایز پنجره) به این دلیل که تعداد توکن های ورودی به مدل، محدود به متغیر بالاست
متن هایی که سایز آنها بیشتر از این مقدار باشد را از طریق یک پنجره لغزان که روی کل متن حرکت می کند، به چند قسمت تقسیم می کنیم و برای هر پنجره، به صورت جداگانه، کلاس دریافت می کنیم.
سایز این قسمت ها را در step_size معین کرده ایم
"""
step_size = 350#100
# تعداد کلاس هایی که بازای هر سکشن از مدل درخواست می کنیم
Top_k = 4
classifier = pipeline("text-classification", model_checkpoint, framework="pt")
def get_class(sentences, top_k:int=4):
# sentences = cleaning(sentences)
out = classifier(sentences, top_k=top_k, truncation=True, max_length=window_size)
return out
def mean_classes(input_classes):
pass
all_classes = []
for cclass in input_classes:
for item in cclass:
all_classes.append({
'label': item['label'],
'score': item['score']
})
# sorted_classes = sorted(all_classes, key=lambda x: x['class'])
classes_df = pd.DataFrame(all_classes)
# گروه بندی بر اساس کلاس
grouped_df = classes_df.groupby("label").agg(
total_value=("score", "sum"), # مجموع امتیازها
count=("score", "count") # تعداد تکرار هر کلاس
).reset_index()
# تعریف فاکتور وزن بر اساس تعداد تکرار کلاس
grouped_df["weight"] = grouped_df["count"]
# بازسازی امتیاز با دخالت دادن وزن
grouped_df["score"] = grouped_df["total_value"] * grouped_df["weight"]
# حذف ستون‌های اضافی و ایجاد دیتافریم نهایی
final_df = grouped_df[["label", "count", "score"]]
# مرتب سازی دیتافریم نهایی بر اساس بالاترین امتیاز کلاسها
sorted_df = final_df.sort_values(by="score", ascending=False)
# تبدیل دیتافریم به دیکشنری
top_n_classes = sorted_df.head(Top_k).to_dict(orient="records")
for item in top_n_classes:
# تبدیل امتیاز در مبنای درصد
item['score'] = (item['score']*100)/sorted_df['score'].sum()
item.pop('count')
return top_n_classes
def get_window_classes(text):
text_classes = []
tokens = tokenizer(text)['input_ids'][1:-1]
#print(len(tokens))
if len(tokens) > window_size:
for i in range(0, len(tokens), step_size):#- window_size + 1
start_window_slice = tokens[0: i]
window_slice = tokens[i: i + window_size]
start_char = len(tokenizer.decode(start_window_slice).replace('[UNK]', ''))
char_len = len(tokenizer.decode(window_slice).replace('[UNK]', ''))
context_slice = text[start_char: start_char + char_len]
tokens_len = len(tokenizer(context_slice)['input_ids'][1:-1])
# print(f'i: {i},token-len: {tokens_len}', flush=True)
results = get_class(context_slice, Top_k)
text_classes.append(results)
text_classes = mean_classes(text_classes)
else:
text_classes = get_class(text, Top_k)
return text_classes
def full_path_text_maker(full_path):
"""
این متد مسیر یک سکشن را می گیرد و متنی را بر اساس ترتیب بخش های آن از جزء به کل بازسازی می کند و بر می گرداند
full_path_text متن بازسازی شده از مسیر یک سکشن
"""
full_path_text = ""
for i, path_item in enumerate(reversed(full_path)):
if i == len(full_path) - 1:
full_path_text += ''.join(f'{path_item}')
break
full_path_text += ''.join(f'{path_item} از ')
full_path_text = full_path_text.strip()
return full_path_text
def single_section_classification(id, section_source):
"""
این متد، متن ورودی را کلاسبندی می کند
**Args:
id (str): شناسه سکشن
section_source (obj): سورس یک سکشن که شامل متن قانون و متادیتاهای مربوط به آن می شود
**Returns:
classification_result(obj): چند کلاس پیشنهادی به ترتیب اولویت
classification_status(bool): بیان می کند که عملیات کلاس بندی موفق بوده یا خیر
desc(str): توضیحی در مورد موفقیت یا خطای عملیات ارائه می دهد
"""
classification_result ={
"best-class":{},
"other-classes": []}
content = section_source['content']
# اگر نوع سکشن، عنوان یا موخره یا امضاء باشد، نیازی به فرایند کلاسبندی نیست و لیست کلاس های مربوط به این سکشن را خالی قرار می دهیم
if content =='' or section_source['other_info']['full_path'] == 'عنوان' or section_source['other_info']['full_path'] == 'موخره' or section_source['other_info']['full_path'] == 'امضاء':
return classification_result, True, 'Classification was successful'
qanon_title = section_source['qanon_title']
# این متغیر ریشه سکشن اخیر تا رسیدن به قانون را مشخص می کند
# مثلا: ماده5>تبصره یک>بند الف
full_path = section_source['other_info']['full_path'].split(">")
# بازسازی متن مسیر سکشن از جزء به کل
full_path_text = full_path_text_maker(full_path)
"""
به دلیل اینکه متن برخی از سکشن ها به تنهایی معنادار نیستند، مسیر جزء به کل ماده و عنوان قانون را به اول همه سکشن ها اضافه می کنیم تا هم نقیصه معنادار نبودن موارد این چنینی را برطرف کنیم و هم برای کلاس بندی، انتخاب مدل را بر اساس عنوان قانون جهت دهی کنیم. به این صورت، مدل، عنوان قانون را هم در کلاس بندی دخیل خواهد کرد
"""
pre_content = f"محتوای {full_path_text} {cleaning(qanon_title)} متن زیر است."
try:
content = cleaning(content)
except Exception as error:
# شناسه ماده هایی که محتوای آنها خالی است در این مسیر ذخیره می شوند
with open('./data/cleaning_content_log.txt', 'a', encoding='utf-8') as output_file:
output_file.write(id + " >> " + str(error) + "\n")
print('cleaning content error!')
return classification_result, False, str(error)
try:
# دریافت کلاس های مربوط به یک سکشن
section_classes = get_window_classes(f"{pre_content} {content}")
except Exception as error:
error_content = f"{id} -- Classification Error Content:{error}\n"
with open('./data/classification/errors.txt', 'a', encoding='utf-8') as output_file:
output_file.write(error_content)
print(error_content)
return classification_result, False, error_content
classification_result ={
"best-class" : section_classes[0],
"other-classes" : section_classes[1:]
}
return classification_result, True, 'Classification was successful'
def do_classify(sections):
print(f'start classification: {datetime.datetime.now()}')
test_counter = 1
# all = تعداد سکشن های فایل قبلی 282671
all = len(sections)
# لیستی جهت ذخیره عناوین قانون ها
qanon_title_list = []
# دیکشنری برای ذخیره نتایج که شامل شناسه سکشن ها و 4 کلاس به ترتیب اولویت است
new_sections_dict = {}
for index, id in enumerate(sections):
source = sections[id]['source']
classification_result, classification_status, desc = single_section_classification(id, source)
if not classification_status:
print(f'id: {id} classification error. error description: {desc}')
# ساماندهی کلاس های پیش بینی شده در عنوان بهترین کلاس و دیگر کلاسها بر اساس امتیاز تخمین مدل و ذخیره در دیکشنری
new_sections_dict[id] = classification_result
""" برای حالت تست که می خواهیم عملکرد مدل کلاسیفایر را ارزیابی کنیم، بدین جهت که تنوعی از قوانین مختلف را بررسی کنیم، عنوان قوانین را ذخیره می کنیم تا از تکرار بررسی سکشن های متعدد از یک قانون پرهیز شود"""
# qanon_title = source['qanon_title']
# qanon_title_list.append(qanon_title)
print(f'section: {all}/{index+1}/{id}', flush=True)
# ذخیره دیکشنری شناسه های قانون و کلاس های تخمین زده شده در فایل جیسون
with open('./data/classification/all_sections_classes_new_140405.json', 'w', encoding='utf-8') as output_file:
json_data = json.dumps(new_sections_dict, indent=4, ensure_ascii=False)
output_file.write(json_data)
print(f'end: {datetime.datetime.now()}')
print('classification finished!')
classified_sections_dict = new_sections_dict
return classified_sections_dict