import json import ast from typing import Dict, Any import time import datetime from openai import OpenAI from langchain_openai import ChatOpenAI # OSS Reqs import asyncio import traceback from openai import AsyncOpenAI import copy, asyncio, traceback from openai import OpenAI, AsyncOpenAI, LengthFinishReasonError from typing import List, Union from pydantic import BaseModel today = f'{datetime.datetime.now().year}{datetime.datetime.now().month}{datetime.datetime.now().day}' SYSTEM_PROMPT = """ تو یک استخراج‌گر ساختاریافته‌ی اطلاعات (Information Extractor) برای زبان فارسی هستی. وظیفه تو تبدیل متن به یک دیکشنری پایتون (Python Dictionary) دقیقاً مطابق با ساختار و تعاریف زیر است: ### قوانین بنیادین: 1. فقط بر اساس متن ورودی کار کن و هیچ دانش خارجی اضافه نکن. 2. خروجی باید "فقط" یک دیکشنری معتبر پایتون باشد؛ بدون هیچ مقدمه، موخره یا توضیح اضافه. 3. تمامی مقادیر (Values) باید به زبان فارسی باشند. 4. اگر موردی در متن یافت نشد، مقدار آن را لیست خالی [] یا None قرار بده. ### تعاریف عملیاتی ساختار خروجی: - **title**: (رشته) بین 4 تا 7 کلمه. خلاصه‌ای دقیق که فقط با واژگان موجود در متن ساخته شده و جهت‌گیری یا دوراهی اصلی متن را نشان دهد. - **central_concepts**: (لیست اشیاء) مفاهیم محوری 2 کلمه‌ای مستقیماً از متن (مضاف‌ومضاف‌الیه یا صفت‌وموصوف؛ بدون حروف عطف). - **type**: اگر مفهوم منجر به رفاه و سعادت است "زندگی‌ساز" و اگر مانع خیر است "زندگی‌سوز". - **paragraph_effect**: اگر لحن متن درباره آن مثبت/ترغیبی است "تقویت" و اگر منفی/نهی است "تضعیف". - **entities**: (لیست رشته) اسامی خاص اشخاص که صریحاً در متن ذکر شده‌اند. - **rules**: (لیست رشته) حداکثر 10 قاعده کلی اجتماعی/سیاسی/مذهبی. جملات کوتاه، مستقل و انتزاعی (بدون وابستگی به واژگان خاص متن). ### ساختار نهایی خروجی: { "title": str, "central_concepts": [{"concept": str, "type": str, "paragraph_effect": str}], "entities": [str], "rules": [str] } """ USER_PROMPT = ''' متن زیر را بر اساس دستورالعمل‌های سیستمی تحلیل کن و خروجی را در قالب دیکشنری پایتون ارائه بده: ## متن برای تحلیل: ''' def get_key(): key = 'aa-YUMjnFkdfIZk1p6XkiWvnw3vGfzVEFNg5A5e2qxOn0iJZgkO' # khamenei_metadata return key def get_client(): url = "https://api.avalai.ir/v1" client = OpenAI( api_key=get_key(), base_url=url, ) return client async def llm_request(text, model="gemini-2.5-flash-lite"): # print(f'using model: {model}') user_prompt = f"{USER_PROMPT}\n{text}" try: answer = await single_simple_async_proccess_item(system_prompt= SYSTEM_PROMPT, user_prompt= user_prompt, api_url="http://2.188.15.101:8001/v1", model_name='gpt-oss-120b', priority=10) except Exception as error: with open('./leader-answer/error-in-getting-metadata.txt', mode='a+', encoding='utf-8') as file: error_message = f'\n\ntext: {text.strip()}\nerror:{error} \n-------------------------------\n' file.write(error_message) return 'با عرض پوزش؛ متاسفانه خطایی رخ داده است.' return answer async def text_to_dict(text: str) -> Dict[str, Any]: text = text.replace('\n','') text = text.lstrip('```json') text = text.lstrip('json') text = text.lstrip('```python') text = text.rstrip('```') text = text.strip() try: return json.loads(text) except (json.JSONDecodeError, TypeError): return ast.literal_eval(text) client = get_client() models = [ "gemini-2.5-flash-lite", "gpt-4o-mini","deepseek-reasoner"] date = str((datetime.datetime.now())).replace(' ','-').replace(':','').replace('.','-') async def single_simple_async_proccess_item( user_prompt: str, api_url: str, model_name: str, top_p: float = 0.9, temperature: float = 0.2, reasoning_effort: str = "medium", max_token: int = None, system_prompt: str = None, assistant_prompt: str = None, semaphore_limit: int = 5, api_key: str = "EMPTY", priority: int = 1, timeout: int = 300 ): client = AsyncOpenAI(base_url=api_url, api_key=api_key) sem = asyncio.Semaphore(semaphore_limit) async with sem: try: messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": user_prompt}) if assistant_prompt: messages.append({"role": "assistant", "content": assistant_prompt}) response = await asyncio.wait_for( client.chat.completions.create( model=model_name, messages=messages, temperature=temperature, top_p=top_p, extra_body={"reasoning_effort": reasoning_effort}, max_tokens=max_token, stop=None, extra_headers={"priority": str(priority)} ), timeout=timeout ) if response.choices and len(response.choices) > 0: message = response.choices[0].message.content return message.strip() if message else "" else: raise ValueError("پاسخی از مدل دریافت نشد.") except asyncio.TimeoutError: raise TimeoutError(f"⚠️ Timeout: Request took too long (more than {timeout} seconds)") except Exception as e: traceback.print_exc() raise RuntimeError(f"⚠️ Error in API call: {str(e)}") class Result(BaseModel): result : str async def single_async_item( api_url, api_key, item, reasoning_effort, temperature, top_p, semaphore_number, model_name, priority=1, output_schema=None, max_token=4096, print_logs=False, return_reason=False, stop=None, return_used_token=False, timeout=300, ): try: async with AsyncOpenAI( base_url=api_url, api_key=api_key ) as client: semaphore = asyncio.Semaphore(semaphore_number) async with semaphore: messages = [{"role": "user", "content": item["user_prompt"]}] if item.get("system_prompt"): messages.insert( 0, {"role": "system", "content": item["system_prompt"]} ) # if item.get("assistant_prompt"): # messages.append( # {"role": "assistant", "content": item["assistant_prompt"]} # ) coro = client.chat.completions.parse( model=model_name, messages=messages, temperature=temperature, top_p=top_p, max_tokens=max_token, stop=stop, response_format=output_schema, reasoning_effort=reasoning_effort, extra_body={"priority": priority}, # priority=1, ) response = await asyncio.wait_for(coro, timeout=timeout) if print_logs: print(f"parse response ---- {response}") parsed_obj = response.choices[0].message.parsed # print(f'parsed_obj {parsed_obj}') if parsed_obj is None: return { "error": "Failed to parse response", "raw": str(response), } parsed_obj = output_schema.model_validate(parsed_obj) # Validate just in case (optional, چون .parse already does it) if return_reason: reasoning_content = response.choices[ 0 ].message.reasoning_content if return_used_token: _total_token = response.usage.total_tokens item["llm_output"] = ( parsed_obj.model_dump(), str(reasoning_content), int(_total_token), ) return item item["llm_output"] = ( parsed_obj.model_dump(), str(reasoning_content) ) return item item["llm_output"] = parsed_obj.model_dump() return item except asyncio.TimeoutError: print(f"⏳ Timeout on item {item}") return None except Exception as e: print(f"⚠️ Error __process_item {item}: {traceback.print_exc()}") return None async def main(): with open('./leader_data/khamenei_messages_4.json', 'r', encoding='utf-8') as file: data = json.load(file) start = (datetime.datetime.now()) # len_pars = 0 # for item in data: # len_pars += len(item['paragraphs']) test_enteries = [] all_paragraphs = 0 for index , entery in enumerate(data, 1): # if index > 20: # break id = entery['id'] if not id == 60106:# شناسه حوزه پیشرو و سرآمد: 60106 continue print(f'{index}/{len(data)}') paragraphs = entery["paragraphs"] new_paragraphs = [] # by full text of each entry # full_text = '' # for counter, paragraph in enumerate(paragraphs, 1): # full_text += f'{paragraph["text"]}\n' # result_data = await llm_request(full_text)#gpt-4o # llm_answer_data = await text_to_dict(result_data) # entery["central_concepts"]= llm_answer_data['central_concepts'] # entery["characters"]= llm_answer_data['entities'] # entery["rules"]= llm_answer_data['rules'] for counter, paragraph in enumerate(paragraphs, 1): print(f'row: {counter}') paragraph_id = paragraph["paragraph_id"] heading = paragraph["heading"] or '' text = paragraph["text"] sentences = paragraph["sentences"] normalized_sentences = paragraph["normalized_sentences"] keyword_matches = paragraph["keyword_matches"] result_data = await llm_request(text)#gpt-4o llm_answer_data = await text_to_dict(result_data) new_paragraphs.append({ "paragraph_id" : paragraph_id, "heading" : heading, "text" : text, "sentences" : sentences, "normalized_sentences" : normalized_sentences, "keyword_matches" : keyword_matches, "paragraph_title": llm_answer_data['title'], "central_concepts": llm_answer_data['central_concepts'], "characters": llm_answer_data['entities'], "rules": llm_answer_data['rules'], }) time.sleep(1) entery['paragraphs'] = new_paragraphs test_enteries.append(entery) all_paragraphs += len(new_paragraphs) with open('./leader_data/leader-metadata-bayanat-howze-fulltext-oss-paragraphs.json', mode='w', encoding='utf-8') as file: result_message = json.dump(test_enteries, file, ensure_ascii=False, indent=2) print('---------------------------------------------') print(f'full duration: {(datetime.datetime.now() - start).total_seconds()}') print(f'all_paragraphs: {all_paragraphs}') print('---------------------------------------------') async def oss_test(SYSTEM_PROMPT,USER_PROMPT,Dictt): item = {} # item['assistant_prompt'] = "تو یک دستیار خبره در زمینه تدوین متون علمی هستی" item['system_prompt'] = SYSTEM_PROMPT item['user_prompt'] = f"{USER_PROMPT}\n{Dictt}" response = await single_async_item( api_url="http://2.188.15.102:8001/v1/", api_key="EMPTY", item=item, reasoning_effort="medium", temperature=0.1, top_p=1, semaphore_number=1, model_name="gpt-oss-120b", priority=1, output_schema=Result, max_token=None, return_reason=True, return_used_token=True, timeout=300 ) print(response['llm_output']) return response['llm_output'] if __name__ == "__main__": # asyncio.run(main()) asyncio.run(oss_test())