530 lines
18 KiB
Python
530 lines
18 KiB
Python
# !pip install hazm
|
|
# !pip install transformers==4.26.0
|
|
# !pip install --upgrade numpy
|
|
# !pip install --upgrade sentence-transformers
|
|
"""
|
|
Persian Sentence Processing and Vector Analysis
|
|
==============================================
|
|
|
|
This script processes Persian sentences from a JSON file and performs:
|
|
1. Word extraction and preprocessing
|
|
2. Vector representation using multilingual transformer
|
|
3. Similarity analysis for key words
|
|
4. Dimensionality reduction to 3D
|
|
5. 3D visualization with Persian labels
|
|
|
|
Author: NLP Expert Assistant
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
import numpy as np
|
|
import pandas as pd
|
|
from typing import List, Dict, Tuple, Set
|
|
from collections import Counter
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
# NLP and ML libraries
|
|
from sentence_transformers import SentenceTransformer
|
|
from sklearn.decomposition import PCA
|
|
from sklearn.manifold import TSNE
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
|
|
# Visualization libraries
|
|
# import matplotlib.pyplot as plt
|
|
# import plotly.graph_objects as go
|
|
# import plotly.express as px
|
|
# from plotly.subplots import make_subplots
|
|
|
|
# Persian text processing
|
|
import hazm
|
|
from hazm import Normalizer, word_tokenize, POSTagger
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class PersianVectorAnalyzer:
|
|
"""
|
|
A comprehensive class for Persian text processing and vector analysis.
|
|
"""
|
|
|
|
def __init__(self, model_name: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):
|
|
"""
|
|
Initialize the analyzer with the specified model.
|
|
|
|
Args:
|
|
model_name: The sentence transformer model to use
|
|
"""
|
|
self.model_name = model_name
|
|
self.model = None
|
|
self.normalizer = Normalizer()
|
|
self.stop_words = self._load_persian_stop_words()
|
|
self.key_words = [
|
|
"خدا", "بنده", "جهاد", "ولی", "زکات",
|
|
"نماز", "صبر", "عبادت", "ولایت", "خلافت","پیامبر"
|
|
]
|
|
|
|
logger.info(f"Initializing Persian Vector Analyzer with model: {model_name}")
|
|
|
|
def _load_persian_stop_words(self) -> Set[str]:
|
|
"""
|
|
Load Persian stop words.
|
|
|
|
Returns:
|
|
Set of Persian stop words
|
|
"""
|
|
# Common Persian stop words
|
|
stop_words = {
|
|
'و', 'در', 'به', 'از', 'که', 'این', 'آن', 'با', 'برای', 'تا',
|
|
'را', 'هم', 'یا', 'اما', 'اگر', 'چون', 'چرا', 'چگونه', 'کجا',
|
|
'چه', 'کی', 'چند', 'چقدر', 'همه', 'هیچ', 'بعضی', 'هر', 'همه',
|
|
'خود', 'خویش', 'ما', 'شما', 'آنها', 'ایشان', 'اینها', 'آنها',
|
|
'من', 'تو', 'او', 'ما', 'شما', 'آنها', 'ایشان', 'اینها',
|
|
'است', 'هست', 'بود', 'شد', 'می', 'باید', 'خواهد', 'دارد',
|
|
'کرد', 'شد', 'بود', 'هست', 'است', 'میشود', 'میکند',
|
|
'یک', 'دو', 'سه', 'چهار', 'پنج', 'شش', 'هفت', 'هشت', 'نه', 'ده',
|
|
'اول', 'دوم', 'سوم', 'چهارم', 'پنجم', 'ششم', 'هفتم', 'هشتم', 'نهم', 'دهم',
|
|
'سال', 'ماه', 'روز', 'هفته', 'ساعت', 'دقیقه', 'ثانیه','پس'
|
|
'بله', 'نه', 'آری', 'خیر', 'بلی', 'نخیر',
|
|
'حالا', 'الان', 'امروز', 'دیروز', 'فردا', 'هفته', 'ماه', 'سال',
|
|
'بالا', 'پایین', 'چپ', 'راست', 'جلو', 'عقب', 'داخل', 'خارج',
|
|
'بزرگ', 'کوچک', 'بلند', 'کوتاه', 'پهن', 'باریک', 'ضخیم', 'نازک',
|
|
|
|
|
|
|
|
}
|
|
return stop_words
|
|
|
|
def load_model(self):
|
|
"""
|
|
Load the sentence transformer model.
|
|
"""
|
|
try:
|
|
logger.info("Loading sentence transformer model...")
|
|
self.model = SentenceTransformer(self.model_name)
|
|
logger.info("Model loaded successfully!")
|
|
except Exception as e:
|
|
logger.error(f"Error loading model: {e}")
|
|
raise
|
|
|
|
def load_json_data(self, file_path: str) -> List[str]:
|
|
"""
|
|
Load Persian sentences from JSON file.
|
|
|
|
Args:
|
|
file_path: Path to the JSON file
|
|
|
|
Returns:
|
|
List of Persian sentences
|
|
"""
|
|
try:
|
|
logger.info(f"Loading data from {file_path}")
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
if type(data) == dict:
|
|
temp_data = []
|
|
for item in data.items():
|
|
temp_data.append(item[1])
|
|
data = temp_data
|
|
|
|
sentences = []
|
|
if isinstance(data, list):
|
|
for item in data:
|
|
if isinstance(item, dict):
|
|
# Extract sentences from different possible keys
|
|
for key in ['persian_translate']:
|
|
if key in item and item[key]:
|
|
sentences.append(str(item[key]))
|
|
elif isinstance(item, str):
|
|
sentences.append(item)
|
|
elif isinstance(data, dict):
|
|
# If it's a single object, extract all string values
|
|
for value in data.values():
|
|
if isinstance(value, str):
|
|
sentences.append(value)
|
|
|
|
logger.info(f"Loaded {len(sentences)} sentences")
|
|
return sentences
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading JSON data: {e}")
|
|
raise
|
|
|
|
def preprocess_text(self, text: str) -> str:
|
|
"""
|
|
Preprocess Persian text.
|
|
|
|
Args:
|
|
text: Raw Persian text
|
|
|
|
Returns:
|
|
Preprocessed text
|
|
"""
|
|
|
|
# Normalize text
|
|
text = self.normalizer.normalize(text)
|
|
|
|
# Remove extra whitespace
|
|
text = re.sub(r'\s+', ' ', text)
|
|
|
|
# Remove special characters but keep Persian characters
|
|
text = re.sub(r'[^\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\s]', '', text)
|
|
|
|
return text.strip()
|
|
|
|
def extract_words(self, sentences: List[str]) -> List[str]:
|
|
"""
|
|
Extract all words from sentences.
|
|
|
|
Args:
|
|
sentences: List of Persian sentences
|
|
|
|
Returns:
|
|
List of all words
|
|
"""
|
|
all_words = []
|
|
|
|
for sentence in sentences:
|
|
# Preprocess sentence
|
|
processed_sentence = self.preprocess_text(sentence)
|
|
|
|
# Tokenize
|
|
words = word_tokenize(processed_sentence)
|
|
# words = processed_sentence.split()
|
|
# Filter out empty strings and very short words
|
|
words = [word for word in words if len(word) > 1]
|
|
|
|
all_words.extend(words)
|
|
|
|
logger.info(f"Extracted {len(all_words)} words from {len(sentences)} sentences")
|
|
return all_words
|
|
|
|
def remove_stop_words(self, words: List[str]) -> List[str]:
|
|
"""
|
|
Remove stop words from the word list.
|
|
|
|
Args:
|
|
words: List of words
|
|
|
|
Returns:
|
|
List of words without stop words
|
|
"""
|
|
filtered_words = [word for word in words if word not in self.stop_words]
|
|
logger.info(f"Removed {len(words) - len(filtered_words)} stop words")
|
|
return filtered_words
|
|
|
|
def get_unique_words(self, words: List[str]) -> List[str]:
|
|
"""
|
|
Get unique words from the list.
|
|
|
|
Args:
|
|
words: List of words
|
|
|
|
Returns:
|
|
List of unique words
|
|
"""
|
|
unique_words = list(set(words))
|
|
logger.info(f"Found {len(unique_words)} unique words from {len(words)} total words")
|
|
return unique_words
|
|
|
|
def compute_word_vectors(self, words: List[str]) -> Dict[str, List[float]]:
|
|
"""
|
|
Compute vector representations for words.
|
|
|
|
Args:
|
|
words: List of unique words
|
|
|
|
Returns:
|
|
Dictionary mapping words to their vector representations
|
|
"""
|
|
if self.model is None:
|
|
self.load_model()
|
|
|
|
logger.info(f"Computing vectors for {len(words)} words...")
|
|
|
|
# Compute embeddings
|
|
embeddings = self.model.encode(words, show_progress_bar=True)
|
|
|
|
# Create dictionary
|
|
word_vectors = {}
|
|
for i, word in enumerate(words):
|
|
word_vectors[word] = embeddings[i].tolist()
|
|
|
|
logger.info("Word vectors computed successfully!")
|
|
return word_vectors
|
|
|
|
def find_closest_words(self, word_vectors: Dict[str, List[float]],
|
|
key_words: List[str], top_k: int = 20) -> Dict[str, List[str]]:
|
|
"""
|
|
Find the closest words to each key word.
|
|
|
|
Args:
|
|
word_vectors: Dictionary of word vectors
|
|
key_words: List of key words to find neighbors for
|
|
top_k: Number of closest words to find
|
|
|
|
Returns:
|
|
Dictionary mapping key words to their closest neighbors
|
|
"""
|
|
logger.info(f"Finding {top_k} closest words for {len(key_words)} key words...")
|
|
|
|
# Convert to numpy arrays for faster computation
|
|
words = list(word_vectors.keys())
|
|
vectors = np.array(list(word_vectors.values()))
|
|
|
|
closest_words = {}
|
|
|
|
for key_word in key_words:
|
|
if key_word in word_vectors:
|
|
# Get the key word vector
|
|
key_vector = np.array(word_vectors[key_word]).reshape(1, -1)
|
|
|
|
# Compute cosine similarities
|
|
similarities = cosine_similarity(key_vector, vectors)[0]
|
|
|
|
# Get indices of top k similar words (excluding the key word itself)
|
|
word_indices = np.argsort(similarities)[::-1]
|
|
|
|
# Filter out the key word itself and get top k
|
|
closest_indices = []
|
|
for idx in word_indices:
|
|
if words[idx] != key_word and len(closest_indices) < top_k:
|
|
closest_indices.append(idx)
|
|
|
|
# Get the closest words
|
|
closest_words[key_word] = [words[idx] for idx in closest_indices]
|
|
logger.info(f"Found {len(closest_words[key_word])} closest words for '{key_word}'")
|
|
else:
|
|
logger.warning(f"Key word '{key_word}' not found in word vectors")
|
|
closest_words[key_word] = []
|
|
|
|
return closest_words
|
|
|
|
def reduce_to_3d(self, word_vectors: Dict[str, List[float]],
|
|
method: str = 'tsne') -> Dict[str, List[float]]:
|
|
"""
|
|
Reduce word vectors to 3D coordinates.
|
|
|
|
Args:
|
|
word_vectors: Dictionary of word vectors
|
|
method: Dimensionality reduction method ('pca' or 'tsne')
|
|
|
|
Returns:
|
|
Dictionary mapping words to their 3D coordinates
|
|
"""
|
|
logger.info(f"Reducing dimensions to 3D using {method.upper()}...")
|
|
|
|
words = list(word_vectors.keys())
|
|
vectors = np.array(list(word_vectors.values()))
|
|
|
|
if method.lower() == 'pca':
|
|
reducer = PCA(n_components=3, random_state=42)
|
|
elif method.lower() == 'tsne':
|
|
reducer = TSNE(n_components=3, random_state=42, perplexity=min(30, len(vectors)-1))
|
|
else:
|
|
raise ValueError("Method must be 'pca' or 'tsne'")
|
|
|
|
# Reduce dimensions
|
|
reduced_vectors = reducer.fit_transform(vectors)
|
|
|
|
# Create dictionary
|
|
word_vectors_3d = {}
|
|
for i, word in enumerate(words):
|
|
word_vectors_3d[word] = reduced_vectors[i].tolist()
|
|
|
|
logger.info("Dimensionality reduction completed!")
|
|
return word_vectors_3d
|
|
|
|
def save_json(self, data: dict, file_path: str):
|
|
"""
|
|
Save data to JSON file.
|
|
|
|
Args:
|
|
data: Data to save
|
|
file_path: Output file path
|
|
"""
|
|
try:
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
logger.info(f"Data saved to {file_path}")
|
|
except Exception as e:
|
|
logger.error(f"Error saving to {file_path}: {e}")
|
|
raise
|
|
|
|
# def create_3d_visualization(self, word_vectors_3d: Dict[str, List[float]],
|
|
# selected_words: Dict[str, List[str]],
|
|
# output_path: str = "persian_words_3d.html"):
|
|
# """
|
|
# Create 3D visualization of words.
|
|
|
|
# Args:
|
|
# word_vectors_3d: Dictionary of 3D word coordinates
|
|
# selected_words: Dictionary of selected words for each key word
|
|
# output_path: Output file path for the visualization
|
|
# """
|
|
# logger.info("Creating 3D visualization...")
|
|
|
|
# # Prepare data for plotting
|
|
# words = list(word_vectors_3d.keys())
|
|
# coords = np.array(list(word_vectors_3d.values()))
|
|
|
|
# # Create color mapping for key words and their neighbors
|
|
# colors = []
|
|
# sizes = []
|
|
# hover_texts = []
|
|
|
|
# for word in words:
|
|
# # Check if word is a key word
|
|
# is_key_word = word in self.key_words
|
|
|
|
# # Check if word is in selected words
|
|
# in_selected = False
|
|
# key_word_group = None
|
|
# for key_word, selected_list in selected_words.items():
|
|
# if word in selected_list:
|
|
# in_selected = True
|
|
# key_word_group = key_word
|
|
# break
|
|
|
|
# if is_key_word:
|
|
# colors.append('red')
|
|
# sizes.append(15)
|
|
# hover_texts.append(f"کلیدواژه: {word}")
|
|
# elif in_selected:
|
|
# colors.append('blue')
|
|
# sizes.append(10)
|
|
# hover_texts.append(f"کلمه مرتبط با '{key_word_group}': {word}")
|
|
# else:
|
|
# colors.append('lightgray')
|
|
# sizes.append(5)
|
|
# hover_texts.append(f"کلمه: {word}")
|
|
|
|
# # Create 3D scatter plot
|
|
# fig = go.Figure()
|
|
|
|
# # Add scatter plot
|
|
# fig.add_trace(go.Scatter3d(
|
|
# x=coords[:, 0],
|
|
# y=coords[:, 1],
|
|
# z=coords[:, 2],
|
|
# mode='markers+text',
|
|
# marker=dict(
|
|
# size=sizes,
|
|
# color=colors,
|
|
# opacity=0.8
|
|
# ),
|
|
# text=words,
|
|
# textposition="middle center",
|
|
# hovertext=hover_texts,
|
|
# hoverinfo='text'
|
|
# ))
|
|
|
|
# # Update layout
|
|
# fig.update_layout(
|
|
# title={
|
|
# 'text': 'نمایش سهبعدی کلمات فارسی',
|
|
# 'x': 0.5,
|
|
# 'xanchor': 'center',
|
|
# 'font': {'size': 20}
|
|
# },
|
|
# scene=dict(
|
|
# xaxis_title='محور X',
|
|
# yaxis_title='محور Y',
|
|
# zaxis_title='محور Z',
|
|
# camera=dict(
|
|
# eye=dict(x=1.5, y=1.5, z=1.5)
|
|
# )
|
|
# ),
|
|
# width=1000,
|
|
# height=800,
|
|
# showlegend=False
|
|
# )
|
|
|
|
# # Save the plot
|
|
# fig.write_html(output_path)
|
|
# logger.info(f"3D visualization saved to {output_path}")
|
|
|
|
# return fig
|
|
|
|
def process_pipeline(self, input_file: str, output_dir: str = "output"):
|
|
"""
|
|
Run the complete processing pipeline.
|
|
|
|
Args:
|
|
input_file: Path to input JSON file
|
|
output_dir: Output directory for results
|
|
"""
|
|
# Create output directory
|
|
Path(output_dir).mkdir(exist_ok=True)
|
|
|
|
logger.info("Starting Persian Vector Analysis Pipeline...")
|
|
|
|
# Step 1: Load data
|
|
sentences = self.load_json_data(input_file)
|
|
|
|
# Step 2: Extract words
|
|
all_words = self.extract_words(sentences)
|
|
|
|
# Step 3: Remove stop words
|
|
# filtered_words = self.remove_stop_words(all_words)
|
|
filtered_words = all_words
|
|
|
|
# Step 4: Get unique words
|
|
unique_words = self.get_unique_words(filtered_words)
|
|
|
|
# Step 5: Compute word vectors
|
|
word_vectors = self.compute_word_vectors(unique_words)
|
|
|
|
# Step 6: Save word vectors
|
|
self.save_json(word_vectors, f"{output_dir}/words_vector.json")
|
|
|
|
# Step 7: Find closest words to key words
|
|
selected_words = self.find_closest_words(word_vectors, self.key_words)
|
|
|
|
# Step 8: Save selected words
|
|
self.save_json(selected_words, f"{output_dir}/selected_words.json")
|
|
|
|
# Step 9: Reduce to 3D
|
|
word_vectors_3d = self.reduce_to_3d(word_vectors, method='tsne')
|
|
|
|
# Step 10: Save 3D vectors
|
|
self.save_json(word_vectors_3d, f"{output_dir}/words_vector_3d.json")
|
|
|
|
# Step 11: Create visualization
|
|
# self.create_3d_visualization(word_vectors_3d, selected_words,
|
|
# f"{output_dir}/persian_words_3d.html")
|
|
|
|
logger.info("Pipeline completed successfully!")
|
|
|
|
# Print summary
|
|
print("\n" + "="*50)
|
|
print("PIPELINE SUMMARY")
|
|
print("="*50)
|
|
print(f"Input sentences: {len(sentences)}")
|
|
print(f"Total words extracted: {len(all_words)}")
|
|
print(f"Unique words after preprocessing: {len(unique_words)}")
|
|
print(f"Word vectors computed: {len(word_vectors)}")
|
|
print(f"Key words processed: {len(self.key_words)}")
|
|
print(f"Output files saved to: {output_dir}/")
|
|
print("="*50)
|
|
|
|
|
|
def main():
|
|
"""
|
|
Main function to run the Persian Vector Analysis.
|
|
"""
|
|
# Initialize analyzer
|
|
analyzer = PersianVectorAnalyzer()
|
|
|
|
# Define input and output paths
|
|
input_file = "./data/final_wisdom.json"
|
|
output_dir = "output"
|
|
|
|
# Run the complete pipeline
|
|
analyzer.process_pipeline(input_file, output_dir)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |