From bd85df3f235650d1c59a631d875630d96991d184 Mon Sep 17 00:00:00 2001 From: init_mahdi Date: Wed, 5 Nov 2025 17:51:03 +0000 Subject: [PATCH] step1 --- .gitignore | 3 + __init__.py | 1 + core/__init__.py | 1 + core/ai_parser.py | 369 ++++++++++++++++++++++++++++++++++++++++++++++ requierments.txt | 1 + 5 files changed, 375 insertions(+) create mode 100644 .gitignore create mode 100644 __init__.py create mode 100644 core/__init__.py create mode 100644 core/ai_parser.py create mode 100644 requierments.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7ade791 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +test/test_data.json +/task/ +/test diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..a5741f7 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +from core import AsyncCore \ No newline at end of file diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..c6155c3 --- /dev/null +++ b/core/__init__.py @@ -0,0 +1 @@ +from ai_parser import AsyncCore \ No newline at end of file diff --git a/core/ai_parser.py b/core/ai_parser.py new file mode 100644 index 0000000..11bb7b2 --- /dev/null +++ b/core/ai_parser.py @@ -0,0 +1,369 @@ +from typing import List +from pathlib import Path +import os, orjson, time, json, re, asyncio, traceback +from openai import AsyncOpenAI + +# -------------------------------------------------------------------- + + +# ------------------------------ پردازش API ------------------------------ +class AsyncCore: + def __init__( + self, + model_name, + task_name, + data_path, + output_schema, + api_url, + reasoning_effort, + top_p, + temperature, + max_token, + output_path=None, + ai_code_version=None, + request_timeout=30, # ثانیه + api_key="EMPTY", + save_number=2, + ): + + self.save_number = save_number + # json file of data + self.data_path = data_path + + self.task_name = task_name + if output_path is None: + output_path = f"./{task_name}" + + self.output_path = Path(output_path) + self._temp_path = self.output_path / "batch_data" + self._temp_processed_id_path = self._temp_path / "processed_id.json" + + # Create output directory and subdirectories if they don't exist + self.output_path.mkdir(parents=True, exist_ok=True) + self._temp_path.mkdir(parents=True, exist_ok=True) + # self._temp_processed_id_path.mkdir(parents=True, exist_ok=True) + + self.request_timeout = request_timeout + self.model_name = model_name + self.api_key = api_key + self.output_schema = output_schema + self.api_url = api_url + self.reasoning_effort = reasoning_effort + self.top_p = top_p + self.temperature = temperature + self.max_token = max_token + + if ai_code_version is None: + ai_code_version = f"{model_name}_{reasoning_effort}" + self.ai_code_version = ai_code_version + + self.PRIMARY_KEY = {"system_prompt", "user_prompt", "id"} + + try: + self.data = self.__data_process() + print(f"📦 Loaded {len(self.data)} words") + except Exception as e: + raise ValueError( + f"Data loading/validation failed: {e}\n{traceback.format_exc()}" + ) + + def __validate_item(self, item, idx): + # Mandatory fields + for key in self.PRIMARY_KEY: + if key not in item: + raise ValueError(f"Missing mandatory key '{key}' in item #{idx}") + if not isinstance(item[key], str): + raise TypeError( + f"Item #{idx}: '{key}' must be a string, got {type(item[key]).__name__}" + ) + + # Optional field: assistant_prompt + if "assistant_prompt" not in item or item["assistant_prompt"] is None: + item["assistant_prompt"] = None + else: + if not isinstance(item["assistant_prompt"], str): + raise TypeError( + f"Item #{idx}: 'assistant_prompt' must be a string or absent, got {type(item['assistant_prompt']).__name__}" + ) + + return item # now normalized + + def __data_process(self): + raw_data = self.__load_orjson(self.data_path) + if not isinstance(raw_data, list): + raise ValueError("Data must be a list of dictionaries.") + + processed_data = [] + for idx, item in enumerate(raw_data): + if not isinstance(item, dict): + raise ValueError(f"Item #{idx} is not a dictionary.") + validated_item = self.__validate_item(item, idx) + processed_data.append(validated_item) + + return processed_data + + def __get_max_number_file(self, directory): + # Pattern to match filenames like out_1.json, out_25.json, etc. + pattern = re.compile(r"output_(\d+)\.json$") + max_num = 0 + + for filename in os.listdir(directory): + match = pattern.match(filename) + if match: + num = int(match.group(1)) + if num > max_num: + max_num = num + return max_num + 1 + + def __load_orjson(self, path: str | Path): + path = Path(path) + with path.open("rb") as f: # باید باینری باز بشه برای orjson + return orjson.loads(f.read()) + + def __save_orjson(self, path, data): + with open(path, "wb") as f: + f.write( + orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) + ) + + def merge_json_dir(self, input_path, output_path): + directory = Path(input_path) + if not directory.is_dir(): + raise ValueError(f"Not valid PATH: {input_path}") + + seen_ids = set() # برای ردیابی idهای دیده‌شده (سریع!) + unique_data = [] # فقط داده‌های یکتا + failed_files = [] + + json_files = list(directory.glob("*.json")) + if not json_files: + print("⚠️ NO JSON File Found In This PATH") + return + + for json_file in json_files: + try: + data = self.__load_orjson(json_file) + if not data: # خالی یا None + failed_files.append(json_file.name) + continue + + if isinstance(data, list) and isinstance(data[0], dict): + for item in data: + item_id = item.get("id") + if item_id is None: + # اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی + # اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند + continue + if item_id not in seen_ids: + seen_ids.add(item_id) + unique_data.append(item) + else: + raise ValueError(f"no list available in this json -> {json_file}") + except ( + json.JSONDecodeError, + ValueError, + OSError, + KeyError, + TypeError, + ) as e: + # print(f"❌ Failed in process '{json_file.name}': {e}") + failed_files.append(json_file.name) + + # گزارش خطاها + if failed_files: + print("\n❌ We lose this file:") + for name in failed_files: + print(f" - {name}") + else: + print("\n✅ All JSON added") + + # ذخیره خروجی + try: + self.__save_orjson(data=unique_data, path=output_path) + print( + f"\n💾 Final file saved: {output_path} (Total unique items: {len(unique_data)})" + ) + except Exception as e: + print(f"❌ Error in saving final file: {e}") + + def make_new_proccessed_ids_from_file(self, json_in, out_path): + data = self.__load_orjson(json_in) + + finall_data = [] + for d in data: + if d["id"]: + finall_data.append(d["id"]) + finall_data = set(finall_data) + finall_data = list(finall_data) + print(f"-- len ids {len(finall_data)}") + + self.__save_orjson(data=finall_data, path=out_path) + + # ------------------------------ Main ------------------------------ + async def __process_item(self, client, item): + try: + messages = [ + {"role": "system", "content": item["system_prompt"]}, + {"role": "user", "content": item["user_prompt"]}, + ] + if item.get("assistant_prompt"): + messages.append( + {"role": "assistant", "content": item["assistant_prompt"]} + ) + + response = await client.chat.completions.parse( + model=self.model_name, + messages=messages, + temperature=self.temperature, + top_p=self.top_p, + reasoning_effort=self.reasoning_effort, + max_tokens=self.max_token, + stop=None, + response_format=self.output_schema, + ) + + parsed = ( + response.choices[0].message.parsed + if response and response.choices and response.choices[0].message.parsed + else {"raw_text": str(response)} + ) + + parsed = self.output_schema.model_validate(parsed) + parsed = dict(parsed) + parsed["ai_code_version"] = self.ai_code_version + parsed["id"] = item["id"] + print(f"parsed") + return parsed, 200 + + except asyncio.TimeoutError: + print(f"⏳ Timeout on item {item['id']}") + return None, 408 + + except Exception as e: + print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}") + return None, 400 + + def async_eval(self, processed_id: List = []): + try: + asyncio.run(self.__async_eval(processed_id)) + except KeyboardInterrupt: + print("\n🛑 Interrupted by user.") + traceback.print_exc() + + async def __async_eval(self, processed_id: List): + """ + اجرای اصلی تک‌هسته‌ای و async برای تولید خروجی نهایی. + """ + print("🔹 Starting async data processing...") + + # ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------ + if not processed_id: + try: + processed_id = self.__load_orjson(self._temp_processed_id_path) + print( + f"📂 Loaded existing processed_id from {self._temp_processed_id_path}" + ) + except Exception: + print("⚠️ No valid processed_id found. Starting fresh.") + processed_id = [] + + # ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------ + all_processed_id = set(processed_id) + all_results = [] + total_time = [] + + data = [item for item in self.data if item.get("id") not in all_processed_id] + print( + f"➕ Total items: {len(self.data)} - {len(all_processed_id)} = {len(data)}" + ) + + # اگر چیزی برای پردازش نیست + if not data: + print("✅ Nothing new to process. All items are already done.") + return + + # ------------------ مرحله ۳: شروع پردازش ------------------ + print(f"🤖 Model: {self.model_name} | Reasoning: {self.reasoning_effort}") + async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: + semaphore = asyncio.Semaphore(5) + + async def limited_process(item): + async with semaphore: + return await self.__process_item(client, item) + + tasks = [asyncio.create_task(limited_process(item)) for item in data] + + total_i = 0 + # ✅ پردازش به ترتیب تکمیل (نه ترتیب لیست) + for i, task in enumerate(asyncio.as_completed(tasks), start=1): + start = time.time() + try: + parsed, status_code = await asyncio.wait_for( + task, timeout=self.request_timeout + ) # ⏱ حداکثر 2 دقیقه + except asyncio.TimeoutError: + print(f"⏳ Task {i} timed out completely") + parsed, status_code = None, 408 + total_time.append(time.time() - start) + + if status_code == 200: + all_results.append(parsed) + all_processed_id.add(parsed.get("id")) + else: + print(f"⚠️ Skipped item {parsed.get('id')} (status={status_code})") + + total_i += 1 + # ✅ ذخیره‌ی موقت هر n مورد + if total_i >= self.save_number: + print(f"total_i {total_i}") + print(f"self.save_number {self.save_number}") + total_i = 0 + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + number = self.__get_max_number_file(self._temp_path) + print(f"number {number}") + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson(data=list(all_results), path=temp_output_path) + print(f"💾 Auto-saved partial data: {len(all_results)}") + all_results.clear() + + # ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده + if total_i > 0 or len(all_results) > 0: + print("💾 Final save of remaining data...") + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + number = self.__get_max_number_file(self._temp_path) + print(f"number {number}") + + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson(data=list(all_results), path=temp_output_path) + print(f"💾 Auto-saved partial data: {len(all_results)}") + all_results.clear() + + # ------------------ مرحله ۴: ذخیره خروجی ------------------ + final_data_path = self.output_path / f"final_data_{self.task_name}.json" + processed_id_path = self.output_path / "processed_id.json" + + self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path) + all_results = self.__load_orjson(final_data_path) + # make_new_proccessed_ids_from_file() + self.__save_orjson(data=list(all_processed_id), path=processed_id_path) + self.__save_orjson(data=all_results, path=final_data_path) + + avg_time = (sum(total_time) / len(total_time)) if total_time else 0 + print( + f"\n✅ Processing completed!\n" + f"📊 Total-Data: {len(data)} | " + f"⭕ Ignored-Data: {len(processed_id)} | " + f"📦 Proccessed-Data: {len(all_results)} | " + f"❌ Loss-Data: {len(data)-len(all_results)} | " + f"🕒 Avg Time: {avg_time:.2f}'s per item | " + f"🕒 Total Time: {sum(total_time):.4f}'s | " + f"💾 Results saved to: {final_data_path}" + ) diff --git a/requierments.txt b/requierments.txt new file mode 100644 index 0000000..d0d0bb5 --- /dev/null +++ b/requierments.txt @@ -0,0 +1 @@ +orjson \ No newline at end of file