434 lines
16 KiB
Python
434 lines
16 KiB
Python
import json
|
||
import ast
|
||
from typing import Dict, Any
|
||
import time
|
||
import datetime
|
||
from openai import OpenAI
|
||
from langchain_openai import ChatOpenAI
|
||
|
||
# OSS Reqs
|
||
import asyncio
|
||
import traceback
|
||
from openai import AsyncOpenAI
|
||
|
||
import copy, asyncio, traceback
|
||
from openai import OpenAI, AsyncOpenAI, LengthFinishReasonError
|
||
from typing import List, Union
|
||
from pydantic import BaseModel
|
||
|
||
today = f'{datetime.datetime.now().year}{datetime.datetime.now().month}{datetime.datetime.now().day}'
|
||
|
||
SYSTEM_PROMPT = """
|
||
تو یک استخراجگر ساختاریافتهی اطلاعات (Information Extractor) برای زبان فارسی هستی. وظیفه تو تبدیل متن به یک دیکشنری پایتون (Python Dictionary) دقیقاً مطابق با ساختار و تعاریف زیر است:
|
||
|
||
### قوانین بنیادین:
|
||
1. فقط بر اساس متن ورودی کار کن و هیچ دانش خارجی اضافه نکن.
|
||
2. خروجی باید "فقط" یک دیکشنری معتبر پایتون باشد؛ بدون هیچ مقدمه، موخره یا توضیح اضافه.
|
||
3. تمامی مقادیر (Values) باید به زبان فارسی باشند.
|
||
4. اگر موردی در متن یافت نشد، مقدار آن را لیست خالی [] یا None قرار بده.
|
||
|
||
### تعاریف عملیاتی ساختار خروجی:
|
||
- **title**: (رشته) بین 4 تا 7 کلمه. خلاصهای دقیق که فقط با واژگان موجود در متن ساخته شده و جهتگیری یا دوراهی اصلی متن را نشان دهد.
|
||
- **central_concepts**: (لیست اشیاء) مفاهیم محوری 2 کلمهای مستقیماً از متن (مضافومضافالیه یا صفتوموصوف؛ بدون حروف عطف).
|
||
- **type**: اگر مفهوم منجر به رفاه و سعادت است "زندگیساز" و اگر مانع خیر است "زندگیسوز".
|
||
- **paragraph_effect**: اگر لحن متن درباره آن مثبت/ترغیبی است "تقویت" و اگر منفی/نهی است "تضعیف".
|
||
- **entities**: (لیست رشته) اسامی خاص اشخاص که صریحاً در متن ذکر شدهاند.
|
||
- **rules**: (لیست رشته) حداکثر 10 قاعده کلی اجتماعی/سیاسی/مذهبی. جملات کوتاه، مستقل و انتزاعی (بدون وابستگی به واژگان خاص متن).
|
||
|
||
### ساختار نهایی خروجی:
|
||
{
|
||
"title": str,
|
||
"central_concepts": [{"concept": str, "type": str, "paragraph_effect": str}],
|
||
"entities": [str],
|
||
"rules": [str]
|
||
}
|
||
"""
|
||
|
||
USER_PROMPT = '''
|
||
متن زیر را بر اساس دستورالعملهای سیستمی تحلیل کن و خروجی را در قالب دیکشنری پایتون ارائه بده:
|
||
|
||
## متن برای تحلیل:
|
||
'''
|
||
|
||
def get_key():
|
||
key = 'aa-YUMjnFkdfIZk1p6XkiWvnw3vGfzVEFNg5A5e2qxOn0iJZgkO' # khamenei_metadata
|
||
return key
|
||
|
||
def get_client():
|
||
url = "https://api.avalai.ir/v1"
|
||
client = OpenAI(
|
||
api_key=get_key(),
|
||
base_url=url,
|
||
)
|
||
return client
|
||
|
||
async def llm_request(text, model="gemini-2.5-flash-lite"):
|
||
# print(f'using model: {model}')
|
||
|
||
user_prompt = f"{USER_PROMPT}\n{text}"
|
||
|
||
try:
|
||
answer = await single_simple_async_proccess_item(system_prompt= SYSTEM_PROMPT, user_prompt= user_prompt, api_url="http://2.188.15.101:8001/v1", model_name='gpt-oss-120b', priority=10)
|
||
|
||
except Exception as error:
|
||
with open('./leader-answer/error-in-getting-metadata.txt', mode='a+', encoding='utf-8') as file:
|
||
error_message = f'\n\ntext: {text.strip()}\nerror:{error} \n-------------------------------\n'
|
||
file.write(error_message)
|
||
return 'با عرض پوزش؛ متاسفانه خطایی رخ داده است.'
|
||
return answer
|
||
|
||
async def text_to_dict(text: str) -> Dict[str, Any]:
|
||
text = text.replace('\n','')
|
||
text = text.lstrip('```json')
|
||
text = text.lstrip('json')
|
||
text = text.lstrip('```python')
|
||
text = text.rstrip('```')
|
||
text = text.strip()
|
||
try:
|
||
return json.loads(text)
|
||
except (json.JSONDecodeError, TypeError):
|
||
return ast.literal_eval(text)
|
||
|
||
client = get_client()
|
||
models = [ "gemini-2.5-flash-lite", "gpt-4o-mini","deepseek-reasoner"]
|
||
|
||
date = str((datetime.datetime.now())).replace(' ','-').replace(':','').replace('.','-')
|
||
|
||
async def single_simple_async_proccess_item(
|
||
user_prompt: str,
|
||
api_url: str,
|
||
model_name: str,
|
||
top_p: float = 0.9,
|
||
temperature: float = 0.2,
|
||
reasoning_effort: str = "medium",
|
||
max_token: int = None,
|
||
system_prompt: str = None,
|
||
assistant_prompt: str = None,
|
||
semaphore_limit: int = 5,
|
||
api_key: str = "EMPTY",
|
||
priority: int = 1,
|
||
timeout: int = 300
|
||
):
|
||
|
||
client = AsyncOpenAI(base_url=api_url, api_key=api_key)
|
||
|
||
sem = asyncio.Semaphore(semaphore_limit)
|
||
|
||
async with sem:
|
||
try:
|
||
messages = []
|
||
|
||
if system_prompt:
|
||
messages.append({"role": "system", "content": system_prompt})
|
||
|
||
messages.append({"role": "user", "content": user_prompt})
|
||
|
||
if assistant_prompt:
|
||
messages.append({"role": "assistant", "content": assistant_prompt})
|
||
|
||
response = await asyncio.wait_for(
|
||
client.chat.completions.create(
|
||
model=model_name,
|
||
messages=messages,
|
||
temperature=temperature,
|
||
top_p=top_p,
|
||
extra_body={"reasoning_effort": reasoning_effort},
|
||
max_tokens=max_token,
|
||
stop=None,
|
||
extra_headers={"priority": str(priority)}
|
||
),
|
||
timeout=timeout
|
||
)
|
||
|
||
if response.choices and len(response.choices) > 0:
|
||
message = response.choices[0].message.content
|
||
return message.strip() if message else ""
|
||
else:
|
||
raise ValueError("پاسخی از مدل دریافت نشد.")
|
||
|
||
except asyncio.TimeoutError:
|
||
raise TimeoutError(f"⚠️ Timeout: Request took too long (more than {timeout} seconds)")
|
||
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
raise RuntimeError(f"⚠️ Error in API call: {str(e)}")
|
||
|
||
class Result(BaseModel):
|
||
result : str
|
||
|
||
async def single_async_item(
|
||
api_url,
|
||
api_key,
|
||
item,
|
||
reasoning_effort,
|
||
temperature,
|
||
top_p,
|
||
semaphore_number,
|
||
model_name,
|
||
priority,
|
||
output_schema=None,
|
||
max_token=4096,
|
||
print_logs=False,
|
||
return_reason=False,
|
||
stop=None,
|
||
return_used_token=False,
|
||
timeout=300,
|
||
):
|
||
try:
|
||
async with AsyncOpenAI(
|
||
base_url=api_url, api_key=api_key
|
||
) as client:
|
||
semaphore = asyncio.Semaphore(semaphore_number)
|
||
async with semaphore:
|
||
messages = [{"role": "user", "content": item["user_prompt"]}]
|
||
if item.get("system_prompt"):
|
||
messages.insert(
|
||
0, {"role": "system", "content": item["system_prompt"]}
|
||
)
|
||
if item.get("assistant_prompt"):
|
||
messages.append(
|
||
{"role": "assistant", "content": item["assistant_prompt"]}
|
||
)
|
||
|
||
coro = client.chat.completions.parse(
|
||
model=model_name,
|
||
messages=messages,
|
||
temperature=temperature,
|
||
top_p=top_p,
|
||
max_tokens=max_token,
|
||
stop=stop,
|
||
response_format=output_schema,
|
||
reasoning_effort=reasoning_effort,
|
||
extra_body={"priority": priority},
|
||
)
|
||
|
||
# response = await asyncio.wait_for(coro, timeout=timeout)
|
||
response = await asyncio.wait_for(
|
||
client.chat.completions.parse(
|
||
model=model_name,
|
||
messages=messages,
|
||
temperature=temperature,
|
||
top_p=top_p,
|
||
max_tokens=max_token,
|
||
stop=stop,
|
||
response_format=output_schema,
|
||
reasoning_effort=reasoning_effort,
|
||
extra_body={"priority": priority},
|
||
),
|
||
timeout=timeout
|
||
)
|
||
print('----------------------------------------------------')
|
||
print('----------------------------------------------------')
|
||
print(response)
|
||
print('----------------------------------------------------')
|
||
print('----------------------------------------------------')
|
||
|
||
if print_logs:
|
||
print(f"parse response ---- {response}")
|
||
|
||
parsed_obj = response.choices[0].message.parsed
|
||
# print(f'parsed_obj {parsed_obj}')
|
||
if parsed_obj is None:
|
||
return {
|
||
"error": "Failed to parse response",
|
||
"raw": str(response),
|
||
}
|
||
|
||
parsed_obj = output_schema.model_validate(parsed_obj)
|
||
# Validate just in case (optional, چون .parse already does it)
|
||
if return_reason:
|
||
reasoning_content = response.choices[
|
||
0
|
||
].message.reasoning_content
|
||
if return_used_token:
|
||
_total_token = response.usage.total_tokens
|
||
item["llm_output"] = (
|
||
parsed_obj.model_dump(),
|
||
str(reasoning_content),
|
||
int(_total_token),
|
||
)
|
||
return item
|
||
|
||
item["llm_output"] = (
|
||
parsed_obj.model_dump(),
|
||
str(reasoning_content)
|
||
)
|
||
return item
|
||
|
||
item["llm_output"] = parsed_obj.model_dump()
|
||
return item
|
||
|
||
except asyncio.TimeoutError:
|
||
print(f"⏳ Timeout on item {item}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ Error __process_item {item}: {traceback.print_exc()}")
|
||
return None
|
||
|
||
async def main():
|
||
with open('./leader_data/khamenei_messages_4.json', 'r', encoding='utf-8') as file:
|
||
data = json.load(file)
|
||
|
||
start = (datetime.datetime.now())
|
||
|
||
# len_pars = 0
|
||
# for item in data:
|
||
# len_pars += len(item['paragraphs'])
|
||
|
||
test_enteries = []
|
||
all_paragraphs = 0
|
||
for index , entery in enumerate(data, 1):
|
||
# if index > 20:
|
||
# break
|
||
id = entery['id']
|
||
if not id == 60106:# شناسه حوزه پیشرو و سرآمد: 60106
|
||
continue
|
||
print(f'{index}/{len(data)}')
|
||
paragraphs = entery["paragraphs"]
|
||
new_paragraphs = []
|
||
|
||
# by full text of each entry
|
||
# full_text = ''
|
||
# for counter, paragraph in enumerate(paragraphs, 1):
|
||
# full_text += f'{paragraph["text"]}\n'
|
||
# result_data = await llm_request(full_text)#gpt-4o
|
||
|
||
# llm_answer_data = await text_to_dict(result_data)
|
||
|
||
# entery["central_concepts"]= llm_answer_data['central_concepts']
|
||
# entery["characters"]= llm_answer_data['entities']
|
||
# entery["rules"]= llm_answer_data['rules']
|
||
|
||
for counter, paragraph in enumerate(paragraphs, 1):
|
||
print(f'row: {counter}')
|
||
paragraph_id = paragraph["paragraph_id"]
|
||
heading = paragraph["heading"] or ''
|
||
text = paragraph["text"]
|
||
sentences = paragraph["sentences"]
|
||
normalized_sentences = paragraph["normalized_sentences"]
|
||
keyword_matches = paragraph["keyword_matches"]
|
||
result_data = await llm_request(text)#gpt-4o
|
||
|
||
llm_answer_data = await text_to_dict(result_data)
|
||
|
||
new_paragraphs.append({
|
||
"paragraph_id" : paragraph_id,
|
||
"heading" : heading,
|
||
"text" : text,
|
||
"sentences" : sentences,
|
||
"normalized_sentences" : normalized_sentences,
|
||
"keyword_matches" : keyword_matches,
|
||
"paragraph_title": llm_answer_data['title'],
|
||
"central_concepts": llm_answer_data['central_concepts'],
|
||
"characters": llm_answer_data['entities'],
|
||
"rules": llm_answer_data['rules'],
|
||
})
|
||
|
||
time.sleep(1)
|
||
|
||
entery['paragraphs'] = new_paragraphs
|
||
test_enteries.append(entery)
|
||
all_paragraphs += len(new_paragraphs)
|
||
|
||
with open('./leader_data/leader-metadata-bayanat-howze-fulltext-oss-paragraphs.json', mode='w', encoding='utf-8') as file:
|
||
result_message = json.dump(test_enteries, file, ensure_ascii=False, indent=2)
|
||
|
||
print('---------------------------------------------')
|
||
print(f'full duration: {(datetime.datetime.now() - start).total_seconds()}')
|
||
print(f'all_paragraphs: {all_paragraphs}')
|
||
print('---------------------------------------------')
|
||
|
||
def process_item(
|
||
item:dict,
|
||
output_schema:BaseModel,
|
||
model_name:str="gpt-oss-120b",
|
||
api_url:str="http://172.16.29.102:8001/v1/",
|
||
api_key:str="EMPTY",
|
||
temperature:float=0.1,
|
||
top_p:float=1,
|
||
reasoning_effort:str="medium",
|
||
max_token:int=1024,
|
||
priority:int=1,
|
||
):
|
||
try:
|
||
client = OpenAI(
|
||
base_url=api_url,
|
||
api_key=api_key,
|
||
)
|
||
messages = [
|
||
{"role": "user", "content": item["user_prompt"]},
|
||
]
|
||
if item.get("system_prompt"):
|
||
messages.append({"role": "system", "content": item["system_prompt"]})
|
||
if item.get("assistant_prompt"):
|
||
messages.append(
|
||
{"role": "assistant", "content": item["assistant_prompt"]}
|
||
)
|
||
response = client.chat.completions.parse(
|
||
model=model_name,
|
||
messages=messages,
|
||
temperature=temperature,
|
||
top_p=top_p,
|
||
reasoning_effort=reasoning_effort,
|
||
max_tokens=max_token,
|
||
stop=None,
|
||
response_format=output_schema,
|
||
extra_body={"priority": priority},
|
||
)
|
||
|
||
parsed = (
|
||
response.choices[0].message.parsed
|
||
if response and response.choices and response.choices[0].message.parsed
|
||
else {"raw_text": str(response)}
|
||
)
|
||
|
||
parsed = output_schema.model_validate(parsed)
|
||
|
||
item["llm_output"] = dict(parsed.model_dump())
|
||
|
||
return item, 200
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}")
|
||
return None, 400
|
||
|
||
|
||
# res = process_item(
|
||
# item={"user_prompt":"سلام خوبی"},
|
||
# output_schema=Result,
|
||
# )
|
||
|
||
async def oss_test(SYSTEM_PROMPT,USER_PROMPT,Dictt):
|
||
item['system_prompt'] = SYSTEM_PROMPT
|
||
item['user_prompt'] = f"{USER_PROMPT}\n{Dictt}"
|
||
# item = {}
|
||
# item['assistant_prompt'] = "تو یک دستیار خبره در زمینه تدوین متون علمی هستی"
|
||
# item['system_prompt'] = "پاسخ ها فقط باید علمی باشند و سبک نگارش طنز، سرگرمی، ادبی،احساسی و ... قابل قبول نیست. پاسخ فقط به زبان فارسی باشد و فقط اصطلاحات علمی را در صورت نیاز به زبان انگلیسی بیاور."
|
||
# item['user_prompt'] = "ابعاد مختلف علوم اجتماعی محاسباتی کدام است؟"
|
||
response = await single_async_item(
|
||
api_url="http://2.188.15.102:8001/v1/",
|
||
api_key="EMPTY",
|
||
item=item,
|
||
reasoning_effort="medium",
|
||
temperature=0.1,
|
||
top_p=1,
|
||
semaphore_number=1,
|
||
model_name="gpt-oss-120b",
|
||
priority=1,
|
||
output_schema=Result,
|
||
max_token=None,
|
||
return_reason=True,
|
||
return_used_token=True,
|
||
timeout=1000
|
||
)
|
||
print(response['llm_output'])
|
||
return response['llm_output']
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# asyncio.run(main())
|
||
asyncio.run(oss_test())
|
||
|
||
|