nahj_rag/nahj_get_metadata_oss.py

434 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import ast
from typing import Dict, Any
import time
import datetime
from openai import OpenAI
from langchain_openai import ChatOpenAI
# OSS Reqs
import asyncio
import traceback
from openai import AsyncOpenAI
import copy, asyncio, traceback
from openai import OpenAI, AsyncOpenAI, LengthFinishReasonError
from typing import List, Union
from pydantic import BaseModel
today = f'{datetime.datetime.now().year}{datetime.datetime.now().month}{datetime.datetime.now().day}'
SYSTEM_PROMPT = """
تو یک استخراج‌گر ساختاریافته‌ی اطلاعات (Information Extractor) برای زبان فارسی هستی. وظیفه تو تبدیل متن به یک دیکشنری پایتون (Python Dictionary) دقیقاً مطابق با ساختار و تعاریف زیر است:
### قوانین بنیادین:
1. فقط بر اساس متن ورودی کار کن و هیچ دانش خارجی اضافه نکن.
2. خروجی باید "فقط" یک دیکشنری معتبر پایتون باشد؛ بدون هیچ مقدمه، موخره یا توضیح اضافه.
3. تمامی مقادیر (Values) باید به زبان فارسی باشند.
4. اگر موردی در متن یافت نشد، مقدار آن را لیست خالی [] یا None قرار بده.
### تعاریف عملیاتی ساختار خروجی:
- **title**: (رشته) بین 4 تا 7 کلمه. خلاصه‌ای دقیق که فقط با واژگان موجود در متن ساخته شده و جهت‌گیری یا دوراهی اصلی متن را نشان دهد.
- **central_concepts**: (لیست اشیاء) مفاهیم محوری 2 کلمه‌ای مستقیماً از متن (مضاف‌ومضاف‌الیه یا صفت‌وموصوف؛ بدون حروف عطف).
- **type**: اگر مفهوم منجر به رفاه و سعادت است "زندگی‌ساز" و اگر مانع خیر است "زندگی‌سوز".
- **paragraph_effect**: اگر لحن متن درباره آن مثبت/ترغیبی است "تقویت" و اگر منفی/نهی است "تضعیف".
- **entities**: (لیست رشته) اسامی خاص اشخاص که صریحاً در متن ذکر شده‌اند.
- **rules**: (لیست رشته) حداکثر 10 قاعده کلی اجتماعی/سیاسی/مذهبی. جملات کوتاه، مستقل و انتزاعی (بدون وابستگی به واژگان خاص متن).
### ساختار نهایی خروجی:
{
"title": str,
"central_concepts": [{"concept": str, "type": str, "paragraph_effect": str}],
"entities": [str],
"rules": [str]
}
"""
USER_PROMPT = '''
متن زیر را بر اساس دستورالعمل‌های سیستمی تحلیل کن و خروجی را در قالب دیکشنری پایتون ارائه بده:
## متن برای تحلیل:
'''
def get_key():
key = 'aa-YUMjnFkdfIZk1p6XkiWvnw3vGfzVEFNg5A5e2qxOn0iJZgkO' # khamenei_metadata
return key
def get_client():
url = "https://api.avalai.ir/v1"
client = OpenAI(
api_key=get_key(),
base_url=url,
)
return client
async def llm_request(text, model="gemini-2.5-flash-lite"):
# print(f'using model: {model}')
user_prompt = f"{USER_PROMPT}\n{text}"
try:
answer = await single_simple_async_proccess_item(system_prompt= SYSTEM_PROMPT, user_prompt= user_prompt, api_url="http://2.188.15.101:8001/v1", model_name='gpt-oss-120b', priority=10)
except Exception as error:
with open('./leader-answer/error-in-getting-metadata.txt', mode='a+', encoding='utf-8') as file:
error_message = f'\n\ntext: {text.strip()}\nerror:{error} \n-------------------------------\n'
file.write(error_message)
return 'با عرض پوزش؛ متاسفانه خطایی رخ داده است.'
return answer
async def text_to_dict(text: str) -> Dict[str, Any]:
text = text.replace('\n','')
text = text.lstrip('```json')
text = text.lstrip('json')
text = text.lstrip('```python')
text = text.rstrip('```')
text = text.strip()
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError):
return ast.literal_eval(text)
client = get_client()
models = [ "gemini-2.5-flash-lite", "gpt-4o-mini","deepseek-reasoner"]
date = str((datetime.datetime.now())).replace(' ','-').replace(':','').replace('.','-')
async def single_simple_async_proccess_item(
user_prompt: str,
api_url: str,
model_name: str,
top_p: float = 0.9,
temperature: float = 0.2,
reasoning_effort: str = "medium",
max_token: int = None,
system_prompt: str = None,
assistant_prompt: str = None,
semaphore_limit: int = 5,
api_key: str = "EMPTY",
priority: int = 1,
timeout: int = 300
):
client = AsyncOpenAI(base_url=api_url, api_key=api_key)
sem = asyncio.Semaphore(semaphore_limit)
async with sem:
try:
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": user_prompt})
if assistant_prompt:
messages.append({"role": "assistant", "content": assistant_prompt})
response = await asyncio.wait_for(
client.chat.completions.create(
model=model_name,
messages=messages,
temperature=temperature,
top_p=top_p,
extra_body={"reasoning_effort": reasoning_effort},
max_tokens=max_token,
stop=None,
extra_headers={"priority": str(priority)}
),
timeout=timeout
)
if response.choices and len(response.choices) > 0:
message = response.choices[0].message.content
return message.strip() if message else ""
else:
raise ValueError("پاسخی از مدل دریافت نشد.")
except asyncio.TimeoutError:
raise TimeoutError(f"⚠️ Timeout: Request took too long (more than {timeout} seconds)")
except Exception as e:
traceback.print_exc()
raise RuntimeError(f"⚠️ Error in API call: {str(e)}")
class Result(BaseModel):
result : str
async def single_async_item(
api_url,
api_key,
item,
reasoning_effort,
temperature,
top_p,
semaphore_number,
model_name,
priority,
output_schema=None,
max_token=4096,
print_logs=False,
return_reason=False,
stop=None,
return_used_token=False,
timeout=300,
):
try:
async with AsyncOpenAI(
base_url=api_url, api_key=api_key
) as client:
semaphore = asyncio.Semaphore(semaphore_number)
async with semaphore:
messages = [{"role": "user", "content": item["user_prompt"]}]
if item.get("system_prompt"):
messages.insert(
0, {"role": "system", "content": item["system_prompt"]}
)
if item.get("assistant_prompt"):
messages.append(
{"role": "assistant", "content": item["assistant_prompt"]}
)
coro = client.chat.completions.parse(
model=model_name,
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=max_token,
stop=stop,
response_format=output_schema,
reasoning_effort=reasoning_effort,
extra_body={"priority": priority},
)
# response = await asyncio.wait_for(coro, timeout=timeout)
response = await asyncio.wait_for(
client.chat.completions.parse(
model=model_name,
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=max_token,
stop=stop,
response_format=output_schema,
reasoning_effort=reasoning_effort,
extra_body={"priority": priority},
),
timeout=timeout
)
print('----------------------------------------------------')
print('----------------------------------------------------')
print(response)
print('----------------------------------------------------')
print('----------------------------------------------------')
if print_logs:
print(f"parse response ---- {response}")
parsed_obj = response.choices[0].message.parsed
# print(f'parsed_obj {parsed_obj}')
if parsed_obj is None:
return {
"error": "Failed to parse response",
"raw": str(response),
}
parsed_obj = output_schema.model_validate(parsed_obj)
# Validate just in case (optional, چون .parse already does it)
if return_reason:
reasoning_content = response.choices[
0
].message.reasoning_content
if return_used_token:
_total_token = response.usage.total_tokens
item["llm_output"] = (
parsed_obj.model_dump(),
str(reasoning_content),
int(_total_token),
)
return item
item["llm_output"] = (
parsed_obj.model_dump(),
str(reasoning_content)
)
return item
item["llm_output"] = parsed_obj.model_dump()
return item
except asyncio.TimeoutError:
print(f"⏳ Timeout on item {item}")
return None
except Exception as e:
print(f"⚠️ Error __process_item {item}: {traceback.print_exc()}")
return None
async def main():
with open('./leader_data/khamenei_messages_4.json', 'r', encoding='utf-8') as file:
data = json.load(file)
start = (datetime.datetime.now())
# len_pars = 0
# for item in data:
# len_pars += len(item['paragraphs'])
test_enteries = []
all_paragraphs = 0
for index , entery in enumerate(data, 1):
# if index > 20:
# break
id = entery['id']
if not id == 60106:# شناسه حوزه پیشرو و سرآمد: 60106
continue
print(f'{index}/{len(data)}')
paragraphs = entery["paragraphs"]
new_paragraphs = []
# by full text of each entry
# full_text = ''
# for counter, paragraph in enumerate(paragraphs, 1):
# full_text += f'{paragraph["text"]}\n'
# result_data = await llm_request(full_text)#gpt-4o
# llm_answer_data = await text_to_dict(result_data)
# entery["central_concepts"]= llm_answer_data['central_concepts']
# entery["characters"]= llm_answer_data['entities']
# entery["rules"]= llm_answer_data['rules']
for counter, paragraph in enumerate(paragraphs, 1):
print(f'row: {counter}')
paragraph_id = paragraph["paragraph_id"]
heading = paragraph["heading"] or ''
text = paragraph["text"]
sentences = paragraph["sentences"]
normalized_sentences = paragraph["normalized_sentences"]
keyword_matches = paragraph["keyword_matches"]
result_data = await llm_request(text)#gpt-4o
llm_answer_data = await text_to_dict(result_data)
new_paragraphs.append({
"paragraph_id" : paragraph_id,
"heading" : heading,
"text" : text,
"sentences" : sentences,
"normalized_sentences" : normalized_sentences,
"keyword_matches" : keyword_matches,
"paragraph_title": llm_answer_data['title'],
"central_concepts": llm_answer_data['central_concepts'],
"characters": llm_answer_data['entities'],
"rules": llm_answer_data['rules'],
})
time.sleep(1)
entery['paragraphs'] = new_paragraphs
test_enteries.append(entery)
all_paragraphs += len(new_paragraphs)
with open('./leader_data/leader-metadata-bayanat-howze-fulltext-oss-paragraphs.json', mode='w', encoding='utf-8') as file:
result_message = json.dump(test_enteries, file, ensure_ascii=False, indent=2)
print('---------------------------------------------')
print(f'full duration: {(datetime.datetime.now() - start).total_seconds()}')
print(f'all_paragraphs: {all_paragraphs}')
print('---------------------------------------------')
def process_item(
item:dict,
output_schema:BaseModel,
model_name:str="gpt-oss-120b",
api_url:str="http://172.16.29.102:8001/v1/",
api_key:str="EMPTY",
temperature:float=0.1,
top_p:float=1,
reasoning_effort:str="medium",
max_token:int=1024,
priority:int=1,
):
try:
client = OpenAI(
base_url=api_url,
api_key=api_key,
)
messages = [
{"role": "user", "content": item["user_prompt"]},
]
if item.get("system_prompt"):
messages.append({"role": "system", "content": item["system_prompt"]})
if item.get("assistant_prompt"):
messages.append(
{"role": "assistant", "content": item["assistant_prompt"]}
)
response = client.chat.completions.parse(
model=model_name,
messages=messages,
temperature=temperature,
top_p=top_p,
reasoning_effort=reasoning_effort,
max_tokens=max_token,
stop=None,
response_format=output_schema,
extra_body={"priority": priority},
)
parsed = (
response.choices[0].message.parsed
if response and response.choices and response.choices[0].message.parsed
else {"raw_text": str(response)}
)
parsed = output_schema.model_validate(parsed)
item["llm_output"] = dict(parsed.model_dump())
return item, 200
except Exception as e:
print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}")
return None, 400
# res = process_item(
# item={"user_prompt":"سلام خوبی"},
# output_schema=Result,
# )
async def oss_test(SYSTEM_PROMPT,USER_PROMPT,Dictt):
item['system_prompt'] = SYSTEM_PROMPT
item['user_prompt'] = f"{USER_PROMPT}\n{Dictt}"
# item = {}
# item['assistant_prompt'] = "تو یک دستیار خبره در زمینه تدوین متون علمی هستی"
# item['system_prompt'] = "پاسخ ها فقط باید علمی باشند و سبک نگارش طنز، سرگرمی، ادبی،احساسی و ... قابل قبول نیست. پاسخ فقط به زبان فارسی باشد و فقط اصطلاحات علمی را در صورت نیاز به زبان انگلیسی بیاور."
# item['user_prompt'] = "ابعاد مختلف علوم اجتماعی محاسباتی کدام است؟"
response = await single_async_item(
api_url="http://2.188.15.102:8001/v1/",
api_key="EMPTY",
item=item,
reasoning_effort="medium",
temperature=0.1,
top_p=1,
semaphore_number=1,
model_name="gpt-oss-120b",
priority=1,
output_schema=Result,
max_token=None,
return_reason=True,
return_used_token=True,
timeout=1000
)
print(response['llm_output'])
return response['llm_output']
if __name__ == "__main__":
# asyncio.run(main())
asyncio.run(oss_test())