This commit is contained in:
init_mahdi 2025-11-05 17:51:03 +00:00
commit bd85df3f23
5 changed files with 375 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
test/test_data.json
/task/
/test

1
__init__.py Normal file
View File

@ -0,0 +1 @@
from core import AsyncCore

1
core/__init__.py Normal file
View File

@ -0,0 +1 @@
from ai_parser import AsyncCore

369
core/ai_parser.py Normal file
View File

@ -0,0 +1,369 @@
from typing import List
from pathlib import Path
import os, orjson, time, json, re, asyncio, traceback
from openai import AsyncOpenAI
# --------------------------------------------------------------------
# ------------------------------ پردازش API ------------------------------
class AsyncCore:
def __init__(
self,
model_name,
task_name,
data_path,
output_schema,
api_url,
reasoning_effort,
top_p,
temperature,
max_token,
output_path=None,
ai_code_version=None,
request_timeout=30, # ثانیه
api_key="EMPTY",
save_number=2,
):
self.save_number = save_number
# json file of data
self.data_path = data_path
self.task_name = task_name
if output_path is None:
output_path = f"./{task_name}"
self.output_path = Path(output_path)
self._temp_path = self.output_path / "batch_data"
self._temp_processed_id_path = self._temp_path / "processed_id.json"
# Create output directory and subdirectories if they don't exist
self.output_path.mkdir(parents=True, exist_ok=True)
self._temp_path.mkdir(parents=True, exist_ok=True)
# self._temp_processed_id_path.mkdir(parents=True, exist_ok=True)
self.request_timeout = request_timeout
self.model_name = model_name
self.api_key = api_key
self.output_schema = output_schema
self.api_url = api_url
self.reasoning_effort = reasoning_effort
self.top_p = top_p
self.temperature = temperature
self.max_token = max_token
if ai_code_version is None:
ai_code_version = f"{model_name}_{reasoning_effort}"
self.ai_code_version = ai_code_version
self.PRIMARY_KEY = {"system_prompt", "user_prompt", "id"}
try:
self.data = self.__data_process()
print(f"📦 Loaded {len(self.data)} words")
except Exception as e:
raise ValueError(
f"Data loading/validation failed: {e}\n{traceback.format_exc()}"
)
def __validate_item(self, item, idx):
# Mandatory fields
for key in self.PRIMARY_KEY:
if key not in item:
raise ValueError(f"Missing mandatory key '{key}' in item #{idx}")
if not isinstance(item[key], str):
raise TypeError(
f"Item #{idx}: '{key}' must be a string, got {type(item[key]).__name__}"
)
# Optional field: assistant_prompt
if "assistant_prompt" not in item or item["assistant_prompt"] is None:
item["assistant_prompt"] = None
else:
if not isinstance(item["assistant_prompt"], str):
raise TypeError(
f"Item #{idx}: 'assistant_prompt' must be a string or absent, got {type(item['assistant_prompt']).__name__}"
)
return item # now normalized
def __data_process(self):
raw_data = self.__load_orjson(self.data_path)
if not isinstance(raw_data, list):
raise ValueError("Data must be a list of dictionaries.")
processed_data = []
for idx, item in enumerate(raw_data):
if not isinstance(item, dict):
raise ValueError(f"Item #{idx} is not a dictionary.")
validated_item = self.__validate_item(item, idx)
processed_data.append(validated_item)
return processed_data
def __get_max_number_file(self, directory):
# Pattern to match filenames like out_1.json, out_25.json, etc.
pattern = re.compile(r"output_(\d+)\.json$")
max_num = 0
for filename in os.listdir(directory):
match = pattern.match(filename)
if match:
num = int(match.group(1))
if num > max_num:
max_num = num
return max_num + 1
def __load_orjson(self, path: str | Path):
path = Path(path)
with path.open("rb") as f: # باید باینری باز بشه برای orjson
return orjson.loads(f.read())
def __save_orjson(self, path, data):
with open(path, "wb") as f:
f.write(
orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS)
)
def merge_json_dir(self, input_path, output_path):
directory = Path(input_path)
if not directory.is_dir():
raise ValueError(f"Not valid PATH: {input_path}")
seen_ids = set() # برای ردیابی idهای دیده‌شده (سریع!)
unique_data = [] # فقط داده‌های یکتا
failed_files = []
json_files = list(directory.glob("*.json"))
if not json_files:
print("⚠️ NO JSON File Found In This PATH")
return
for json_file in json_files:
try:
data = self.__load_orjson(json_file)
if not data: # خالی یا None
failed_files.append(json_file.name)
continue
if isinstance(data, list) and isinstance(data[0], dict):
for item in data:
item_id = item.get("id")
if item_id is None:
# اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی
# اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند
continue
if item_id not in seen_ids:
seen_ids.add(item_id)
unique_data.append(item)
else:
raise ValueError(f"no list available in this json -> {json_file}")
except (
json.JSONDecodeError,
ValueError,
OSError,
KeyError,
TypeError,
) as e:
# print(f"❌ Failed in process '{json_file.name}': {e}")
failed_files.append(json_file.name)
# گزارش خطاها
if failed_files:
print("\n❌ We lose this file:")
for name in failed_files:
print(f" - {name}")
else:
print("\n✅ All JSON added")
# ذخیره خروجی
try:
self.__save_orjson(data=unique_data, path=output_path)
print(
f"\n💾 Final file saved: {output_path} (Total unique items: {len(unique_data)})"
)
except Exception as e:
print(f"❌ Error in saving final file: {e}")
def make_new_proccessed_ids_from_file(self, json_in, out_path):
data = self.__load_orjson(json_in)
finall_data = []
for d in data:
if d["id"]:
finall_data.append(d["id"])
finall_data = set(finall_data)
finall_data = list(finall_data)
print(f"-- len ids {len(finall_data)}")
self.__save_orjson(data=finall_data, path=out_path)
# ------------------------------ Main ------------------------------
async def __process_item(self, client, item):
try:
messages = [
{"role": "system", "content": item["system_prompt"]},
{"role": "user", "content": item["user_prompt"]},
]
if item.get("assistant_prompt"):
messages.append(
{"role": "assistant", "content": item["assistant_prompt"]}
)
response = await client.chat.completions.parse(
model=self.model_name,
messages=messages,
temperature=self.temperature,
top_p=self.top_p,
reasoning_effort=self.reasoning_effort,
max_tokens=self.max_token,
stop=None,
response_format=self.output_schema,
)
parsed = (
response.choices[0].message.parsed
if response and response.choices and response.choices[0].message.parsed
else {"raw_text": str(response)}
)
parsed = self.output_schema.model_validate(parsed)
parsed = dict(parsed)
parsed["ai_code_version"] = self.ai_code_version
parsed["id"] = item["id"]
print(f"parsed")
return parsed, 200
except asyncio.TimeoutError:
print(f"⏳ Timeout on item {item['id']}")
return None, 408
except Exception as e:
print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}")
return None, 400
def async_eval(self, processed_id: List = []):
try:
asyncio.run(self.__async_eval(processed_id))
except KeyboardInterrupt:
print("\n🛑 Interrupted by user.")
traceback.print_exc()
async def __async_eval(self, processed_id: List):
"""
اجرای اصلی تکهستهای و async برای تولید خروجی نهایی.
"""
print("🔹 Starting async data processing...")
# ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------
if not processed_id:
try:
processed_id = self.__load_orjson(self._temp_processed_id_path)
print(
f"📂 Loaded existing processed_id from {self._temp_processed_id_path}"
)
except Exception:
print("⚠️ No valid processed_id found. Starting fresh.")
processed_id = []
# ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------
all_processed_id = set(processed_id)
all_results = []
total_time = []
data = [item for item in self.data if item.get("id") not in all_processed_id]
print(
f" Total items: {len(self.data)} - {len(all_processed_id)} = {len(data)}"
)
# اگر چیزی برای پردازش نیست
if not data:
print("✅ Nothing new to process. All items are already done.")
return
# ------------------ مرحله ۳: شروع پردازش ------------------
print(f"🤖 Model: {self.model_name} | Reasoning: {self.reasoning_effort}")
async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client:
semaphore = asyncio.Semaphore(5)
async def limited_process(item):
async with semaphore:
return await self.__process_item(client, item)
tasks = [asyncio.create_task(limited_process(item)) for item in data]
total_i = 0
# ✅ پردازش به ترتیب تکمیل (نه ترتیب لیست)
for i, task in enumerate(asyncio.as_completed(tasks), start=1):
start = time.time()
try:
parsed, status_code = await asyncio.wait_for(
task, timeout=self.request_timeout
) # ⏱ حداکثر 2 دقیقه
except asyncio.TimeoutError:
print(f"⏳ Task {i} timed out completely")
parsed, status_code = None, 408
total_time.append(time.time() - start)
if status_code == 200:
all_results.append(parsed)
all_processed_id.add(parsed.get("id"))
else:
print(f"⚠️ Skipped item {parsed.get('id')} (status={status_code})")
total_i += 1
# ✅ ذخیره‌ی موقت هر n مورد
if total_i >= self.save_number:
print(f"total_i {total_i}")
print(f"self.save_number {self.save_number}")
total_i = 0
self.__save_orjson(
data=list(all_processed_id),
path=self._temp_processed_id_path,
)
print(f"💾 Auto-saved processed ids: {len(all_processed_id)}")
number = self.__get_max_number_file(self._temp_path)
print(f"number {number}")
temp_output_path = self._temp_path / f"output_{number}.json"
self.__save_orjson(data=list(all_results), path=temp_output_path)
print(f"💾 Auto-saved partial data: {len(all_results)}")
all_results.clear()
# ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده
if total_i > 0 or len(all_results) > 0:
print("💾 Final save of remaining data...")
self.__save_orjson(
data=list(all_processed_id),
path=self._temp_processed_id_path,
)
print(f"💾 Auto-saved processed ids: {len(all_processed_id)}")
number = self.__get_max_number_file(self._temp_path)
print(f"number {number}")
temp_output_path = self._temp_path / f"output_{number}.json"
self.__save_orjson(data=list(all_results), path=temp_output_path)
print(f"💾 Auto-saved partial data: {len(all_results)}")
all_results.clear()
# ------------------ مرحله ۴: ذخیره خروجی ------------------
final_data_path = self.output_path / f"final_data_{self.task_name}.json"
processed_id_path = self.output_path / "processed_id.json"
self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path)
all_results = self.__load_orjson(final_data_path)
# make_new_proccessed_ids_from_file()
self.__save_orjson(data=list(all_processed_id), path=processed_id_path)
self.__save_orjson(data=all_results, path=final_data_path)
avg_time = (sum(total_time) / len(total_time)) if total_time else 0
print(
f"\n✅ Processing completed!\n"
f"📊 Total-Data: {len(data)} | "
f"⭕ Ignored-Data: {len(processed_id)} | "
f"📦 Proccessed-Data: {len(all_results)} | "
f"❌ Loss-Data: {len(data)-len(all_results)} | "
f"🕒 Avg Time: {avg_time:.2f}'s per item | "
f"🕒 Total Time: {sum(total_time):.4f}'s | "
f"💾 Results saved to: {final_data_path}"
)

1
requierments.txt Normal file
View File

@ -0,0 +1 @@
orjson