mj_bale_chat/core/core.py
2026-01-25 11:29:33 +00:00

1438 lines
49 KiB
Python
Executable File
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

################# modularity
### import from external-package
import unicodedata, requests, logging, asyncio, httpx, os, json, uuid, traceback, orjson, copy, uvicorn, time, re
from pathlib import Path
from time import sleep
from collections import defaultdict
from typing import Dict, List, Tuple
from elasticsearch import Elasticsearch, helpers
from pathlib import Path
### import from internal-file
from router.bale.base_model import *
from router.bale.bale_buttons import *
from router.bale.bale_massages import *
from core.static import *
from core.base_model import *
############## Global-Params
DATA_DIR = os.path.join(".", "_data_json")
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
PERSIAN_BOUNDARIES = set(" \n،.؟!؛:")
class Formatter:
"""
Formatting options
Bold : \s*TEXT*\s
Italic : \s_TEXT_\s
Link: [متن](آدرس‌لینک)
```[متن]‌توضیحات‌```
answerCallbackQuery -> {callback_query_id:str, text:str, show_alert:bool}
setChatDescription -> {chat_id:str, description:str}
editMessageText -> {chat_id, message_id, text }
"""
###########################################################
# توابع برای رفتار کلاس
###########################################################
def __init__(self, max_len: int = 4000):
self.max_len = max_len
self.dash = "--------------------------------------------------------------------"
self._number_map = {
"0": "0",
"1": "1",
"2": "2",
"3": "3",
"4": "4",
"5": "5",
"6": "6",
"7": "7",
"8": "8",
"9": "9",
}
def __getattr__(self, name: str):
# فقط برای روش‌های مجاز (مثل bold, number) واکنش نشان بده
if name == "bold":
return self._bold
if name == "number":
return self._number
if name == "format_text":
return self._pretier1
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{name}'"
)
###########################################################
# توابع استایل دهی
###########################################################
def _bold(self, _string: str) -> str:
return f" *{_string}* "
def _number(self, value: Union[int, str]) -> Union[str, int]:
"""
اگر int بود، تبدیل به str برای پردازش
اگر رشته‌ای بود که فقط از ارقام تشکیل شده → تبدیل
اگر رشته بود اما عدد نبود → خودش را برگردان
هر نوع دیگری → بدون تغییر برگردان
تبدیل هر رقم به ایموجی مربوطه
"""
if isinstance(value, int):
num_str = str(value)
elif isinstance(value, str):
if value.isdigit():
num_str = value
else:
return value
else:
return value
return "".join(
self._number_map.get(d, d) for d in num_str[::-1]
) # handle array of number
def _pretier1(self, text: str) -> str:
"""
مشکل :
عدم تشخیص پاراگراف
عدم تشخیص اعداد پشت سر هم با - و فاصله
عدم تشخیص اعداد 1 0 - با یک فاصله از اول هستند
"""
pattern = r"(?<!ماده\s)(?<!تبصره\s)(\d|[٠-٩])\s*-\s*"
text = re.sub(pattern, r"\n\1 -", text)
if text.startswith("\n") or text.endswith("\n"):
text = text.strip("\n")
return text
###########################################################
# توابع اصلی
###########################################################
def __make_link_qq(self, src, ref_text=REF_TEXT):
return f" [{ref_text}]({QQ_WEB_LINK}{src}) "
def __make_link_qs(self, src, ref_text=REF_TEXT):
return f" [{ref_text}]({QS_WEB_LINK}{src}) "
###########################################################
# توابع فرمت و ساختار
###########################################################
def form_search_in_law_rules(self, header: str, body: List[SemanticSearchP2P], footer: str = None) -> List[str]:
"""
گروه‌بندی بر اساس in_rule.rule_id و ساخت بلوک‌های مجزا برای هر گروه.
"""
if footer:
footer = '\n' + footer
# گروه‌بندی بر اساس in_rule.rule_id
grouped = defaultdict(list)
for item in body:
key = item.in_rule.rule_id
grouped[key].append(item)
print(f'form_search_in_law_rules -> {len(grouped)}')
chunks = []
current = header
# برای هر گروه (یعنی یک in_rule.rule_id منحصربه‌فرد)
for group_id, items in grouped.items():
# فرض: همه‌ی in_rule در یک گروه، rule_content یکسانی دارند — از اولی استفاده می‌کنیم
in_rule_content = items[0].in_rule.rule_content
block_lines = [in_rule_content]
# لیست db_ruleها با شماره‌گذاری
for i, item in enumerate(items, start=1):
db_content = item.db_rule.rule_content
section_id = self.__make_link_qs(src=item.db_rule.section_id)
block_lines.append(f"{self.number(i)} گزاره: {db_content} در {section_id}")
block = "\n".join(block_lines) + "\n\n"
# بررسی سایز و تقسیم در صورت نیاز
if len(current) + len(block) > self.max_len:
if current.strip() != header.strip():
chunks.append(current.rstrip())
current = header + block # شروع چانک جدید با header دوباره (یا بدون header؟)
else:
current += block
# اضافه کردن آخرین چانک
if current.strip() and current.strip() != header.strip():
chunks.append(current.rstrip())
# footer
if footer and chunks:
last = chunks[-1]
if len(last) + len(footer) <= self.max_len:
chunks[-1] = last + footer
else:
chunks.append(footer)
return chunks
def form_search_in_law(self, header: str, sections: List[SingleSearchData], footer:str=None) -> List:
"""
خروجی به صورت چانک بدون دکمه هر خروجی لینک دارد
برش امن لینک ها و اده ها
"""
footer = '\n\n'+footer
chunks = []
current = header
for i, data in enumerate(sections, start=1):
sec_text = data.content
idx = data.id
# ساخت ref کامل
ref = self.__make_link_qs(src=idx)
# متن کامل آیتم
block = (
f"{self.number(i)} {sec_text}\n{ref}\n\n" # self.format_text(sec_text)
)
# اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید
if len(current) + len(block) > self.max_len:
chunks.append(current.rstrip())
current = ""
current += block
# آخرین چانک را هم اضافه کن
if current.strip():
chunks.append(current.rstrip())
if footer :
last = chunks[-1]
if len(last) + len(footer) <= self.max_len:
chunks[-1] = last + footer
else:
chunks.append(footer)
return chunks
def form_law_chat(self, answer_text: str):
"""
answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد
sources: مثل ['qs2117427']
"""
# الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد
# مثلا: (qs123) یا (qs123, qs456, qs789)
pattern = r"\((?:منبع[: ]+)?([a-zA-Z0-9_, ]+)\)"
def replace_source(m):
content = m.group(1)
codes = [c.strip() for c in content.split(",")] # جداسازی چند کد
links = [make_link_qs(src=code) for code in codes]
full_match = m.group(0)
# if "منبع" in full_match:
# print(f'Found explicit source(s): {links}')
# else:
# print(f'Found implicit source(s): {links}')
return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان
# جایگزینی در متن
answer_text = re.sub(pattern, replace_source, answer_text)
# اگر طول کمتر از MAX_LEN بود → تمام
if len(answer_text) <= MAX_LEN:
return [answer_text]
# تقسیم متن اگر طول زیاد شد
chunks = []
current = ""
sentences = answer_text.split(". ")
for sentence in sentences:
st = sentence.strip()
if not st.endswith("."):
st += "."
if len(current) + len(st) > MAX_LEN:
chunks.append(current.strip())
current = ""
current += st + " "
if current.strip():
chunks.append(current.strip())
return chunks
async def form_title_repeated(self, _input: List[TitleRepeat]) -> List:
if len(_input) == 0:
return ["هیچ عنوان تکراری و یا حتی مشابه یافت نشد."]
chunks = []
current = "نزدیک‌ترین عناوین مشابه عنوان قانون موارد زیر می باشد:\n\n"
for i, item in enumerate(_input, start=1):
if not item.title or not item.id:
continue
ref = self.__make_link_qq(src=item.id)
# بلوک کامل: عنوان + لینک — هر دو در یک بلوک غیرقابل تقسیم
# block = f"{i}. {title}(وزن {score})\n{ref}\n"
block = (
f"{self.number(i)} {self.bold(item.title)}؛ میزان تشابه: %{item.score} ؛{ref}\n"
)
# اگر اضافه کردن این بلوک باعث overflow شود → چانک قبلی را ذخیره و current را ریست کن
if len(current) + len(block) > self.max_len and current.strip():
chunks.append(current.rstrip())
current += block
# ذخیره آخرین چانک
if current.strip():
chunks.append(current.rstrip())
return chunks
def replace_source(self, m):
content = m.group(1)
codes = [c.strip() for c in content.split(",")] # جداسازی چند کد
links = [self.__make_link_qs(src=code) for code in codes]
full_match = m.group(0)
# if "منبع" in full_match:
# print(f'Found explicit source(s): {links}')
# else:
# print(f'Found implicit source(s): {links}')
return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان
async def form_chat(self, llm_text: str, header: str, footer: str=None):
"""
answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد
"""
if footer:
footer = '\n\n'+footer
# الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد
# مثلا: (qs123) یا (qs123, qs456, qs789)
pattern = r"\((?:منبع[: ]+)?([a-zA-Z0-9_, ]+)\)"
# جایگزینی در متن
answer_text = re.sub(pattern, self.replace_source, llm_text)
# اگر طول کمتر از MAX_LEN بود → تمام
if len(answer_text) <= self.max_len:
return [header + answer_text]
# تقسیم متن اگر طول زیاد شد
chunks = []
current = header
sentences = answer_text.split(". ")
for sentence in sentences:
st = sentence.strip()
if not st.endswith("."):
st += "."
if len(current) + len(st) > self.max_len:
chunks.append(current.strip())
current = ""
current += st + " "
if current.strip():
chunks.append(current.strip())
if footer and chunks:
last = chunks[-1]
if len(last) + len(footer) <= self.max_len:
chunks[-1] = last + footer
else:
chunks.append(footer)
return chunks
async def form_llm_answer_chat(self, _input, header):
if len(_input) > 0:
return await self.form_chat(llm_text=_input, header=header)
# _input['source']
return ["هیچ ماده مرتبطی یافت نشد!"]
async def form_subject_unity(
self,
_input: Union[List[RuleRelation], str],
header="نتایج اولیه مغایرت های احتمالی :\n",
):
if isinstance(_input, str):
_input = self.form_law_chat(_input)
return _input, [], []
else:
chunks = []
buttons = []
seen_qanon_titles = set()
groups = defaultdict(set)
for item in _input:
title = item.db_rule.qanon_title
groups[title].add(item.db_rule.section_id)
current = header
for idx, (qanon_title, section_ids) in enumerate(groups.items(), start=1):
block_lines = [f"{self.number(idx)} در قانون {self.bold(qanon_title)}"]
sample_items_by_section = {}
for item in _input:
if (
item.db_rule.qanon_title == qanon_title
and item.db_rule.section_id in section_ids
):
sid = item.db_rule.section_id
if sid not in sample_items_by_section:
sample_items_by_section[sid] = item
for sub_idx, section_id in enumerate(sorted(section_ids), start=1):
item = sample_items_by_section[section_id] # representative item
link = self.__make_link_qs(src=section_id)
unity = item.subject_unity
if not unity:
block_lines.append("\t\t")
continue
if unity.has_subject_unity == "yes":
block_lines.append(f"توضیح {sub_idx} بر اساس {link}:")
block_lines.append(f"\t{unity.reasoning or ''}")
elif unity.has_subject_unity == "yes_under_assumptions":
block_lines.append(f"توضیح {sub_idx} بر اساس {link}:")
block_lines.append(f"\t{unity.reasoning or ''}")
block_lines.append("\tتوضیحات بیشتر (فرضیات لازم):")
block_lines.append(f"\t{unity.required_assumptions or ''}")
if len(block_lines) > 2:
block = "\n".join(block_lines) + "\n"
else:
continue
# Auto-chunk based on length
if len(current) + len(block) > MAX_LEN and current != header:
chunks.append(current.rstrip())
current = header
current += block
# Button: add *once* per qanon_title
if qanon_title and qanon_title not in seen_qanon_titles:
seen_qanon_titles.add(qanon_title)
buttons.append(
[
{
"text": f"بررسی مغایرت با {qanon_title}",
"callback_data": f"subject_unities:qq:{qanon_title}",
}
]
)
# Final flush
if current.strip() and (
len(chunks) == 0 or current.strip() != header.rstrip()
):
chunks.append(current.rstrip())
input_dict = {item.db_rule.section_id: item for item in _input}
mapping_data = defaultdict(list)
for k, v in groups.items():
for i in v:
mapping_data[k].append(input_dict[i])
return chunks, buttons, mapping_data
async def form_rule_making(
self, _input, header="گزاره های حقوقی زیر استخراج شد:\n\n", footer=None
):
if len(_input) > 0:
if footer:
footer = '\n\n'+footer
chunks = []
current = header
for i, item in enumerate(_input, start=1):
block = f'{self.number(i)} {item["rule_content"]}\n'
if len(current) + len(block) > self.max_len and current.strip():
chunks.append(current.rstrip())
current += block
if current.strip():
chunks.append(current.rstrip())
if footer and chunks:
last = chunks[-1]
if len(last) + len(footer) <= self.max_len:
chunks[-1] = last + footer
else:
chunks.append(footer)
return chunks
return ["هیچ گزاره حقوقی یافت و استخراج نشد!"]
def get_asl(self, _in: str):
return _in.replace("qs_", "اصل ")
def get_in_form_single(self, asl: str, _in_dict: Dict, _id: int) -> str:
f_list = []
if _in_dict["is_conflict"]:
f_list += [f"{_id}. *{self.get_asl(asl)}*: ❌ دارای مغایرت ❌"]
f_list += [
f"موضوع این اصل قانون اساسی {_in_dict['principle_subject']} می باشد."
]
f_list += [f"موضوع متن ورودی شامل {_in_dict['text_subject']} است."]
if _in_dict["has_subject_relation"] == True:
unity_text = "می باشد"
if _in_dict["has_subject_relation"] == False:
unity_text = "نمی باشد"
f_list += [f"دارای وحدت در موضوع {unity_text}."]
if _in_dict["conflict_type"] != "":
f_list += [f"نوع مغایرت تشحیص داده شده: {_in_dict['conflict_type']}"]
else:
f_list += [f"{_id}. *{self.get_asl(asl)}*: ✅ عدم مغایرت ✅"]
f_list += [f"توضیحات: {_in_dict['legal_reasoning']}"]
f_list += ["\n\n"]
return "\n".join(f_list)
async def form_constitution(self, input: Dict):
""" """
chunks = []
header = "*نتیجه بررسی مغایرت با اصول مهم قانون اساسی*:\n\n"
current = header
_id = 1
for k, v in input.items():
block = self.get_in_form_single(asl=k, _in_dict=v, _id=_id)
# اگر این بلاک جا نشد → چانک جدید
if len(current) + len(block) > self.max_len:
chunks.append(current.rstrip())
current = header + block
else:
current += block
_id += 1
# آخرین چانک
if current.strip():
chunks.append(current.rstrip())
return chunks
async def form_constitution_low(self, input: Dict, _id, _header='نتایچ بررسی مغایرت با اصول مهم قانون اساسی\n\n'):
""" """
chunks = []
current = ""
for k, v in input.items():
block = self.get_in_form_single(asl=k, _in_dict=v, _id=_id)
# اگر این بلاک جا نشد → چانک جدید
if len(current) + len(block) > self.max_len:
chunks.append(current.rstrip())
current = _header + block
else:
current += block
# آخرین چانک
if current.strip():
chunks.append(current.rstrip())
return ''.join(chunks)
async def form_ss_rules(self, _input: List[Dict], header):
if len(_input) > 1:
chunks = []
current = header
_i = 0
# -------- 1. group by qanon_id / qanon_title
groups = defaultdict(set)
for item in _input:
key = item["db_rule"]["qanon_title"]
groups[key].add(item["db_rule"]["section_id"])
for qanon_title, ids in groups.items():
_i += 1
links = "و ".join([self.__make_link_qs(id) for id in ids])
block = f"{self.number(_i)} در قانون {self.bold(qanon_title)} تشابه با گزاره های حقوقی ماده:{links}\n\n"
if len(current) + len(block) > self.max_len:
if current:
chunks.append(current)
current = header + block
else:
current += block
if current and current != header:
chunks.append(current)
return chunks
return ["هیچ ماده مرتبطی یافت نشد!"]
async def form_conflict_detection(
self, _input: RuleRelation, header="نتیجه تشخیص مغایرت :\n"
):
current = header
# ساخت لینک
# _link = self.__make_link_qs(src=_input.db_rule.section_id)
current += f"به صورت خلاصه {_input.conflict_detection.has_confict}\n"
current += f"توضیحات : {_input.conflict_detection.explanation_of_conflict}\n"
return current
async def form_conflict_type_detection(
self, _input: RuleRelation, header="نتیجه تشخیص نوع مغایرت :\n"
):
current = header
# ساخت لینک
# _link = self.__make_link_qs(src=_input.db_rule.section_id)
current += f"به صورت خلاصه {_input.conflict_type_detection.conflict_type}\n"
current += f"توضیحات : {_input.conflict_type_detection.explanation_of_type}\n"
return current
async def form_relation_identification(
self, _input: RuleRelation, header="نتیجه رابطه مغایرت :\n"
):
current = header
# ساخت لینک
# _link = self.__make_link_qs(src=_input.db_rule.section_id)
current += f"به صورت خلاصه {_input.relation_identification.relation_type}\n"
current += f"توضیحات : {_input.relation_identification.reasoning}\n"
return current
async def form_evaluation(
self, _input: Evaluation, header="نتیجه نهایی بررسی مغایرت :\n"
):
current = header
# ساخت لینک
# _link = self.__make_link_qs(src=_input.db_rule.section_id)
current += f"1. آیا ارزیابی وحدت موضوع صحیح است؟ {_input.is_subject_unity_assessment_correct}\n"
current += f"2. آیا ارزیابی تشخیص نوع درست است ؟ {_input.is_conflict_detection_correct}\n"
current += f"3. آیا ارزیابی نوع درست است ؟ {_input.is_conflict_type_detection_correct}\n"
current += (
f"4. رابطه مغایرت چطور؟ {_input.is_relation_type_detection_correct}\n"
)
current += f"5. نوع رابطه ؟ {_input.valid_relation_type}\n"
current += f"6.توضیح بیشتر: {_input.comments}\n"
return current
async def from_law_writing_policy(
self, _input_dict: Dict, header: str
) -> List[str]:
f_list = [self.bold(header)]
_str = {
"analyze": "گزارش تحلیلی بندبه‌بند",
"strength": "بیان نقاط قوت",
"weakness": "بیان نقاط ضعف و ریسک‌های تقنینی",
"conclusion_score": "جمع‌بندی نهایی شامل میزان انطباق کلی (عالی / متوسط / ضعیف)",
"suggestions": "ارائه پیشنهادهای اصلاحی مشخص و عملی",
}
for k, v in _input_dict.items():
_title = _str[k]
_title = "*" + _title + "*"
f_list += [_title]
# f_list += ['\n']
f_list += [v]
f_list += ["\n"]
return ["\n".join(f_list)]
class RequestManager:
def __init__(
self,
host_url: str,
url_time_out=1200,
step_time_out=600,
):
if host_url.endswith('/'):
self.host_url = host_url
else:
self.host_url = host_url + '/'
self.url_time_out = url_time_out
self.step_time_out = step_time_out
TASK_URL = {
# stream
"": "/stream/chat_logical",
# none-stream
"": "/conflict/general_policy/qs_unity",
"": "/conflict/all_qanon/qs_unity",
"": "/conflict/general_policy/unity_eval",
"": "/conflict/law_writing_policy",
"": "/conflict/constitution",
"": "/rule_making",
"": "/chat",
"": "/talk",
"": "/semantic_search/chat_logical",
"": "/semantic_search/run_semantic_search",
"": "/semantic_search/run_chat",
}
async def get_result(
self,
payload,
url: str,
section_id: str = "qs_10001",
mode_type="bale",
):
if url.startswith('/'):
url = url[1:]
_url = self.host_url + url
print(f"get_result _url {_url}")
try:
async with httpx.AsyncClient(timeout=self.url_time_out) as client:
response = await client.post(url=_url, json=payload)
response.raise_for_status()
data = response.json()
result = data.get("result", "❌ پاسخی دریافت نشد")
return result
except Exception as e:
print(f"❌ خطای RAG:\n{str(e)}")
return "❌ ارتباط با سرور قطع می‌باشد"
async def stream_result(
self,
url: str,
payload: Dict,
):
"""
هر مرحله شامل:
{
step : "اسم مرحله"
data : "داده در این مرحله"
}
"""
if url.startswith('/'):
url = url[1:]
timeout = httpx.Timeout(self.step_time_out, read=self.url_time_out)
_url = self.host_url + url
async with httpx.AsyncClient(timeout=timeout) as client:
# ارسال درخواست به صورت Stream
async with client.stream("POST", url=_url, json=payload) as r:
# بررسی وضعیت پاسخ
if r.status_code != 200:
print(f"Error: {r.status_code}")
return
# خواندن خط به خط (هر خط یک JSON است که سرور Yield کرده)
async for line in r.aiter_lines():
if line.strip(): # جلوگیری از پردازش خطوط خالی
try:
# تبدیل متن JSON به دیکشنری پایتون
step_data = json.loads(line)
yield step_data
except json.JSONDecodeError:
print(f"Failed to decode: {line}")
def unique_id(prefix="wai_") -> str:
return f"{prefix}{uuid.uuid4().hex[:16]}"
def load_orjson(path: str | Path):
path = Path(path)
with path.open("rb") as f: # باید باینری باز بشه برای orjson
return orjson.loads(f.read())
def save_orjson(path, data):
with open(path, "wb") as f:
f.write(
orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS)
)
def split_text_chunks(text: str):
"""Split a long text into safe chunks."""
return [text[i : i + MAX_LEN] for i in range(0, len(text), MAX_LEN)]
class ElasticHelper:
"""
کلاس ElasticHelper:
نوع ورودی: بدون ورودی مستقیم در تعریف کلاس
نوع خروجی: شیء از نوع ElasticHelper
عملیات:
- متغیرهای کلاسی برای شمارش و مدیریت عملیات تعریف می‌کند
- مسیر پیش‌فرض مپینگ‌ها را تنظیم می‌کند
"""
counter = 0
total = 0
id = ""
path_mappings = os.getcwd() + "/repo/_other/"
def __init__(
self,
es_url="http://127.0.0.1:6900",
es_pass="",
es_user="elastic",
path_mappings="",
):
"""
نوع ورودی:
- es_url: آدرس Elasticsearch (str) - پیش‌فرض "http://127.0.0.1:6900"
- es_pass: رمز عبور (str) - پیش‌فرض خالی
- es_user: نام کاربری (str) - پیش‌فرض "elastic"
- path_mappings: مسیر مپینگ‌ها (str) - پیش‌فرض خالی
نوع خروجی: شیء ElasticHelper
عملیات:
- اتصال به Elasticsearch را برقرار می‌کند
- در صورت وجود رمز عبور، از احراز هویت استفاده می‌کند
- تا 10 بار برای اتصال مجدد تلاش می‌کند (هر بار 5 ثانیه انتظار)
- در صورت عدم موفقیت، پیام خطا نمایش داده می‌شود
"""
if path_mappings:
self.path_mappings = path_mappings
if es_pass == "":
self.es = Elasticsearch(es_url)
else:
self.es = Elasticsearch(
es_url,
basic_auth=(es_user, es_pass),
verify_certs=False,
)
# print(es_url)
# print(self.es)
self.success_connect = False
for a in range(0, 10):
try:
if not self.es.ping():
print("Elastic Connection Not ping, sleep 30 s : ", a)
sleep(5)
continue
else:
self.success_connect = True
break
except Exception as e:
break
if not self.success_connect:
print("******", "not access to elastic service")
return
self.counter = 0
self.total = 0
self.id = ""
def search(self, **params):
try:
res = self.es.search(**params)
except:
return {"hits": {"hits": []}}
return res
def get_document(self, index_name, id):
res = self.es.get(index=index_name, id=id)
return res
def exist_document(self, index_name, id):
res = self.es.exists(index=index_name, id=id)
return res
def update_index_doc(self, is_update_state, index_name_o, eid, data):
"""
نوع ورودی:
- is_update_state: تعیین عملیات (update یا index) (bool)
- index_name_o: نام اندیس (str)
- eid: شناسه سند (str)
- data: داده‌های سند (dict)
نوع خروجی: پاسخ Elasticsearch (dict)
عملیات:
- اگر is_update_state=True باشد: سند را آپدیت می‌کند
- در غیر این صورت: سند جدید ایجاد می‌کند
"""
if is_update_state:
resp = self.es.update(index=index_name_o, id=eid, doc=data)
# resp = self.es.update(index=index_name_o, id=eid, body={'doc':data})
else:
resp = self.es.index(index=index_name_o, id=eid, document=data)
return resp
def make_link_qq(src, ref_text=REF_TEXT):
return f"[{ref_text}]({QQ_WEB_LINK}{src})"
def make_link_qs(src, ref_text=REF_TEXT):
return f"[{ref_text}]({QS_WEB_LINK}{src})"
async def get_from_gpl(in_dict: Dict) -> List[str]:
f_list = []
_str = {
"analyze": "گزارش تحلیلی بندبه‌بند",
"strength": "بیان نقاط قوت",
"weakness": "بیان نقاط ضعف و ریسک‌های تقنینی",
"conclusion_score": "جمع‌بندی نهایی شامل میزان انطباق کلی (عالی / متوسط / ضعیف)",
"suggestions": "ارائه پیشنهادهای اصلاحی مشخص و عملی",
}
for k, v in in_dict.items():
_title = _str[k]
_title = "*" + _title + "*"
f_list += [_title]
# f_list += ['\n']
f_list += [v]
f_list += ["\n"]
return ["\n".join(f_list)]
def cer(ref: str, hyp: str) -> float:
m, n = len(ref), len(hyp)
dp = list(range(n + 1))
for i in range(1, m + 1):
prev, dp[0] = dp[0], i
for j in range(1, n + 1):
cur = dp[j]
dp[j] = min(
dp[j] + 1, # deletion
dp[j - 1] + 1, # insertion
prev + (ref[i - 1] != hyp[j - 1]), # substitution
)
prev = cur
return (dp[n] / m) * 100
import nltk
from nltk.metrics import edit_distance
def cer_ntlk(exist: str, new: str) -> float:
"""
این روش دقیق‌تر است، چون تعداد کاراکترهای اضافی یا کم در متن طولانی،
CER را به شکل اغراق‌آمیز کاهش نمی‌دهد، بلکه روی شباهت معنایی و واژه‌ای تمرکز می‌کند.
"""
# edit distance روی کلمات
return round(float(1 - edit_distance(new, exist) / len(exist)) * 100, 2)
def wer_nltk(new: str, exist: str) -> float:
new = new.split()
exist = exist.split()
return round(float(1 - edit_distance(new, exist) / len(exist)) * 100, 2)
def normalize_persian(text: str) -> str:
# حذف کنترل‌کاراکترها
text = "".join(ch for ch in text if unicodedata.category(ch)[0] != "C")
# حذف فاصله بین حروف فارسی
text = re.sub(r"(?<=[آ-ی])\s+(?=[آ-ی])", "", text)
# اصلاح فاصله قبل و بعد از علائم
text = re.sub(r"\s+([،؛:؟!])", r"\1", text)
text = re.sub(r"([،؛:؟!])\s*", r"\1 ", text)
# فاصله‌های چندتایی
text = re.sub(r"\s{2,}", " ", text)
return text.strip()
async def get_in_from_rule_making(_input):
print(f"_input {_input}")
o_put = "گزاره های حقوقی زیر استخراج شد:\n"
for i, item in enumerate(_input, start=1):
o_put += f'{i}. {item["rule_content"]}\n'
return o_put
async def get_in_from_title_repeated(data: List[Dict[str, str]]):
if len(data) == 0:
return ["هیچ عنوانی تکراری یافت نشد."]
chunks = []
current = "نزدیک‌ترین عناوین مشابه عنوان قانون موارد زیر می باشد::\n\n"
for i, item in enumerate(data, start=1):
title = item.get("title", "").strip()
sec_id = item.get("id", "").strip()
score = item.get("score", "")
if not title or not sec_id:
continue
ref = make_link_qq(src=sec_id)
# بلوک کامل: عنوان + لینک — هر دو در یک بلوک غیرقابل تقسیم
# block = f"{i}. {title}(وزن {score})\n{ref}\n"
block = f"{i}. {title}\n{ref}\n"
# اگر اضافه کردن این بلوک باعث overflow شود → چانک قبلی را ذخیره و current را ریست کن
if len(current) + len(block) > 4000 and current.strip():
chunks.append(current.rstrip())
current += block
# ذخیره آخرین چانک
if current.strip():
chunks.append(current.rstrip())
return chunks
async def get_multi_qs_advance_gp(
_inputs: List[RuleRelation],
):
chunks = ["لطفا یک شماره را جهت بررسی جزئی تر مغایرت انتخاب کنید: "]
buttons = []
for i, item in enumerate(_inputs, start=1):
unity = item.subject_unity
section_id = item.db_rule.section_id
if unity:
if unity.has_subject_unity != "no":
buttons.append(
[
{
"text": f"{i} بررسی مورد ",
"callback_data": f"advanced_check_conflict_qsids:{section_id}",
}
]
)
return chunks, buttons
async def get_form_gp_p1(
_inputs: Union[List[RuleRelation], List],
):
f_result = []
if not _inputs or len(_inputs) <= 1:
f_result += ["قوانین مرتبط زیر از سیاستهای کلی نظام یافت شد : "]
for i, item in enumerate(_inputs, start=1):
link = make_link_qs(src=item.db_rule.section_id)
f_result += [f"{i}. {item.subject_unity.reasoning}. {link}"]
f_result += [
"و بعد از بررسی گزاره های حقوقی هر یک با متن شما ، موضوعات مرتبط مستقیم یافت نشد"
]
return ["\n".join(f_result)], []
# -------- 1. group by qanon_id / qanon_title
groups = defaultdict(list)
for item in _inputs:
key = item.db_rule.qanon_id or item.db_rule.qanon_title
groups[key].append(item)
# -------- 2. build output per group
for qanon_key, items in groups.items():
chunks = []
buttons = []
qanon_title = items[0].db_rule.qanon_title or "قانون نامشخص"
current = f"موضوعات مرتبط در قانون *{qanon_title}*:\n\n"
for i, item in enumerate(items, start=1):
unity = item.subject_unity
link = make_link_qs(src=item.db_rule.section_id)
lines = []
if unity:
if unity.has_subject_unity == "yes":
lines.append(f"{i}- " + unity.reasoning or "")
lines.append(link)
elif unity.has_subject_unity == "yes_under_assumptions":
lines.append(f"{i}- " + unity.reasoning or "")
lines.append("مشروط به فرض زیر :")
lines.append("\t" + unity.required_assumptions or "")
lines.append(link)
block = "\n".join(lines) + "\n\n"
if len(current) + len(block) > MAX_LEN and current.strip():
chunks.append(current.rstrip())
current = ""
current += block
if current.strip():
chunks.append(current.rstrip())
# -------- 3. one button per law
buttons.append(
[
{
"text": f"بررسی وجود مغایرت",
"callback_data": f"advanced_check_conflict_qqids:{qanon_key}",
}
]
)
f_result.append([chunks, buttons])
return f_result
async def get_form_gp_old(_inputs: Union[List[RuleRelation], List]):
chunks = []
_button = []
print(f"_inputs {_inputs}")
if len(_inputs) > 1:
current = "نتایج اولیه مغایرت های احتمالی :\n"
for i, item in enumerate(_inputs, start=1):
# ساخت لینک
_link = make_link_qs(src=item.db_rule.section_id)
# ساخت بلوک متنی کامل مربوط به این item — بدون قطع شدن
lines = [f"{i}. {item.db_rule.qanon_title} \n{_link}"]
unity = item.subject_unity
print(f"unity.has_subject_unity {unity.has_subject_unity}")
_qs_title = item.db_rule.qanon_title + "-" + str(i)
if unity.has_subject_unity == "yes":
print(f"yes")
lines.append("توضیح:")
lines.append(unity.reasoning or "")
elif unity.has_subject_unity == "yes_under_assumptions":
print(f"yes_under_assumptions")
lines.append("توضیح:")
lines.append(unity.reasoning or "")
lines.append("توضیحات بیشتر (فرضیات لازم):")
lines.append(unity.required_assumptions or "")
block = "\n".join(lines) + "\n"
if len(current) + len(block) > MAX_LEN and current.strip():
# قبلی را ذخیره کن
chunks.append(current.rstrip())
current += block
_button.append(
[{"text": f"بررسی {_qs_title}", "callback_data": f"not_yet"}]
)
if current.strip():
chunks.append(current.rstrip())
else:
chunks = ["هیچ مغایرتی یافت نشد."]
return chunks, _button
async def get_form_gp_advanced(_input: RuleRelation):
"""
ما در نظر میگیریم که subject_unity را داریم
"""
finall = ["نتیجه:\n"]
qs_id = _input.db_rule.section_id
button = []
if _input.relation_identification:
pass
if _input.conflict_type_detection:
pass
if _input.conflict_detection:
print("conflict_detection----------------------------")
_end = "آیا میخواهید نتیجه بررسی تزاحم را ببینید؟"
if _input.conflict_detection.has_confict == True:
# finall.append(
# 'باهم تعارض دارند !'
# )
button.append(
[
{
"text": "بررسی نوع تعارض",
"callback_data": f"advanced_check_conflict_qsids:{qs_id}",
}
]
)
finall += ["توضیحات"]
finall += [_input.conflict_detection.explanation_of_conflict]
finall += [_end]
else:
# finall.append(
# 'باهم تعارض مستقیم ندارند'
# )
finall += ["توضیحات"]
finall += [_input.conflict_detection.explanation_of_conflict]
finall = ["\n".join(finall)]
return finall, button
if _input.subject_unity:
pass
# _input.subject_unity.has_subject_unity
# _input.subject_unity.required_assumptions
# _input.subject_unity.reasoning
return _input.model_dump()
async def result_gp(text, url, effort="low") -> Dict:
print(
f"text {type(text)}\n-> {text}",
)
try:
async with httpx.AsyncClient(timeout=TIME_OUT) as client:
response = await client.post(
url,
json={
"section_content": text,
"effort": "medium",
"mode_type": "bale",
},
)
response.raise_for_status()
response = response.json()
data = response.get("result", "❌ پاسخی دریافت نشد")
if isinstance(data, str):
return data
_output = []
for item in data:
_output.append(RuleRelation.parse_obj(item))
# print('results_chat ',type(result))
return _output
except Exception as e:
print(f"❌ خطای RAG:\n{str(e)}")
return "❌ ارتباط با سرور قطع می‌باشد"
def extract_other_info(update: BaleUpdate) -> dict:
other_info = {}
if update.message:
user = update.message.from_user
elif update.callback_query:
user = update.callback_query.from_user
else:
return other_info # خالی برگردان اگر هیچ‌کدام نبود
# ایمن در برابر None
other_info["username"] = user.username or ""
other_info["first_name"] = user.first_name or ""
other_info["last_name"] = getattr(user, "last_name", "") or ""
return other_info
def get_in_form(title: str, sections: list):
chunks = []
current = f"برای پرسش: {title}\n\n"
for i, data in enumerate(sections, start=1):
sec_text = data.get("content", "")
idx = data.get("id")
# ساخت ref کامل
ref = make_link_qs(src=idx)
# متن کامل آیتم
block = f"{i}: {sec_text}\n{ref}\n\n"
# اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید
if len(current) + len(block) > MAX_LEN:
chunks.append(current.rstrip())
current = ""
current += block
# آخرین چانک را هم اضافه کن
if current.strip():
chunks.append(current.rstrip())
return chunks
def form_search_in_law(title: str, sections: List) -> List:
chunks = []
current = f"برای پرسش: {title}\n\n"
for i, data in enumerate(sections, start=1):
sec_text = data.get("content", "")
idx = data.get("id")
# ساخت ref کامل
ref = make_link_qs(src=idx)
# متن کامل آیتم
block = f"{i}: {sec_text}\n{ref}\n\n"
# اگر با اضافه شدن این آیتم از حد مجاز عبور می‌کنیم → شروع چانک جدید
if len(current) + len(block) > MAX_LEN:
chunks.append(current.rstrip())
current = ""
current += block
# آخرین چانک را هم اضافه کن
if current.strip():
chunks.append(current.rstrip())
return chunks
def format_answer_bale(answer_text: str):
"""
answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد
sources: مثل ['qs2117427']
"""
# الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد
# مثلا: (qs123) یا (qs123, qs456, qs789)
pattern = r"\((?:منبع[: ]+)?([a-zA-Z0-9_, ]+)\)"
def replace_source(m):
content = m.group(1)
codes = [c.strip() for c in content.split(",")] # جداسازی چند کد
links = [make_link_qs(src=code) for code in codes]
full_match = m.group(0)
# if "منبع" in full_match:
# print(f'Found explicit source(s): {links}')
# else:
# print(f'Found implicit source(s): {links}')
return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان
# جایگزینی در متن
answer_text = re.sub(pattern, replace_source, answer_text)
# اگر طول کمتر از MAX_LEN بود → تمام
if len(answer_text) <= MAX_LEN:
return [answer_text]
# تقسیم متن اگر طول زیاد شد
chunks = []
current = ""
sentences = answer_text.split(". ")
for sentence in sentences:
st = sentence.strip()
if not st.endswith("."):
st += "."
if len(current) + len(st) > MAX_LEN:
chunks.append(current.strip())
current = ""
current += st + " "
if current.strip():
chunks.append(current.strip())
return chunks
def form_answer_bale(answer_text: str):
"""
answer_text: متن خروجی مدل که داخلش عبارت‌های مثل (منبع: qs2117427) وجود دارد
sources: مثل ['qs2117427']
"""
# الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد
# مثلا: (qs123) یا (qs123, qs456, qs789)
pattern = r"\((?:منبع[: ]+)?([a-zA-Z0-9_, ]+)\)"
def replace_source(m):
content = m.group(1)
codes = [c.strip() for c in content.split(",")] # جداسازی چند کد
links = [make_link_qs(src=code) for code in codes]
full_match = m.group(0)
# if "منبع" in full_match:
# print(f'Found explicit source(s): {links}')
# else:
# print(f'Found implicit source(s): {links}')
return ", ".join(links) # جایگزینی همه کدها با لینک‌هایشان
# جایگزینی در متن
answer_text = re.sub(pattern, replace_source, answer_text)
# اگر طول کمتر از MAX_LEN بود → تمام
if len(answer_text) <= MAX_LEN:
return [answer_text]
# تقسیم متن اگر طول زیاد شد
chunks = []
current = ""
sentences = answer_text.split(". ")
for sentence in sentences:
st = sentence.strip()
if not st.endswith("."):
st += "."
if len(current) + len(st) > MAX_LEN:
chunks.append(current.strip())
current = ""
current += st + " "
if current.strip():
chunks.append(current.strip())
return chunks
def chunked_simple_text(answer_text):
chunks = []
current = ""
sentences = answer_text.split(". ")
for sentence in sentences:
st = sentence.strip()
if not st.endswith("."):
st += "."
if len(current) + len(st) > MAX_LEN:
chunks.append(current.strip())
current = ""
current += st + " "
if current.strip():
chunks.append(current.strip())
return chunks