Compare commits

...

1 Commits
master ... md

Author SHA1 Message Date
d831e3ee52 adding 512tokenizer and split content 2025-10-09 15:08:13 +03:30
4 changed files with 251 additions and 11 deletions

135
512_tokens_finder.py Normal file
View File

@ -0,0 +1,135 @@
# بسم الله
from transformers import AutoTokenizer
from typing import List
import unicodedata
import json
import re
# -----------------------------
# Normalization Utilities
# -----------------------------
_ARABIC_TO_PERSIAN = {
"ي": "ی",
"ك": "ک",
"ۀ": "ه",
"ة": "ه",
"ؤ": "و",
"إ": "ا",
"أ": "ا",
"ٱ": "ا",
"آ": "ا",
}
sys_max = 0x110000
_DIACRITICS = dict.fromkeys(
i for i in range(sys_max) if unicodedata.category(chr(i)) == 'Mn'
)
_ZWNJ = "\u200c" # نیم‌فاصله
_TATWEEL = "\u0640" # کشیدگی
# الگوی Regex برای پیدا کردن کلمات (حروف، اعداد و زیرخط)
_TOKEN_RE = re.compile(r"[\w\u0600-\u06FF]+", re.UNICODE)
def nfkc(text: str) -> str:
"""متن را به فرم استاندارد NFKC یونی‌کد تبدیل می‌کند."""
return unicodedata.normalize("NFKC", text)
def strip_diacritics(text: str) -> str:
"""حرکات و علائم غیرفاصله‌دار را از متن حذف می‌کند."""
return text.translate(_DIACRITICS)
def unify_persian_arabic(text: str) -> str:
"""حروف عربی را به معادل فارسی آن‌ها تبدیل می‌کند."""
return text.translate(str.maketrans(_ARABIC_TO_PERSIAN))
def normalize_spaces(text: str) -> str:
"""نیم‌فاصله و کشیدگی را حذف و فواصل اضافی را یکپارچه می‌کند."""
text = text.replace(_ZWNJ, " ")
text = text.replace(_TATWEEL, "")
text = re.sub(r"\s+", " ", text)
return text.strip()
def normalize_text(text: str) -> str:
"""
تابع اصلی نرمالسازی که تمام مراحل را به ترتیب روی متن اعمال میکند.
"""
if not isinstance(text, str):
t = str(text)
else:
t = text
t = nfkc(t)
t = unify_persian_arabic(t)
t = strip_diacritics(t)
t = t.lower()
t = normalize_spaces(t)
return t
def tokenize(text: str) -> List[str]:
"""متن را به توکن‌ها (کلمات) تقسیم می‌کند."""
return _TOKEN_RE.findall(text)
model_name = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
longs_count = 0
shorts_count = 0
all_sections_count = 0
file_path = "ALL_SECTIONS.json"
with open(file_path, 'r', encoding='utf-8') as file:
ALL_SECTIONS = json.load(file)
NEW_ALL_SECTIONS = []
for section in ALL_SECTIONS :
all_sections_count+=1
sentence = section["sentence-content"]
normal_txt = normalize_text(sentence.strip())
tokens = tokenizer.tokenize(normal_txt)
if len(tokens) > 512 :
longs_count+=1
section["tokens_len"] = len(tokens)
section["is_long"] = True
section["is_short"] = False
elif len(tokens) < 10 :
shorts_count+=1
section["tokens_len"] = len(tokens)
section["is_long"] = False
section["is_short"] = True
else :
section["tokens_len"] = len(tokens)
section["is_long"] = False
section["is_short"] = False
NEW_ALL_SECTIONS.append(section)
with open('512t_ALL_SECTIONS.json', 'w', encoding='utf-8') as f:
json.dump(NEW_ALL_SECTIONS, f, indent=4, ensure_ascii=False)
print(f"All Sections : {all_sections_count}")
print(f"Long Sections : {longs_count}")
print(f"Short Sections : {shorts_count}")

View File

