diff --git a/__init__.py b/__init__.py index c6cf2a7..9d83305 100644 --- a/__init__.py +++ b/__init__.py @@ -1 +1,3 @@ -from aiDataParser.core import AsyncCore \ No newline at end of file +from aiDataParser.core.ai_parser import AsyncCore +from aiDataParser.core.ai_stream_parser import StreamAsyncCore +from aiDataParser.core.data_normalizer import load_orjson, save_orjson diff --git a/core/__init__.py b/core/__init__.py index a8b6da5..e69de29 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -1 +0,0 @@ -from aiDataParser.core.ai_parser import AsyncCore \ No newline at end of file diff --git a/core/ai_parser.py b/core/ai_parser.py index e01e924..72f6c1b 100644 --- a/core/ai_parser.py +++ b/core/ai_parser.py @@ -1,24 +1,56 @@ -from typing import List +from typing import List, Dict, Optional, Union, Tuple +from pydantic import BaseModel from pathlib import Path -import os, orjson, time, json, re, asyncio, traceback -from openai import AsyncOpenAI +import os, orjson, time, json, re, asyncio, traceback, requests +from openai import OpenAI, AsyncOpenAI, LengthFinishReasonError +from datetime import datetime, timedelta +from itertools import product -# -------------------------------------------------------------------- +__version__ = "1.0.3" -# ------------------------------ پردازش API ------------------------------ +############################################################################## +# ------------------------------ Start BaseModel ------------------------------ +############################################################################## + + +class LLMServer(BaseModel): + api_url: str # "http://ip1:8000/v1" + model_name: str + api_key: str = "EMPTY" + ai_code_version: Optional[str] = None + priority: int # 1 + semaphore_number: int + reasoning_effort: str = "low" + top_p: float = 1 + temperature: float = 0.0 + max_token: Optional[int] = None + + +############################################################################## +# ------------------------------ End BaseModel ------------------------------ +############################################################################## + + +############################################################################## +# ------------------------------ Start Code ------------------------------ +############################################################################## + + +# ------------------------------ Single LLm API ------------------------------ class AsyncCore: def __init__( self, model_name, task_name, - data_path, - output_schema, api_url, - reasoning_effort='low', + priority, + output_schema=None, + data_path=None, + reasoning_effort="low", top_p=1, temperature=0.0, - max_token=128000, + max_token=4096, output_path=None, ai_code_version=None, request_timeout=30, # ثانیه @@ -28,6 +60,7 @@ class AsyncCore: ): self.save_number = save_number + self.priority = priority # json file of data self.data_path = data_path self.semaphore_number = semaphore_number @@ -59,15 +92,15 @@ class AsyncCore: ai_code_version = f"{model_name}_{reasoning_effort}" self.ai_code_version = ai_code_version - self.PRIMARY_KEY = {"system_prompt", "user_prompt", "id"} - - try: - self.data = self.__data_process() - print(f"📦 Loaded {len(self.data)} words") - except Exception as e: - raise ValueError( - f"Data loading/validation failed: {e}\n{traceback.format_exc()}" - ) + self.PRIMARY_KEY = {"user_prompt", "id"} + if data_path != None: + try: + self.data = self.__data_process() + print(f"📦 Loaded {len(self.data)} words") + except Exception as e: + raise ValueError( + f"Data loading/validation failed: {e}\n{traceback.format_exc()}" + ) def __validate_item(self, item, idx): # Mandatory fields @@ -173,9 +206,7 @@ class AsyncCore: # گزارش خطاها if failed_files: - print("\n❌ We lose this file:") - for name in failed_files: - print(f" - {name}") + print(f"\n❌ We lose This Files: {len(failed_files)}") else: print("\n✅ All JSON added") @@ -205,14 +236,15 @@ class AsyncCore: async def __process_item(self, client, item): try: messages = [ - {"role": "system", "content": item["system_prompt"]}, {"role": "user", "content": item["user_prompt"]}, ] + if item.get("system_prompt"): + messages.append({"role": "system", "content": item["system_prompt"]}) if item.get("assistant_prompt"): messages.append( {"role": "assistant", "content": item["assistant_prompt"]} ) - + # print(f"self.max_token {self.max_token}") response = await client.chat.completions.parse( model=self.model_name, messages=messages, @@ -222,8 +254,7 @@ class AsyncCore: max_tokens=self.max_token, stop=None, response_format=self.output_schema, - - + extra_body={"priority": self.priority}, ) parsed = ( @@ -233,12 +264,11 @@ class AsyncCore: ) parsed = self.output_schema.model_validate(parsed) - parsed = parsed.model_dump() - parsed = dict(parsed) - parsed["ai_code_version"] = self.ai_code_version - parsed["id"] = item["id"] - # parsed["item"] = item - return parsed, 200 + + item["ai_code_version"] = self.ai_code_version + item["llm_output"] = dict(parsed.model_dump()) + + return item, 200 except asyncio.TimeoutError: print(f"⏳ Timeout on item {item['id']}") @@ -248,14 +278,66 @@ class AsyncCore: print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}") return None, 400 - def async_eval(self, processed_id: List = []): + async def __process_item_string_chat_only(self, client: AsyncOpenAI, item): try: - asyncio.run(self.__async_eval(processed_id)) + messages = [] + + if item.get("system_prompt"): + messages.append({"role": "system", "content": item["system_prompt"]}) + + messages.append({"role": "user", "content": item["user_prompt"]}) + + if item.get("assistant_prompt"): + messages.append( + {"role": "assistant", "content": item["assistant_prompt"]} + ) + # if item["input_token"]: + # _max_token = item["input_token"] + self.max_token + # else: + _max_token = self.max_token + + response = await client.chat.completions.create( + model=self.model_name, + messages=messages, + temperature=self.temperature, + top_p=self.top_p, + max_tokens=_max_token, + stop=None, + extra_body={"priority": self.priority}, + ) + + text_output = response.choices[0].message.content + + item["ai_code_version"] = self.ai_code_version + item["llm_output"] = str(text_output) + + return item, 200 + + except asyncio.TimeoutError: + print(f"⏳ Timeout on item {item['id']}") + return None, 408 + + except Exception: + print(f"⚠️ Error __process_item {item['id']}:") + traceback.print_exc() + return None, 400 + + def _get_processor(self, mode: str): + print(f"Model Run In {mode}-mode") + processors = { + "parse": self.__process_item, + "chat": self.__process_item_string_chat_only, + } + return processors.get(mode, self.__process_item) + + def sync_eval(self, processed_id: List = [], mode="parse"): + try: + asyncio.run(self.async_eval(processed_id, mode)) except KeyboardInterrupt: print("\n🛑 Interrupted by user.") traceback.print_exc() - async def __async_eval(self, processed_id: List): + async def async_eval(self, processed_id: List, mode: str): """ اجرای اصلی تک‌هسته‌ای و async برای تولید خروجی نهایی. """ @@ -287,14 +369,19 @@ class AsyncCore: print("✅ Nothing new to process. All items are already done.") return + if self.output_schema == None: + processor = self._get_processor(mode="chat") + else: + processor = self._get_processor(mode) + # ------------------ مرحله ۳: شروع پردازش ------------------ print(f"🤖 Model: {self.model_name} | Reasoning: {self.reasoning_effort}") async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: - semaphore = asyncio.Semaphore(5) + semaphore = asyncio.Semaphore(self.semaphore_number) async def limited_process(item): async with semaphore: - return await self.__process_item(client, item) + return await processor(client, item) tasks = [asyncio.create_task(limited_process(item)) for item in data] @@ -325,9 +412,7 @@ class AsyncCore: data=list(all_processed_id), path=self._temp_processed_id_path, ) - print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") number = self.__get_max_number_file(self._temp_path) - print(f"number {number}") temp_output_path = self._temp_path / f"output_{number}.json" self.__save_orjson(data=list(all_results), path=temp_output_path) print(f"💾 Auto-saved partial data: {len(all_results)}") @@ -371,20 +456,422 @@ class AsyncCore: f"💾 Results saved to: {final_data_path}" ) - def async_eval_with_retry(self, processed_id: List = [], classification_list: List=[], classification_result_field:str='word'): + async def single_async_item( + self, + item, + reasoning_effort, + temperature, + top_p, + output_schema=None, + max_token=4096, + stream=False, + print_logs=False, + return_reason=False, + stop=None, + return_used_token=False, + ): try: - asyncio.run(self.__async_eval_with_retry(processed_id, classification_list, classification_result_field)) + async with AsyncOpenAI( + base_url=self.api_url, api_key=self.api_key + ) as client: + semaphore = asyncio.Semaphore(self.semaphore_number) + async with semaphore: + messages = [{"role": "user", "content": item["user_prompt"]}] + if item.get("system_prompt"): + messages.insert( + 0, {"role": "system", "content": item["system_prompt"]} + ) + if item.get("assistant_prompt"): + messages.append( + {"role": "assistant", "content": item["assistant_prompt"]} + ) + # output_schema =None + if output_schema is not None: + # Use .parse for structured output + response = await client.chat.completions.parse( + model=self.model_name, + messages=messages, + temperature=temperature, + top_p=top_p, + max_tokens=max_token, + stop=stop, + response_format=output_schema, + reasoning_effort=reasoning_effort, + extra_body={"priority": self.priority}, + ) + if print_logs: + print(f"parse response ---- {response}") + parsed_obj = response.choices[0].message.parsed + # print(f'parsed_obj {parsed_obj}') + if parsed_obj is None: + return { + "error": "Failed to parse response", + "raw": str(response), + } + # Validate just in case (optional, چون .parse already does it) + if return_reason: + reasoning_content = response.choices[ + 0 + ].message.reasoning_content + if return_used_token: + _total_token = response.usage.total_tokens + return ( + output_schema.model_validate(parsed_obj), + str(reasoning_content), + int(_total_token), + ) + + return output_schema.model_validate(parsed_obj), str( + reasoning_content + ) + + return output_schema.model_validate(parsed_obj) + + else: + # Use .create for raw text + response = await client.chat.completions.create( + model=self.model_name, + messages=messages, + # temperature=temperature, + # top_p=top_p, + # max_tokens=max_token, + stop=None, + stream=stream, + reasoning_effort=reasoning_effort, + # No response_format + extra_body={"priority": self.priority}, + ) + content = response.choices[0].message.content + print(f"content {content}") + if not content: + return {"error": "Empty response", "raw": str(response)} + return content + + except LengthFinishReasonError as le: + # اینجاست که جادو اتفاق می‌افته + if max_token >= 50000: + return {"error": "MAX_TOKEN_LIMIT_REACHED", "max_token": max_token} + + new_max = min(max_token * 2, 50000) + + return await self.single_async_item( + item=item, + reasoning_effort=reasoning_effort, + temperature=temperature, + top_p=top_p, + output_schema=output_schema, + max_token=new_max, + stream=stream, + print_logs=print_logs, + return_reason=return_reason, + return_used_token=return_used_token, + stop=stop, + ) + + except asyncio.TimeoutError: + print(f"⏳ Timeout on item {item}") + return None + + except Exception as e: + print(f"⚠️ Error __process_item {item}: {traceback.print_exc()}") + return None + + async def single_async_item_stream( + self, + item, + reasoning_effort, + temperature, + top_p, + max_token=None, + ): + async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: + + messages = [{"role": "user", "content": item["user_prompt"]}] + if item.get("system_prompt"): + messages.insert(0, {"role": "system", "content": item["system_prompt"]}) + + stream = await client.chat.completions.create( + model=self.model_name, + messages=messages, + temperature=temperature, + top_p=top_p, + max_tokens=max_token, + reasoning_effort=reasoning_effort, + stream=True, # ⭐ مهم + # stream_options= + extra_body={"priority": self.priority}, + ) + + async for chunk in stream: + if not chunk.choices: + continue + + delta = chunk.choices[0].delta + + if delta and delta.content: + yield delta.content + + +# ------------------------------ Multi LLm API ------------------------------ +class AsyncMultiServerCore: + """ + ProductionGrade-JsonBased-LoadBalanced-AsyncMaxConcurrency + """ + + def __init__( + self, + servers: List[LLMServer | Dict], + task_name, + output_schema: BaseModel | None = None, + data_path=None, + output_path=None, + request_timeout: int = 30, + save_number=2, + PRIMARY_KEY={"user_prompt", "id"}, + ): + # تعریف چند سرور + try: + self.servers = [LLMServer.model_validate(s) for s in servers] + + except Exception as e: + traceback.print_exc() + self.request_timeout = request_timeout + self.save_number = save_number + + self.task_name = task_name + if output_path is None: + output_path = f"./{task_name}" + + self.output_path = Path(output_path) + self._temp_path = self.output_path / "batch_data" + self._temp_processed_id_path = self._temp_path / "processed_id.json" + + # Create output directory and subdirectories if they don't exist + self.output_path.mkdir(parents=True, exist_ok=True) + self._temp_path.mkdir(parents=True, exist_ok=True) + # self._temp_processed_id_path.mkdir(parents=True, exist_ok=True) + + self.output_schema = output_schema + self.before_t = time.time() + self.after_t = time.time() + self.PRIMARY_KEY = PRIMARY_KEY + + if data_path != None: + self.data_path = Path(data_path) + if self.data_path.is_dir(): + self.data = None + + elif self.data_path.is_file() and self.data_path.suffix.lower() == ".json": + + try: + self.data = self.__data_process() + print(f"📦 Loaded {len(self.data)} words") + except Exception as e: + raise ValueError( + f"Data loading/validation failed: {e}\n{traceback.format_exc()}" + ) + + def __validate_item(self, item, idx): + # Mandatory fields + for key in self.PRIMARY_KEY: + if key not in item: + raise ValueError(f"Missing mandatory key '{key}' in item #{idx}") + if key == "id": + try: + hash(item[key]) + except TypeError: + raise TypeError( + f"Item #{idx}: '{key}' must be hashable (immutable), " + f"got {type(item[key]).__name__}" + ) + + else: + if not isinstance(item[key], str): + raise TypeError( + f"Item #{idx}: '{key}' must be a string, got {type(item[key]).__name__}" + ) + + # Optional field: assistant_prompt + if "assistant_prompt" not in item or item["assistant_prompt"] is None: + item["assistant_prompt"] = None + else: + if not isinstance(item["assistant_prompt"], str): + raise TypeError( + f"Item #{idx}: 'assistant_prompt' must be a string or absent, got {type(item['assistant_prompt']).__name__}" + ) + + return item # now normalized + + def __data_process(self): + raw_data = self.__load_orjson(self.data_path) + if not isinstance(raw_data, list): + raise ValueError("Data must be a list of dictionaries.") + + processed_data = [] + for idx, item in enumerate(raw_data): + if not isinstance(item, dict): + raise ValueError(f"Item #{idx} is not a dictionary.") + validated_item = self.__validate_item(item, idx) + processed_data.append(validated_item) + + return processed_data + + def __get_max_number_file(self, directory): + # Pattern to match filenames like out_1.json, out_25.json, etc. + pattern = re.compile(r"output_(\d+)\.json$") + max_num = 0 + + for filename in os.listdir(directory): + match = pattern.match(filename) + if match: + num = int(match.group(1)) + if num > max_num: + max_num = num + return max_num + 1 + + def __load_orjson(self, path: str | Path): + path = Path(path) + with path.open("rb") as f: # باید باینری باز بشه برای orjson + return orjson.loads(f.read()) + + def __save_orjson(self, path, data): + with open(path, "wb") as f: + f.write( + orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) + ) + + def merge_json_dir(self, input_path, output_path): + directory = Path(input_path) + if not directory.is_dir(): + raise ValueError(f"Not valid PATH: {input_path}") + + seen_ids = set() # برای ردیابی idهای دیده‌شده (سریع!) + unique_data = [] # فقط داده‌های یکتا + failed_files = [] + + json_files = list(directory.glob("*.json")) + if not json_files: + print("⚠️ NO JSON File Found In This PATH") + return + + for json_file in json_files: + try: + data = self.__load_orjson(json_file) + if not data: # خالی یا None + failed_files.append(json_file.name) + continue + + if isinstance(data, list) and isinstance(data[0], dict): + for item in data: + item_id = item.get("id") + if item_id is None: + # اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی + # اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند + continue + if item_id not in seen_ids: + seen_ids.add(item_id) + unique_data.append(item) + else: + raise ValueError(f"no list available in this json -> {json_file}") + except ( + json.JSONDecodeError, + ValueError, + OSError, + KeyError, + TypeError, + ) as e: + # print(f"❌ Failed in process '{json_file.name}': {e}") + failed_files.append(json_file.name) + + # # گزارش خطاها + # if failed_files: + # print(f"\n❌ We lose this file: {len(failed_files)}") + # # for name in failed_files: + # # print(f" - {name}") + # else: + # print("\n✅ All JSON added") + + # ذخیره خروجی + try: + self.__save_orjson(data=unique_data, path=output_path) + print( + f"\n💾 Final file saved: {output_path} (Total unique items: {len(unique_data)})" + ) + except Exception as e: + print(f"❌ Error in saving final file: {e}") + + def make_new_proccessed_ids_from_file(self, json_in, out_path): + data = self.__load_orjson(json_in) + + finall_data = [] + for d in data: + if d["id"]: + finall_data.append(d["id"]) + finall_data = set(finall_data) + finall_data = list(finall_data) + print(f"-- len ids {len(finall_data)}") + + self.__save_orjson(data=finall_data, path=out_path) + + # ------------------------------ Main ------------------------------ + + async def __process_item(self, client: AsyncOpenAI, server_config: LLMServer, item): + try: + messages = [ + {"role": "user", "content": item["user_prompt"]}, + ] + if item.get("system_prompt"): + messages.append({"role": "system", "content": item["system_prompt"]}) + if item.get("assistant_prompt"): + messages.append( + {"role": "assistant", "content": item["assistant_prompt"]} + ) + + response = await client.chat.completions.parse( + model=server_config.model_name, + messages=messages, + temperature=server_config.temperature, + top_p=server_config.top_p, + reasoning_effort=server_config.reasoning_effort, + max_tokens=server_config.max_token, + stop=None, + response_format=self.output_schema, + extra_body={"priority": server_config.priority}, + ) + + parsed = ( + response.choices[0].message.parsed + if response and response.choices and response.choices[0].message.parsed + else {"raw_text": str(response)} + ) + + parsed = self.output_schema.model_validate(parsed) + # parsed["ai_code_version"] = server_config.ai_code_version + item["llm_output"] = parsed.model_dump() + return item, 200 + + except asyncio.TimeoutError: + print(f"⏳ Timeout on item {item['id']}") + return None, 408 + + except Exception as e: + print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}") + return None, 400 + + def sync_eval(self, processed_id: List = []): + try: + asyncio.run(self.async_eval(processed_id)) except KeyboardInterrupt: print("\n🛑 Interrupted by user.") traceback.print_exc() - - - async def __async_eval_with_retry(self, processed_id: List, classification_list:List, classification_result_field:str): - """ - اجرای اصلی تک‌هسته‌ای و async برای تولید خروجی نهایی. - """ - print("🔹 Starting async data processing...") + async def async_eval( + self, + processed_id: Optional[List] = None, + log_percent_step: float = 0.5, # هر چند درصد لاگ بده + ): + print("🔹 Starting async data processing...") + self.start_t = time.time() # ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------ if not processed_id: try: @@ -399,9 +886,14 @@ class AsyncCore: # ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------ all_processed_id = set(processed_id) all_results = [] - total_time = [] + if self.data == None: + raise ValueError("Need Single Json Data") data = [item for item in self.data if item.get("id") not in all_processed_id] + total_items = len(data) + processed_counter = 0 + next_log_percent = log_percent_step + print( f"➕ Total items: {len(self.data)} - {len(all_processed_id)} = {len(data)}" ) @@ -412,64 +904,171 @@ class AsyncCore: return # ------------------ مرحله ۳: شروع پردازش ------------------ - print(f"🤖 Model: {self.model_name} | Reasoning: {self.reasoning_effort}") - async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: - semaphore = asyncio.Semaphore(self.semaphore_number) + server_pool = [] - async def limited_process(item): - async with semaphore: - return await self.__process_item(client, item) + for server in self.servers: + print( + f"+= Model: {server.model_name} | Reasoning: {server.reasoning_effort} | API: {server.api_url}" + ) + server_pool.append( + { + "config": server, + "client": AsyncOpenAI( + base_url=server.api_url, + api_key=server.api_key, + ), + "semaphore": asyncio.Semaphore(server.semaphore_number), + } + ) - tasks = [asyncio.create_task(limited_process(item)) for item in data] + self.server_pool = server_pool - total_i = 0 - # ✅ پردازش به ترتیب تکمیل (نه ترتیب لیست) - for i, task in enumerate(asyncio.as_completed(tasks), start=1): - start = time.time() + # round robin state + if not hasattr(self, "_rr_index"): + self._rr_index = 0 + self._rr_lock = asyncio.Lock() + + async def _get_next_server(): + async with self._rr_lock: + server = self.server_pool[self._rr_index] + self._rr_index = (self._rr_index + 1) % len(self.server_pool) + return server + + # ------------------ محدودکننده پردازش ------------------ + async def limited_process(item): + server = await _get_next_server() + + async with server["semaphore"]: try: - parsed, status_code = await asyncio.wait_for( - task, timeout=self.request_timeout - ) # ⏱ حداکثر 2 دقیقه - except asyncio.TimeoutError: - print(f"⏳ Task {i} timed out completely") - parsed, status_code = None, 408 - total_time.append(time.time() - start) - if status_code == 200: - all_results.append(parsed) - all_processed_id.add(parsed.get("id")) - else: - print(f"⚠️ Skipped item (status={status_code})") - - total_i += 1 - # ✅ ذخیره‌ی موقت هر n مورد - if total_i >= self.save_number: - total_i = 0 - self.__save_orjson( - data=list(all_processed_id), - path=self._temp_processed_id_path, + result = await asyncio.wait_for( + self.__process_item( + client=server["client"], + server_config=server["config"], + item=item, + ), + timeout=self.request_timeout, ) - print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") - number = self.__get_max_number_file(self._temp_path) - print(f"--- {number}/{len(data)} ---") - temp_output_path = self._temp_path / f"output_{number}.json" - self.__save_orjson(data=list(all_results), path=temp_output_path) - all_results.clear() + return result + except asyncio.TimeoutError: + # print(f"⏳ Timeout item {item['id']}") + return f"⏳ Timeout item {item['id']}", 408 + except Exception: + traceback.print_exc() + return None, 500 - # ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده - if total_i > 0 or len(all_results) > 0: - print("💾 Final save of remaining data...") - self.__save_orjson( - data=list(all_processed_id), - path=self._temp_processed_id_path, - ) - print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") - number = self.__get_max_number_file(self._temp_path) - print(f"--- {number}/{len(data)} ---") + # ------------------ Queue + Worker ------------------ + queue = asyncio.Queue() - temp_output_path = self._temp_path / f"output_{number}.json" - self.__save_orjson(data=list(all_results), path=temp_output_path) - # print(f"💾 Auto-saved partial data: {len(all_results)}") - all_results.clear() + for item in data: + await queue.put(item) + + total_concurrency = sum(s["semaphore"]._value for s in server_pool) + + total_i = 0 + lock = asyncio.Lock() + + async def worker(): + nonlocal total_i, processed_counter, next_log_percent + + while True: + try: + item = await queue.get() + except asyncio.CancelledError: + break + + parsed, status_code = await limited_process(item) + + async with lock: + if status_code == 200 and not isinstance(parsed, str): + all_results.append(parsed) + all_processed_id.add(parsed.get("id")) + else: + print(f"⚠️ Skipped item (status={status_code}) {parsed}") + + total_i += 1 + processed_counter += 1 + + # ---------- لاگ درصدی امن ---------- + progress_percent = (processed_counter / total_items) * 100 + if progress_percent >= next_log_percent: + now = time.time() + elapsed = now - self.start_t + + # میانگین زمان هر آیتم + avg_time_per_item = elapsed / processed_counter + + # زمان باقی‌مانده + remaining_items = total_items - processed_counter + eta_seconds = avg_time_per_item * remaining_items + + # زمان پایان واقعی + # finish_time = datetime.now() + timedelta(seconds=eta_seconds) + + # تبدیل Elapsed به ساعت/دقیقه/ثانیه + elapsed_h = int(elapsed // 3600) + elapsed_m = int((elapsed % 3600) // 60) + elapsed_s = int(elapsed % 60) + + # تبدیل ETA به ساعت/دقیقه/ثانیه + eta_h = int(eta_seconds // 3600) + eta_m = int((eta_seconds % 3600) // 60) + eta_s = int(eta_seconds % 60) + + print( + f"% Progress: {progress_percent:.1f}% " + f"({processed_counter}/{total_items}) | " + f"Elapsed: {elapsed_h:02d}:{elapsed_m:02d}:{elapsed_s:02d} | " + f"ETA: {eta_h:02d}:{eta_m:02d}:{eta_s:02d} | " + # f"Now : {datetime.now()} Finish at: {finish_time.strftime('%H:%M:%S')}" + ) + # برای جلوگیری از چند چاپ پشت سر هم + while progress_percent >= next_log_percent: + next_log_percent += log_percent_step + # ----------------------------------- + + # auto save + if total_i >= self.save_number: + total_i = 0 + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + number = self.__get_max_number_file(self._temp_path) + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson( + data=list(all_results), path=temp_output_path + ) + all_results.clear() + # if + # print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + + queue.task_done() + + # ایجاد workerها + workers = [asyncio.create_task(worker()) for _ in range(total_concurrency)] + + await queue.join() + + # توقف workerها + for w in workers: + w.cancel() + + # ------------------ ذخیره نهایی ------------------ + # ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده + if total_i > 0 or len(all_results) > 0: + print("💾 Final save of remaining data...") + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + number = self.__get_max_number_file(self._temp_path) + print(f"number {number}") + + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson(data=list(all_results), path=temp_output_path) + # print(f"💾 Auto-saved partial data: {len(all_results)}") + print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + all_results.clear() # ------------------ مرحله ۴: ذخیره خروجی ------------------ final_data_path = self.output_path / f"final_data_{self.task_name}.json" @@ -481,62 +1080,491 @@ class AsyncCore: self.__save_orjson(data=list(all_processed_id), path=processed_id_path) self.__save_orjson(data=all_results, path=final_data_path) - avg_time = (sum(total_time) / len(total_time)) if total_time else 0 print( f"\n✅ Processing completed!\n" f"📊 Total-Data: {len(data)} | " f"⭕ Ignored-Data: {len(processed_id)} | " f"📦 Proccessed-Data: {len(all_results)} | " - f"❌ Loss-Data: {len(data)-len(all_results)} | " - f"🕒 Avg Time: {avg_time:.2f}'s per item | " - f"🕒 Total Time: {sum(total_time):.4f}'s | " + f"❌ Loss-Data: {len(data)+len(processed_id)-len(all_results)} | " f"💾 Results saved to: {final_data_path}" ) - async def single_simple_async_proccess_item(self, item): - async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: - semaphore = asyncio.Semaphore(5) - async with semaphore: - try: - messages = [ - {"role": "user", "content": item["user_prompt"]}, - ] - if item.get("system_prompt"): - messages.append( - {"role": "system", "content": item["system_prompt"]} + def sync_multi_part_eval(self, processed_id: List = []): + try: + asyncio.run(self.async_multi_part_eval(processed_id)) + except KeyboardInterrupt: + print("\n🛑 Interrupted by user.") + traceback.print_exc() + + async def async_multi_part_eval( + self, + processed_id: Optional[List] = None, + log_percent_step: float = 0.5, # هر چند درصد لاگ بده + ): + print("🔹 Starting async data processing...") + self.start_t = time.time() + # ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------ + if not processed_id: + try: + processed_id = self.__load_orjson(self._temp_processed_id_path) + print( + f"📂 Loaded existing processed_id from {self._temp_processed_id_path}" + ) + except Exception: + print("⚠️ No valid processed_id found. Starting fresh.") + processed_id = [] + + # ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------ + all_processed_id = set(processed_id) + all_results = [] + + if self.data != None: + raise ValueError("Need Multi Part Json Data") + + for data_path in self.data_path.rglob("*.json"): + raw_data = self.__load_orjson(data_path) + if not raw_data: + print(f"⚠️ Empty file: {data_path.name}") + continue + + data = [item for item in raw_data if item.get("id") not in all_processed_id] + if not data: + print(f"✅ No new items in {data_path.name}. Skipping.") + continue + + total_items = len(data) + processed_counter = 0 + next_log_percent = log_percent_step + + print( + f"➕ Total items: {len(data)} - {len(all_processed_id)} = {len(data)}" + ) + + # اگر چیزی برای پردازش نیست + if not data: + print("✅ Nothing new to process. All items are already done.") + return + + # ------------------ مرحله ۳: شروع پردازش ------------------ + server_pool = [] + + for server in self.servers: + print( + f"+= Model: {server.model_name} | Reasoning: {server.reasoning_effort} | API: {server.api_url}" + ) + server_pool.append( + { + "config": server, + "client": AsyncOpenAI( + base_url=server.api_url, + api_key=server.api_key, + ), + "semaphore": asyncio.Semaphore(server.semaphore_number), + } + ) + + self.server_pool = server_pool + + # round robin state + if not hasattr(self, "_rr_index"): + self._rr_index = 0 + self._rr_lock = asyncio.Lock() + + async def _get_next_server(): + async with self._rr_lock: + server = self.server_pool[self._rr_index] + self._rr_index = (self._rr_index + 1) % len(self.server_pool) + return server + + # ------------------ محدودکننده پردازش ------------------ + async def limited_process(item): + server = await _get_next_server() + + async with server["semaphore"]: + try: + result = await asyncio.wait_for( + self.__process_item( + client=server["client"], + server_config=server["config"], + item=item, + ), + timeout=self.request_timeout, ) - if item.get("assistant_prompt"): - messages.append( - {"role": "assistant", "content": item["assistant_prompt"]} + return result + except asyncio.TimeoutError: + # print(f"⏳ Timeout item {item['id']}") + return f"⏳ Timeout item {item['id']}", 408 + except Exception: + traceback.print_exc() + return None, 500 + + # ------------------ Queue + Worker ------------------ + queue = asyncio.Queue() + + for item in data: + await queue.put(item) + + total_concurrency = sum(s["semaphore"]._value for s in server_pool) + + total_i = 0 + lock = asyncio.Lock() + + async def worker(): + nonlocal total_i, processed_counter, next_log_percent + + while True: + try: + item = await queue.get() + except asyncio.CancelledError: + break + + parsed, status_code = await limited_process(item) + + async with lock: + if status_code == 200 and not isinstance(parsed, str): + all_results.append(parsed) + all_processed_id.add(parsed.get("id")) + else: + print(f"⚠️ Skipped item (status={status_code}) {parsed}") + + total_i += 1 + processed_counter += 1 + + # ---------- لاگ درصدی امن ---------- + progress_percent = (processed_counter / total_items) * 100 + if progress_percent >= next_log_percent: + now = time.time() + elapsed = now - self.start_t + + # میانگین زمان هر آیتم + avg_time_per_item = elapsed / processed_counter + + # زمان باقی‌مانده + remaining_items = total_items - processed_counter + eta_seconds = avg_time_per_item * remaining_items + + # زمان پایان واقعی + # finish_time = datetime.now() + timedelta(seconds=eta_seconds) + + # تبدیل Elapsed به ساعت/دقیقه/ثانیه + elapsed_h = int(elapsed // 3600) + elapsed_m = int((elapsed % 3600) // 60) + elapsed_s = int(elapsed % 60) + + # تبدیل ETA به ساعت/دقیقه/ثانیه + eta_h = int(eta_seconds // 3600) + eta_m = int((eta_seconds % 3600) // 60) + eta_s = int(eta_seconds % 60) + + print( + f"% Progress: {progress_percent:.1f}% " + f"({processed_counter}/{total_items}) | " + f"Elapsed: {elapsed_h:02d}:{elapsed_m:02d}:{elapsed_s:02d} | " + f"ETA: {eta_h:02d}:{eta_m:02d}:{eta_s:02d} | " + # f"Now : {datetime.now()} Finish at: {finish_time.strftime('%H:%M:%S')}" + ) + # برای جلوگیری از چند چاپ پشت سر هم + while progress_percent >= next_log_percent: + next_log_percent += log_percent_step + # ----------------------------------- + + # auto save + if total_i >= self.save_number: + total_i = 0 + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + number = self.__get_max_number_file(self._temp_path) + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson( + data=list(all_results), path=temp_output_path + ) + all_results.clear() + # if + # print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + + queue.task_done() + + # ایجاد workerها + workers = [asyncio.create_task(worker()) for _ in range(total_concurrency)] + + await queue.join() + + # توقف workerها + for w in workers: + w.cancel() + + # ------------------ ذخیره نهایی ------------------ + # ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده + if total_i > 0 or len(all_results) > 0: + # print("💾 Final save of remaining data...") + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + number = self.__get_max_number_file(self._temp_path) + # print(f"number {number}") + + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson(data=list(all_results), path=temp_output_path) + # print(f"💾 Auto-saved partial data: {len(all_results)}") + print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + all_results.clear() + + # ------------------ مرحله ۴: ذخیره خروجی ------------------ + final_data_path = self.output_path / f"final_data_{self.task_name}.json" + processed_id_path = self.output_path / "processed_id.json" + + self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path) + all_results = self.__load_orjson(final_data_path) + # make_new_proccessed_ids_from_file() + self.__save_orjson(data=list(all_processed_id), path=processed_id_path) + self.__save_orjson(data=all_results, path=final_data_path) + + print( + f"\n✅ Processing completed!\n" + f"📊 Total-Data: {len(data)} | " + f"⭕ Ignored-Data: {len(processed_id)} | " + f"📦 Proccessed-Data: {len(all_results)} | " + f"❌ Loss-Data: {len(data)+len(processed_id)-len(all_results)} | " + f"💾 Results saved to: {final_data_path}" + ) + # --- پایان کلی --- + final_processed_id_path = self.output_path / "processed_id_final.json" + self.__save_orjson(data=list(all_processed_id), path=final_processed_id_path) + print(f"\n🏁 All files processed. Final IDs saved to {final_processed_id_path}") + + +# ------------------------------ Multi LLm Multi Client Smart API ------------------------------ +class SmartMultiServerCore: + """ + ProductionGrade-Smart-Handle-Request + """ + + def __init__( + self, + llm_server: List = None, + model_names: List = [ + "gpt-oss-20b", + "gemma-3-27b", + "gpt-oss-120b", + # "magistral-small-2509", + # "Qwen3-Coder-30B", + ], + api_urls: List = [ + "http://127.0.0.1:8001/v1/", + "http://172.16.29.102:8001/v1/", + "http://127.0.0.1:8003/v1/", + "http://172.16.29.102:8003/v1/", + "http://127.0.0.1:8005/v1/", + "http://172.16.29:8005/v1/", + ], + api_keys: List = ["EMPTY"], + max_concurrent_requests_per_server: int = 20, + ): + + if not llm_server: + # تولید تمام ترکیب‌ها + combinations = list(product(model_names, api_urls, api_keys)) + total_count = len(combinations) + print(f"total_count {total_count}") + # ساخت دیکشنری‌ها + self.llm_configs = [] + for model, url, key in combinations: + self.llm_configs.append( + {"model_name": model, "api_url": url, "api_key": key} + ) + else: + self.llm_configs = llm_server + + self.validate_configs() + + self.semaphore = asyncio.Semaphore(max_concurrent_requests_per_server) + self.config_index = 0 + self.num_configs = len(self.active_configs) + + print(f"=== Smart Service Initialized ===") + if len(self.active_configs) > 0: + + print(f"=== self.active_configs ===") + for i, j in enumerate(self.active_configs, start=1): + print(f"=== Config{i}: {j}") + + else: + print(f"Not Found Any Active Service !!!") + + def validate_configs(self): + self.active_configs = [] + + for config in self.llm_configs: + model_name = config["model_name"] + base_url = config.get("api_url", "") + + # اگر url با v1/ تمام می‌شود، endpoint مدل‌ها را به v1/models وصل کن + if base_url.endswith("/v1/"): + models_endpoint = base_url + "models" + else: + models_endpoint = base_url + "/models" + + headers = { + "Authorization": f"Bearer {config.get('api_key', 'EMPTY')}", + "Content-Type": "application/json", + } + + is_active = False + try: + response = requests.get(models_endpoint, headers=headers, timeout=0.1) + + if response.status_code == 200: + models_list = response.json() + # لیست مدل‌ها معمولاً شامل 'data' است که آرایه‌ای از مدل‌هاست + if isinstance(models_list, dict) and "data" in models_list: + available_model_ids = [ + m.get("id", "") for m in models_list["data"] + ] + else: + available_model_ids = [] + + if model_name in available_model_ids: + is_active = True + + if is_active: + self.active_configs.append(config) + except: + continue + + def _get_next_config(self): + """انتخاب کانفیگ بعدی به صورت چرخشی (Round-Robin).""" + config = self.active_configs[self.config_index] + self.config_index = (self.config_index + 1) % self.num_configs + return config + + async def _process_request( + self, + config: Dict, + user_prompt: str, + reasoning_effort: str = "medium", + priority: int = 5, + temperature: float = 0.2, + top_p: float = 0.8, + system_prompt: str = None, + assistant_prompt: str = None, + stop=None, + max_token: int = None, + output_schema: BaseModel = None, + return_reason: bool = False, + return_used_token: bool = False, + ) -> Union[BaseModel, str, Tuple]: + model_name = config["model_name"] + api_url = config["api_url"] + api_key = config["api_key"] + + async with AsyncOpenAI(base_url=api_url, api_key=api_key) as client: + messages = [{"role": "user", "content": user_prompt}] + if system_prompt: + messages.insert(0, {"role": "system", "content": system_prompt}) + if assistant_prompt: + messages.append({"role": "assistant", "content": assistant_prompt}) + + if output_schema: + # Use .parse for structured output + response = await client.chat.completions.parse( + model=model_name, + messages=messages, + temperature=temperature, + top_p=top_p, + max_tokens=max_token, + stop=stop, + response_format=output_schema, + reasoning_effort=reasoning_effort, + extra_body={"priority": priority}, + ) + parsed_obj = response.choices[0].message.parsed + # print(f'parsed_obj {parsed_obj}') + if parsed_obj is None: + print("Failed to parse response") + return str(response) + + # Validate just in case (optional, چون .parse already does it) + if return_reason: + reasoning_content = response.choices[0].message.reasoning_content + if return_used_token: + _total_token = response.usage.total_tokens + return ( + output_schema.model_validate(parsed_obj), + str(reasoning_content), + int(_total_token), ) - response = await client.chat.completions.parse( - model=self.model_name, - messages=messages, - temperature=self.temperature, - top_p=self.top_p, - reasoning_effort=self.reasoning_effort, - max_tokens=self.max_token, - stop=None, - response_format=self.output_schema, + return output_schema.model_validate(parsed_obj), str( + reasoning_content ) - parsed = ( - response.choices[0].message.parsed - if response and response.choices and response.choices[0].message.parsed - else {"raw_text": str(response)} - ) + return output_schema.model_validate(parsed_obj) - parsed = self.output_schema.model_validate(parsed) - parsed = parsed.model_dump() - parsed = dict(parsed) - parsed["ai_code_version"] = self.ai_code_version - return parsed, 200 + else: + response = await client.chat.completions.create( + model=model_name, + messages=messages, + temperature=temperature, + top_p=top_p, + max_tokens=max_token, + stop=None, + reasoning_effort=reasoning_effort, + extra_body={"priority": priority}, + ) + content = response.choices[0].message.content + if not content: + print("Empty response") + return str(response) + return content - except asyncio.TimeoutError: - print(f"⏳ Timeout on item {item}") - return None, 408 + async def single_item( + self, + user_prompt, + reasoning_effort: str = "medium", + priority: int = 5, + temperature: float = 0.2, + top_p: float = 0.8, + system_prompt: str = None, + assistant_prompt: str = None, + stop=None, + max_token: int = None, + output_schema: BaseModel = None, + return_reason: bool = False, + return_used_token: bool = False, + ): + """ + تابعی که درخواست را می‌پذیرد، آن را در صف قرار می‌دهد + و منتظر اجرای آن می‌ماند. + """ - except Exception as e: - print(f"⚠️ Error __process_item {item}: {traceback.print_exc()}") - return None, 400 \ No newline at end of file + # 1. انتخاب کانفیگ فعال بعدی به صورت چرخشی + config = self._get_next_config() + + async with self.semaphore: + result = await self._process_request( + config=config, + user_prompt=user_prompt, + reasoning_effort=reasoning_effort, + priority=priority, + temperature=temperature, + top_p=top_p, + system_prompt=system_prompt, + assistant_prompt=assistant_prompt, + stop=stop, + max_token=max_token, + output_schema=output_schema, + return_reason=return_reason, + return_used_token=return_used_token, + ) + return result + + async def stream_single_item(self): + raise NotImplementedError("Not Created Yet.") + + +############################################################################## +# ------------------------------ End Code ------------------------------ +############################################################################## diff --git a/core/ai_stream_parser.py b/core/ai_stream_parser.py new file mode 100644 index 0000000..1c9ff83 --- /dev/null +++ b/core/ai_stream_parser.py @@ -0,0 +1,534 @@ +from typing import List +from pathlib import Path +import os, orjson, time, json, re, asyncio, traceback +from openai import AsyncOpenAI, LengthFinishReasonError + + + +# ------------------------------ پردازش API ------------------------------ +class StreamAsyncCore: + def __init__( + self, + model_name, + task_name, + api_url, + output_schema=None, + data_path=None, + reasoning_effort="low", + top_p=1, + temperature=0.0, + max_token=128000, + output_path=None, + ai_code_version=None, + request_timeout=30, # ثانیه + api_key="EMPTY", + save_number=2, + semaphore_number=5, + ): + self.unvalid_chunk = [] + self.sample_chunk = [] + self.save_number = save_number + # json file of data + self.data_path = data_path + self.semaphore_number = semaphore_number + + self.task_name = task_name + if output_path is None: + output_path = f"./{task_name}" + + self.output_path = Path(output_path) + self._temp_path = self.output_path / "batch_data" + self._temp_processed_id_path = self._temp_path / "processed_id.json" + + # Create output directory and subdirectories if they don't exist + self.output_path.mkdir(parents=True, exist_ok=True) + self._temp_path.mkdir(parents=True, exist_ok=True) + # self._temp_processed_id_path.mkdir(parents=True, exist_ok=True) + + self.request_timeout = request_timeout + self.model_name = model_name + self.api_key = api_key + self.output_schema = output_schema + self.api_url = api_url + self.reasoning_effort = reasoning_effort + self.top_p = top_p + self.temperature = temperature + self.max_token = max_token + + if ai_code_version is None: + ai_code_version = f"{model_name}_{reasoning_effort}" + self.ai_code_version = ai_code_version + + self.PRIMARY_KEY = {"system_prompt", "user_prompt", "id"} + if data_path != None: + try: + self.data = self.__data_process() + print(f"📦 Loaded {len(self.data)} words") + except Exception as e: + raise ValueError( + f"Data loading/validation failed: {e}\n{traceback.format_exc()}" + ) + + def __validate_item(self, item, idx): + # Mandatory fields + for key in self.PRIMARY_KEY: + if key not in item: + raise ValueError(f"Missing mandatory key '{key}' in item #{idx}") + if not isinstance(item[key], str): + raise TypeError( + f"Item #{idx}: '{key}' must be a string, got {type(item[key]).__name__}" + ) + + # Optional field: assistant_prompt + if "assistant_prompt" not in item or item["assistant_prompt"] is None: + item["assistant_prompt"] = None + else: + if not isinstance(item["assistant_prompt"], str): + raise TypeError( + f"Item #{idx}: 'assistant_prompt' must be a string or absent, got {type(item['assistant_prompt']).__name__}" + ) + + return item # now normalized + + def __data_process(self): + raw_data = self.__load_orjson(self.data_path) + if not isinstance(raw_data, list): + raise ValueError("Data must be a list of dictionaries.") + + processed_data = [] + for idx, item in enumerate(raw_data): + if not isinstance(item, dict): + raise ValueError(f"Item #{idx} is not a dictionary.") + validated_item = self.__validate_item(item, idx) + processed_data.append(validated_item) + + return processed_data + + def __get_max_number_file(self, directory): + # Pattern to match filenames like out_1.json, out_25.json, etc. + pattern = re.compile(r"output_(\d+)\.json$") + max_num = 0 + + for filename in os.listdir(directory): + match = pattern.match(filename) + if match: + num = int(match.group(1)) + if num > max_num: + max_num = num + return max_num + 1 + + def __load_orjson(self, path: str | Path): + path = Path(path) + with path.open("rb") as f: # باید باینری باز بشه برای orjson + return orjson.loads(f.read()) + + def __save_orjson(self, path, data): + with open(path, "wb") as f: + f.write( + orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) + ) + + def merge_json_dir(self, input_path, output_path): + directory = Path(input_path) + if not directory.is_dir(): + raise ValueError(f"Not valid PATH: {input_path}") + + seen_ids = set() # برای ردیابی idهای دیده‌شده (سریع!) + unique_data = [] # فقط داده‌های یکتا + failed_files = [] + + json_files = list(directory.glob("*.json")) + if not json_files: + print("⚠️ NO JSON File Found In This PATH") + return + + for json_file in json_files: + try: + data = self.__load_orjson(json_file) + if not data: # خالی یا None + failed_files.append(json_file.name) + continue + + if isinstance(data, list) and isinstance(data[0], dict): + for item in data: + item_id = item.get("id") + if item_id is None: + # اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی + # اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند + continue + if item_id not in seen_ids: + seen_ids.add(item_id) + unique_data.append(item) + else: + raise ValueError(f"no list available in this json -> {json_file}") + except ( + json.JSONDecodeError, + ValueError, + OSError, + KeyError, + TypeError, + ) as e: + # print(f"❌ Failed in process '{json_file.name}': {e}") + failed_files.append(json_file.name) + + # گزارش خطاها + if failed_files: + print("\n❌ We lose this file:") + for name in failed_files: + print(f" - {name}") + else: + print("\n✅ All JSON added") + + # ذخیره خروجی + try: + self.__save_orjson(data=unique_data, path=output_path) + print( + f"\n💾 Final file saved: {output_path} (Total unique items: {len(unique_data)})" + ) + except Exception as e: + print(f"❌ Error in saving final file: {e}") + + def make_new_proccessed_ids_from_file(self, json_in, out_path): + data = self.__load_orjson(json_in) + + finall_data = [] + for d in data: + if d["id"]: + finall_data.append(d["id"]) + finall_data = set(finall_data) + finall_data = list(finall_data) + print(f"-- len ids {len(finall_data)}") + + self.__save_orjson(data=finall_data, path=out_path) + + # ------------------------------ Main ------------------------------ + async def __process_item(self, client, item): + try: + messages = [ + {"role": "user", "content": item["user_prompt"]}, + ] + if item.get("system_prompt"): + messages.append({"role": "system", "content": item["system_prompt"]}) + if item.get("assistant_prompt"): + messages.append( + {"role": "assistant", "content": item["assistant_prompt"]} + ) + + response = await client.chat.completions.parse( + model=self.model_name, + messages=messages, + temperature=self.temperature, + top_p=self.top_p, + reasoning_effort=self.reasoning_effort, + max_tokens=self.max_token, + stop=None, + response_format=self.output_schema, + ) + + parsed = ( + response.choices[0].message.parsed + if response and response.choices and response.choices[0].message.parsed + else {"raw_text": str(response)} + ) + + parsed = self.output_schema.model_validate(parsed) + parsed = parsed.model_dump() + parsed = dict(parsed) + parsed["ai_code_version"] = self.ai_code_version + parsed["id"] = item["id"] + # parsed["item"] = item + return parsed, 200 + + except asyncio.TimeoutError: + print(f"⏳ Timeout on item {item['id']}") + return None, 408 + + except Exception as e: + print(f"⚠️ Error __process_item {item['id']}: {traceback.print_exc()}") + return None, 400 + + def async_eval(self, processed_id: List = []): + try: + asyncio.run(self.__async_eval(processed_id)) + except KeyboardInterrupt: + print("\n🛑 Interrupted by user.") + traceback.print_exc() + + async def __async_eval(self, processed_id: List): + """ + اجرای اصلی تک‌هسته‌ای و async برای تولید خروجی نهایی. + """ + print("🔹 Starting async data processing...") + + # ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------ + if not processed_id: + try: + processed_id = self.__load_orjson(self._temp_processed_id_path) + print( + f"📂 Loaded existing processed_id from {self._temp_processed_id_path}" + ) + except Exception: + print("⚠️ No valid processed_id found. Starting fresh.") + processed_id = [] + + # ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------ + all_processed_id = set(processed_id) + all_results = [] + total_time = [] + + data = [item for item in self.data if item.get("id") not in all_processed_id] + print( + f"➕ Total items: {len(self.data)} - {len(all_processed_id)} = {len(data)}" + ) + + # اگر چیزی برای پردازش نیست + if not data: + print("✅ Nothing new to process. All items are already done.") + return + + # ------------------ مرحله ۳: شروع پردازش ------------------ + print(f"🤖 Model: {self.model_name} | Reasoning: {self.reasoning_effort}") + async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: + semaphore = asyncio.Semaphore(5) + + async def limited_process(item): + async with semaphore: + return await self.__process_item(client, item) + + tasks = [asyncio.create_task(limited_process(item)) for item in data] + + total_i = 0 + # ✅ پردازش به ترتیب تکمیل (نه ترتیب لیست) + for i, task in enumerate(asyncio.as_completed(tasks), start=1): + start = time.time() + try: + parsed, status_code = await asyncio.wait_for( + task, timeout=self.request_timeout + ) # ⏱ حداکثر 2 دقیقه + except asyncio.TimeoutError: + print(f"⏳ Task {i} timed out completely") + parsed, status_code = None, 408 + total_time.append(time.time() - start) + + if status_code == 200: + all_results.append(parsed) + all_processed_id.add(parsed.get("id")) + else: + print(f"⚠️ Skipped item (status={status_code})") + + total_i += 1 + # ✅ ذخیره‌ی موقت هر n مورد + if total_i >= self.save_number: + print(f"total_i {total_i}") + print(f"self.save_number {self.save_number}") + total_i = 0 + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + number = self.__get_max_number_file(self._temp_path) + print(f"number {number}") + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson(data=list(all_results), path=temp_output_path) + print(f"💾 Auto-saved partial data: {len(all_results)}") + all_results.clear() + + # ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده + if total_i > 0 or len(all_results) > 0: + print("💾 Final save of remaining data...") + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + number = self.__get_max_number_file(self._temp_path) + print(f"number {number}") + + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson(data=list(all_results), path=temp_output_path) + print(f"💾 Auto-saved partial data: {len(all_results)}") + all_results.clear() + + # ------------------ مرحله ۴: ذخیره خروجی ------------------ + final_data_path = self.output_path / f"final_data_{self.task_name}.json" + processed_id_path = self.output_path / "processed_id.json" + + self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path) + all_results = self.__load_orjson(final_data_path) + # make_new_proccessed_ids_from_file() + self.__save_orjson(data=list(all_processed_id), path=processed_id_path) + self.__save_orjson(data=all_results, path=final_data_path) + + avg_time = (sum(total_time) / len(total_time)) if total_time else 0 + print( + f"\n✅ Processing completed!\n" + f"📊 Total-Data: {len(data)} | " + f"⭕ Ignored-Data: {len(processed_id)} | " + f"📦 Proccessed-Data: {len(all_results)} | " + f"❌ Loss-Data: {len(data)-len(all_results)} | " + f"🕒 Avg Time: {avg_time:.2f}'s per item | " + f"🕒 Total Time: {sum(total_time):.4f}'s | " + f"💾 Results saved to: {final_data_path}" + ) + + async def single_async_item( + self, item, reasoning_effort, temperature, top_p, output_schema=None, max_token=4096, + stream=False, print_logs=False, return_reason=False, stop=None, return_used_token=False, + ): + try: + async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: + semaphore = asyncio.Semaphore(self.semaphore_number) + async with semaphore: + messages = [{"role": "user", "content": item["user_prompt"]}] + if item.get("system_prompt"): + messages.insert(0, {"role": "system", "content": item["system_prompt"]}) + if item.get("assistant_prompt"): + messages.append({"role": "assistant", "content": item["assistant_prompt"]}) + # output_schema =None + if output_schema is not None: + # Use .parse for structured output + response = await client.chat.completions.parse( + model=self.model_name, + messages=messages, + temperature=temperature, + top_p=top_p, + max_tokens=max_token, + stop=stop, + response_format=output_schema, + reasoning_effort=reasoning_effort + ) + if print_logs: + print(f'parse response ---- {response}') + parsed_obj = response.choices[0].message.parsed + # print(f'parsed_obj {parsed_obj}') + if parsed_obj is None: + return {"error": "Failed to parse response", "raw": str(response)} + # Validate just in case (optional, چون .parse already does it) + if return_reason: + reasoning_content = response.choices[0].message.reasoning_content + if return_used_token: + _total_token = response.usage.total_tokens + return output_schema.model_validate(parsed_obj), str(reasoning_content), int(_total_token) + + return output_schema.model_validate(parsed_obj), str(reasoning_content) + + return output_schema.model_validate(parsed_obj) + + else: + # Use .create for raw text + response = await client.chat.completions.create( + model=self.model_name, + messages=messages, + temperature=temperature, + top_p=top_p, + max_tokens=max_token, + stop=None, + stream=stream, + reasoning_effort=reasoning_effort + # No response_format + ) + # print(f'response-stream {stream}-> {response}') + content = response.choices[0].message.content + if return_reason: + reasoning_content = response.choices[0].message.reasoning_content + if return_used_token: + _total_token = response.usage.total_tokens + return content, str(reasoning_content), int(_total_token) + + return content, str(reasoning_content) + + if not content: + return {"error": "Empty response", "raw": str(response)} + + return content + + except LengthFinishReasonError as le: + # اینجاست که جادو اتفاق می‌افته + if max_token >= 50000: + return { + "error": "MAX_TOKEN_LIMIT_REACHED", + "max_token": max_token + } + + new_max = min(max_token * 2, 50000) + + return await self.single_async_item( + item=item, + reasoning_effort=reasoning_effort, + temperature=temperature, + top_p=top_p, + output_schema=output_schema, + max_token=new_max, + stream=stream, + print_logs=print_logs, + return_reason=return_reason, + return_used_token=return_used_token, + stop=stop + ) + + except asyncio.TimeoutError: + print(f"⏳ Timeout on item {item}") + return None + + except Exception as e: + print(f"⚠️ Error __process_item {item}: {traceback.print_exc()}") + return None + + async def single_async_item_stream( + self, + item, + reasoning_effort, + temperature, + top_p, + max_token=None, + ): + async with AsyncOpenAI( + base_url=self.api_url, + api_key=self.api_key + ) as client: + + messages = [{"role": "user", "content": item["user_prompt"]}] + if item.get("system_prompt"): + messages.insert(0, {"role": "system", "content": item["system_prompt"]}) + + stream = await client.chat.completions.create( + model=self.model_name, + messages=messages, + temperature=temperature, + top_p=top_p, + max_tokens=max_token, + reasoning_effort=reasoning_effort, + stream=True, # ⭐ مهم + # stream_options= + ) + + # c = 0 + # v = 0 + # v1 = 0 + # q = 0 + async for chunk in stream: + # if c == 0: + # self.sample_chunk.append(chunk) + # c += 1 + # print(f'c {c}') + if not chunk.choices: + continue + + delta = chunk.choices[0].delta + + if delta and delta.content: + # if v1 == 0: + # self.sample_chunk.append(chunk) + # v += 1 + # v1 += 1 + # print(f'v {v}') + yield delta.content + + # if v != c: + # if q == 0: + # self.sample_chunk.append(chunk) + # q +=1 + # print('add-unvalid_chunk') + # self.unvalid_chunk.append(chunk) + # v = c diff --git a/core/data_normalizer.py b/core/data_normalizer.py index 07f96d8..2a4c6ea 100644 --- a/core/data_normalizer.py +++ b/core/data_normalizer.py @@ -35,6 +35,7 @@ def merge_json_dir(input_path, output_path): for json_file in json_files: try: data = load_orjson(json_file) + # print(f'data {type(data)}') if not data: # خالی یا None failed_files.append(json_file.name) continue @@ -44,6 +45,9 @@ def merge_json_dir(input_path, output_path): if isinstance(data, list) and isinstance(data[0], dict): for item in data: item_id = item.get("id") + if not item_id: + item_id = item.get("_id") + if item_id is None: # اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی # اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند @@ -188,10 +192,10 @@ def count_tokens(model_name, system_prompt, user_prompt): # --- نحوه استفاده --- if __name__ == "__main__": # ##### یکی کردن تمام بچ های خروجی در یک فایل - # merge_json_dir( - # input_path= '/home1/ava3/project/aiDataParser/task/keyword_extractor/output/batch_data', - # output_path='/home1/ava3/project/aiDataParser/task/keyword_extractor/output/merged_1.json' - # ) + merge_json_dir( + input_path= '/home1/ava3/init_mahdi/project/rule_extractor/im_based/data/sequential/subject_unity/output_6152/batch_data', + output_path='/home1/ava3/init_mahdi/project/rule_extractor/im_based/data/sequential/subject_unity/output_6152/final_data_subject_unity.json' + ) ###### ساخت یک proccessed id از فایل نهایی # make_new_proccessed_ids_from_file( @@ -507,3 +511,4 @@ if __name__ == "__main__": ########################################################################## print(":D") +# python3 -m aiDataParser.core.data_normalizer \ No newline at end of file diff --git a/core/example.py b/core/example.py index e529e78..aa97889 100644 --- a/core/example.py +++ b/core/example.py @@ -1,43 +1,72 @@ -######################################################################1 -class Output(BaseModel): - simplify_list : List[str] +# ###################################################################### +# import asyncio, time +# from pydantic import BaseModel -if __name__ == '__main__': - ruuner = AsyncCore( - model_name='gpt-oss-120b', - data_path='/home1/ava3/project/aiDataParser/task/simplify/input/prompt.json', - output_path='/home1/ava3/project/aiDataParser/task/simplify/output', - api_url="http://172.16.29.102:8001/v1/", - task_name='simplify-all-v1-oss-120b-med', - output_schema=Output, - reasoning_effort='medium', - ai_code_version='oss120b_med', - request_timeout=60, - save_number=2, - max_token=50000, - ) - ruuner.async_eval() -######################################################################2 +# class Response(BaseModel): +# result: int -class Output(BaseModel): - simplify_list : List[str] -if __name__ == '__main__': - ruuner = AsyncCore( - model_name='gpt-oss-120b', - data_path='/home1/ava3/project/aiDataParser/task/simplify/input/prompt.json', - output_path='/home1/ava3/project/aiDataParser/task/simplify/output', - api_url="http://172.16.29.102:8001/v1/", - task_name='simplify-all-v1-oss-120b-med', - output_schema=Output, - reasoning_effort='medium', - ai_code_version='oss120b_med', - request_timeout=60, - save_number=2, - max_token=50000, - ) +# async def main(): +# runner = SmartMultiServerCore() +# start = time.time() +# # res = await runner.single_item(user_prompt="سلام جواب این عبارت ریاضی چی میشه ؟ 145/5*2") +# res = await runner.single_item( +# user_prompt="سلام جواب این عبارت ریاضی چی میشه ؟ 145/5*2", +# # output_schema=Response, +# ) +# end = time.time() +# print(f"res {res}", f"{end - start:.2f}", sep="\n") - llm_answer, _ = await RUNNER_PROMPT.single_simple_async_proccess_item( - item={"user_prompt": prompt, "system_prompt": SYSTEM_PROPMT2}, - ) \ No newline at end of file + +# asyncio.run(main()) + +# ###################################################################### +# class Output(BaseModel): +# simplify_list : List[str] + +# if __name__ == '__main__': +# ruuner = AsyncCore( +# model_name='gpt-oss-120b', +# data_path='/home1/ava3/project/aiDataParser/task/simplify/input/prompt.json', +# output_path='/home1/ava3/project/aiDataParser/task/simplify/output', +# api_url="http://172.16.29.102:8001/v1/", +# task_name='simplify-all-v1-oss-120b-med', +# output_schema=Output, +# reasoning_effort='medium', +# ai_code_version='oss120b_med', +# request_timeout=60, +# save_number=2, +# max_token=50000, +# ) +# ruuner.async_eval() + +# ######################################################################2 + +# class Output(BaseModel): +# simplify_list : List[str] + +# if __name__ == '__main__': +# ruuner = AsyncCore( +# model_name='gpt-oss-120b', +# data_path='/home1/ava3/project/aiDataParser/task/simplify/input/prompt.json', +# output_path='/home1/ava3/project/aiDataParser/task/simplify/output', +# api_url="http://172.16.29.102:8001/v1/", +# task_name='simplify-all-v1-oss-120b-med', +# output_schema=Output, +# reasoning_effort='medium', +# ai_code_version='oss120b_med', +# request_timeout=60, +# save_number=2, +# max_token=50000, +# ) + +# llm_answer, _ = await RUNNER_PROMPT.single_simple_async_proccess_item( +# item={"user_prompt": prompt, "system_prompt": SYSTEM_PROPMT2}, +# ) + +# ######################################################################2 +# from aiDataParser.core.data_normalizer import merge_json_dir + +# merge_json_dir(input_path="/home1/ava3/init_mahdi/data/mj_qa_section", output_path="/home1/ava3/init_mahdi/data/mj_qa_section_28_11_1404.json") +# python3 -m aiDataParser.core.example \ No newline at end of file diff --git a/requierments.txt b/requierments.txt index d0d0bb5..280b9b0 100644 --- a/requierments.txt +++ b/requierments.txt @@ -1 +1,4 @@ -orjson \ No newline at end of file +orjson +openpyxl +beautifulsoup4 +sacrebleu \ No newline at end of file