# بسم الله # !pip install hazm # !pip install transformers==4.26.0 # !pip install --upgrade numpy # !pip install --upgrade sentence-transformers """ Persian Sentence Processing and Vector Analysis ============================================== This script processes Persian sentences from a JSON file and performs: 1. Word extraction and preprocessing 2. Vector representation using multilingual transformer 3. Similarity analysis for key words 4. Dimensionality reduction to 3D 5. 3D visualization with Persian labels Author: NLP Expert Assistant """ import json import re import csv import numpy as np import pandas as pd from typing import List, Dict, Tuple, Set from collections import Counter import logging from pathlib import Path from fasttext import tokenize # NLP and ML libraries from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer from sklearn.decomposition import PCA from sklearn.manifold import TSNE from sklearn.metrics.pairwise import cosine_similarity # Visualization libraries # import matplotlib.pyplot as plt # import plotly.graph_objects as go # import plotly.express as px # from plotly.subplots import make_subplots # Persian text processing # import hazm # from hazm import Normalizer, word_tokenize, POSTagger from normalizer import normalize_persian from datetime import datetime import fasttext.util from gensim.models.fasttext import FastText import logging from typing import List, Dict, Union # فرض کنید این یک placeholder برای logger واقعی است logger = logging.getLogger(__name__) # fasttext.util.download_model('fa', if_exists='ignore') # peraian model strt_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class FasttextPersianVectorAnalyzer: """ A comprehensive class for Persian text processing and vector analysis. """ def __init__(self, model_name: str = './models/khamenei_all_180_clean_model.bin'): """ Initialize the analyzer with the specified model. Args: model_name: The sentence transformer model to use """ self.model_name = model_name self.model = None # self.normalizer = Normalizer() self.stop_words = self._load_persian_stop_words() self.key_words = [ ] logger.info(f"Initializing Persian Vector Analyzer with model: {model_name}") def _load_persian_stop_words(self) -> Set[str]: stop_words = { } return stop_words def load_model(self): """ Load the sentence transformer model. """ try: logger.info("Loading sentence transformer model...") self.model = fasttext.load_model(self.model_name) logger.info("Model loaded successfully!") except Exception as e: logger.error(f"Error loading model: {e}") raise def split_sentence(self, sentence:str): sentences = [] sentence_len = len(tokenize(sentence)) if sentence_len < 512: sentences.append(sentence) else: temp_sentences = str(sentence).split('.') for sent in temp_sentences: sent_len = len(tokenize(sent)) if sent_len > 512: temp_sentences_2 = str(sent).split('،') for snt in temp_sentences_2: sentences.append(snt) else: sentences.append(sent) return sentences def load_json_data(self, file_path: str) -> List[str]: try: logger.info(f"Loading data from {file_path}") with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) if type(data) == dict: temp_data = [] for item in data.items(): temp_data.append(item[1]) data = temp_data sentences = [] if isinstance(data, list): for index, item in enumerate(data): print(f'split sentence {index}') if isinstance(item, dict): # Extract sentences from different possible keys for key in ['sentece_text']: if key in item and item[key]: splited_sentences = self.split_sentence(item[key]) for sent in splited_sentences: sentences.append(sent) elif isinstance(item, str): splited_sentences = self.split_sentence(item[key]) for sent in splited_sentences: sentences.append(sent) elif isinstance(data, dict): # If it's a single object, extract all string values for value in data.values(): if isinstance(value, str): splited_sentences = str(value).split('.') for sent in splited_sentences: sentences.append(sent) sentences = [senten for senten in sentences if senten] logger.info(f"Loaded {len(sentences)} sentences") return sentences except Exception as e: logger.error(f"Error loading JSON data: {e}") raise def preprocess_text(self, text: str) -> str: # Normalize text text = normalize_persian(text) # Remove extra whitespace text = re.sub(r'\s+', ' ', text) # Remove special characters but keep Persian characters text = re.sub(r'[^\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\s]', '', text) return text.strip() def extract_words(self, sentences: List[str]) -> List[str]: all_words = [] for sentence in sentences: # Preprocess sentence processed_sentence = self.preprocess_text(sentence) # Tokenize words = word_tokenize(processed_sentence) # words = processed_sentence.split() # Filter out empty strings and very short words words = [word for word in words if len(word) > 1] all_words.extend(words) logger.info(f"Extracted {len(all_words)} words from {len(sentences)} sentences") return all_words def remove_stop_words(self, words: List[str]) -> List[str]: filtered_words = [word for word in words if word not in self.stop_words] logger.info(f"Removed {len(words) - len(filtered_words)} stop words") return filtered_words def get_unique_words(self, words: List[str]) -> List[str]: unique_words = list(set(words)) logger.info(f"Found {len(unique_words)} unique words from {len(words)} total words") return unique_words def compute_word_vectors(self, ids, dates,sentences: List[str], titles, urls) -> Dict[str, List[float]]: """Compute sentence vectors using FastText model""" if self.model is None: self.load_model() logger.info(f"Computing vectors for {len(sentences)} sentences...") # لیست موقت برای ذخیره بردارهای جمله sentences_vectors_list = [] valid_sentences = [] new_years = [] for index, sent in enumerate(sentences): # 🟢 تمیز کردن کامل متن از کاراکترهای مشکل‌ساز cleaned_sent = sent.strip() # حذف تمام کاراکترهای جدید خط و فاصله‌های اضافی cleaned_sent = re.sub(r'\s+', ' ', cleaned_sent) cleaned_sent = cleaned_sent.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ') cleaned_sent = cleaned_sent.strip() # پرش از خطوط خالی پس از پاکسازی if not cleaned_sent: continue try: # get_sentence_vector بردار میانگین کلمات جمله را تولید می‌کند. vector = self.model.get_sentence_vector(cleaned_sent) sentences_vectors_list.append(vector) valid_sentences.append(cleaned_sent) # new_years.append(years[index]) print(f'sentence {index} embedded: {cleaned_sent[:10]}...') except Exception as e: logger.warning(f"Error processing sentence: {e}") continue # تبدیل لیست بردارها به آرایه NumPy embeddings = np.array(sentences_vectors_list) # Create dictionary sentences_vectors = {} for i, (id, date, sent, title, url, embedding) in enumerate(zip(ids, dates, valid_sentences, titles, urls, embeddings)): sentences_vectors[f'sentence-{i+1}'] = { 'id': id, 'date': date, 'title': title, 'sentence': sent, 'url': url, 'embeddings': embedding.tolist() } logger.info(f"Successfully computed vectors for {len(sentences_vectors)} sentences!") return sentences_vectors def find_closest_words(self, word_vectors: Dict[str, List[float]], key_words: List[str], top_k: int = 20) -> Dict[str, List[str]]: logger.info(f"Finding {top_k} closest words for {len(key_words)} key words...") # Convert to numpy arrays for faster computation words = list(word_vectors.keys()) vectors = np.array(list(word_vectors.values())) closest_words = {} for key_word in key_words: if key_word in word_vectors: # Get the key word vector key_vector = np.array(word_vectors[key_word]).reshape(1, -1) # Compute cosine similarities similarities = cosine_similarity(key_vector, vectors)[0] # Get indices of top k similar words (excluding the key word itself) word_indices = np.argsort(similarities)[::-1] # Filter out the key word itself and get top k closest_indices = [] for idx in word_indices: if words[idx] != key_word and len(closest_indices) < top_k: closest_indices.append(idx) # Get the closest words closest_words[key_word] = [words[idx] for idx in closest_indices] logger.info(f"Found {len(closest_words[key_word])} closest words for '{key_word}'") else: logger.warning(f"Key word '{key_word}' not found in word vectors") closest_words[key_word] = [] return closest_words def reduce_to_3d(self, word_vectors: Dict[str, List[float]], method: str = 'tsne') -> Dict[str, List[float]]: logger.info(f"Reducing dimensions to 3D using {method.upper()}...") words = list(word_vectors.keys()) vectors = np.array(list(word_vectors.values())) if method.lower() == 'pca': reducer = PCA(n_components=3, random_state=42) elif method.lower() == 'tsne': reducer = TSNE(n_components=3, random_state=42, perplexity=min(30, len(vectors)-1)) else: raise ValueError("Method must be 'pca' or 'tsne'") # Reduce dimensions reduced_vectors = reducer.fit_transform(vectors) # Create dictionary word_vectors_3d = {} for i, word in enumerate(words): word_vectors_3d[word] = reduced_vectors[i].tolist() logger.info("Dimensionality reduction completed!") return word_vectors_3d def save_json(self, data: dict, file_path: str): try: with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"Data saved to {file_path}") except Exception as e: logger.error(f"Error saving to {file_path}: {e}") raise def convert_csv_to_json(csv_path, json_path): with open(csv_path, 'r', encoding='utf-8') as csv_file: csv_reader = csv.reader(csv_file) data = [row for row in csv_reader] with open(json_path, 'w', encoding='utf-8') as json_file: json.dump(data, json_file, ensure_ascii=False, indent=2) def process_pipeline(self, input_file: str, output_dir: str = "output"): # Create output directory Path(output_dir).mkdir(exist_ok=True) logger.info("Starting Persian Vector Analysis Pipeline...") # Step 1: Load data # sentences = self.load_json_data(input_file) with open(input_file, 'r', encoding='utf-8') as f: data = json.load(f) ids = [] dates = [] sentences = [] titles = [] urls = [] for sec in data : ids.append(sec['id']) dates.append(sec['date']) sentences.append(sec['text']) titles.append(sec['title']) urls.append(sec['url']) print(f'len sentences: {len(sentences)}') # Step 5: Compute word vectors sentences_vectors = self.compute_word_vectors(ids, dates,sentences, titles, urls) # Step 6: Save word vectors self.save_json(sentences_vectors, f"{output_dir}/embedding_FastText_khamenei.json") logger.info("Pipeline completed successfully!") # Print summary print("\n" + "="*50) print("PIPELINE SUMMARY") print("="*50) print(f"Input sentences: {len(sentences)}") print(f"Output files saved to: {output_dir}/") print("="*50) def main(): """ Main function to run the Persian Vector Analysis. """ # Initialize analyzer analyzer = FasttextPersianVectorAnalyzer() # Define input and output paths # input_file = "./output/512_final_final_wisdom.json" # input_file = "./output/512_final_nahj_letters.json" input_file = "./data/raw-khamenei.json" output_dir = "output-khamenei" # Run the complete pipeline analyzer.process_pipeline(input_file, output_dir) if __name__ == "__main__": main() end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"start time : {strt_time}\nend time : {end_time}")