from typing import List from pathlib import Path import os, orjson, time, json, re, asyncio, traceback from openai import AsyncOpenAI # -------------------------------------------------------------------- # ------------------------------ پردازش API ------------------------------ class AsyncCore: def __init__( self, model_name, task_name, data_path, output_schema, api_url, reasoning_effort='low', top_p=1, temperature=0.0, max_token=128000, output_path=None, ai_code_version=None, request_timeout=30, # ثانیه api_key="EMPTY", save_number=2, ): self.save_number = save_number # json file of data self.data_path = data_path self.task_name = task_name if output_path is None: output_path = f"./{task_name}" self.output_path = Path(output_path) self._temp_path = self.output_path / "batch_data" self._temp_processed_id_path = self._temp_path / "processed_id.json" # Create output directory and subdirectories if they don't exist self.output_path.mkdir(parents=True, exist_ok=True) self._temp_path.mkdir(parents=True, exist_ok=True) # self._temp_processed_id_path.mkdir(parents=True, exist_ok=True) self.request_timeout = request_timeout self.model_name = model_name self.api_key = api_key self.output_schema = output_schema self.api_url = api_url self.reasoning_effort = reasoning_effort self.top_p = top_p self.temperature = temperature self.max_token = max_token if ai_code_version is None: ai_code_version = f"{model_name}_{reasoning_effort}" self.ai_code_version = ai_code_version self.PRIMARY_KEY = {"system_prompt", "user_prompt", "id"} try: self.data = self.__data_process() print(f"📦 Loaded {len(self.data)} words") except Exception as e: raise ValueError( f"Data loading/validation failed: {e}\n{traceback.format_exc()}" ) def __validate_item(self, item, idx): # Mandatory fields for key in self.PRIMARY_KEY: if key not in item: raise ValueError(f"Missing mandatory key '{key}' in item #{idx}") if not isinstance(item[key], str): raise TypeError( f"Item #{idx}: '{key}' must be a string, got {type(item[key]).__name__}" ) # Optional field: assistant_prompt if "assistant_prompt" not in item or item["assistant_prompt"] is None: item["assistant_prompt"] = None else: if not isinstance(item["assistant_prompt"], str): raise TypeError( f"Item #{idx}: 'assistant_prompt' must be a string or absent, got {type(item['assistant_prompt']).__name__}" ) return item # now normalized def __data_process(self): raw_data = self.__load_orjson(self.data_path) if not isinstance(raw_data, list): raise ValueError("Data must be a list of dictionaries.") processed_data = [] for idx, item in enumerate(raw_data): if not isinstance(item, dict): raise ValueError(f"Item #{idx} is not a dictionary.") validated_item = self.__validate_item(item, idx) processed_data.append(validated_item) return processed_data def __get_max_number_file(self, directory): # Pattern to match filenames like out_1.json, out_25.json, etc. pattern = re.compile(r"output_(\d+)\.json$") max_num = 0 for filename in os.listdir(directory): match = pattern.match(filename) if match: num = int(match.group(1)) if num > max_num: max_num = num return max_num + 1 def __load_orjson(self, path: str | Path): path = Path(path) with path.open("rb") as f: # باید باینری باز بشه برای orjson return orjson.loads(f.read()) def __save_orjson(self, path, data): with open(path, "wb") as f: f.write( orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) ) def merge_json_dir(self, input_path, output_path): directory = Path(input_path) if not directory.is_dir(): raise ValueError(f"Not valid PATH: {input_path}") seen_ids = set() # برای ردیابی idهای دیده‌شده (سریع!) unique_data = [] # فقط داده‌های یکتا failed_files = [] json_files = list(directory.glob("*.json")) if not json_files: print("⚠️ NO JSON File Found In This PATH") return for json_file in json_files: try: data = self.__load_orjson(json_file) if not data: # خالی یا None failed_files.append(json_file.name) continue if isinstance(data, list) and isinstance(data[0], dict): for item in data: item_id = item.get("id") if item_id is None: # اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی # اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند continue if item_id not in seen_ids: seen_ids.add(item_id) unique_data.append(item) else: raise ValueError(f"no list available in this json -> {json_file}") except ( json.JSONDecodeError, ValueError, OSError, KeyError, TypeError, ) as e: # print(f"❌ Failed in process '{json_file.name}': {e}") failed_files.append(json_file.name) # گزارش خطاها if failed_files: print("\n❌ We lose this file:") for name in failed_files: print(f" - {name}") else: print("\n✅ All JSON added") # ذخیره خروجی try: self.__save_orjson(data=unique_data, path=output_path) print( f"\n💾 Final file saved: {output_path} (Total unique items: {len(unique_data)})" ) except Exception as e: print(f"❌ Error in saving final file: {e}") def make_new_proccessed_ids_from_file(self, json_in, out_path): data = self.__load_orjson(json_in) finall_data = [] for d in data: if d["id"]: finall_data.append(d["id"]) finall_data = set(finall_data) finall_data = list(finall_data) print(f"-- len ids {len(finall_data)}") self.__save_orjson(data=finall_data, path=out_path) # ------------------------------ Main ------------------------------ async def __process_item(self, client, item): try: messages = [ {"role": "system", "content": item["system_prompt"]}, {"role": "user", "content": item["user_prompt"]}, ] if item.get("assistant_prompt"): messages.append( {"role": "assistant", "content": item["assistant_prompt"]} ) response = await client.chat.completions.parse( model=self.model_name, messages=messages, temperature=self.temperature, top_p=self.top_p, reasoning_effort=self.reasoning_effort, max_tokens=self.max_token, stop=None, response_format=self.output_schema, ) parsed = ( response.choices[0].message.parsed if response and response.choices and response.choices[0].message.parsed else {"raw_text": str(response)} ) parsed = self.output_schema.model_validate(parsed) parsed = dict(parsed) parsed["ai_code_version"] = self.ai_code_version parsed["id"] = item["id"] return parsed, 200 except asyncio.TimeoutError: print(f"⏳ Timeout on item {item['id']}") return None, 408 except Exception as e: print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}") return None, 400 def async_eval(self, processed_id: List = []): try: asyncio.run(self.__async_eval(processed_id)) except KeyboardInterrupt: print("\n🛑 Interrupted by user.") traceback.print_exc() async def __async_eval(self, processed_id: List): """ اجرای اصلی تک‌هسته‌ای و async برای تولید خروجی نهایی. """ print("🔹 Starting async data processing...") # ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------ if not processed_id: try: processed_id = self.__load_orjson(self._temp_processed_id_path) print( f"📂 Loaded existing processed_id from {self._temp_processed_id_path}" ) except Exception: print("⚠️ No valid processed_id found. Starting fresh.") processed_id = [] # ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------ all_processed_id = set(processed_id) all_results = [] total_time = [] data = [item for item in self.data if item.get("id") not in all_processed_id] print( f"➕ Total items: {len(self.data)} - {len(all_processed_id)} = {len(data)}" ) # اگر چیزی برای پردازش نیست if not data: print("✅ Nothing new to process. All items are already done.") return # ------------------ مرحله ۳: شروع پردازش ------------------ print(f"🤖 Model: {self.model_name} | Reasoning: {self.reasoning_effort}") async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: semaphore = asyncio.Semaphore(5) async def limited_process(item): async with semaphore: return await self.__process_item(client, item) tasks = [asyncio.create_task(limited_process(item)) for item in data] total_i = 0 # ✅ پردازش به ترتیب تکمیل (نه ترتیب لیست) for i, task in enumerate(asyncio.as_completed(tasks), start=1): start = time.time() try: parsed, status_code = await asyncio.wait_for( task, timeout=self.request_timeout ) # ⏱ حداکثر 2 دقیقه except asyncio.TimeoutError: print(f"⏳ Task {i} timed out completely") parsed, status_code = None, 408 total_time.append(time.time() - start) if status_code == 200: all_results.append(parsed) all_processed_id.add(parsed.get("id")) else: print(f"⚠️ Skipped item {parsed.get('id')} (status={status_code})") total_i += 1 # ✅ ذخیره‌ی موقت هر n مورد if total_i >= self.save_number: print(f"total_i {total_i}") print(f"self.save_number {self.save_number}") total_i = 0 self.__save_orjson( data=list(all_processed_id), path=self._temp_processed_id_path, ) print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") number = self.__get_max_number_file(self._temp_path) print(f"number {number}") temp_output_path = self._temp_path / f"output_{number}.json" self.__save_orjson(data=list(all_results), path=temp_output_path) print(f"💾 Auto-saved partial data: {len(all_results)}") all_results.clear() # ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده if total_i > 0 or len(all_results) > 0: print("💾 Final save of remaining data...") self.__save_orjson( data=list(all_processed_id), path=self._temp_processed_id_path, ) print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") number = self.__get_max_number_file(self._temp_path) print(f"number {number}") temp_output_path = self._temp_path / f"output_{number}.json" self.__save_orjson(data=list(all_results), path=temp_output_path) print(f"💾 Auto-saved partial data: {len(all_results)}") all_results.clear() # ------------------ مرحله ۴: ذخیره خروجی ------------------ final_data_path = self.output_path / f"final_data_{self.task_name}.json" processed_id_path = self.output_path / "processed_id.json" self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path) all_results = self.__load_orjson(final_data_path) # make_new_proccessed_ids_from_file() self.__save_orjson(data=list(all_processed_id), path=processed_id_path) self.__save_orjson(data=all_results, path=final_data_path) avg_time = (sum(total_time) / len(total_time)) if total_time else 0 print( f"\n✅ Processing completed!\n" f"📊 Total-Data: {len(data)} | " f"⭕ Ignored-Data: {len(processed_id)} | " f"📦 Proccessed-Data: {len(all_results)} | " f"❌ Loss-Data: {len(data)-len(all_results)} | " f"🕒 Avg Time: {avg_time:.2f}'s per item | " f"🕒 Total Time: {sum(total_time):.4f}'s | " f"💾 Results saved to: {final_data_path}" )