from typing import List, Dict, Optional, Union, Tuple from pydantic import BaseModel from pathlib import Path import os, orjson, time, json, re, asyncio, traceback, requests from openai import OpenAI, AsyncOpenAI, LengthFinishReasonError from datetime import datetime, timedelta from itertools import product __version__ = "1.0.3" ############################################################################## # ------------------------------ Start BaseModel ------------------------------ ############################################################################## class LLMServer(BaseModel): api_url: str # "http://ip1:8000/v1" model_name: str api_key: str = "EMPTY" ai_code_version: Optional[str] = None priority: int # 1 semaphore_number: int reasoning_effort: str = "low" top_p: float = 1 temperature: float = 0.0 max_token: Optional[int] = None ############################################################################## # ------------------------------ End BaseModel ------------------------------ ############################################################################## ############################################################################## # ------------------------------ Start Code ------------------------------ ############################################################################## # ------------------------------ Single LLm API ------------------------------ class AsyncCore: def __init__( self, model_name, task_name, api_url, priority, output_schema=None, data_path=None, reasoning_effort="low", top_p=1, temperature=0.0, max_token=4096, output_path=None, ai_code_version=None, request_timeout=30, # ثانیه api_key="EMPTY", save_number=2, semaphore_number=5, ): self.save_number = save_number self.priority = priority # json file of data self.data_path = data_path self.semaphore_number = semaphore_number self.task_name = task_name if output_path is None: output_path = f"./{task_name}" self.output_path = Path(output_path) self._temp_path = self.output_path / "batch_data" self._temp_processed_id_path = self._temp_path / "processed_id.json" # Create output directory and subdirectories if they don't exist self.output_path.mkdir(parents=True, exist_ok=True) self._temp_path.mkdir(parents=True, exist_ok=True) # self._temp_processed_id_path.mkdir(parents=True, exist_ok=True) self.request_timeout = request_timeout self.model_name = model_name self.api_key = api_key self.output_schema = output_schema self.api_url = api_url self.reasoning_effort = reasoning_effort self.top_p = top_p self.temperature = temperature self.max_token = max_token if ai_code_version is None: ai_code_version = f"{model_name}_{reasoning_effort}" self.ai_code_version = ai_code_version self.PRIMARY_KEY = {"user_prompt", "id"} if data_path != None: try: self.data = self.__data_process() print(f"📦 Loaded {len(self.data)} words") except Exception as e: raise ValueError( f"Data loading/validation failed: {e}\n{traceback.format_exc()}" ) def __validate_item(self, item, idx): # Mandatory fields for key in self.PRIMARY_KEY: if key not in item: raise ValueError(f"Missing mandatory key '{key}' in item #{idx}") if not isinstance(item[key], str): raise TypeError( f"Item #{idx}: '{key}' must be a string, got {type(item[key]).__name__}" ) # Optional field: assistant_prompt if "assistant_prompt" not in item or item["assistant_prompt"] is None: item["assistant_prompt"] = None else: if not isinstance(item["assistant_prompt"], str): raise TypeError( f"Item #{idx}: 'assistant_prompt' must be a string or absent, got {type(item['assistant_prompt']).__name__}" ) return item # now normalized def __data_process(self): raw_data = self.__load_orjson(self.data_path) if not isinstance(raw_data, list): raise ValueError("Data must be a list of dictionaries.") processed_data = [] for idx, item in enumerate(raw_data): if not isinstance(item, dict): raise ValueError(f"Item #{idx} is not a dictionary.") validated_item = self.__validate_item(item, idx) processed_data.append(validated_item) return processed_data def __get_max_number_file(self, directory): # Pattern to match filenames like out_1.json, out_25.json, etc. pattern = re.compile(r"output_(\d+)\.json$") max_num = 0 for filename in os.listdir(directory): match = pattern.match(filename) if match: num = int(match.group(1)) if num > max_num: max_num = num return max_num + 1 def __load_orjson(self, path: str | Path): path = Path(path) with path.open("rb") as f: # باید باینری باز بشه برای orjson return orjson.loads(f.read()) def __save_orjson(self, path, data): with open(path, "wb") as f: f.write( orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) ) def merge_json_dir(self, input_path, output_path): directory = Path(input_path) if not directory.is_dir(): raise ValueError(f"Not valid PATH: {input_path}") seen_ids = set() # برای ردیابی idهای دیده‌شده (سریع!) unique_data = [] # فقط داده‌های یکتا failed_files = [] json_files = list(directory.glob("*.json")) if not json_files: print("⚠️ NO JSON File Found In This PATH") return for json_file in json_files: try: data = self.__load_orjson(json_file) if not data: # خالی یا None failed_files.append(json_file.name) continue if isinstance(data, list) and isinstance(data[0], dict): for item in data: item_id = item.get("id") if item_id is None: # اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی # اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند continue if item_id not in seen_ids: seen_ids.add(item_id) unique_data.append(item) else: raise ValueError(f"no list available in this json -> {json_file}") except ( json.JSONDecodeError, ValueError, OSError, KeyError, TypeError, ) as e: # print(f"❌ Failed in process '{json_file.name}': {e}") failed_files.append(json_file.name) # گزارش خطاها if failed_files: print(f"\n❌ We lose This Files: {len(failed_files)}") else: print("\n✅ All JSON added") # ذخیره خروجی try: self.__save_orjson(data=unique_data, path=output_path) print( f"\n💾 Final file saved: {output_path} (Total unique items: {len(unique_data)})" ) except Exception as e: print(f"❌ Error in saving final file: {e}") def make_new_proccessed_ids_from_file(self, json_in, out_path): data = self.__load_orjson(json_in) finall_data = [] for d in data: if d["id"]: finall_data.append(d["id"]) finall_data = set(finall_data) finall_data = list(finall_data) print(f"-- len ids {len(finall_data)}") self.__save_orjson(data=finall_data, path=out_path) # ------------------------------ Main ------------------------------ async def __process_item(self, client, item): try: messages = [ {"role": "user", "content": item["user_prompt"]}, ] if item.get("system_prompt"): messages.append({"role": "system", "content": item["system_prompt"]}) if item.get("assistant_prompt"): messages.append( {"role": "assistant", "content": item["assistant_prompt"]} ) # print(f"self.max_token {self.max_token}") response = await client.chat.completions.parse( model=self.model_name, messages=messages, temperature=self.temperature, top_p=self.top_p, reasoning_effort=self.reasoning_effort, max_tokens=self.max_token, stop=None, response_format=self.output_schema, extra_body={"priority": self.priority}, ) parsed = ( response.choices[0].message.parsed if response and response.choices and response.choices[0].message.parsed else {"raw_text": str(response)} ) parsed = self.output_schema.model_validate(parsed) item["ai_code_version"] = self.ai_code_version item["llm_output"] = dict(parsed.model_dump()) return item, 200 except asyncio.TimeoutError: print(f"⏳ Timeout on item {item['id']}") return None, 408 except Exception as e: print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}") return None, 400 async def __process_item_string_chat_only(self, client: AsyncOpenAI, item): try: messages = [] if item.get("system_prompt"): messages.append({"role": "system", "content": item["system_prompt"]}) messages.append({"role": "user", "content": item["user_prompt"]}) if item.get("assistant_prompt"): messages.append( {"role": "assistant", "content": item["assistant_prompt"]} ) # if item["input_token"]: # _max_token = item["input_token"] + self.max_token # else: _max_token = self.max_token response = await client.chat.completions.create( model=self.model_name, messages=messages, temperature=self.temperature, top_p=self.top_p, max_tokens=_max_token, stop=None, extra_body={"priority": self.priority}, ) text_output = response.choices[0].message.content item["ai_code_version"] = self.ai_code_version item["llm_output"] = str(text_output) return item, 200 except asyncio.TimeoutError: print(f"⏳ Timeout on item {item['id']}") return None, 408 except Exception: print(f"⚠️ Error __process_item {item['id']}:") traceback.print_exc() return None, 400 def _get_processor(self, mode: str): print(f"Model Run In {mode}-mode") processors = { "parse": self.__process_item, "chat": self.__process_item_string_chat_only, } return processors.get(mode, self.__process_item) def sync_eval(self, processed_id: List = [], mode="parse"): try: asyncio.run(self.async_eval(processed_id, mode)) except KeyboardInterrupt: print("\n🛑 Interrupted by user.") traceback.print_exc() async def async_eval(self, processed_id: List, mode: str): """ اجرای اصلی تک‌هسته‌ای و async برای تولید خروجی نهایی. """ print("🔹 Starting async data processing...") # ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------ if not processed_id: try: processed_id = self.__load_orjson(self._temp_processed_id_path) print( f"📂 Loaded existing processed_id from {self._temp_processed_id_path}" ) except Exception: print("⚠️ No valid processed_id found. Starting fresh.") processed_id = [] # ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------ all_processed_id = set(processed_id) all_results = [] total_time = [] data = [item for item in self.data if item.get("id") not in all_processed_id] print( f"➕ Total items: {len(self.data)} - {len(all_processed_id)} = {len(data)}" ) # اگر چیزی برای پردازش نیست if not data: print("✅ Nothing new to process. All items are already done.") return if self.output_schema == None: processor = self._get_processor(mode="chat") else: processor = self._get_processor(mode) # ------------------ مرحله ۳: شروع پردازش ------------------ print(f"🤖 Model: {self.model_name} | Reasoning: {self.reasoning_effort}") async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: semaphore = asyncio.Semaphore(self.semaphore_number) async def limited_process(item): async with semaphore: return await processor(client, item) tasks = [asyncio.create_task(limited_process(item)) for item in data] total_i = 0 # ✅ پردازش به ترتیب تکمیل (نه ترتیب لیست) for i, task in enumerate(asyncio.as_completed(tasks), start=1): start = time.time() try: parsed, status_code = await asyncio.wait_for( task, timeout=self.request_timeout ) # ⏱ حداکثر 2 دقیقه except asyncio.TimeoutError: print(f"⏳ Task {i} timed out completely") parsed, status_code = None, 408 total_time.append(time.time() - start) if status_code == 200: all_results.append(parsed) all_processed_id.add(parsed.get("id")) else: print(f"⚠️ Skipped item (status={status_code})") total_i += 1 # ✅ ذخیره‌ی موقت هر n مورد if total_i >= self.save_number: total_i = 0 self.__save_orjson( data=list(all_processed_id), path=self._temp_processed_id_path, ) number = self.__get_max_number_file(self._temp_path) temp_output_path = self._temp_path / f"output_{number}.json" self.__save_orjson(data=list(all_results), path=temp_output_path) print(f"💾 Auto-saved partial data: {len(all_results)}") all_results.clear() # ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده if total_i > 0 or len(all_results) > 0: print("💾 Final save of remaining data...") self.__save_orjson( data=list(all_processed_id), path=self._temp_processed_id_path, ) print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") number = self.__get_max_number_file(self._temp_path) print(f"number {number}") temp_output_path = self._temp_path / f"output_{number}.json" self.__save_orjson(data=list(all_results), path=temp_output_path) print(f"💾 Auto-saved partial data: {len(all_results)}") all_results.clear() # ------------------ مرحله ۴: ذخیره خروجی ------------------ final_data_path = self.output_path / f"final_data_{self.task_name}.json" processed_id_path = self.output_path / "processed_id.json" self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path) all_results = self.__load_orjson(final_data_path) # make_new_proccessed_ids_from_file() self.__save_orjson(data=list(all_processed_id), path=processed_id_path) self.__save_orjson(data=all_results, path=final_data_path) avg_time = (sum(total_time) / len(total_time)) if total_time else 0 print( f"\n✅ Processing completed!\n" f"📊 Total-Data: {len(data)} | " f"⭕ Ignored-Data: {len(processed_id)} | " f"📦 Proccessed-Data: {len(all_results)} | " f"❌ Loss-Data: {len(data)-len(all_results)} | " f"🕒 Avg Time: {avg_time:.2f}'s per item | " f"🕒 Total Time: {sum(total_time):.4f}'s | " f"💾 Results saved to: {final_data_path}" ) async def single_async_item( self, item, reasoning_effort, temperature, top_p, output_schema=None, max_token=4096, stream=False, print_logs=False, return_reason=False, stop=None, return_used_token=False, ): try: async with AsyncOpenAI( base_url=self.api_url, api_key=self.api_key ) as client: semaphore = asyncio.Semaphore(self.semaphore_number) async with semaphore: messages = [{"role": "user", "content": item["user_prompt"]}] if item.get("system_prompt"): messages.insert( 0, {"role": "system", "content": item["system_prompt"]} ) if item.get("assistant_prompt"): messages.append( {"role": "assistant", "content": item["assistant_prompt"]} ) # output_schema =None if output_schema is not None: # Use .parse for structured output response = await client.chat.completions.parse( model=self.model_name, messages=messages, temperature=temperature, top_p=top_p, max_tokens=max_token, stop=stop, response_format=output_schema, reasoning_effort=reasoning_effort, extra_body={"priority": self.priority}, ) if print_logs: print(f"parse response ---- {response}") parsed_obj = response.choices[0].message.parsed # print(f'parsed_obj {parsed_obj}') if parsed_obj is None: return { "error": "Failed to parse response", "raw": str(response), } # Validate just in case (optional, چون .parse already does it) if return_reason: reasoning_content = response.choices[ 0 ].message.reasoning_content if return_used_token: _total_token = response.usage.total_tokens return ( output_schema.model_validate(parsed_obj), str(reasoning_content), int(_total_token), ) return output_schema.model_validate(parsed_obj), str( reasoning_content ) return output_schema.model_validate(parsed_obj) else: # Use .create for raw text response = await client.chat.completions.create( model=self.model_name, messages=messages, # temperature=temperature, # top_p=top_p, # max_tokens=max_token, stop=None, stream=stream, reasoning_effort=reasoning_effort, # No response_format extra_body={"priority": self.priority}, ) content = response.choices[0].message.content print(f"content {content}") if not content: return {"error": "Empty response", "raw": str(response)} return content except LengthFinishReasonError as le: # اینجاست که جادو اتفاق می‌افته if max_token >= 50000: return {"error": "MAX_TOKEN_LIMIT_REACHED", "max_token": max_token} new_max = min(max_token * 2, 50000) return await self.single_async_item( item=item, reasoning_effort=reasoning_effort, temperature=temperature, top_p=top_p, output_schema=output_schema, max_token=new_max, stream=stream, print_logs=print_logs, return_reason=return_reason, return_used_token=return_used_token, stop=stop, ) except asyncio.TimeoutError: print(f"⏳ Timeout on item {item}") return None except Exception as e: print(f"⚠️ Error __process_item {item}: {traceback.print_exc()}") return None async def single_async_item_stream( self, item, reasoning_effort, temperature, top_p, max_token=None, ): async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: messages = [{"role": "user", "content": item["user_prompt"]}] if item.get("system_prompt"): messages.insert(0, {"role": "system", "content": item["system_prompt"]}) stream = await client.chat.completions.create( model=self.model_name, messages=messages, temperature=temperature, top_p=top_p, max_tokens=max_token, reasoning_effort=reasoning_effort, stream=True, # ⭐ مهم # stream_options= extra_body={"priority": self.priority}, ) async for chunk in stream: if not chunk.choices: continue delta = chunk.choices[0].delta if delta and delta.content: yield delta.content # ------------------------------ Multi LLm API ------------------------------ class AsyncMultiServerCore: """ ProductionGrade-JsonBased-LoadBalanced-AsyncMaxConcurrency """ def __init__( self, servers: List[LLMServer | Dict], task_name, output_schema: BaseModel | None = None, data_path=None, output_path=None, request_timeout: int = 30, save_number=2, PRIMARY_KEY={"user_prompt", "id"}, ): # تعریف چند سرور try: self.servers = [LLMServer.model_validate(s) for s in servers] except Exception as e: traceback.print_exc() self.request_timeout = request_timeout self.save_number = save_number self.task_name = task_name if output_path is None: output_path = f"./{task_name}" self.output_path = Path(output_path) self._temp_path = self.output_path / "batch_data" self._temp_processed_id_path = self._temp_path / "processed_id.json" # Create output directory and subdirectories if they don't exist self.output_path.mkdir(parents=True, exist_ok=True) self._temp_path.mkdir(parents=True, exist_ok=True) # self._temp_processed_id_path.mkdir(parents=True, exist_ok=True) self.output_schema = output_schema self.before_t = time.time() self.after_t = time.time() self.PRIMARY_KEY = PRIMARY_KEY if data_path != None: self.data_path = Path(data_path) if self.data_path.is_dir(): self.data = None elif self.data_path.is_file() and self.data_path.suffix.lower() == ".json": try: self.data = self.__data_process() print(f"📦 Loaded {len(self.data)} words") except Exception as e: raise ValueError( f"Data loading/validation failed: {e}\n{traceback.format_exc()}" ) def __validate_item(self, item, idx): # Mandatory fields for key in self.PRIMARY_KEY: if key not in item: raise ValueError(f"Missing mandatory key '{key}' in item #{idx}") if key == "id": try: hash(item[key]) except TypeError: raise TypeError( f"Item #{idx}: '{key}' must be hashable (immutable), " f"got {type(item[key]).__name__}" ) else: if not isinstance(item[key], str): raise TypeError( f"Item #{idx}: '{key}' must be a string, got {type(item[key]).__name__}" ) # Optional field: assistant_prompt if "assistant_prompt" not in item or item["assistant_prompt"] is None: item["assistant_prompt"] = None else: if not isinstance(item["assistant_prompt"], str): raise TypeError( f"Item #{idx}: 'assistant_prompt' must be a string or absent, got {type(item['assistant_prompt']).__name__}" ) return item # now normalized def __data_process(self): raw_data = self.__load_orjson(self.data_path) if not isinstance(raw_data, list): raise ValueError("Data must be a list of dictionaries.") processed_data = [] for idx, item in enumerate(raw_data): if not isinstance(item, dict): raise ValueError(f"Item #{idx} is not a dictionary.") validated_item = self.__validate_item(item, idx) processed_data.append(validated_item) return processed_data def __get_max_number_file(self, directory): # Pattern to match filenames like out_1.json, out_25.json, etc. pattern = re.compile(r"output_(\d+)\.json$") max_num = 0 for filename in os.listdir(directory): match = pattern.match(filename) if match: num = int(match.group(1)) if num > max_num: max_num = num return max_num + 1 def __load_orjson(self, path: str | Path): path = Path(path) with path.open("rb") as f: # باید باینری باز بشه برای orjson return orjson.loads(f.read()) def __save_orjson(self, path, data): with open(path, "wb") as f: f.write( orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) ) def merge_json_dir(self, input_path, output_path): directory = Path(input_path) if not directory.is_dir(): raise ValueError(f"Not valid PATH: {input_path}") seen_ids = set() # برای ردیابی idهای دیده‌شده (سریع!) unique_data = [] # فقط داده‌های یکتا failed_files = [] json_files = list(directory.glob("*.json")) if not json_files: print("⚠️ NO JSON File Found In This PATH") return for json_file in json_files: try: data = self.__load_orjson(json_file) if not data: # خالی یا None failed_files.append(json_file.name) continue if isinstance(data, list) and isinstance(data[0], dict): for item in data: item_id = item.get("id") if item_id is None: # اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی # اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند continue if item_id not in seen_ids: seen_ids.add(item_id) unique_data.append(item) else: raise ValueError(f"no list available in this json -> {json_file}") except ( json.JSONDecodeError, ValueError, OSError, KeyError, TypeError, ) as e: # print(f"❌ Failed in process '{json_file.name}': {e}") failed_files.append(json_file.name) # # گزارش خطاها # if failed_files: # print(f"\n❌ We lose this file: {len(failed_files)}") # # for name in failed_files: # # print(f" - {name}") # else: # print("\n✅ All JSON added") # ذخیره خروجی try: self.__save_orjson(data=unique_data, path=output_path) print( f"\n💾 Final file saved: {output_path} (Total unique items: {len(unique_data)})" ) except Exception as e: print(f"❌ Error in saving final file: {e}") def make_new_proccessed_ids_from_file(self, json_in, out_path): data = self.__load_orjson(json_in) finall_data = [] for d in data: if d["id"]: finall_data.append(d["id"]) finall_data = set(finall_data) finall_data = list(finall_data) print(f"-- len ids {len(finall_data)}") self.__save_orjson(data=finall_data, path=out_path) # ------------------------------ Main ------------------------------ async def __process_item(self, client: AsyncOpenAI, server_config: LLMServer, item): try: messages = [ {"role": "user", "content": item["user_prompt"]}, ] if item.get("system_prompt"): messages.append({"role": "system", "content": item["system_prompt"]}) if item.get("assistant_prompt"): messages.append( {"role": "assistant", "content": item["assistant_prompt"]} ) response = await client.chat.completions.parse( model=server_config.model_name, messages=messages, temperature=server_config.temperature, top_p=server_config.top_p, reasoning_effort=server_config.reasoning_effort, max_tokens=server_config.max_token, stop=None, response_format=self.output_schema, extra_body={"priority": server_config.priority}, ) parsed = ( response.choices[0].message.parsed if response and response.choices and response.choices[0].message.parsed else {"raw_text": str(response)} ) parsed = self.output_schema.model_validate(parsed) # parsed["ai_code_version"] = server_config.ai_code_version item["llm_output"] = parsed.model_dump() return item, 200 except asyncio.TimeoutError: print(f"⏳ Timeout on item {item['id']}") return None, 408 except Exception as e: print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}") return None, 400 def sync_eval(self, processed_id: List = []): try: asyncio.run(self.async_eval(processed_id)) except KeyboardInterrupt: print("\n🛑 Interrupted by user.") traceback.print_exc() async def async_eval( self, processed_id: Optional[List] = None, log_percent_step: float = 0.5, # هر چند درصد لاگ بده ): print("🔹 Starting async data processing...") self.start_t = time.time() # ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------ if not processed_id: try: processed_id = self.__load_orjson(self._temp_processed_id_path) print( f"📂 Loaded existing processed_id from {self._temp_processed_id_path}" ) except Exception: print("⚠️ No valid processed_id found. Starting fresh.") processed_id = [] # ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------ all_processed_id = set(processed_id) all_results = [] if self.data == None: raise ValueError("Need Single Json Data") data = [item for item in self.data if item.get("id") not in all_processed_id] total_items = len(data) processed_counter = 0 next_log_percent = log_percent_step print( f"➕ Total items: {len(self.data)} - {len(all_processed_id)} = {len(data)}" ) # اگر چیزی برای پردازش نیست if not data: print("✅ Nothing new to process. All items are already done.") return # ------------------ مرحله ۳: شروع پردازش ------------------ server_pool = [] for server in self.servers: print( f"+= Model: {server.model_name} | Reasoning: {server.reasoning_effort} | API: {server.api_url}" ) server_pool.append( { "config": server, "client": AsyncOpenAI( base_url=server.api_url, api_key=server.api_key, ), "semaphore": asyncio.Semaphore(server.semaphore_number), } ) self.server_pool = server_pool # round robin state if not hasattr(self, "_rr_index"): self._rr_index = 0 self._rr_lock = asyncio.Lock() async def _get_next_server(): async with self._rr_lock: server = self.server_pool[self._rr_index] self._rr_index = (self._rr_index + 1) % len(self.server_pool) return server # ------------------ محدودکننده پردازش ------------------ async def limited_process(item): server = await _get_next_server() async with server["semaphore"]: try: result = await asyncio.wait_for( self.__process_item( client=server["client"], server_config=server["config"], item=item, ), timeout=self.request_timeout, ) return result except asyncio.TimeoutError: # print(f"⏳ Timeout item {item['id']}") return f"⏳ Timeout item {item['id']}", 408 except Exception: traceback.print_exc() return None, 500 # ------------------ Queue + Worker ------------------ queue = asyncio.Queue() for item in data: await queue.put(item) total_concurrency = sum(s["semaphore"]._value for s in server_pool) total_i = 0 lock = asyncio.Lock() async def worker(): nonlocal total_i, processed_counter, next_log_percent while True: try: item = await queue.get() except asyncio.CancelledError: break parsed, status_code = await limited_process(item) async with lock: if status_code == 200 and not isinstance(parsed, str): all_results.append(parsed) all_processed_id.add(parsed.get("id")) else: print(f"⚠️ Skipped item (status={status_code}) {parsed}") total_i += 1 processed_counter += 1 # ---------- لاگ درصدی امن ---------- progress_percent = (processed_counter / total_items) * 100 if progress_percent >= next_log_percent: now = time.time() elapsed = now - self.start_t # میانگین زمان هر آیتم avg_time_per_item = elapsed / processed_counter # زمان باقی‌مانده remaining_items = total_items - processed_counter eta_seconds = avg_time_per_item * remaining_items # زمان پایان واقعی # finish_time = datetime.now() + timedelta(seconds=eta_seconds) # تبدیل Elapsed به ساعت/دقیقه/ثانیه elapsed_h = int(elapsed // 3600) elapsed_m = int((elapsed % 3600) // 60) elapsed_s = int(elapsed % 60) # تبدیل ETA به ساعت/دقیقه/ثانیه eta_h = int(eta_seconds // 3600) eta_m = int((eta_seconds % 3600) // 60) eta_s = int(eta_seconds % 60) print( f"% Progress: {progress_percent:.1f}% " f"({processed_counter}/{total_items}) | " f"Elapsed: {elapsed_h:02d}:{elapsed_m:02d}:{elapsed_s:02d} | " f"ETA: {eta_h:02d}:{eta_m:02d}:{eta_s:02d} | " # f"Now : {datetime.now()} Finish at: {finish_time.strftime('%H:%M:%S')}" ) # برای جلوگیری از چند چاپ پشت سر هم while progress_percent >= next_log_percent: next_log_percent += log_percent_step # ----------------------------------- # auto save if total_i >= self.save_number: total_i = 0 self.__save_orjson( data=list(all_processed_id), path=self._temp_processed_id_path, ) number = self.__get_max_number_file(self._temp_path) temp_output_path = self._temp_path / f"output_{number}.json" self.__save_orjson( data=list(all_results), path=temp_output_path ) all_results.clear() # if # print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") queue.task_done() # ایجاد workerها workers = [asyncio.create_task(worker()) for _ in range(total_concurrency)] await queue.join() # توقف workerها for w in workers: w.cancel() # ------------------ ذخیره نهایی ------------------ # ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده if total_i > 0 or len(all_results) > 0: print("💾 Final save of remaining data...") self.__save_orjson( data=list(all_processed_id), path=self._temp_processed_id_path, ) number = self.__get_max_number_file(self._temp_path) print(f"number {number}") temp_output_path = self._temp_path / f"output_{number}.json" self.__save_orjson(data=list(all_results), path=temp_output_path) # print(f"💾 Auto-saved partial data: {len(all_results)}") print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") all_results.clear() # ------------------ مرحله ۴: ذخیره خروجی ------------------ final_data_path = self.output_path / f"final_data_{self.task_name}.json" processed_id_path = self.output_path / "processed_id.json" self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path) all_results = self.__load_orjson(final_data_path) # make_new_proccessed_ids_from_file() self.__save_orjson(data=list(all_processed_id), path=processed_id_path) self.__save_orjson(data=all_results, path=final_data_path) print( f"\n✅ Processing completed!\n" f"📊 Total-Data: {len(data)} | " f"⭕ Ignored-Data: {len(processed_id)} | " f"📦 Proccessed-Data: {len(all_results)} | " f"❌ Loss-Data: {len(data)+len(processed_id)-len(all_results)} | " f"💾 Results saved to: {final_data_path}" ) def sync_multi_part_eval(self, processed_id: List = []): try: asyncio.run(self.async_multi_part_eval(processed_id)) except KeyboardInterrupt: print("\n🛑 Interrupted by user.") traceback.print_exc() async def async_multi_part_eval( self, processed_id: Optional[List] = None, log_percent_step: float = 0.5, # هر چند درصد لاگ بده ): print("🔹 Starting async data processing...") self.start_t = time.time() # ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------ if not processed_id: try: processed_id = self.__load_orjson(self._temp_processed_id_path) print( f"📂 Loaded existing processed_id from {self._temp_processed_id_path}" ) except Exception: print("⚠️ No valid processed_id found. Starting fresh.") processed_id = [] # ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------ all_processed_id = set(processed_id) all_results = [] if self.data != None: raise ValueError("Need Multi Part Json Data") for data_path in self.data_path.rglob("*.json"): raw_data = self.__load_orjson(data_path) if not raw_data: print(f"⚠️ Empty file: {data_path.name}") continue data = [item for item in raw_data if item.get("id") not in all_processed_id] if not data: print(f"✅ No new items in {data_path.name}. Skipping.") continue total_items = len(data) processed_counter = 0 next_log_percent = log_percent_step print( f"➕ Total items: {len(data)} - {len(all_processed_id)} = {len(data)}" ) # اگر چیزی برای پردازش نیست if not data: print("✅ Nothing new to process. All items are already done.") return # ------------------ مرحله ۳: شروع پردازش ------------------ server_pool = [] for server in self.servers: print( f"+= Model: {server.model_name} | Reasoning: {server.reasoning_effort} | API: {server.api_url}" ) server_pool.append( { "config": server, "client": AsyncOpenAI( base_url=server.api_url, api_key=server.api_key, ), "semaphore": asyncio.Semaphore(server.semaphore_number), } ) self.server_pool = server_pool # round robin state if not hasattr(self, "_rr_index"): self._rr_index = 0 self._rr_lock = asyncio.Lock() async def _get_next_server(): async with self._rr_lock: server = self.server_pool[self._rr_index] self._rr_index = (self._rr_index + 1) % len(self.server_pool) return server # ------------------ محدودکننده پردازش ------------------ async def limited_process(item): server = await _get_next_server() async with server["semaphore"]: try: result = await asyncio.wait_for( self.__process_item( client=server["client"], server_config=server["config"], item=item, ), timeout=self.request_timeout, ) return result except asyncio.TimeoutError: # print(f"⏳ Timeout item {item['id']}") return f"⏳ Timeout item {item['id']}", 408 except Exception: traceback.print_exc() return None, 500 # ------------------ Queue + Worker ------------------ queue = asyncio.Queue() for item in data: await queue.put(item) total_concurrency = sum(s["semaphore"]._value for s in server_pool) total_i = 0 lock = asyncio.Lock() async def worker(): nonlocal total_i, processed_counter, next_log_percent while True: try: item = await queue.get() except asyncio.CancelledError: break parsed, status_code = await limited_process(item) async with lock: if status_code == 200 and not isinstance(parsed, str): all_results.append(parsed) all_processed_id.add(parsed.get("id")) else: print(f"⚠️ Skipped item (status={status_code}) {parsed}") total_i += 1 processed_counter += 1 # ---------- لاگ درصدی امن ---------- progress_percent = (processed_counter / total_items) * 100 if progress_percent >= next_log_percent: now = time.time() elapsed = now - self.start_t # میانگین زمان هر آیتم avg_time_per_item = elapsed / processed_counter # زمان باقی‌مانده remaining_items = total_items - processed_counter eta_seconds = avg_time_per_item * remaining_items # زمان پایان واقعی # finish_time = datetime.now() + timedelta(seconds=eta_seconds) # تبدیل Elapsed به ساعت/دقیقه/ثانیه elapsed_h = int(elapsed // 3600) elapsed_m = int((elapsed % 3600) // 60) elapsed_s = int(elapsed % 60) # تبدیل ETA به ساعت/دقیقه/ثانیه eta_h = int(eta_seconds // 3600) eta_m = int((eta_seconds % 3600) // 60) eta_s = int(eta_seconds % 60) print( f"% Progress: {progress_percent:.1f}% " f"({processed_counter}/{total_items}) | " f"Elapsed: {elapsed_h:02d}:{elapsed_m:02d}:{elapsed_s:02d} | " f"ETA: {eta_h:02d}:{eta_m:02d}:{eta_s:02d} | " # f"Now : {datetime.now()} Finish at: {finish_time.strftime('%H:%M:%S')}" ) # برای جلوگیری از چند چاپ پشت سر هم while progress_percent >= next_log_percent: next_log_percent += log_percent_step # ----------------------------------- # auto save if total_i >= self.save_number: total_i = 0 self.__save_orjson( data=list(all_processed_id), path=self._temp_processed_id_path, ) number = self.__get_max_number_file(self._temp_path) temp_output_path = self._temp_path / f"output_{number}.json" self.__save_orjson( data=list(all_results), path=temp_output_path ) all_results.clear() # if # print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") queue.task_done() # ایجاد workerها workers = [asyncio.create_task(worker()) for _ in range(total_concurrency)] await queue.join() # توقف workerها for w in workers: w.cancel() # ------------------ ذخیره نهایی ------------------ # ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده if total_i > 0 or len(all_results) > 0: # print("💾 Final save of remaining data...") self.__save_orjson( data=list(all_processed_id), path=self._temp_processed_id_path, ) number = self.__get_max_number_file(self._temp_path) # print(f"number {number}") temp_output_path = self._temp_path / f"output_{number}.json" self.__save_orjson(data=list(all_results), path=temp_output_path) # print(f"💾 Auto-saved partial data: {len(all_results)}") print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") all_results.clear() # ------------------ مرحله ۴: ذخیره خروجی ------------------ final_data_path = self.output_path / f"final_data_{self.task_name}.json" processed_id_path = self.output_path / "processed_id.json" self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path) all_results = self.__load_orjson(final_data_path) # make_new_proccessed_ids_from_file() self.__save_orjson(data=list(all_processed_id), path=processed_id_path) self.__save_orjson(data=all_results, path=final_data_path) print( f"\n✅ Processing completed!\n" f"📊 Total-Data: {len(data)} | " f"⭕ Ignored-Data: {len(processed_id)} | " f"📦 Proccessed-Data: {len(all_results)} | " f"❌ Loss-Data: {len(data)+len(processed_id)-len(all_results)} | " f"💾 Results saved to: {final_data_path}" ) # --- پایان کلی --- final_processed_id_path = self.output_path / "processed_id_final.json" self.__save_orjson(data=list(all_processed_id), path=final_processed_id_path) print(f"\n🏁 All files processed. Final IDs saved to {final_processed_id_path}") # ------------------------------ Multi LLm Multi Client Smart API ------------------------------ class SmartMultiServerCore: """ ProductionGrade-Smart-Handle-Request """ def __init__( self, llm_server: List = None, model_names: List = [ "gpt-oss-20b", "gemma-3-27b", "gpt-oss-120b", # "magistral-small-2509", # "Qwen3-Coder-30B", ], api_urls: List = [ "http://127.0.0.1:8001/v1/", "http://172.16.29.102:8001/v1/", "http://127.0.0.1:8003/v1/", "http://172.16.29.102:8003/v1/", "http://127.0.0.1:8005/v1/", "http://172.16.29:8005/v1/", ], api_keys: List = ["EMPTY"], max_concurrent_requests_per_server: int = 20, ): if not llm_server: # تولید تمام ترکیب‌ها combinations = list(product(model_names, api_urls, api_keys)) total_count = len(combinations) print(f"total_count {total_count}") # ساخت دیکشنری‌ها self.llm_configs = [] for model, url, key in combinations: self.llm_configs.append( {"model_name": model, "api_url": url, "api_key": key} ) else: self.llm_configs = llm_server self.validate_configs() self.semaphore = asyncio.Semaphore(max_concurrent_requests_per_server) self.config_index = 0 self.num_configs = len(self.active_configs) print(f"=== Smart Service Initialized ===") if len(self.active_configs) > 0: print(f"=== self.active_configs ===") for i, j in enumerate(self.active_configs, start=1): print(f"=== Config{i}: {j}") else: print(f"Not Found Any Active Service !!!") def validate_configs(self): self.active_configs = [] for config in self.llm_configs: model_name = config["model_name"] base_url = config.get("api_url", "") # اگر url با v1/ تمام می‌شود، endpoint مدل‌ها را به v1/models وصل کن if base_url.endswith("/v1/"): models_endpoint = base_url + "models" else: models_endpoint = base_url + "/models" headers = { "Authorization": f"Bearer {config.get('api_key', 'EMPTY')}", "Content-Type": "application/json", } is_active = False try: response = requests.get(models_endpoint, headers=headers, timeout=0.1) if response.status_code == 200: models_list = response.json() # لیست مدل‌ها معمولاً شامل 'data' است که آرایه‌ای از مدل‌هاست if isinstance(models_list, dict) and "data" in models_list: available_model_ids = [ m.get("id", "") for m in models_list["data"] ] else: available_model_ids = [] if model_name in available_model_ids: is_active = True if is_active: self.active_configs.append(config) except: continue def _get_next_config(self): """انتخاب کانفیگ بعدی به صورت چرخشی (Round-Robin).""" config = self.active_configs[self.config_index] self.config_index = (self.config_index + 1) % self.num_configs return config async def _process_request( self, config: Dict, user_prompt: str, reasoning_effort: str = "medium", priority: int = 5, temperature: float = 0.2, top_p: float = 0.8, system_prompt: str = None, assistant_prompt: str = None, stop=None, max_token: int = None, output_schema: BaseModel = None, return_reason: bool = False, return_used_token: bool = False, ) -> Union[BaseModel, str, Tuple]: model_name = config["model_name"] api_url = config["api_url"] api_key = config["api_key"] async with AsyncOpenAI(base_url=api_url, api_key=api_key) as client: messages = [{"role": "user", "content": user_prompt}] if system_prompt: messages.insert(0, {"role": "system", "content": system_prompt}) if assistant_prompt: messages.append({"role": "assistant", "content": assistant_prompt}) if output_schema: # Use .parse for structured output response = await client.chat.completions.parse( model=model_name, messages=messages, temperature=temperature, top_p=top_p, max_tokens=max_token, stop=stop, response_format=output_schema, reasoning_effort=reasoning_effort, extra_body={"priority": priority}, ) parsed_obj = response.choices[0].message.parsed # print(f'parsed_obj {parsed_obj}') if parsed_obj is None: print("Failed to parse response") return str(response) # Validate just in case (optional, چون .parse already does it) if return_reason: reasoning_content = response.choices[0].message.reasoning_content if return_used_token: _total_token = response.usage.total_tokens return ( output_schema.model_validate(parsed_obj), str(reasoning_content), int(_total_token), ) return output_schema.model_validate(parsed_obj), str( reasoning_content ) return output_schema.model_validate(parsed_obj) else: response = await client.chat.completions.create( model=model_name, messages=messages, temperature=temperature, top_p=top_p, max_tokens=max_token, stop=None, reasoning_effort=reasoning_effort, extra_body={"priority": priority}, ) content = response.choices[0].message.content if not content: print("Empty response") return str(response) return content async def single_item( self, user_prompt, reasoning_effort: str = "medium", priority: int = 5, temperature: float = 0.2, top_p: float = 0.8, system_prompt: str = None, assistant_prompt: str = None, stop=None, max_token: int = None, output_schema: BaseModel = None, return_reason: bool = False, return_used_token: bool = False, ): """ تابعی که درخواست را می‌پذیرد، آن را در صف قرار می‌دهد و منتظر اجرای آن می‌ماند. """ # 1. انتخاب کانفیگ فعال بعدی به صورت چرخشی config = self._get_next_config() async with self.semaphore: result = await self._process_request( config=config, user_prompt=user_prompt, reasoning_effort=reasoning_effort, priority=priority, temperature=temperature, top_p=top_p, system_prompt=system_prompt, assistant_prompt=assistant_prompt, stop=stop, max_token=max_token, output_schema=output_schema, return_reason=return_reason, return_used_token=return_used_token, ) return result async def stream_single_item(self): raise NotImplementedError("Not Created Yet.") ############################################################################## # ------------------------------ End Code ------------------------------ ##############################################################################