diff --git a/core/ai_parser.py b/core/ai_parser.py index 60739a3..e01e924 100644 --- a/core/ai_parser.py +++ b/core/ai_parser.py @@ -24,11 +24,13 @@ class AsyncCore: request_timeout=30, # ثانیه api_key="EMPTY", save_number=2, + semaphore_number=5, ): self.save_number = save_number # json file of data self.data_path = data_path + self.semaphore_number = semaphore_number self.task_name = task_name if output_path is None: @@ -220,6 +222,8 @@ class AsyncCore: max_tokens=self.max_token, stop=None, response_format=self.output_schema, + + ) parsed = ( @@ -229,9 +233,11 @@ class AsyncCore: ) parsed = self.output_schema.model_validate(parsed) + parsed = parsed.model_dump() parsed = dict(parsed) parsed["ai_code_version"] = self.ai_code_version parsed["id"] = item["id"] + # parsed["item"] = item return parsed, 200 except asyncio.TimeoutError: @@ -309,13 +315,11 @@ class AsyncCore: all_results.append(parsed) all_processed_id.add(parsed.get("id")) else: - print(f"⚠️ Skipped item {parsed.get('id')} (status={status_code})") + print(f"⚠️ Skipped item (status={status_code})") total_i += 1 # ✅ ذخیره‌ی موقت هر n مورد if total_i >= self.save_number: - print(f"total_i {total_i}") - print(f"self.save_number {self.save_number}") total_i = 0 self.__save_orjson( data=list(all_processed_id), @@ -366,3 +370,173 @@ class AsyncCore: f"🕒 Total Time: {sum(total_time):.4f}'s | " f"💾 Results saved to: {final_data_path}" ) + + def async_eval_with_retry(self, processed_id: List = [], classification_list: List=[], classification_result_field:str='word'): + try: + asyncio.run(self.__async_eval_with_retry(processed_id, classification_list, classification_result_field)) + except KeyboardInterrupt: + print("\n🛑 Interrupted by user.") + traceback.print_exc() + + + async def __async_eval_with_retry(self, processed_id: List, classification_list:List, classification_result_field:str): + """ + اجرای اصلی تک‌هسته‌ای و async برای تولید خروجی نهایی. + """ + print("🔹 Starting async data processing...") + + # ------------------ مرحله ۱: بازیابی شناسه‌های قبلاً پردازش‌شده ------------------ + if not processed_id: + try: + processed_id = self.__load_orjson(self._temp_processed_id_path) + print( + f"📂 Loaded existing processed_id from {self._temp_processed_id_path}" + ) + except Exception: + print("⚠️ No valid processed_id found. Starting fresh.") + processed_id = [] + + # ------------------ مرحله ۲: آماده‌سازی داده‌ها ------------------ + all_processed_id = set(processed_id) + all_results = [] + total_time = [] + + data = [item for item in self.data if item.get("id") not in all_processed_id] + print( + f"➕ Total items: {len(self.data)} - {len(all_processed_id)} = {len(data)}" + ) + + # اگر چیزی برای پردازش نیست + if not data: + print("✅ Nothing new to process. All items are already done.") + return + + # ------------------ مرحله ۳: شروع پردازش ------------------ + print(f"🤖 Model: {self.model_name} | Reasoning: {self.reasoning_effort}") + async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: + semaphore = asyncio.Semaphore(self.semaphore_number) + + async def limited_process(item): + async with semaphore: + return await self.__process_item(client, item) + + tasks = [asyncio.create_task(limited_process(item)) for item in data] + + total_i = 0 + # ✅ پردازش به ترتیب تکمیل (نه ترتیب لیست) + for i, task in enumerate(asyncio.as_completed(tasks), start=1): + start = time.time() + try: + parsed, status_code = await asyncio.wait_for( + task, timeout=self.request_timeout + ) # ⏱ حداکثر 2 دقیقه + except asyncio.TimeoutError: + print(f"⏳ Task {i} timed out completely") + parsed, status_code = None, 408 + total_time.append(time.time() - start) + if status_code == 200: + all_results.append(parsed) + all_processed_id.add(parsed.get("id")) + else: + print(f"⚠️ Skipped item (status={status_code})") + + total_i += 1 + # ✅ ذخیره‌ی موقت هر n مورد + if total_i >= self.save_number: + total_i = 0 + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + number = self.__get_max_number_file(self._temp_path) + print(f"--- {number}/{len(data)} ---") + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson(data=list(all_results), path=temp_output_path) + all_results.clear() + + # ✅ بعد از پایان تمام تسک‌ها، ذخیره نهایی برای داده‌های باقیمانده + if total_i > 0 or len(all_results) > 0: + print("💾 Final save of remaining data...") + self.__save_orjson( + data=list(all_processed_id), + path=self._temp_processed_id_path, + ) + print(f"💾 Auto-saved processed ids: {len(all_processed_id)}") + number = self.__get_max_number_file(self._temp_path) + print(f"--- {number}/{len(data)} ---") + + temp_output_path = self._temp_path / f"output_{number}.json" + self.__save_orjson(data=list(all_results), path=temp_output_path) + # print(f"💾 Auto-saved partial data: {len(all_results)}") + all_results.clear() + + # ------------------ مرحله ۴: ذخیره خروجی ------------------ + final_data_path = self.output_path / f"final_data_{self.task_name}.json" + processed_id_path = self.output_path / "processed_id.json" + + self.merge_json_dir(input_path=self._temp_path, output_path=final_data_path) + all_results = self.__load_orjson(final_data_path) + # make_new_proccessed_ids_from_file() + self.__save_orjson(data=list(all_processed_id), path=processed_id_path) + self.__save_orjson(data=all_results, path=final_data_path) + + avg_time = (sum(total_time) / len(total_time)) if total_time else 0 + print( + f"\n✅ Processing completed!\n" + f"📊 Total-Data: {len(data)} | " + f"⭕ Ignored-Data: {len(processed_id)} | " + f"📦 Proccessed-Data: {len(all_results)} | " + f"❌ Loss-Data: {len(data)-len(all_results)} | " + f"🕒 Avg Time: {avg_time:.2f}'s per item | " + f"🕒 Total Time: {sum(total_time):.4f}'s | " + f"💾 Results saved to: {final_data_path}" + ) + + async def single_simple_async_proccess_item(self, item): + async with AsyncOpenAI(base_url=self.api_url, api_key=self.api_key) as client: + semaphore = asyncio.Semaphore(5) + async with semaphore: + try: + messages = [ + {"role": "user", "content": item["user_prompt"]}, + ] + if item.get("system_prompt"): + messages.append( + {"role": "system", "content": item["system_prompt"]} + ) + if item.get("assistant_prompt"): + messages.append( + {"role": "assistant", "content": item["assistant_prompt"]} + ) + + response = await client.chat.completions.parse( + model=self.model_name, + messages=messages, + temperature=self.temperature, + top_p=self.top_p, + reasoning_effort=self.reasoning_effort, + max_tokens=self.max_token, + stop=None, + response_format=self.output_schema, + ) + + parsed = ( + response.choices[0].message.parsed + if response and response.choices and response.choices[0].message.parsed + else {"raw_text": str(response)} + ) + + parsed = self.output_schema.model_validate(parsed) + parsed = parsed.model_dump() + parsed = dict(parsed) + parsed["ai_code_version"] = self.ai_code_version + return parsed, 200 + + except asyncio.TimeoutError: + print(f"⏳ Timeout on item {item}") + return None, 408 + + except Exception as e: + print(f"⚠️ Error __process_item {item}: {traceback.print_exc()}") + return None, 400 \ No newline at end of file diff --git a/core/data_normalizer.py b/core/data_normalizer.py new file mode 100644 index 0000000..07f96d8 --- /dev/null +++ b/core/data_normalizer.py @@ -0,0 +1,509 @@ +import os +import json +import orjson +import tiktoken +from pathlib import Path + + +def load_orjson(path: str | Path): + path = Path(path) + with path.open("rb") as f: # باید باینری باز بشه برای orjson + return orjson.loads(f.read()) + + +def save_orjson(path, data): + with open(path, "wb") as f: + f.write( + orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) + ) + + +def merge_json_dir(input_path, output_path): + directory = Path(input_path) + if not directory.is_dir(): + raise ValueError(f"Not valid PATH: {input_path}") + + seen_ids = set() # برای ردیابی idهای دیده‌شده (سریع!) + unique_data = [] # فقط داده‌های یکتا + failed_files = [] + + json_files = list(directory.glob("*.json")) + if not json_files: + print("⚠️ NO JSON File Found In This PATH") + return + + for json_file in json_files: + try: + data = load_orjson(json_file) + if not data: # خالی یا None + failed_files.append(json_file.name) + continue + + # if isinstance(data, dict): + # unique_data.append(data) + if isinstance(data, list) and isinstance(data[0], dict): + for item in data: + item_id = item.get("id") + if item_id is None: + # اگر id نداشت، می‌تونی تصمیم بگیری: نگه داری یا ردش کنی + # اینجا فرض می‌کنیم فقط مواردی با id معتبر مهم هستند + continue + if item_id not in seen_ids: + seen_ids.add(item_id) + unique_data.append(item) + else: + raise ValueError(f"no list available in this json -> {json_file}") + except (json.JSONDecodeError, ValueError, OSError, KeyError, TypeError) as e: + print(f"❌ Failed in process '{json_file.name}': {e}") + failed_files.append(json_file.name) + + # گزارش خطاها + if failed_files: + print("\n❌ We lose this file:") + for name in failed_files: + print(f" - {name}") + else: + print("\n✅ All JSON added") + + # ذخیره خروجی + try: + save_orjson(data=unique_data, path=output_path) + print( + f"\n💾 Final file saved: {output_path} (Total unique items: {len(unique_data)})" + ) + except Exception as e: + print(f"❌ Error in saving final file: {e}") + + +def make_new_proccessed_ids_from_file(json_in, out_path): + data = load_orjson(json_in) + + finall_data = [] + for d in data: + if d["id"]: + finall_data.append(d["id"]) + finall_data = set(finall_data) + finall_data = list(finall_data) + print(f"-- len ids {len(finall_data)}") + + save_orjson(data=finall_data, path=out_path) + + +def __try_parse_json(value): + """اگر value یک رشته باشد و بتوان آن را به JSON پارس کرد، نسخه پارس‌شده را برمی‌گرداند.""" + if isinstance(value, str): + try: + parsed = json.loads(value) + # فقط اگر واقعاً JSON بود (نه یک عدد یا کلمه ساده) + if isinstance(parsed, (dict, list)): + return parsed + else: + return value # مثلاً اگر رشته "123" بود، نمی‌خواهیم به عدد تبدیلش کنیم + except (json.JSONDecodeError, TypeError): + pass + return value + + +def __deep_parse_json_strings(obj): + """به صورت بازگشتی همه رشته‌هایی که JSON هستند را پارس می‌کند.""" + if isinstance(obj, dict): + return { + key: __deep_parse_json_strings(__try_parse_json(value)) + for key, value in obj.items() + } + elif isinstance(obj, list): + return [__deep_parse_json_strings(__try_parse_json(item)) for item in obj] + else: + return obj + + +def serialize_json_from_str_fields(json_in, out_path=None): + if out_path is None: + out_path = json_in + + # بارگذاری داده + data = load_orjson(json_in) + + # پارس کردن عمیق همه رشته‌های JSON + cleaned_data = __deep_parse_json_strings(data) + + # ذخیره نتیجه + save_orjson(data=cleaned_data, path=out_path) + print(f"✅ all done '{out_path}'") + + +def make_format(in_path, out_path): + data = load_orjson(in_path) + f_data = [] + + for i in data: + form = { + "id": None, + "word": None, + "ai_code_version": None, + "ai_result": [], + } + form["id"] = i["id"] + form["word"] = i["word"] + form["ai_result"] = i["ai_code"]["ai_code"]["result"] + form["ai_code_version"] = i["ai_code"]["ai_code_version"] + f_data.append(form) + + save_orjson(data=f_data, path=out_path) + + + +def count_tokens(model_name, system_prompt, user_prompt): + + """ + شمارش دقیق توکن‌های ورودی برای مدل‌های مختلف + """ + text = f"<|system|>\n{system_prompt}\n<|user|>\n{user_prompt}" + + # --- انتخاب tokenizer --- + if "openai" in model_name.lower() or "oss" in model_name.lower(): + # مدل‌های مشابه GPT یا OSS از tiktoken استفاده می‌کنند + enc = tiktoken.get_encoding("cl100k_base") + tokens = enc.encode(text) + return len(tokens) + + elif "gemma" in model_name.lower(): + from transformers import AutoTokenizer + # SentencePiece tokenizer (Gemma از HuggingFace) + tokenizer = AutoTokenizer.from_pretrained(model_name) + tokens = tokenizer.encode(text) + return len(tokens) + + elif "magistral" in model_name.lower() or "mistral" in model_name.lower(): + from transformers import AutoTokenizer + # Mistral / Magistral tokenizer (BPE) + tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True) + tokens = tokenizer.encode(text) + return len(tokens) + + else: + raise ValueError(f"Model {model_name} not recognized.") + + +# --- نحوه استفاده --- +if __name__ == "__main__": + # ##### یکی کردن تمام بچ های خروجی در یک فایل + # merge_json_dir( + # input_path= '/home1/ava3/project/aiDataParser/task/keyword_extractor/output/batch_data', + # output_path='/home1/ava3/project/aiDataParser/task/keyword_extractor/output/merged_1.json' + # ) + + ###### ساخت یک proccessed id از فایل نهایی + # make_new_proccessed_ids_from_file( + # json_in ='/home1/ava3/word_bank_proccess/oss_120b_v1/merged_finall.json', + # out_path='/home1/ava3/word_bank_proccess/oss_120b_v1/proccessed_id.json', + # ) + + # جیسونی کردن تمام فیلد ها + # serialize_json_from_str_fields( + # json_in='/home1/ava3/keyword_simpify_proccess/data_keyword_gemma27/merged_1.json', + # out_path='/home1/ava3/keyword_simpify_proccess/data_keyword_gemma27/merged_2.json' + # ) + + # format finallize + # make_format( + # in_path='/home1/ava3/keyword_simpify_proccess/data_keyword_gemma27/data.json', + # out_path='/home1/ava3/keyword_simpify_proccess/data_keyword_gemma27/data_f1.json' + # ) + # input_path = "/home1/ava3/word_bank_proccess/oss_120b_v1/merged_finall.json" + # data = load_orjson(input_path) + # print(f"------{len(data)}-----") + # Truley = [] + # for i in data: + # if i['ai_code']['ai_code']['is_correct'] is True : #and i['ai_code']['ai_code']['is_proper_noun'] is True + # Truley.append(i) + + # ---------------- filter حقوقی word + # input_path = "/home1/ava3/word_bank_proccess/oss_120b_v1/merged_finall.json" + # data = load_orjson(input_path) + # Truley = [] + # for item in data: + # if "حقوقی" in item['ai_code']['ai_code']['scope'] and item['ai_code']['ai_code']["is_correct"] is True: + # Truley.append(item) + + # print(f'Truley {len(Truley)}') + # save_orjson( + # data=Truley, + # path='/home1/ava3/project/aidataparser_test/motaradef_dataset/motaradef.json' + # ) + ######################################################################## + # raw_data = '/home1/ava3/data/mj_qa_section.json' + # raw_data = load_orjson(raw_data) + # ignore_data = '/home1/ava3/data/ignore_mj_qa_sections.json' + # ignore_data = load_orjson(ignore_data) + # ignore_data = set(ignore_data) + + # f_data = [] + # for item in raw_data: + # if item['id'] not in ignore_data: + # f_data.append(item) + + # print(f'f_data {len(f_data)}') + # save_orjson( + # data=f_data, + # path='/home1/ava3/data/valid_mj_qa_sections.json' + # ) + + # ########################################################################## + # raw_data = load_orjson('/home1/ava3/data/valid_mj_qa_sections.json') + # f_data = [] + # for item in raw_data: + # f_data.append( + # { + # 'id':item['id'], + # 'content':item['content'] + # } + # ) + + # save_orjson( + # data=f_data, + # path='/home1/ava3/data/valid_mj_qa_sections_light.json') + ##################################### work with tree ##################################### + # fr_tree = '/home1/ava3/project/aiDataParser/task/match_code_fr_per/prompt_ir_def.json' + # fr_tree = load_orjson(fr_tree) + + # code_title = [] + # for k, v in fr_tree.items(): + # code_title.append( + # k + # ) + + # save_orjson( + # data=code_title, + # path='/home1/ava3/project/aiDataParser/task/match_code_fr_per/all_code_title_persian.json' + # ) + + ##################################### make tree ##################################### + # fr_tree = '/home1/ava3/franc_legal_codes/translate/all_tree_franc.json' + # fr_tree = load_orjson(fr_tree) + + # pr_data = '/home1/ava3/project/aiDataParser/task/france_translate/all_pr_fr_title_cleaned_1.json' + # pr_data = load_orjson(pr_data) + + + # def build_lookup(flat_list): + # lookup = {} + # for item in flat_list: + # key = item.get("france") + # key = key.strip() + # lookup[key] = item + # return lookup + + + # def clean_title(title: str) -> str: + # import re + # title = re.sub(r'\n+', '', title) # \s+ = هر ترکیبی از whitespace (space, \t, \n, \r, ...) + # if r'-\s+' in title: + # title = title.replace(r'-\s+', '-') + # cleaned = re.sub(r'\s+', ' ', title) # \s+ = هر ترکیبی از whitespace (space, \t, \n, \r, ...) + # return cleaned.strip() + + # def enrich_tree(node, flatted_list, lvl=0): + # title = clean_title(node["title"]) + + # # یافتن مطابق در لیست مسطح + # enriched = flatted_list.get(title, {}) + + # persian = enriched.get("persian") + # if persian == None: + # unlist = { + # 'fr_title':title, + # 'pr_title':enriched.get('france') + # } + # print(unlist) + # return (unlist, 1) + # # ساخت گره جدید + # new_node = { + # "france": title, + # "persian": enriched.get("persian"), # پیش‌فرض: همان فرانسوی اگر ترجمه نبود + # "id": enriched.get("id"), + # "level":lvl + # # "ai_code_version": enriched.get("ai_code_version"), + # } + # lvl +=1 + # # اضافه کردن زیربخش‌ها (sections) اگر وجود داشت + # if "sections" in node and node["sections"]: + # new_node["sections"] = [ + # enrich_tree(child, flatted_list, lvl) for child in node["sections"] + # ] + + # # return new_node + # return new_node + + # f_data = [] + # lookup = build_lookup(pr_data) + # for node in fr_tree: + # # f_data.append(enrich_tree(node, lookup)) + # res = enrich_tree(node, lookup) + # if isinstance(res, tuple): + # res, _ = res + # f_data.append( + # res + # ) + + # save_orjson( + # path='/home1/ava3/project/aiDataParser/task/france_translate/tree_test1_title.json', + # data=f_data + # ) + + ########################################################################## + # input_per = '/home1/ava3/project/aiDataParser/task/france_translate/all_pr_fr_title.json' + # input_per = load_orjson(input_per) + # for i in input_per: + # # ) ° + # title = i['persian'] + + # # حذف ° + # if '°' in title: + # title = title.replace('°', '') + + # if '.' in title: + # title = title.replace('.', '') + + # if ':' in title: + # title = title.split(':', 1)[1].strip() + + # if ')' in title : #and '(' not in title: + # colon_count1 = title.count(')') + # colon_count2 = title.count('(') + # if colon_count1 == colon_count2: + # continue + + # else: + # # print(colon_count1) + # # print(title) + # title = title.split(')', 1)[1].strip() + # # print(title) + # # break + # i['persian'] = title + + # save_orjson( + # data=input_per, + # path='/home1/ava3/project/aiDataParser/task/france_translate/all_pr_fr_title_cleaned_1.json' + # ) + ########################################################################## + # input_path1 = "/home1/ava3/project/aiDataParser/task/france_translate/all_title_persian/temp1.json" + # data_ = load_orjson(input_path1) + # data_1 = [] + # unvalid = [] + # for i in data_: + # # if len(i['persian']) > 0 and isinstance(i['persian'], list) and bool(i['persian'][0] != None) and bool(i['persian'][0] != ''): + # data_1.append( + # { + # "id": str(i["id"]), + # # "ai_code_version": str(i["ai_code_version"]), + # 'persian' : i['fa'] + # } + # ) + # # else: + # # unvalid.append(str(i["id"])) + + # input_path2 = "/home1/ava3/project/aiDataParser/task/france_translate/input_fr_title.json" + # data_2 = load_orjson(input_path2) + # data_2 = [ + # { + # "id": str(i["id"]), + # "fr": str(i["fr"]) + # } + # for i in data_2 # if str(i['id']) in unvalid + # ] + + # # print( + # # f'-- data_1 {len(data_1)}\n', + # # f'-- data_2 {len(data_2)}\n', + # # ) + + # f_data = [] + # for i in data_1: + # form = {} + # for j in data_2: + # if i['id'] == j['id'] and i['id'] not in unvalid: + # form['id'] = j['id'] + # form['france'] = j['fr'] + # form['persian'] = i['persian'] + # # form['ai_code_version'] = i['ai_code_version'] + # f_data.append( + # form + # ) + # break + + + # save_orjson( + # data=f_data, + # path="/home1/ava3/project/aiDataParser/task/france_translate/all_title_persian/temp2.json", + # ) + # save_orjson( + # data=data_2, + # path="/home1/ava3/project/aiDataParser/task/france_translate/all_title_persian/unvalid_step1.json", + # ) + + ########################################################################## + # input_path1 = '/home1/ava3/project/aiDataParser/task/france_translate/all_title_persian/setp2_per.json' + # data_1 = load_orjson(input_path1) + + # input_path2 = '/home1/ava3/project/aiDataParser/task/france_translate/input_fr_title.json' + # data_2 = load_orjson(input_path2) + + # undata = [] + # proccess_id = [] + # for i in data_2: + # proccess_id.append(str(i['id'])) + + # for i in data_1: + # if isinstance(i, dict): + # for key in i.keys(): + # if str(key) not in proccess_id: + # undata.append(i) + + # print(f'--- translated qwen {len(undata)}') + # # print(f'--- NOT translated {len(undata)}') + # # save_orjson( + # # data=undata, + # # path='/home1/ava3/project/aiDataParser/task/france_translate/all_title_persian/step2_trnsalte_unfinished.json' + # # ) + + ######################################### make keyword finall dataset ################################# + # data1 = load_orjson( + # "/home1/ava3/project/aiDataParser/task/keyword_extractor/input/valid_mj_qa_sections.json" + # ) + # data2 = load_orjson( + # "/home1/ava3/project/aiDataParser/task/keyword_extractor/output/merged_1.json" + # ) + # data2_map = {item["id"]: item for item in data2} + # clean_data = [] + # error_data = [] + # for i1 in data1: + # i2 = data2_map.get(i1["id"]) + # if i2 is None: + # error_data.append(i1) + # continue + # try: + # i1["keyword_list"] = i2["keyword_list"] + # i1["ai_code_version"] = i2["ai_code_version"] + # clean_data.append(i1) + # except Exception: + # error_data.append(i2) + # print( + # f'data1 {len(data1)}\n', + # f'data2 {len(data2)}\n', + # f'clean_data {len(clean_data)}\n', + # f'error_data {len(error_data)}\n', + # ) + + # save_orjson( + # path='/home1/ava3/project/aiDataParser/task/keyword_extractor/output/finall_dataset.json', + # data=clean_data + # ) + # save_orjson( + # path='/home1/ava3/project/aiDataParser/task/keyword_extractor/output/error_data.json', + # data=error_data + # ) + + + ########################################################################## + print(":D")