Classifier/sections_window_3.py
2025-07-13 17:32:37 +03:30

180 lines
7.3 KiB
Python

from transformers import pipeline
from normalizer import cleaning
from elastic_helper import ElasticHelper
import transformers
import json
import datetime
import pandas as pd
from transformers import AutoTokenizer
print(transformers.__version__)
#model_checkpoint = "./BERT/findtuned_classification_model_15"# 15 epoch
# NLP/MLM/CODES/BERT/similarity/findtuned_classification_model_with_path_v3.2_15/checkpoint-18525
# model_checkpoint = '/home/gpu/NLP/MLM/CODES/BERT/similarity/findtuned_classification_model_with_path_v3.2_15/checkpoint-18525'
# # آموزش با 15 ایپاک بدون پیش پردازش
# model_checkpoint = '/home/gpu/NLP/MLM/CODES/BERT/similarity/findtuned_classification_model_with_path_v2_15/checkpoint-915'
# # آموزش با 60 ایپاک بدون پیش پردازش
# model_checkpoint = '/home/gpu/NLP/MLM/CODES/BERT/similarity/findtuned_classification_model_with_path_v2__60/checkpoint-3660'
# # آموزش با 60 ایپاک با پیش پردازش
# model_checkpoint = '/home/gpu/NLP/MLM/CODES/BERT/similarity/findtuned_classification_model_with_path_v2_cleaning_60/checkpoint-3660'
# آموزش با 120 ایپاک بدون پیش پردازش
model_checkpoint = '/home/gpu/NLP/MLM/CODES/BERT/similarity/findtuned_classification_model_with_path_v2__120/checkpoint-7320'
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
window_size = tokenizer.model_max_length#512#200
step_size = 350#100
Top_k = 10
# with open('./data/errors.txt', 'r', encoding='utf-8') as input_file:
# error_sections_id = input_file.read().splitlines()
eh_obj = ElasticHelper()
path = "/home/gpu/data_11/mj_qa_section.zip"
sections = eh_obj.iterateJsonFile(path, True)
classifier = pipeline("text-classification", model_checkpoint, framework="pt")
def get_class(sentences, top_k:int=4):
# sentences = cleaning(sentences)
out = classifier(sentences, top_k=top_k, truncation=True, max_length=window_size)
return out
def mean_classes(input_classes):
pass
all_classes = []
for cclass in input_classes:
for item in cclass:
all_classes.append({
'label': item['label'],
'score': item['score']
})
# sorted_classes = sorted(all_classes, key=lambda x: x['class'])
classes_df = pd.DataFrame(all_classes)
# گروه بندی بر اساس کلاس
grouped_df = classes_df.groupby("label").agg(
total_value=("score", "sum"), # مجموع امتیازها
count=("score", "count") # تعداد تکرار هر کلاس
).reset_index()
# تعریف فاکتور وزن بر اساس تعداد تکرار کلاس
grouped_df["weight"] = grouped_df["count"]
# بازسازی امتیاز با دخالت دادن وزن
grouped_df["score"] = grouped_df["total_value"] * grouped_df["weight"]
# حذف ستون‌های اضافی و ایجاد دیتافریم نهایی
final_df = grouped_df[["label", "count", "score"]]
# مرتب سازی دیتافریم نهایی بر اساس بالاترین امتیاز کلاسها
sorted_df = final_df.sort_values(by="score", ascending=False)
# تبدیل دیتافریم به دیکشنری
top_n_classes = sorted_df.head(Top_k).to_dict(orient="records")
for item in top_n_classes:
# تبدیل امتیاز در مبنای درصد
item['score'] = (item['score']*100)/sorted_df['score'].sum()
item.pop('count')
return top_n_classes
def get_window_classes(text):
text_classes = []
tokens = tokenizer(text)['input_ids'][1:-1]
#print(len(tokens))
if len(tokens) > window_size:
for i in range(0, len(tokens), step_size):#- window_size + 1
start_window_slice = tokens[0: i]
window_slice = tokens[i: i + window_size]
start_char = len(tokenizer.decode(start_window_slice).replace('[UNK]', ''))
char_len = len(tokenizer.decode(window_slice).replace('[UNK]', ''))
context_slice = text[start_char: start_char + char_len]
tokens_len = len(tokenizer(context_slice)['input_ids'][1:-1])
# print(f'i: {i},token-len: {tokens_len}', flush=True)
results = get_class(context_slice, Top_k)
text_classes.append(results)
text_classes = mean_classes(text_classes)
else:
text_classes = get_class(text, Top_k)
return text_classes
print(f'start: {datetime.datetime.now()}')
cc_counter = 1
test_counter = 1
all = 282671
qanon_title_list = []
new_sections_dict = {}
for index, item in enumerate(sections):
if index > 500:
break
id = item['id']
source = item['source']
if source['other_info']['full_path'] == 'عنوان' or source['other_info']['full_path'] == 'موخره' or source['other_info']['full_path'] == 'امضاء':
new_sections_dict[id] ={
"best-class":{},
"other-classes": []
}
qanon_title_list.append(qanon_title)
print(f'section: {all}/{id}/{index+1}', flush=True)
print(cc_counter)
cc_counter += 1
continue
content0 = source['content']
qanon_title = source['qanon_title']
full_path = source['other_info']['full_path'].split(">")
full_path_text = ''
for i, path_item in enumerate(reversed(full_path)):
if i == len(full_path) - 1:
full_path_text += ''.join(f'{path_item}')
break
full_path_text += ''.join(f'{path_item} از ')
full_path_text = full_path_text.strip()
try:
content = cleaning(content0)
pre_content = f"{full_path_text} {cleaning(qanon_title)} عبارت است از: "
except Exception as e:
with open('./data/errors_log.txt', 'a', encoding='utf-8') as output_file:
output_file.write(id + " >> " + str(e) + "\n")
continue
try:
section_classes = get_window_classes(f"{pre_content} {content}")
if (len(tokenizer(f"{pre_content} {content}")['input_ids'][1:-1]) < 1500) and not qanon_title in qanon_title_list:
with open('./data/test_log.txt', 'a', encoding='utf-8') as output_file:
message = f"\n{test_counter}\n{id} : {pre_content} {content}\nclasses:\n"
for cls in section_classes:
message += f"{cls['label']} >> {cls['score']}\n"
output_file.write(message + "\n")
test_counter+=1
except Exception as e:
error = e
with open('./data/errors.txt', 'a', encoding='utf-8') as output_file:
output_file.write(id + "\n")
continue
# item['classes'] = section_classes
new_sections_dict[id] ={
"content": f"{pre_content} {content}",
"best-class":section_classes[0],
"other-classes": section_classes[1:]
}
qanon_title_list.append(qanon_title)
print(f'section: {all}/{id}/{index+1}', flush=True)
# with open('./data/all_sections_classes_new2.json', 'w', encoding='utf-8') as output_file:
# json_data = json.dumps(new_sections_dict, indent=4, ensure_ascii=False)
# output_file.write(json_data)
with open('./data/all_sections_classes_tttttesttttt.json', 'w', encoding='utf-8') as output_file:
json_data = json.dumps(new_sections_dict, indent=4, ensure_ascii=False)
output_file.write(json_data)
print(f'end: {datetime.datetime.now()}')
print('finished!')