################# modularity ### import from external-package from fastapi import FastAPI, Request, HTTPException import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time, re from dotenv import load_dotenv from pathlib import Path from time import sleep from enum import Enum from typing import Dict, List, Tuple from collections import defaultdict from typing import Union, List from elasticsearch import Elasticsearch, helpers import requests, logging, asyncio, httpx, os, uuid, traceback, orjson, copy, uvicorn, time from pathlib import Path from time import sleep import re import unicodedata import httpx import json ### import from internal-file from router.bale.base_model import * from router.bale.bale_buttons import * from router.bale.bale_massages import * from core.static import * ############## Global-Params DATA_DIR = os.path.join(".", "_data_json") if not os.path.exists(DATA_DIR): os.makedirs(DATA_DIR) PERSIAN_BOUNDARIES = set(" \n،.؟!؛:") # f"https://YOUR_DOMAIN.com from pydantic import BaseModel class DbRule(BaseModel): rule_id: str rule_content: str rule_type: str section_id: str section_content: str section_full_path :str qanon_id: str qanon_etebar: str qanon_title: str state_etebar: str class InputRule(BaseModel): rule_id: str rule_content: str rule_type: str section_id: str section_content: str class SemanticSearchP2P(BaseModel): in_rule: InputRule db_rule: DbRule score: float = 0 metadata: Dict class Formatter: """ Formatting options Bold : \s*TEXT*\s Italic : \s_TEXT_\s Link: [متن](آدرس‌لینک) ```‌[متن]‌توضیحات‌``` answerCallbackQuery -> {callback_query_id:str, text:str, show_alert:bool} setChatDescription -> {chat_id:str, description:str} editMessageText -> {chat_id, message_id, text } """ ########################################################### # توابع برای رفتار کلاس ########################################################### def __init__(self, max_len: int = 4000): self.max_len = max_len self._number_map = { "0": "0️⃣", "1": "1️⃣", "2": "2️⃣", "3": "3️⃣", "4": "4️⃣", "5": "5️⃣", "6": "6️⃣", "7": "7️⃣", "8": "8️⃣", "9": "9️⃣", } def __getattr__(self, name: str): # فقط برای روش‌های مجاز (مثل bold, number) واکنش نشان بده if name == "bold": return self._bold if name == "number": return self._number if name == "format_text": return self._pretier1 raise AttributeError( f"'{self.__class__.__name__}' object has no attribute '{name}'" ) ########################################################### # توابع استایل دهی ########################################################### def _bold(self, _string: str) -> str: return f" *{_string}* " def _number(self, value: Union[int, str]) -> Union[str, int]: """ اگر int بود، تبدیل به str برای پردازش اگر رشته‌ای بود که فقط از ارقام تشکیل شده → تبدیل اگر رشته بود اما عدد نبود → خودش را برگردان هر نوع دیگری → بدون تغییر برگردان تبدیل هر رقم به ایموجی مربوطه """ if isinstance(value, int): num_str = str(value) elif isinstance(value, str): if value.isdigit(): num_str = value else: return value else: return value return "".join( self._number_map.get(d, d) for d in num_str[::-1] ) # handle array of number def _pretier1(self, text: str) -> str: """ مشکل : عدم تشخیص پاراگراف عدم تشخیص اعداد پشت سر هم با - و فاصله عدم تشخیص اعداد 1 0 - با یک فاصله از اول هستند """ pattern = r"(? List: """ خروجی به صورت چانک بدون دکمه هر خروجی لینک دارد برش امن لینک ها و اده ها """ chunks = [] current = f"برای پرسش: {title}\n\n" for i, data in enumerate(sections, start=1): sec_text = data.get("content", "") idx = data.get("id") # ساخت ref کامل ref = self.__make_link_qs(src=idx) # متن کامل آیتم block = ( f"{self.number(i)} {sec_text}\n{ref}\n\n" # self.format_text(sec_text) ) # اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید if len(current) + len(block) > self.max_len: chunks.append(current.rstrip()) current = "" current += block # آخرین چانک را هم اضافه کن if current.strip(): chunks.append(current.rstrip()) return chunks def form_law_chat(self, answer_text: str): """ answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد sources: مثل ['qs2117427'] """ # الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد # مثلا: (qs123) یا (qs123, qs456, qs789) pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)" def replace_source(m): content = m.group(1) codes = [c.strip() for c in content.split(",")] # جداسازی چند کد links = [make_link_qs(src=code) for code in codes] full_match = m.group(0) # if "منبع" in full_match: # print(f'Found explicit source(s): {links}') # else: # print(f'Found implicit source(s): {links}') return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان # جایگزینی در متن answer_text = re.sub(pattern, replace_source, answer_text) # اگر طول کمتر از MAX_LEN بود → تمام if len(answer_text) <= MAX_LEN: return [answer_text] # تقسیم متن اگر طول زیاد شد chunks = [] current = "" sentences = answer_text.split(". ") for sentence in sentences: st = sentence.strip() if not st.endswith("."): st += "." if len(current) + len(st) > MAX_LEN: chunks.append(current.strip()) current = "" current += st + " " if current.strip(): chunks.append(current.strip()) return chunks async def form_title_repeated(self, data: List[Dict[str, str]]): if len(data) == 0: return ["هیچ عنوان تکراری و یا حتی مشابه یافت نشد."] chunks = [] current = "نزدیک‌ترین عناوین مشابه عنوان قانون موارد زیر می باشد:\n\n" for i, item in enumerate(data, start=1): title = item.get("title", "") sec_id = item.get("id", "") score = item.get("score", "") if not title or not sec_id: continue ref = self.__make_link_qq(src=sec_id) # بلوک کامل: عنوان + لینک — هر دو در یک بلوک غیرقابل تقسیم # block = f"{i}. {title}(وزن {score})\n{ref}\n" block = ( f"{self.number(i)} {self.bold(title)}؛ میزان تشابه: %{score} ؛{ref}\n" ) # اگر اضافه کردن این بلوک باعث overflow شود → چانک قبلی را ذخیره و current را ریست کن if len(current) + len(block) > self.max_len and current.strip(): chunks.append(current.rstrip()) current += block # ذخیره آخرین چانک if current.strip(): chunks.append(current.rstrip()) return chunks def replace_source(self, m): content = m.group(1) codes = [c.strip() for c in content.split(",")] # جداسازی چند کد links = [self.__make_link_qs(src=code) for code in codes] full_match = m.group(0) # if "منبع" in full_match: # print(f'Found explicit source(s): {links}') # else: # print(f'Found implicit source(s): {links}') return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان async def form_chat(self, llm_text: str, header: str): """ answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد """ # الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد # مثلا: (qs123) یا (qs123, qs456, qs789) pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)" # جایگزینی در متن answer_text = re.sub(pattern, self.replace_source, llm_text) # اگر طول کمتر از MAX_LEN بود → تمام if len(answer_text) <= self.max_len: return [header + answer_text] # تقسیم متن اگر طول زیاد شد chunks = [] current = header sentences = answer_text.split(". ") for sentence in sentences: st = sentence.strip() if not st.endswith("."): st += "." if len(current) + len(st) > self.max_len: chunks.append(current.strip()) current = "" current += st + " " if current.strip(): chunks.append(current.strip()) return chunks async def form_llm_answer_chat(self, _input, header): if len(_input) > 0: return await self.form_chat(llm_text=_input["text"], header=header) # _input['source'] return ["هیچ ماده مرتبطی یافت نشد!"] async def form_subject_unity(self, _input:Union[List[RuleRelation], str], header="نتایج اولیه مغایرت های احتمالی :\n" ): if isinstance(_input, str): _input = self.form_law_chat(_input) return _input, [], [] else: chunks = [] buttons = [] seen_qanon_titles = set() groups = defaultdict(set) for item in _input: title = item.db_rule.qanon_title groups[title].add(item.db_rule.section_id) current = header for idx, (qanon_title, section_ids) in enumerate(groups.items(), start=1): block_lines = [f"{self.number(idx)} در قانون {self.bold(qanon_title)}"] sample_items_by_section = {} for item in _input: if item.db_rule.qanon_title == qanon_title and item.db_rule.section_id in section_ids: sid = item.db_rule.section_id if sid not in sample_items_by_section: sample_items_by_section[sid] = item for sub_idx, section_id in enumerate(sorted(section_ids), start=1): item = sample_items_by_section[section_id] # representative item link = self.__make_link_qs(src=section_id) unity = item.subject_unity if not unity: block_lines.append("\t\t—") continue if unity.has_subject_unity == "yes": block_lines.append(f"توضیح {sub_idx} بر اساس {link}:") block_lines.append(f"\t{unity.reasoning or ''}") elif unity.has_subject_unity == "yes_under_assumptions": block_lines.append(f"توضیح {sub_idx} بر اساس {link}:") block_lines.append(f"\t{unity.reasoning or ''}") block_lines.append("\tتوضیحات بیشتر (فرضیات لازم):") block_lines.append(f"\t{unity.required_assumptions or ''}") if len(block_lines) > 2: block = "\n".join(block_lines) + "\n" else: continue # Auto-chunk based on length if len(current) + len(block) > MAX_LEN and current != header: chunks.append(current.rstrip()) current = header current += block # Button: add *once* per qanon_title if qanon_title and qanon_title not in seen_qanon_titles: seen_qanon_titles.add(qanon_title) buttons.append([ { "text": f"بررسی مغایرت با {qanon_title}", "callback_data": f"subject_unities:qq:{qanon_title}" } ]) # Final flush if current.strip() and (len(chunks) == 0 or current.strip() != header.rstrip()): chunks.append(current.rstrip()) input_dict = {item.db_rule.section_id : item for item in _input} mapping_data = defaultdict(list) for k, v in groups.items(): for i in v: mapping_data[k].append(input_dict[i]) return chunks, buttons, mapping_data async def form_rule_making( self, _input, header="گزاره های حقوقی زیر استخراج شد:\n\n" ): if len(_input) > 0: chunks = [] current = header for i, item in enumerate(_input, start=1): block = f'{self.number(i)} {item["rule_content"]}\n' if len(current) + len(block) > self.max_len and current.strip(): chunks.append(current.rstrip()) current += block if current.strip(): chunks.append(current.rstrip()) return chunks return ["هیچ گزاره حقوقی یافت و استخراج نشد!"] def get_asl(self, _in: str): return _in.replace("qs_", "اصل ") def get_in_form_single(self, asl: str, _in_dict: Dict, _id: int) -> str: f_list = [] if _in_dict["is_conflict"]: f_list += [f"{_id}. *{self.get_asl(asl)}*: ❌ دارای مغایرت ❌"] f_list += [ f"موضوع این اصل قانون اساسی {_in_dict['principle_subject']} می باشد." ] f_list += [f"موضوع متن ورودی شامل {_in_dict['text_subject']} است."] if _in_dict["has_subject_relation"] == True: unity_text = "می باشد" if _in_dict["has_subject_relation"] == False: unity_text = "نمی باشد" f_list += [f"دارای وحدت در موضوع {unity_text}."] if _in_dict["conflict_type"] != "": f_list += [f"نوع مغایرت تشحیص داده شده: {_in_dict['conflict_type']}"] else: f_list += [f"{_id}. *{self.get_asl(asl)}*: ✅ عدم مغایرت ✅"] f_list += [f"توضیحات: {_in_dict['legal_reasoning']}"] f_list += ["\n\n"] return "\n".join(f_list) async def form_constitution(self, input: Dict): """ """ chunks = [] header = "*نتیجه بررسی مغایرت با اصول مهم قانون اساسی*:\n\n" current = header _id = 1 for k, v in input.items(): block = self.get_in_form_single(asl=k, _in_dict=v, _id=_id) # اگر این بلاک جا نشد → چانک جدید if len(current) + len(block) > self.max_len: chunks.append(current.rstrip()) current = header + block else: current += block _id += 1 # آخرین چانک if current.strip(): chunks.append(current.rstrip()) return chunks async def form_ss_rules(self, _input:List[Dict], header): if len(_input) > 1: chunks = [] current = header _i = 0 # -------- 1. group by qanon_id / qanon_title groups = defaultdict(set) for item in _input: key = item['db_rule']['qanon_title'] groups[key].add(item['db_rule']['section_id']) for qanon_title, ids in groups.items(): _i += 1 links = "و ".join([self.__make_link_qs(id) for id in ids]) block = f"{self.number(_i)} در قانون {self.bold(qanon_title)} تشابه با گزاره های حقوقی ماده:{links}\n\n" if len(current) + len(block) > self.max_len: if current: chunks.append(current) current = header + block else: current += block if current and current != header: chunks.append(current) return chunks return ["هیچ ماده مرتبطی یافت نشد!"] async def form_conflict_detection(self, _input:RuleRelation, header="نتیجه تشخیص مغایرت :\n" ): current = header # ساخت لینک # _link = self.__make_link_qs(src=_input.db_rule.section_id) current += f"به صورت خلاصه {_input.conflict_detection.has_confict}\n" current += f"توضیحات : {_input.conflict_detection.explanation_of_conflict}\n" return current async def form_conflict_type_detection(self, _input:RuleRelation, header="نتیجه تشخیص نوع مغایرت :\n" ): current = header # ساخت لینک # _link = self.__make_link_qs(src=_input.db_rule.section_id) current += f"به صورت خلاصه {_input.conflict_type_detection.conflict_type}\n" current += f"توضیحات : {_input.conflict_type_detection.explanation_of_type}\n" return current async def form_relation_identification(self, _input:RuleRelation, header="نتیجه رابطه مغایرت :\n" ): current = header # ساخت لینک # _link = self.__make_link_qs(src=_input.db_rule.section_id) current += f"به صورت خلاصه {_input.relation_identification.relation_type}\n" current += f"توضیحات : {_input.relation_identification.reasoning}\n" return current async def form_evaluation(self, _input:Evaluation, header="نتیجه نهایی بررسی مغایرت :\n" ): current = header # ساخت لینک # _link = self.__make_link_qs(src=_input.db_rule.section_id) current += f"1. آیا ارزیابی وحدت موضوع صحیح است؟ {_input.is_subject_unity_assessment_correct}\n" current += f"2. آیا ارزیابی تشخیص نوع درست است ؟ {_input.is_conflict_detection_correct}\n" current += f"3. آیا ارزیابی نوع درست است ؟ {_input.is_conflict_type_detection_correct}\n" current += f"4. رابطه مغایرت چطور؟ {_input.is_relation_type_detection_correct}\n" current += f"5. نوع رابطه ؟ {_input.valid_relation_type}\n" current += f"6.توضیح بیشتر: {_input.comments}\n" return current async def from_law_writing_policy(self, _input_dict: Dict, header:str) -> List[str]: f_list = [ self.bold(header)] _str = { "analyze": "گزارش تحلیلی بندبه‌بند", "strength": "بیان نقاط قوت", "weakness": "بیان نقاط ضعف و ریسک‌های تقنینی", "conclusion_score": "جمع‌بندی نهایی شامل میزان انطباق کلی (عالی / متوسط / ضعیف)", "suggestions": "ارائه پیشنهادهای اصلاحی مشخص و عملی", } for k, v in _input_dict.items(): _title = _str[k] _title = "*" + _title + "*" f_list += [_title] # f_list += ['\n'] f_list += [v] f_list += ['\n'] return ["\n".join(f_list)] """ deleteMessage message_id chat_id """ class RequestManager: def __init__(self, host_url:str, url_time_out=120, step_time_out=60, ): self.host_url = host_url self.url_time_out = url_time_out self.step_time_out = step_time_out TASK_URL ={ # stream "":"/stream/chat_logical", # none-stream "":"/conflict/general_policy/qs_unity", "":"/conflict/all_qanon/qs_unity", "":"/conflict/general_policy/unity_eval", "":"/conflict/law_writing_policy", "":"/conflict/constitution", "":"/rule_making", "":"/chat", "":"/talk", "":"/semantic_search/chat_logical", "":"/semantic_search/run_chat", "":"/semantic_search/run_semantic_search", } async def get_result( self, payload, url :str, section_id:str='qs_10001', mode_type='bale', ): _url = self.host_url+url print( f'get_result _url {_url}' ) try: async with httpx.AsyncClient(timeout=self.url_time_out) as client: response = await client.post( url=_url, json=payload ) response.raise_for_status() data = response.json() result = data.get("result", "❌ پاسخی دریافت نشد") return result except Exception as e: print(f"❌ خطای RAG:\n{str(e)}") return "❌ ارتباط با سرور قطع می‌باشد" async def stream_result( self, url :str, payload : Dict, ): """ هر مرحله شامل: { step : "اسم مرحله" data : "داده در این مرحله" } """ timeout = httpx.Timeout(self.step_time_out, read=self.url_time_out) _url = self.host_url+url async with httpx.AsyncClient(timeout=timeout) as client: # ارسال درخواست به صورت Stream async with client.stream( "POST", url=_url, json=payload ) as r: # بررسی وضعیت پاسخ if r.status_code != 200: print(f"Error: {r.status_code}") return # خواندن خط به خط (هر خط یک JSON است که سرور Yield کرده) async for line in r.aiter_lines(): if line.strip(): # جلوگیری از پردازش خطوط خالی try: # تبدیل متن JSON به دیکشنری پایتون step_data = json.loads(line) yield step_data except json.JSONDecodeError: print(f"Failed to decode: {line}") def unique_id(prefix="wai_") -> str: return f"{prefix}{uuid.uuid4().hex[:16]}" def load_orjson(path: str | Path): path = Path(path) with path.open("rb") as f: # باید باینری باز بشه برای orjson return orjson.loads(f.read()) def save_orjson(path, data): with open(path, "wb") as f: f.write( orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS) ) def split_text_chunks(text: str): """Split a long text into safe chunks.""" return [text[i : i + MAX_LEN] for i in range(0, len(text), MAX_LEN)] class ElasticHelper: """ کلاس ElasticHelper: نوع ورودی: بدون ورودی مستقیم در تعریف کلاس نوع خروجی: شیء از نوع ElasticHelper عملیات: - متغیرهای کلاسی برای شمارش و مدیریت عملیات تعریف می‌کند - مسیر پیش‌فرض مپینگ‌ها را تنظیم می‌کند """ counter = 0 total = 0 id = "" path_mappings = os.getcwd() + "/repo/_other/" def __init__( self, es_url="http://127.0.0.1:6900", es_pass="", es_user="elastic", path_mappings="", ): """ نوع ورودی: - es_url: آدرس Elasticsearch (str) - پیش‌فرض "http://127.0.0.1:6900" - es_pass: رمز عبور (str) - پیش‌فرض خالی - es_user: نام کاربری (str) - پیش‌فرض "elastic" - path_mappings: مسیر مپینگ‌ها (str) - پیش‌فرض خالی نوع خروجی: شیء ElasticHelper عملیات: - اتصال به Elasticsearch را برقرار می‌کند - در صورت وجود رمز عبور، از احراز هویت استفاده می‌کند - تا 10 بار برای اتصال مجدد تلاش می‌کند (هر بار 5 ثانیه انتظار) - در صورت عدم موفقیت، پیام خطا نمایش داده می‌شود """ if path_mappings: self.path_mappings = path_mappings if es_pass == "": self.es = Elasticsearch(es_url) else: self.es = Elasticsearch( es_url, basic_auth=(es_user, es_pass), verify_certs=False, ) # print(es_url) # print(self.es) self.success_connect = False for a in range(0, 10): try: if not self.es.ping(): print("Elastic Connection Not ping, sleep 30 s : ", a) sleep(5) continue else: self.success_connect = True break except Exception as e: break if not self.success_connect: print("******", "not access to elastic service") return self.counter = 0 self.total = 0 self.id = "" def search(self, **params): try: res = self.es.search(**params) except: return {"hits": {"hits": []}} return res def get_document(self, index_name, id): res = self.es.get(index=index_name, id=id) return res def exist_document(self, index_name, id): res = self.es.exists(index=index_name, id=id) return res def update_index_doc(self, is_update_state, index_name_o, eid, data): """ نوع ورودی: - is_update_state: تعیین عملیات (update یا index) (bool) - index_name_o: نام اندیس (str) - eid: شناسه سند (str) - data: داده‌های سند (dict) نوع خروجی: پاسخ Elasticsearch (dict) عملیات: - اگر is_update_state=True باشد: سند را آپدیت می‌کند - در غیر این صورت: سند جدید ایجاد می‌کند """ if is_update_state: resp = self.es.update(index=index_name_o, id=eid, doc=data) # resp = self.es.update(index=index_name_o, id=eid, body={'doc':data}) else: resp = self.es.index(index=index_name_o, id=eid, document=data) return resp def make_link_qq(src, ref_text=REF_TEXT): return f"[{ref_text}]({QQ_WEB_LINK}{src})" def make_link_qs(src, ref_text=REF_TEXT): return f"[{ref_text}]({QS_WEB_LINK}{src})" def encode_uc(update: BaleUpdate) -> str: if update.message: user = update.message.from_user chat = update.message.chat elif update.callback_query: user = update.callback_query.from_user chat = update.callback_query.message.chat else: return "unknown" username = user.username or user.id chat_id = chat.id # ✅ فقط chat_id return f"{username}:{chat_id}" def decode_uc(uc_id: str) -> dict: """ ورودی: 'username:chat_id' یا 'user_id:chat_id' خروجی: {'username': ..., 'chat_id': ...} """ try: username, chat_id = uc_id.split(":", 1) return (username, int(chat_id) if chat_id.isdigit() else chat_id) except ValueError: raise ValueError(f"decode_uc") async def get_from_gpl(in_dict: Dict) -> List[str]: f_list = [] _str = { "analyze": "گزارش تحلیلی بندبه‌بند", "strength": "بیان نقاط قوت", "weakness": "بیان نقاط ضعف و ریسک‌های تقنینی", "conclusion_score": "جمع‌بندی نهایی شامل میزان انطباق کلی (عالی / متوسط / ضعیف)", "suggestions": "ارائه پیشنهادهای اصلاحی مشخص و عملی", } for k, v in in_dict.items(): _title = _str[k] _title = "*" + _title + "*" f_list += [_title] # f_list += ['\n'] f_list += [v] f_list += ["\n"] return ["\n".join(f_list)] def cer(ref: str, hyp: str) -> float: m, n = len(ref), len(hyp) dp = list(range(n + 1)) for i in range(1, m + 1): prev, dp[0] = dp[0], i for j in range(1, n + 1): cur = dp[j] dp[j] = min( dp[j] + 1, # deletion dp[j - 1] + 1, # insertion prev + (ref[i - 1] != hyp[j - 1]) # substitution ) prev = cur return (dp[n] / m) * 100 import nltk from nltk.metrics import edit_distance def cer_ntlk(exist: str, new: str) -> float: """ این روش دقیق‌تر است، چون تعداد کاراکترهای اضافی یا کم در متن طولانی، CER را به شکل اغراق‌آمیز کاهش نمی‌دهد، بلکه روی شباهت معنایی و واژه‌ای تمرکز می‌کند. """ # edit distance روی کلمات return round(float(1 - edit_distance(new, exist) / len(exist)) * 100, 2) async def title_repeated( qanontitle, search_range: int = 10, url=f"http://localhost:8010/v1/indices/qaqanon/search" ): """ - باید با سرویس از حاج آقا گرفته شود Fetch similar titles from the custom Elasticsearch-like REST API. """ # "/majles/similar/title/qaqanon/0/10/none" # headers["Authorization"]="GuestAccess" headers = {"accept": "application/json", "Content-Type": "application/json"} body = { "query": qanontitle, # "from_": 0, "size": search_range+10, "track_total_hits": True, } response = requests.request("POST", url, headers=headers, json=body, timeout=20) if response.status_code != 200: print("ERROR:", response.status_code) print(response.text) else: data = response.json() ids = [] # print('---------------------------------------> max_score', max_score) # print(data["hits"]) for i in data["hits"]["hits"]: title = i["_source"]["title"] ids.append( {"title": title, "id": i["_source"]["id"], "score" :cer_ntlk(exist=title, new=qanontitle)} ) return sorted(ids, key=lambda x: x['score'], reverse=True)[:search_range] def normalize_persian(text: str) -> str: # حذف کنترل‌کاراکترها text = "".join(ch for ch in text if unicodedata.category(ch)[0] != "C") # حذف فاصله بین حروف فارسی text = re.sub(r"(?<=[آ-ی])\s+(?=[آ-ی])", "", text) # اصلاح فاصله قبل و بعد از علائم text = re.sub(r"\s+([،؛:؟!])", r"\1", text) text = re.sub(r"([،؛:؟!])\s*", r"\1 ", text) # فاصله‌های چندتایی text = re.sub(r"\s{2,}", " ", text) return text.strip() async def get_in_from_rule_making(_input): print(f"_input {_input}") o_put = "گزاره های حقوقی زیر استخراج شد:\n" for i, item in enumerate(_input, start=1): o_put += f'{i}. {item["rule_content"]}\n' return o_put async def get_in_from_title_repeated(data: List[Dict[str, str]]): if len(data) == 0: return ["هیچ عنوانی تکراری یافت نشد."] chunks = [] current = "نزدیک‌ترین عناوین مشابه عنوان قانون موارد زیر می باشد::\n\n" for i, item in enumerate(data, start=1): title = item.get("title", "").strip() sec_id = item.get("id", "").strip() score = item.get("score", "") if not title or not sec_id: continue ref = make_link_qq(src=sec_id) # بلوک کامل: عنوان + لینک — هر دو در یک بلوک غیرقابل تقسیم # block = f"{i}. {title}(وزن {score})\n{ref}\n" block = f"{i}. {title}\n{ref}\n" # اگر اضافه کردن این بلوک باعث overflow شود → چانک قبلی را ذخیره و current را ریست کن if len(current) + len(block) > 4000 and current.strip(): chunks.append(current.rstrip()) current += block # ذخیره آخرین چانک if current.strip(): chunks.append(current.rstrip()) return chunks async def get_multi_qs_advance_gp( _inputs: List[RuleRelation], ): chunks = ["لطفا یک شماره را جهت بررسی جزئی تر مغایرت انتخاب کنید: "] buttons = [] for i, item in enumerate(_inputs, start=1): unity = item.subject_unity section_id = item.db_rule.section_id if unity: if unity.has_subject_unity != "no": buttons.append( [ { "text": f"{i} بررسی مورد ", "callback_data": f"advanced_check_conflict_qsids:{section_id}", } ] ) return chunks, buttons async def get_form_gp_p1( _inputs: Union[List[RuleRelation], List], ): f_result = [] if not _inputs or len(_inputs) <= 1: f_result += ["قوانین مرتبط زیر از سیاستهای کلی نظام یافت شد : "] for i, item in enumerate(_inputs, start=1): link = make_link_qs(src=item.db_rule.section_id) f_result += [f"{i}. {item.subject_unity.reasoning}. {link}"] f_result += [ "و بعد از بررسی گزاره های حقوقی هر یک با متن شما ، موضوعات مرتبط مستقیم یافت نشد" ] return ["\n".join(f_result)], [] # -------- 1. group by qanon_id / qanon_title groups = defaultdict(list) for item in _inputs: key = item.db_rule.qanon_id or item.db_rule.qanon_title groups[key].append(item) # -------- 2. build output per group for qanon_key, items in groups.items(): chunks = [] buttons = [] qanon_title = items[0].db_rule.qanon_title or "قانون نامشخص" current = f"موضوعات مرتبط در قانون *{qanon_title}*:\n\n" for i, item in enumerate(items, start=1): unity = item.subject_unity link = make_link_qs(src=item.db_rule.section_id) lines = [] if unity: if unity.has_subject_unity == "yes": lines.append(f"{i}- " + unity.reasoning or "") lines.append(link) elif unity.has_subject_unity == "yes_under_assumptions": lines.append(f"{i}- " + unity.reasoning or "") lines.append("مشروط به فرض زیر :") lines.append("\t" + unity.required_assumptions or "") lines.append(link) block = "\n".join(lines) + "\n\n" if len(current) + len(block) > MAX_LEN and current.strip(): chunks.append(current.rstrip()) current = "" current += block if current.strip(): chunks.append(current.rstrip()) # -------- 3. one button per law buttons.append( [ { "text": f"بررسی وجود مغایرت", "callback_data": f"advanced_check_conflict_qqids:{qanon_key}", } ] ) f_result.append([chunks, buttons]) return f_result async def get_form_gp_old(_inputs: Union[List[RuleRelation], List]): chunks = [] _button = [] print(f"_inputs {_inputs}") if len(_inputs) > 1: current = "نتایج اولیه مغایرت های احتمالی :\n" for i, item in enumerate(_inputs, start=1): # ساخت لینک _link = make_link_qs(src=item.db_rule.section_id) # ساخت بلوک متنی کامل مربوط به این item — بدون قطع شدن lines = [f"{i}. {item.db_rule.qanon_title} \n{_link}"] unity = item.subject_unity print(f"unity.has_subject_unity {unity.has_subject_unity}") _qs_title = item.db_rule.qanon_title + "-" + str(i) if unity.has_subject_unity == "yes": print(f"yes") lines.append("توضیح:") lines.append(unity.reasoning or "") elif unity.has_subject_unity == "yes_under_assumptions": print(f"yes_under_assumptions") lines.append("توضیح:") lines.append(unity.reasoning or "") lines.append("توضیحات بیشتر (فرضیات لازم):") lines.append(unity.required_assumptions or "") block = "\n".join(lines) + "\n" if len(current) + len(block) > MAX_LEN and current.strip(): # قبلی را ذخیره کن chunks.append(current.rstrip()) current += block _button.append( [{"text": f"بررسی {_qs_title}", "callback_data": f"not_yet"}] ) if current.strip(): chunks.append(current.rstrip()) else: chunks = ["هیچ مغایرتی یافت نشد."] return chunks, _button async def get_form_gp_advanced(_input: RuleRelation): """ ما در نظر میگیریم که subject_unity را داریم """ finall = ["نتیجه:\n"] qs_id = _input.db_rule.section_id button = [] if _input.relation_identification: pass if _input.conflict_type_detection: pass if _input.conflict_detection: print("conflict_detection----------------------------") _end = "آیا میخواهید نتیجه بررسی تزاحم را ببینید؟" if _input.conflict_detection.has_confict == True: # finall.append( # 'باهم تعارض دارند !' # ) button.append( [ { "text": "بررسی نوع تعارض", "callback_data": f"advanced_check_conflict_qsids:{qs_id}", } ] ) finall += ["توضیحات"] finall += [_input.conflict_detection.explanation_of_conflict] finall += [_end] else: # finall.append( # 'باهم تعارض مستقیم ندارند' # ) finall += ["توضیحات"] finall += [_input.conflict_detection.explanation_of_conflict] finall = ["\n".join(finall)] return finall, button if _input.subject_unity: pass # _input.subject_unity.has_subject_unity # _input.subject_unity.required_assumptions # _input.subject_unity.reasoning return _input.model_dump() async def result_gp(text, url, effort="low") -> Dict: print( f"text {type(text)}\n-> {text}", ) try: async with httpx.AsyncClient(timeout=TIME_OUT) as client: response = await client.post( url, json={ "section_content": text, "effort": "medium", "mode_type": "bale", }, ) response.raise_for_status() response = response.json() data = response.get("result", "❌ پاسخی دریافت نشد") if isinstance(data, str): return data _output = [] for item in data: _output.append(RuleRelation.parse_obj(item)) # print('results_chat ',type(result)) return _output except Exception as e: print(f"❌ خطای RAG:\n{str(e)}") return "❌ ارتباط با سرور قطع می‌باشد" def extract_other_info(update: BaleUpdate) -> dict: other_info = {} if update.message: user = update.message.from_user elif update.callback_query: user = update.callback_query.from_user else: return other_info # خالی برگردان اگر هیچ‌کدام نبود # ایمن در برابر None other_info["username"] = user.username or "" other_info["first_name"] = user.first_name or "" other_info["last_name"] = getattr(user, "last_name", "") or "" return other_info def get_in_form(title: str, sections: list): chunks = [] current = f"برای پرسش: {title}\n\n" for i, data in enumerate(sections, start=1): sec_text = data.get("content", "") idx = data.get("id") # ساخت ref کامل ref = make_link_qs(src=idx) # متن کامل آیتم block = f"{i}: {sec_text}\n{ref}\n\n" # اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید if len(current) + len(block) > MAX_LEN: chunks.append(current.rstrip()) current = "" current += block # آخرین چانک را هم اضافه کن if current.strip(): chunks.append(current.rstrip()) return chunks def form_search_in_law(title: str, sections: List) -> List: chunks = [] current = f"برای پرسش: {title}\n\n" for i, data in enumerate(sections, start=1): sec_text = data.get("content", "") idx = data.get("id") # ساخت ref کامل ref = make_link_qs(src=idx) # متن کامل آیتم block = f"{i}: {sec_text}\n{ref}\n\n" # اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید if len(current) + len(block) > MAX_LEN: chunks.append(current.rstrip()) current = "" current += block # آخرین چانک را هم اضافه کن if current.strip(): chunks.append(current.rstrip()) return chunks def format_answer_bale(answer_text: str): """ answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد sources: مثل ['qs2117427'] """ # الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد # مثلا: (qs123) یا (qs123, qs456, qs789) pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)" def replace_source(m): content = m.group(1) codes = [c.strip() for c in content.split(",")] # جداسازی چند کد links = [make_link_qs(src=code) for code in codes] full_match = m.group(0) # if "منبع" in full_match: # print(f'Found explicit source(s): {links}') # else: # print(f'Found implicit source(s): {links}') return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان # جایگزینی در متن answer_text = re.sub(pattern, replace_source, answer_text) # اگر طول کمتر از MAX_LEN بود → تمام if len(answer_text) <= MAX_LEN: return [answer_text] # تقسیم متن اگر طول زیاد شد chunks = [] current = "" sentences = answer_text.split(". ") for sentence in sentences: st = sentence.strip() if not st.endswith("."): st += "." if len(current) + len(st) > MAX_LEN: chunks.append(current.strip()) current = "" current += st + " " if current.strip(): chunks.append(current.strip()) return chunks def form_answer_bale(answer_text: str): """ answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد sources: مثل ['qs2117427'] """ # الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد # مثلا: (qs123) یا (qs123, qs456, qs789) pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)" def replace_source(m): content = m.group(1) codes = [c.strip() for c in content.split(",")] # جداسازی چند کد links = [make_link_qs(src=code) for code in codes] full_match = m.group(0) # if "منبع" in full_match: # print(f'Found explicit source(s): {links}') # else: # print(f'Found implicit source(s): {links}') return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان # جایگزینی در متن answer_text = re.sub(pattern, replace_source, answer_text) # اگر طول کمتر از MAX_LEN بود → تمام if len(answer_text) <= MAX_LEN: return [answer_text] # تقسیم متن اگر طول زیاد شد chunks = [] current = "" sentences = answer_text.split(". ") for sentence in sentences: st = sentence.strip() if not st.endswith("."): st += "." if len(current) + len(st) > MAX_LEN: chunks.append(current.strip()) current = "" current += st + " " if current.strip(): chunks.append(current.strip()) return chunks def chunked_simple_text(answer_text): chunks = [] current = "" sentences = answer_text.split(". ") for sentence in sentences: st = sentence.strip() if not st.endswith("."): st += "." if len(current) + len(st) > MAX_LEN: chunks.append(current.strip()) current = "" current += st + " " if current.strip(): chunks.append(current.strip()) return chunks