aiDataParser/core/ai_parser.py

369 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import List
from pathlib import Path
import os, orjson, time, json, re, asyncio, traceback
from openai import AsyncOpenAI
# --------------------------------------------------------------------
# ------------------------------ پردازش API ------------------------------
class AsyncCore:
def __init__(
self,
model_name,
task_name,
data_path,
output_schema,
api_url,
reasoning_effort='low',
top_p=1,
temperature=0.0,
max_token=128000,
output_path=None,
ai_code_version=None,
request_timeout=30, # ثانیه
api_key="EMPTY",
save_number=2,
):
self.save_number = save_number
# json file of data
self.data_path = data_path
self.task_name = task_name
if output_path is None:
output_path = f"./{task_name}"
self.output_path = Path(output_path)
self._temp_path = self.output_path / "batch_data"
self._temp_processed_id_path = self._temp_path / "processed_id.json"
# Create output directory and subdirectories if they don't exist
self.output_path.mkdir(parents=True, exist_ok=True)
self._temp_path.mkdir(parents=True, exist_ok=True)
# self._temp_processed_id_path.mkdir(parents=True, exist_ok=True)
self.request_timeout = request_timeout
self.model_name = model_name
self.api_key = api_key
self.output_schema = output_schema
self.api_url = api_url
self.reasoning_effort = reasoning_effort
self.top_p = top_p
self.temperature = temperature
self.max_token = max_token
if ai_code_version is None:
ai_code_version = f"{model_name}_{reasoning_effort}"
self.ai_code_version = ai_code_version
self.PRIMARY_KEY = {"system_prompt", "user_prompt", "id"}
try:
self.data = self.__data_process()
print(f"📦 Loaded {len(self.data)} words")
except Exception as e:
raise ValueError(
f"Data loading/validation failed: {e}\n{traceback.format_exc()}"
)
def __validate_item(self, item, idx):
# Mandatory fields
for key in self.PRIMARY_KEY:
if key not in item:
raise ValueError(f"Missing mandatory key '{key}' in item #{idx}")
if not isinstance(item[key], str):
raise TypeError(
f"Item #{idx}: '{key}' must be a string, got {type(item[key]).__name__}"
)
# Optional field: assistant_prompt
if "assistant_prompt" not in item or item["assistant_prompt"] is None:
item["assistant_prompt"] = None
else:
if not isinstance(item["assistant_prompt"], str):
raise TypeError(
f"Item #{idx}: 'assistant_prompt' must be a string or absent, got {type(item['assistant_prompt']).__name__}"
)
return item # now normalized
def __data_process(self):
raw_data = self.__load_orjson(self.data_path)
if not isinstance(raw_data, list):
raise ValueError("Data must be a list of dictionaries.")
processed_data = []
for idx, item in enumerate(raw_data):
if not isinstance(item, dict):
raise ValueError(f"Item #{idx} is not a dictionary.")
validated_item = self.__validate_item(item, idx)
processed_data.append(validated_item)
return processed_data
def __get_max_number_file(self, directory):
# Pattern to match filenames like out_1.json, out_25.json, etc.
pattern = re.compile(r"output_(\d+)\.json$")
max_num = 0
for filename in os.listdir(directory):
match = pattern.match(filename)
if match:
num = int(match.group(1))
if num > max_num:
max_num = num
return max_num + 1
def __load_orjson(self, path: str | Path):
path = Path(path)
with path.open("rb") as f: # باید باینری باز بشه برای orjson
return orjson.loads(f.read())
def __save_orjson(self, path, data):
with open(path, "wb") as f:
f.write(
orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS)
)
def merge_json_dir(self, input_path, output_path):
directory = Path(input_path)
if not directory.is_dir():
raise ValueError(f"Not valid PATH: {input_path}")
seen_ids = set() # برای ردیابی idهای دیده‌شده (سریع!)
unique_data = [] # فقط داده‌های یکتا
failed_files = []
json_files = list(directory.glob("*.json"))
if not json_files:
print("⚠️ NO JSON File Found In This PATH")
return
for json_file in json_files:
try:
data = self.__load_orjson(json_file)
if not data: # خالی یا None
failed_files.append(json_file.name)
continue
if isinstance(data, list) and isinstance(data[0], dict):
for item in data:
item_id = item.get("id")
if item_id is None:
# اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی
# اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند
continue
if item_id not in seen_ids:
seen_ids.add(item_id)
unique_data.append(item)
else:
raise ValueError(f"no list available in this json -> {json_file}")
except (
json.JSONDecodeError,
ValueError,
OSError,
KeyError,
TypeError,
) as e:
# print(f"❌ Failed in process '{json_file.name}': {e}")
failed_files.append(json_file.name)
# گزارش خطاها
if failed_files:
print("\n❌ We lose this file:")
for name in failed_files:
print(f" - {name}")
else:
print("\n✅ All JSON added")
# ذخیره خروجی
try:
self.__save_orjson(data=unique_data, path=output_path)
print(
f"\n💾 Final file saved: {output_path} (Total unique items: {len(unique_data)})"
)
except Exception as e:
print(f"❌ Error in saving final file: {e}")
def make_new_proccessed_ids_from_file(self, json_in, out_path):
data = self.__load_orjson(json_in)
finall_data = []
for d in data:
if d["id"]:
finall_data.append(d["id"])
finall_data = set(finall_data)
finall_data = list(finall_data)
print(f"-- len ids {len(finall_data)}")
self.__save_orjson(data=finall_data, path=out_path)
# ------------------------------ Main ------------------------------
async def __process_item(self, client, item):
try:
messages = [
{"role": "system", "content": item["system_prompt"]},
{"role": "user", "content": item["user_prompt"]},
]
if item.get("assistant_prompt"):
messages.append(
{"role": "assistant", "content": item["assistant_prompt"]}
)
response = await client.chat.completions.parse(
model=self.model_name,
messages=messages,
temperature=self.temperature,
top_p=self.top_p,
reasoning_effort=self.reasoning_effort,
max_tokens=self.max_token,
stop=None,
response_format=self.output_schema,
)
parsed = (
response.choices[0].message.parsed
if response and response.choices and response.choices[0].message.parsed
else {"raw_text": str(response)}
)
parsed = self.output_schema.model_validate(parsed)
parsed = dict(parsed)
parsed["ai_code_version"] = self.ai_code_version
parsed["id"] = item["id"]
return parsed, 200
except asyncio.TimeoutError:
print(f"⏳ Timeout on item {item['id']}")
return None, 408
except Exception as e:
print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}")
return None, 400
def async_eval(self, processed_id: List = []):
try:
asyncio.run(self.__async_eval(processed_id))
except KeyboardInterrupt:
print("\n🛑 Interrupted by user.")
traceback.print_exc()
async def __async_eval(self, processed_id: List):
"""
اجرای اصلی تک‌هسته‌ای و async برای تولید خروجی نهایی.
"""
print("🔹 Starting async data processing...")
# ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------
if not processed_id:
try:
processed_id = self.__load_orjson(self._temp_processed_id_path)
print(
f"📂 Loaded existing processed_id from {self._temp_processed_id_path}"
)
except Exception:
print("⚠️ No valid processed_id found. Starting fresh.")
processed_id = []
# ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------
all_processed_id = set(processed_id)
all_results = []
total_time = []
data = [item for item in self.data if item.get("id") not in all_processed_id]
print(
f" Total items: {len(self.data)} - {len(all_processed_id)} = {len(data)}"
)
# اگر چیزی برای پردازش نیست
if not data:
print("✅ Nothing new to process. All items are already done.")
return
# ------------------ مرحله ۳: شروع پردازش ------------------
print(f"🤖 Model: {self.model_name} | Reasoning: {self.reasoning_effort}")
async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client:
semaphore = asyncio.Semaphore(5)
async def limited_process(item):
async with semaphore:
return await self.__process_item(client, item)
tasks = [asyncio.create_task(limited_process(item)) for item in data]
total_i = 0
# ✅ پردازش به ترتیب تکمیل (نه ترتیب لیست)
for i, task in enumerate(asyncio.as_completed(tasks), start=1):
start = time.time()
try:
parsed, status_code = await asyncio.wait_for(
task, timeout=self.request_timeout
) # ⏱ حداکثر 2 دقیقه
except asyncio.TimeoutError:
print(f"⏳ Task {i} timed out completely")
parsed, status_code = None, 408
total_time.append(time.time() - start)
if status_code == 200:
all_results.append(parsed)
all_processed_id.add(parsed.get("id"))
else:
print(f"⚠️ Skipped item {parsed.get('id')} (status={status_code})")
total_i += 1
# ✅ ذخیره‌ی موقت هر n مورد
if total_i >= self.save_number:
print(f"total_i {total_i}")
print(f"self.save_number {self.save_number}")
total_i = 0
self.__save_orjson(
data=list(all_processed_id),
path=self._temp_processed_id_path,
)
print(f"💾 Auto-saved processed ids: {len(all_processed_id)}")
number = self.__get_max_number_file(self._temp_path)
print(f"number {number}")
temp_output_path = self._temp_path / f"output_{number}.json"
self.__save_orjson(data=list(all_results), path=temp_output_path)
print(f"💾 Auto-saved partial data: {len(all_results)}")
all_results.clear()
# ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده
if total_i > 0 or len(all_results) > 0:
print("💾 Final save of remaining data...")
self.__save_orjson(
data=list(all_processed_id),
path=self._temp_processed_id_path,
)
print(f"💾 Auto-saved processed ids: {len(all_processed_id)}")
number = self.__get_max_number_file(self._temp_path)
print(f"number {number}")
temp_output_path = self._temp_path / f"output_{number}.json"
self.__save_orjson(data=list(all_results), path=temp_output_path)
print(f"💾 Auto-saved partial data: {len(all_results)}")
all_results.clear()
# ------------------ مرحله ۴: ذخیره خروجی ------------------
final_data_path = self.output_path / f"final_data_{self.task_name}.json"
processed_id_path = self.output_path / "processed_id.json"
self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path)
all_results = self.__load_orjson(final_data_path)
# make_new_proccessed_ids_from_file()
self.__save_orjson(data=list(all_processed_id), path=processed_id_path)
self.__save_orjson(data=all_results, path=final_data_path)
avg_time = (sum(total_time) / len(total_time)) if total_time else 0
print(
f"\n✅ Processing completed!\n"
f"📊 Total-Data: {len(data)} | "
f"⭕ Ignored-Data: {len(processed_id)} | "
f"📦 Proccessed-Data: {len(all_results)} | "
f"❌ Loss-Data: {len(data)-len(all_results)} | "
f"🕒 Avg Time: {avg_time:.2f}'s per item | "
f"🕒 Total Time: {sum(total_time):.4f}'s | "
f"💾 Results saved to: {final_data_path}"
)