@ -14,16 +14,16 @@ class ElasticHelper():
def __init__(self, es_url="http://127.0.0.1:6900", es_pass="", es_user="elastic", path_mappings = ""): def __init__(self, es_url="http://127.0.0.1:6900", es_pass="", es_user="elastic", path_mappings = ""):
if path_mappings : # if path_mappings :
self.path_mappings = path_mappings # self.path_mappings = path_mappings
if es_pass == '' : # if es_pass == '' :
self.es = Elasticsearch(es_url) # self.es = Elasticsearch(es_url)
else: # else:
self.es = Elasticsearch( # self.es = Elasticsearch(
es_url, # es_url,
http_auth=(es_user, es_pass), # http_auth=(es_user, es_pass),
) # )
# print(es_url) # print(es_url)
# print(self.es) # print(self.es)

View File

@ -30,7 +30,7 @@ from transformers import AutoTokenizer
from sklearn.decomposition import PCA from sklearn.decomposition import PCA
from sklearn.manifold import TSNE from sklearn.manifold import TSNE
from sklearn.metrics.pairwise import cosine_similarity from sklearn.metrics.pairwise import cosine_similarity
#from normalizer import cleaning from normalizer import cleaning
try: try:
from elastic_helper import ElasticHelper from elastic_helper import ElasticHelper
except Exception as error: except Exception as error:
@ -44,7 +44,7 @@ except Exception as error:
# Persian text processing # Persian text processing
# import hazm # import hazm
# from hazm import Normalizer, word_tokenize, POSTagger from hazm import Normalizer, word_tokenize, POSTagger
# Configure logging # Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

View File

@ -0,0 +1,105 @@
# بسم الله
import json
from normalizer import cleaning
# try:
from elastic_helper import ElasticHelper
# except Exception as error:
# eee = error
# pass
def full_path_text_maker(full_path):
"""
این متد مسیر یک سکشن را می گیرد و متنی را بر اساس ترتیب بخش های آن از جزء به کل بازسازی می کند و بر می گرداند
Args:
full_path(list): لیستی از عناصر مشخص کننده مسیر درختی این سکشن
Returns:
full_path_text(str): متن بازسازی شده از مسیر یک سکشن
"""
full_path_text = ""
for i, path_item in enumerate(reversed(full_path)):
if i == len(full_path) - 1:
full_path_text += ''.join(f'{path_item}')
break
full_path_text += ''.join(f'{path_item} از ')
full_path_text = full_path_text.strip()
return full_path_text
if __name__ == "__main__":
eh_obj = ElasticHelper()
path = ".\data\mj_qa_section-v02.zip"
sections_elastic = eh_obj.iterateJsonFile(path, True)
all_count = 0
dont_cares = []
ALL_SECTIONS = []
n=0
for index, item in enumerate(sections_elastic):
source = item['source']
section_path = source['other_info']['full_path']
id = item['id']
filtered_keys = ['فصل','موخره','امضاء','عنوان']
section_path = source['other_info']['full_path']
flag = False
if '>' in section_path:
path_parts = section_path.split('>')
for key in filtered_keys:
if key in path_parts[-1]:
dont_cares.append(id)
flag = True
break
if flag:
continue
else:
for key in filtered_keys:
if key in section_path:
dont_cares.append(id)
flag = True
break
if flag:
continue
qanon_title = source['qanon_title']
full_path_text = full_path_text_maker(section_path.split('>'))
section_prefix = f"محتوای {full_path_text} {cleaning(qanon_title)} عبارت است از: "
try:
content = cleaning(item['source']['content'])
sentences = content.split(".")
# # کنار گذاشتن سکشن های خیلی کوچک که عملا محتوا ندارند
# if len(content.split()) <= 10:
# continue
except Exception as error:
print(error)
continue
for sentence in sentences:
if sentence == "":
continue
all_count +=1
sentence_id = f"sn{n}"
n+=1
data = {
'id': id,
"sentence_id" : sentence_id,
'fullpath': section_path,
'qanon-title': qanon_title,
'section-prefix': section_prefix,
'sentence-content': sentence
}
ALL_SECTIONS.append(data)
with open('ALL_SECTIONS.json', 'w', encoding='utf-8') as f:
json.dump(ALL_SECTIONS, f, indent=4, ensure_ascii=False)
print(f'all_count: {all_count}')
print(f'dont_cares: {len(dont_cares)}')
print(f'ALL_SECTIONS without dont-cares: {len(ALL_SECTIONS)}')