1438 lines
49 KiB
Python
Executable File
1438 lines
49 KiB
Python
Executable File
################# modularity
|
||
### import from external-package
|
||
import unicodedata, requests, logging, asyncio, httpx, os, json, uuid, traceback, orjson, copy, uvicorn, time, re
|
||
from pathlib import Path
|
||
from time import sleep
|
||
from collections import defaultdict
|
||
from typing import Dict, List, Tuple
|
||
from elasticsearch import Elasticsearch, helpers
|
||
from pathlib import Path
|
||
|
||
### import from internal-file
|
||
from router.bale.base_model import *
|
||
from router.bale.bale_buttons import *
|
||
from router.bale.bale_massages import *
|
||
from core.static import *
|
||
from core.base_model import *
|
||
|
||
|
||
############## Global-Params
|
||
DATA_DIR = os.path.join(".", "_data_json")
|
||
if not os.path.exists(DATA_DIR):
|
||
os.makedirs(DATA_DIR)
|
||
|
||
PERSIAN_BOUNDARIES = set(" \n،.؟!؛:")
|
||
|
||
|
||
class Formatter:
|
||
"""
|
||
Formatting options
|
||
Bold : \s*TEXT*\s
|
||
Italic : \s_TEXT_\s
|
||
Link: [متن](آدرسلینک)
|
||
```[متن]توضیحات```
|
||
|
||
answerCallbackQuery -> {callback_query_id:str, text:str, show_alert:bool}
|
||
setChatDescription -> {chat_id:str, description:str}
|
||
editMessageText -> {chat_id, message_id, text }
|
||
"""
|
||
|
||
###########################################################
|
||
# توابع برای رفتار کلاس
|
||
###########################################################
|
||
def __init__(self, max_len: int = 4000):
|
||
self.max_len = max_len
|
||
|
||
self.dash = "--------------------------------------------------------------------"
|
||
self._number_map = {
|
||
"0": "0️⃣",
|
||
"1": "1️⃣",
|
||
"2": "2️⃣",
|
||
"3": "3️⃣",
|
||
"4": "4️⃣",
|
||
"5": "5️⃣",
|
||
"6": "6️⃣",
|
||
"7": "7️⃣",
|
||
"8": "8️⃣",
|
||
"9": "9️⃣",
|
||
}
|
||
|
||
def __getattr__(self, name: str):
|
||
# فقط برای روشهای مجاز (مثل bold, number) واکنش نشان بده
|
||
if name == "bold":
|
||
return self._bold
|
||
if name == "number":
|
||
return self._number
|
||
if name == "format_text":
|
||
return self._pretier1
|
||
raise AttributeError(
|
||
f"'{self.__class__.__name__}' object has no attribute '{name}'"
|
||
)
|
||
|
||
###########################################################
|
||
# توابع استایل دهی
|
||
###########################################################
|
||
|
||
def _bold(self, _string: str) -> str:
|
||
return f" *{_string}* "
|
||
|
||
def _number(self, value: Union[int, str]) -> Union[str, int]:
|
||
"""
|
||
اگر int بود، تبدیل به str برای پردازش
|
||
اگر رشتهای بود که فقط از ارقام تشکیل شده → تبدیل
|
||
اگر رشته بود اما عدد نبود → خودش را برگردان
|
||
هر نوع دیگری → بدون تغییر برگردان
|
||
تبدیل هر رقم به ایموجی مربوطه
|
||
"""
|
||
|
||
if isinstance(value, int):
|
||
num_str = str(value)
|
||
elif isinstance(value, str):
|
||
if value.isdigit():
|
||
num_str = value
|
||
else:
|
||
return value
|
||
else:
|
||
return value
|
||
|
||
return "".join(
|
||
self._number_map.get(d, d) for d in num_str[::-1]
|
||
) # handle array of number
|
||
|
||
def _pretier1(self, text: str) -> str:
|
||
"""
|
||
مشکل :
|
||
عدم تشخیص پاراگراف
|
||
عدم تشخیص اعداد پشت سر هم با - و فاصله
|
||
عدم تشخیص اعداد 1 0 - با یک فاصله از اول هستند
|
||
"""
|
||
pattern = r"(?<!ماده\s)(?<!تبصره\s)(\d|[٠-٩])\s*-\s*"
|
||
text = re.sub(pattern, r"\n\1 -", text)
|
||
|
||
if text.startswith("\n") or text.endswith("\n"):
|
||
text = text.strip("\n")
|
||
|
||
return text
|
||
|
||
###########################################################
|
||
# توابع اصلی
|
||
###########################################################
|
||
|
||
def __make_link_qq(self, src, ref_text=REF_TEXT):
|
||
return f" [{ref_text}]({QQ_WEB_LINK}{src}) "
|
||
|
||
def __make_link_qs(self, src, ref_text=REF_TEXT):
|
||
return f" [{ref_text}]({QS_WEB_LINK}{src}) "
|
||
|
||
###########################################################
|
||
# توابع فرمت و ساختار
|
||
###########################################################
|
||
|
||
def form_search_in_law_rules(self, header: str, body: List[SemanticSearchP2P], footer: str = None) -> List[str]:
|
||
"""
|
||
گروهبندی بر اساس in_rule.rule_id و ساخت بلوکهای مجزا برای هر گروه.
|
||
"""
|
||
if footer:
|
||
footer = '\n' + footer
|
||
|
||
# گروهبندی بر اساس in_rule.rule_id
|
||
grouped = defaultdict(list)
|
||
for item in body:
|
||
key = item.in_rule.rule_id
|
||
grouped[key].append(item)
|
||
|
||
print(f'form_search_in_law_rules -> {len(grouped)}')
|
||
|
||
chunks = []
|
||
current = header
|
||
|
||
# برای هر گروه (یعنی یک in_rule.rule_id منحصربهفرد)
|
||
for group_id, items in grouped.items():
|
||
# فرض: همهی in_rule در یک گروه، rule_content یکسانی دارند — از اولی استفاده میکنیم
|
||
in_rule_content = items[0].in_rule.rule_content
|
||
block_lines = [in_rule_content]
|
||
|
||
# لیست db_ruleها با شمارهگذاری
|
||
for i, item in enumerate(items, start=1):
|
||
db_content = item.db_rule.rule_content
|
||
section_id = self.__make_link_qs(src=item.db_rule.section_id)
|
||
block_lines.append(f"{self.number(i)} گزاره: {db_content} در {section_id}")
|
||
|
||
block = "\n".join(block_lines) + "\n\n"
|
||
|
||
# بررسی سایز و تقسیم در صورت نیاز
|
||
if len(current) + len(block) > self.max_len:
|
||
if current.strip() != header.strip():
|
||
chunks.append(current.rstrip())
|
||
current = header + block # شروع چانک جدید با header دوباره (یا بدون header؟)
|
||
else:
|
||
current += block
|
||
|
||
# اضافه کردن آخرین چانک
|
||
if current.strip() and current.strip() != header.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
# footer
|
||
if footer and chunks:
|
||
last = chunks[-1]
|
||
if len(last) + len(footer) <= self.max_len:
|
||
chunks[-1] = last + footer
|
||
else:
|
||
chunks.append(footer)
|
||
|
||
return chunks
|
||
|
||
def form_search_in_law(self, header: str, sections: List[SingleSearchData], footer:str=None) -> List:
|
||
"""
|
||
خروجی به صورت چانک بدون دکمه هر خروجی لینک دارد
|
||
برش امن لینک ها و اده ها
|
||
"""
|
||
footer = '\n\n'+footer
|
||
chunks = []
|
||
current = header
|
||
|
||
for i, data in enumerate(sections, start=1):
|
||
sec_text = data.content
|
||
idx = data.id
|
||
|
||
# ساخت ref کامل
|
||
ref = self.__make_link_qs(src=idx)
|
||
# متن کامل آیتم
|
||
block = (
|
||
f"{self.number(i)} {sec_text}\n{ref}\n\n" # self.format_text(sec_text)
|
||
)
|
||
|
||
# اگر با اضافه شدن این آیتم از حد مجاز عبور میکنیم → شروع چانک جدید
|
||
if len(current) + len(block) > self.max_len:
|
||
chunks.append(current.rstrip())
|
||
current = ""
|
||
|
||
current += block
|
||
|
||
# آخرین چانک را هم اضافه کن
|
||
if current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
if footer :
|
||
last = chunks[-1]
|
||
if len(last) + len(footer) <= self.max_len:
|
||
chunks[-1] = last + footer
|
||
else:
|
||
chunks.append(footer)
|
||
|
||
return chunks
|
||
|
||
def form_law_chat(self, answer_text: str):
|
||
"""
|
||
answer_text: متن خروجی مدل که داخلش عبارتهای مثل (منبع: qs2117427) وجود دارد
|
||
sources: مثل ['qs2117427']
|
||
"""
|
||
|
||
# الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد
|
||
# مثلا: (qs123) یا (qs123, qs456, qs789)
|
||
pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)"
|
||
|
||
def replace_source(m):
|
||
content = m.group(1)
|
||
codes = [c.strip() for c in content.split(",")] # جداسازی چند کد
|
||
links = [make_link_qs(src=code) for code in codes]
|
||
full_match = m.group(0)
|
||
# if "منبع" in full_match:
|
||
# print(f'Found explicit source(s): {links}')
|
||
# else:
|
||
# print(f'Found implicit source(s): {links}')
|
||
return ", ".join(links) # جایگزینی همه کدها با لینکهایشان
|
||
|
||
# جایگزینی در متن
|
||
answer_text = re.sub(pattern, replace_source, answer_text)
|
||
|
||
# اگر طول کمتر از MAX_LEN بود → تمام
|
||
if len(answer_text) <= MAX_LEN:
|
||
return [answer_text]
|
||
|
||
# تقسیم متن اگر طول زیاد شد
|
||
chunks = []
|
||
current = ""
|
||
|
||
sentences = answer_text.split(". ")
|
||
for sentence in sentences:
|
||
st = sentence.strip()
|
||
if not st.endswith("."):
|
||
st += "."
|
||
|
||
if len(current) + len(st) > MAX_LEN:
|
||
chunks.append(current.strip())
|
||
current = ""
|
||
|
||
current += st + " "
|
||
|
||
if current.strip():
|
||
chunks.append(current.strip())
|
||
|
||
return chunks
|
||
|
||
async def form_title_repeated(self, _input: List[TitleRepeat]) -> List:
|
||
if len(_input) == 0:
|
||
return ["هیچ عنوان تکراری و یا حتی مشابه یافت نشد."]
|
||
|
||
chunks = []
|
||
current = "نزدیکترین عناوین مشابه عنوان قانون موارد زیر می باشد:\n\n"
|
||
|
||
for i, item in enumerate(_input, start=1):
|
||
|
||
if not item.title or not item.id:
|
||
continue
|
||
|
||
ref = self.__make_link_qq(src=item.id)
|
||
|
||
# بلوک کامل: عنوان + لینک — هر دو در یک بلوک غیرقابل تقسیم
|
||
# block = f"{i}. {title}(وزن {score})\n{ref}\n"
|
||
block = (
|
||
f"{self.number(i)} {self.bold(item.title)}؛ میزان تشابه: %{item.score} ؛{ref}\n"
|
||
)
|
||
|
||
# اگر اضافه کردن این بلوک باعث overflow شود → چانک قبلی را ذخیره و current را ریست کن
|
||
if len(current) + len(block) > self.max_len and current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
current += block
|
||
|
||
# ذخیره آخرین چانک
|
||
if current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
return chunks
|
||
|
||
def replace_source(self, m):
|
||
content = m.group(1)
|
||
codes = [c.strip() for c in content.split(",")] # جداسازی چند کد
|
||
links = [self.__make_link_qs(src=code) for code in codes]
|
||
full_match = m.group(0)
|
||
# if "منبع" in full_match:
|
||
# print(f'Found explicit source(s): {links}')
|
||
# else:
|
||
# print(f'Found implicit source(s): {links}')
|
||
return ", ".join(links) # جایگزینی همه کدها با لینکهایشان
|
||
|
||
async def form_chat(self, llm_text: str, header: str, footer: str=None):
|
||
"""
|
||
answer_text: متن خروجی مدل که داخلش عبارتهای مثل (منبع: qs2117427) وجود دارد
|
||
"""
|
||
if footer:
|
||
footer = '\n\n'+footer
|
||
# الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد
|
||
# مثلا: (qs123) یا (qs123, qs456, qs789)
|
||
pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)"
|
||
|
||
# جایگزینی در متن
|
||
answer_text = re.sub(pattern, self.replace_source, llm_text)
|
||
|
||
# اگر طول کمتر از MAX_LEN بود → تمام
|
||
if len(answer_text) <= self.max_len:
|
||
return [header + answer_text]
|
||
|
||
# تقسیم متن اگر طول زیاد شد
|
||
chunks = []
|
||
current = header
|
||
|
||
sentences = answer_text.split(". ")
|
||
for sentence in sentences:
|
||
st = sentence.strip()
|
||
if not st.endswith("."):
|
||
st += "."
|
||
|
||
if len(current) + len(st) > self.max_len:
|
||
chunks.append(current.strip())
|
||
current = ""
|
||
|
||
current += st + " "
|
||
|
||
if current.strip():
|
||
chunks.append(current.strip())
|
||
|
||
if footer and chunks:
|
||
last = chunks[-1]
|
||
if len(last) + len(footer) <= self.max_len:
|
||
chunks[-1] = last + footer
|
||
else:
|
||
chunks.append(footer)
|
||
|
||
return chunks
|
||
|
||
async def form_llm_answer_chat(self, _input, header):
|
||
if len(_input) > 0:
|
||
return await self.form_chat(llm_text=_input, header=header)
|
||
# _input['source']
|
||
return ["هیچ ماده مرتبطی یافت نشد!"]
|
||
|
||
async def form_subject_unity(
|
||
self,
|
||
_input: Union[List[RuleRelation], str],
|
||
header="نتایج اولیه مغایرت های احتمالی :\n",
|
||
):
|
||
if isinstance(_input, str):
|
||
_input = self.form_law_chat(_input)
|
||
return _input, [], []
|
||
else:
|
||
chunks = []
|
||
buttons = []
|
||
seen_qanon_titles = set()
|
||
groups = defaultdict(set)
|
||
|
||
for item in _input:
|
||
title = item.db_rule.qanon_title
|
||
groups[title].add(item.db_rule.section_id)
|
||
|
||
current = header
|
||
for idx, (qanon_title, section_ids) in enumerate(groups.items(), start=1):
|
||
block_lines = [f"{self.number(idx)} در قانون {self.bold(qanon_title)}"]
|
||
sample_items_by_section = {}
|
||
for item in _input:
|
||
if (
|
||
item.db_rule.qanon_title == qanon_title
|
||
and item.db_rule.section_id in section_ids
|
||
):
|
||
sid = item.db_rule.section_id
|
||
if sid not in sample_items_by_section:
|
||
sample_items_by_section[sid] = item
|
||
|
||
for sub_idx, section_id in enumerate(sorted(section_ids), start=1):
|
||
item = sample_items_by_section[section_id] # representative item
|
||
link = self.__make_link_qs(src=section_id)
|
||
|
||
unity = item.subject_unity
|
||
if not unity:
|
||
block_lines.append("\t\t—")
|
||
continue
|
||
|
||
if unity.has_subject_unity == "yes":
|
||
block_lines.append(f"توضیح {sub_idx} بر اساس {link}:")
|
||
block_lines.append(f"\t{unity.reasoning or ''}")
|
||
|
||
elif unity.has_subject_unity == "yes_under_assumptions":
|
||
block_lines.append(f"توضیح {sub_idx} بر اساس {link}:")
|
||
block_lines.append(f"\t{unity.reasoning or ''}")
|
||
block_lines.append("\tتوضیحات بیشتر (فرضیات لازم):")
|
||
block_lines.append(f"\t{unity.required_assumptions or ''}")
|
||
|
||
if len(block_lines) > 2:
|
||
block = "\n".join(block_lines) + "\n"
|
||
else:
|
||
continue
|
||
|
||
# Auto-chunk based on length
|
||
if len(current) + len(block) > MAX_LEN and current != header:
|
||
chunks.append(current.rstrip())
|
||
current = header
|
||
|
||
current += block
|
||
|
||
# Button: add *once* per qanon_title
|
||
if qanon_title and qanon_title not in seen_qanon_titles:
|
||
seen_qanon_titles.add(qanon_title)
|
||
buttons.append(
|
||
[
|
||
{
|
||
"text": f"بررسی مغایرت با {qanon_title}",
|
||
"callback_data": f"subject_unities:qq:{qanon_title}",
|
||
}
|
||
]
|
||
)
|
||
|
||
# Final flush
|
||
if current.strip() and (
|
||
len(chunks) == 0 or current.strip() != header.rstrip()
|
||
):
|
||
chunks.append(current.rstrip())
|
||
|
||
input_dict = {item.db_rule.section_id: item for item in _input}
|
||
mapping_data = defaultdict(list)
|
||
for k, v in groups.items():
|
||
for i in v:
|
||
mapping_data[k].append(input_dict[i])
|
||
|
||
return chunks, buttons, mapping_data
|
||
|
||
async def form_rule_making(
|
||
self, _input, header="گزاره های حقوقی زیر استخراج شد:\n\n", footer=None
|
||
):
|
||
if len(_input) > 0:
|
||
if footer:
|
||
footer = '\n\n'+footer
|
||
|
||
chunks = []
|
||
current = header
|
||
|
||
for i, item in enumerate(_input, start=1):
|
||
block = f'{self.number(i)} {item["rule_content"]}\n'
|
||
if len(current) + len(block) > self.max_len and current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
current += block
|
||
|
||
if current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
if footer and chunks:
|
||
last = chunks[-1]
|
||
if len(last) + len(footer) <= self.max_len:
|
||
chunks[-1] = last + footer
|
||
else:
|
||
chunks.append(footer)
|
||
return chunks
|
||
|
||
|
||
return ["هیچ گزاره حقوقی یافت و استخراج نشد!"]
|
||
|
||
def get_asl(self, _in: str):
|
||
return _in.replace("qs_", "اصل ")
|
||
|
||
def get_in_form_single(self, asl: str, _in_dict: Dict, _id: int) -> str:
|
||
f_list = []
|
||
if _in_dict["is_conflict"]:
|
||
f_list += [f"{_id}. *{self.get_asl(asl)}*: ❌ دارای مغایرت ❌"]
|
||
f_list += [
|
||
f"موضوع این اصل قانون اساسی {_in_dict['principle_subject']} می باشد."
|
||
]
|
||
f_list += [f"موضوع متن ورودی شامل {_in_dict['text_subject']} است."]
|
||
if _in_dict["has_subject_relation"] == True:
|
||
unity_text = "می باشد"
|
||
if _in_dict["has_subject_relation"] == False:
|
||
unity_text = "نمی باشد"
|
||
f_list += [f"دارای وحدت در موضوع {unity_text}."]
|
||
if _in_dict["conflict_type"] != "":
|
||
f_list += [f"نوع مغایرت تشحیص داده شده: {_in_dict['conflict_type']}"]
|
||
else:
|
||
f_list += [f"{_id}. *{self.get_asl(asl)}*: ✅ عدم مغایرت ✅"]
|
||
|
||
f_list += [f"توضیحات: {_in_dict['legal_reasoning']}"]
|
||
f_list += ["\n\n"]
|
||
return "\n".join(f_list)
|
||
|
||
async def form_constitution(self, input: Dict):
|
||
""" """
|
||
|
||
chunks = []
|
||
header = "*نتیجه بررسی مغایرت با اصول مهم قانون اساسی*:\n\n"
|
||
current = header
|
||
|
||
_id = 1
|
||
for k, v in input.items():
|
||
block = self.get_in_form_single(asl=k, _in_dict=v, _id=_id)
|
||
|
||
# اگر این بلاک جا نشد → چانک جدید
|
||
if len(current) + len(block) > self.max_len:
|
||
chunks.append(current.rstrip())
|
||
current = header + block
|
||
else:
|
||
current += block
|
||
|
||
_id += 1
|
||
|
||
# آخرین چانک
|
||
if current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
return chunks
|
||
|
||
async def form_constitution_low(self, input: Dict, _id, _header='نتایچ بررسی مغایرت با اصول مهم قانون اساسی\n\n'):
|
||
""" """
|
||
|
||
chunks = []
|
||
current = ""
|
||
|
||
for k, v in input.items():
|
||
block = self.get_in_form_single(asl=k, _in_dict=v, _id=_id)
|
||
|
||
# اگر این بلاک جا نشد → چانک جدید
|
||
if len(current) + len(block) > self.max_len:
|
||
chunks.append(current.rstrip())
|
||
current = _header + block
|
||
else:
|
||
current += block
|
||
|
||
|
||
# آخرین چانک
|
||
if current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
return ''.join(chunks)
|
||
|
||
async def form_ss_rules(self, _input: List[Dict], header):
|
||
|
||
if len(_input) > 1:
|
||
chunks = []
|
||
current = header
|
||
_i = 0
|
||
|
||
# -------- 1. group by qanon_id / qanon_title
|
||
groups = defaultdict(set)
|
||
for item in _input:
|
||
key = item["db_rule"]["qanon_title"]
|
||
groups[key].add(item["db_rule"]["section_id"])
|
||
|
||
for qanon_title, ids in groups.items():
|
||
_i += 1
|
||
links = "و ".join([self.__make_link_qs(id) for id in ids])
|
||
block = f"{self.number(_i)} در قانون {self.bold(qanon_title)} تشابه با گزاره های حقوقی ماده:{links}\n\n"
|
||
|
||
if len(current) + len(block) > self.max_len:
|
||
if current:
|
||
chunks.append(current)
|
||
|
||
current = header + block
|
||
else:
|
||
current += block
|
||
|
||
if current and current != header:
|
||
chunks.append(current)
|
||
|
||
return chunks
|
||
|
||
return ["هیچ ماده مرتبطی یافت نشد!"]
|
||
|
||
async def form_conflict_detection(
|
||
self, _input: RuleRelation, header="نتیجه تشخیص مغایرت :\n"
|
||
):
|
||
current = header
|
||
|
||
# ساخت لینک
|
||
# _link = self.__make_link_qs(src=_input.db_rule.section_id)
|
||
current += f"به صورت خلاصه {_input.conflict_detection.has_confict}\n"
|
||
current += f"توضیحات : {_input.conflict_detection.explanation_of_conflict}\n"
|
||
|
||
return current
|
||
|
||
async def form_conflict_type_detection(
|
||
self, _input: RuleRelation, header="نتیجه تشخیص نوع مغایرت :\n"
|
||
):
|
||
current = header
|
||
|
||
# ساخت لینک
|
||
# _link = self.__make_link_qs(src=_input.db_rule.section_id)
|
||
current += f"به صورت خلاصه {_input.conflict_type_detection.conflict_type}\n"
|
||
current += f"توضیحات : {_input.conflict_type_detection.explanation_of_type}\n"
|
||
|
||
return current
|
||
|
||
async def form_relation_identification(
|
||
self, _input: RuleRelation, header="نتیجه رابطه مغایرت :\n"
|
||
):
|
||
current = header
|
||
|
||
# ساخت لینک
|
||
# _link = self.__make_link_qs(src=_input.db_rule.section_id)
|
||
current += f"به صورت خلاصه {_input.relation_identification.relation_type}\n"
|
||
current += f"توضیحات : {_input.relation_identification.reasoning}\n"
|
||
|
||
return current
|
||
|
||
async def form_evaluation(
|
||
self, _input: Evaluation, header="نتیجه نهایی بررسی مغایرت :\n"
|
||
):
|
||
current = header
|
||
|
||
# ساخت لینک
|
||
# _link = self.__make_link_qs(src=_input.db_rule.section_id)
|
||
current += f"1. آیا ارزیابی وحدت موضوع صحیح است؟ {_input.is_subject_unity_assessment_correct}\n"
|
||
current += f"2. آیا ارزیابی تشخیص نوع درست است ؟ {_input.is_conflict_detection_correct}\n"
|
||
current += f"3. آیا ارزیابی نوع درست است ؟ {_input.is_conflict_type_detection_correct}\n"
|
||
current += (
|
||
f"4. رابطه مغایرت چطور؟ {_input.is_relation_type_detection_correct}\n"
|
||
)
|
||
current += f"5. نوع رابطه ؟ {_input.valid_relation_type}\n"
|
||
current += f"6.توضیح بیشتر: {_input.comments}\n"
|
||
|
||
return current
|
||
|
||
async def from_law_writing_policy(
|
||
self, _input_dict: Dict, header: str
|
||
) -> List[str]:
|
||
f_list = [self.bold(header)]
|
||
_str = {
|
||
"analyze": "گزارش تحلیلی بندبهبند",
|
||
"strength": "بیان نقاط قوت",
|
||
"weakness": "بیان نقاط ضعف و ریسکهای تقنینی",
|
||
"conclusion_score": "جمعبندی نهایی شامل میزان انطباق کلی (عالی / متوسط / ضعیف)",
|
||
"suggestions": "ارائه پیشنهادهای اصلاحی مشخص و عملی",
|
||
}
|
||
for k, v in _input_dict.items():
|
||
_title = _str[k]
|
||
_title = "*" + _title + "*"
|
||
f_list += [_title]
|
||
# f_list += ['\n']
|
||
f_list += [v]
|
||
f_list += ["\n"]
|
||
|
||
return ["\n".join(f_list)]
|
||
|
||
|
||
class RequestManager:
|
||
def __init__(
|
||
self,
|
||
host_url: str,
|
||
url_time_out=1200,
|
||
step_time_out=600,
|
||
):
|
||
if host_url.endswith('/'):
|
||
self.host_url = host_url
|
||
else:
|
||
self.host_url = host_url + '/'
|
||
|
||
self.url_time_out = url_time_out
|
||
self.step_time_out = step_time_out
|
||
TASK_URL = {
|
||
# stream
|
||
"": "/stream/chat_logical",
|
||
# none-stream
|
||
"": "/conflict/general_policy/qs_unity",
|
||
"": "/conflict/all_qanon/qs_unity",
|
||
"": "/conflict/general_policy/unity_eval",
|
||
"": "/conflict/law_writing_policy",
|
||
"": "/conflict/constitution",
|
||
"": "/rule_making",
|
||
"": "/chat",
|
||
"": "/talk",
|
||
"": "/semantic_search/chat_logical",
|
||
"": "/semantic_search/run_semantic_search",
|
||
"": "/semantic_search/run_chat",
|
||
}
|
||
|
||
async def get_result(
|
||
self,
|
||
payload,
|
||
url: str,
|
||
section_id: str = "qs_10001",
|
||
mode_type="bale",
|
||
):
|
||
if url.startswith('/'):
|
||
url = url[1:]
|
||
|
||
_url = self.host_url + url
|
||
print(f"get_result _url {_url}")
|
||
try:
|
||
async with httpx.AsyncClient(timeout=self.url_time_out) as client:
|
||
response = await client.post(url=_url, json=payload)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
result = data.get("result", "❌ پاسخی دریافت نشد")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(f"❌ خطای RAG:\n{str(e)}")
|
||
return "❌ ارتباط با سرور قطع میباشد"
|
||
|
||
async def stream_result(
|
||
self,
|
||
url: str,
|
||
payload: Dict,
|
||
):
|
||
"""
|
||
هر مرحله شامل:
|
||
{
|
||
step : "اسم مرحله"
|
||
data : "داده در این مرحله"
|
||
}
|
||
"""
|
||
if url.startswith('/'):
|
||
url = url[1:]
|
||
|
||
timeout = httpx.Timeout(self.step_time_out, read=self.url_time_out)
|
||
_url = self.host_url + url
|
||
|
||
async with httpx.AsyncClient(timeout=timeout) as client:
|
||
# ارسال درخواست به صورت Stream
|
||
async with client.stream("POST", url=_url, json=payload) as r:
|
||
# بررسی وضعیت پاسخ
|
||
if r.status_code != 200:
|
||
print(f"Error: {r.status_code}")
|
||
return
|
||
|
||
# خواندن خط به خط (هر خط یک JSON است که سرور Yield کرده)
|
||
async for line in r.aiter_lines():
|
||
if line.strip(): # جلوگیری از پردازش خطوط خالی
|
||
try:
|
||
# تبدیل متن JSON به دیکشنری پایتون
|
||
step_data = json.loads(line)
|
||
yield step_data
|
||
except json.JSONDecodeError:
|
||
print(f"Failed to decode: {line}")
|
||
|
||
|
||
def unique_id(prefix="wai_") -> str:
|
||
return f"{prefix}{uuid.uuid4().hex[:16]}"
|
||
|
||
|
||
def load_orjson(path: str | Path):
|
||
path = Path(path)
|
||
with path.open("rb") as f: # باید باینری باز بشه برای orjson
|
||
return orjson.loads(f.read())
|
||
|
||
|
||
def save_orjson(path, data):
|
||
with open(path, "wb") as f:
|
||
f.write(
|
||
orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS)
|
||
)
|
||
|
||
|
||
def split_text_chunks(text: str):
|
||
"""Split a long text into safe chunks."""
|
||
return [text[i : i + MAX_LEN] for i in range(0, len(text), MAX_LEN)]
|
||
|
||
|
||
class ElasticHelper:
|
||
"""
|
||
کلاس ElasticHelper:
|
||
نوع ورودی: بدون ورودی مستقیم در تعریف کلاس
|
||
نوع خروجی: شیء از نوع ElasticHelper
|
||
عملیات:
|
||
- متغیرهای کلاسی برای شمارش و مدیریت عملیات تعریف میکند
|
||
- مسیر پیشفرض مپینگها را تنظیم میکند
|
||
"""
|
||
|
||
counter = 0
|
||
total = 0
|
||
id = ""
|
||
path_mappings = os.getcwd() + "/repo/_other/"
|
||
|
||
def __init__(
|
||
self,
|
||
es_url="http://127.0.0.1:6900",
|
||
es_pass="",
|
||
es_user="elastic",
|
||
path_mappings="",
|
||
):
|
||
"""
|
||
نوع ورودی:
|
||
- es_url: آدرس Elasticsearch (str) - پیشفرض "http://127.0.0.1:6900"
|
||
- es_pass: رمز عبور (str) - پیشفرض خالی
|
||
- es_user: نام کاربری (str) - پیشفرض "elastic"
|
||
- path_mappings: مسیر مپینگها (str) - پیشفرض خالی
|
||
نوع خروجی: شیء ElasticHelper
|
||
عملیات:
|
||
- اتصال به Elasticsearch را برقرار میکند
|
||
- در صورت وجود رمز عبور، از احراز هویت استفاده میکند
|
||
- تا 10 بار برای اتصال مجدد تلاش میکند (هر بار 5 ثانیه انتظار)
|
||
- در صورت عدم موفقیت، پیام خطا نمایش داده میشود
|
||
"""
|
||
if path_mappings:
|
||
self.path_mappings = path_mappings
|
||
|
||
if es_pass == "":
|
||
self.es = Elasticsearch(es_url)
|
||
else:
|
||
self.es = Elasticsearch(
|
||
es_url,
|
||
basic_auth=(es_user, es_pass),
|
||
verify_certs=False,
|
||
)
|
||
# print(es_url)
|
||
# print(self.es)
|
||
|
||
self.success_connect = False
|
||
for a in range(0, 10):
|
||
try:
|
||
if not self.es.ping():
|
||
print("Elastic Connection Not ping, sleep 30 s : ", a)
|
||
sleep(5)
|
||
continue
|
||
else:
|
||
self.success_connect = True
|
||
break
|
||
|
||
except Exception as e:
|
||
break
|
||
if not self.success_connect:
|
||
print("******", "not access to elastic service")
|
||
return
|
||
|
||
self.counter = 0
|
||
self.total = 0
|
||
self.id = ""
|
||
|
||
def search(self, **params):
|
||
try:
|
||
res = self.es.search(**params)
|
||
except:
|
||
return {"hits": {"hits": []}}
|
||
return res
|
||
|
||
def get_document(self, index_name, id):
|
||
res = self.es.get(index=index_name, id=id)
|
||
return res
|
||
|
||
def exist_document(self, index_name, id):
|
||
res = self.es.exists(index=index_name, id=id)
|
||
return res
|
||
|
||
def update_index_doc(self, is_update_state, index_name_o, eid, data):
|
||
"""
|
||
نوع ورودی:
|
||
- is_update_state: تعیین عملیات (update یا index) (bool)
|
||
- index_name_o: نام اندیس (str)
|
||
- eid: شناسه سند (str)
|
||
- data: دادههای سند (dict)
|
||
نوع خروجی: پاسخ Elasticsearch (dict)
|
||
عملیات:
|
||
- اگر is_update_state=True باشد: سند را آپدیت میکند
|
||
- در غیر این صورت: سند جدید ایجاد میکند
|
||
"""
|
||
if is_update_state:
|
||
resp = self.es.update(index=index_name_o, id=eid, doc=data)
|
||
# resp = self.es.update(index=index_name_o, id=eid, body={'doc':data})
|
||
else:
|
||
resp = self.es.index(index=index_name_o, id=eid, document=data)
|
||
return resp
|
||
|
||
|
||
def make_link_qq(src, ref_text=REF_TEXT):
|
||
return f"[{ref_text}]({QQ_WEB_LINK}{src})"
|
||
|
||
|
||
def make_link_qs(src, ref_text=REF_TEXT):
|
||
return f"[{ref_text}]({QS_WEB_LINK}{src})"
|
||
|
||
|
||
|
||
async def get_from_gpl(in_dict: Dict) -> List[str]:
|
||
f_list = []
|
||
_str = {
|
||
"analyze": "گزارش تحلیلی بندبهبند",
|
||
"strength": "بیان نقاط قوت",
|
||
"weakness": "بیان نقاط ضعف و ریسکهای تقنینی",
|
||
"conclusion_score": "جمعبندی نهایی شامل میزان انطباق کلی (عالی / متوسط / ضعیف)",
|
||
"suggestions": "ارائه پیشنهادهای اصلاحی مشخص و عملی",
|
||
}
|
||
for k, v in in_dict.items():
|
||
_title = _str[k]
|
||
_title = "*" + _title + "*"
|
||
f_list += [_title]
|
||
# f_list += ['\n']
|
||
f_list += [v]
|
||
f_list += ["\n"]
|
||
|
||
return ["\n".join(f_list)]
|
||
|
||
|
||
def cer(ref: str, hyp: str) -> float:
|
||
m, n = len(ref), len(hyp)
|
||
dp = list(range(n + 1))
|
||
|
||
for i in range(1, m + 1):
|
||
prev, dp[0] = dp[0], i
|
||
for j in range(1, n + 1):
|
||
cur = dp[j]
|
||
dp[j] = min(
|
||
dp[j] + 1, # deletion
|
||
dp[j - 1] + 1, # insertion
|
||
prev + (ref[i - 1] != hyp[j - 1]), # substitution
|
||
)
|
||
prev = cur
|
||
|
||
return (dp[n] / m) * 100
|
||
|
||
|
||
import nltk
|
||
from nltk.metrics import edit_distance
|
||
|
||
|
||
def cer_ntlk(exist: str, new: str) -> float:
|
||
"""
|
||
این روش دقیقتر است، چون تعداد کاراکترهای اضافی یا کم در متن طولانی،
|
||
CER را به شکل اغراقآمیز کاهش نمیدهد، بلکه روی شباهت معنایی و واژهای تمرکز میکند.
|
||
"""
|
||
# edit distance روی کلمات
|
||
return round(float(1 - edit_distance(new, exist) / len(exist)) * 100, 2)
|
||
|
||
|
||
def wer_nltk(new: str, exist: str) -> float:
|
||
new = new.split()
|
||
exist = exist.split()
|
||
|
||
return round(float(1 - edit_distance(new, exist) / len(exist)) * 100, 2)
|
||
|
||
|
||
|
||
|
||
def normalize_persian(text: str) -> str:
|
||
# حذف کنترلکاراکترها
|
||
text = "".join(ch for ch in text if unicodedata.category(ch)[0] != "C")
|
||
|
||
# حذف فاصله بین حروف فارسی
|
||
text = re.sub(r"(?<=[آ-ی])\s+(?=[آ-ی])", "", text)
|
||
|
||
# اصلاح فاصله قبل و بعد از علائم
|
||
text = re.sub(r"\s+([،؛:؟!])", r"\1", text)
|
||
text = re.sub(r"([،؛:؟!])\s*", r"\1 ", text)
|
||
|
||
# فاصلههای چندتایی
|
||
text = re.sub(r"\s{2,}", " ", text)
|
||
|
||
return text.strip()
|
||
|
||
|
||
async def get_in_from_rule_making(_input):
|
||
|
||
print(f"_input {_input}")
|
||
o_put = "گزاره های حقوقی زیر استخراج شد:\n"
|
||
for i, item in enumerate(_input, start=1):
|
||
o_put += f'{i}. {item["rule_content"]}\n'
|
||
|
||
return o_put
|
||
|
||
|
||
async def get_in_from_title_repeated(data: List[Dict[str, str]]):
|
||
if len(data) == 0:
|
||
return ["هیچ عنوانی تکراری یافت نشد."]
|
||
|
||
chunks = []
|
||
current = "نزدیکترین عناوین مشابه عنوان قانون موارد زیر می باشد::\n\n"
|
||
|
||
for i, item in enumerate(data, start=1):
|
||
title = item.get("title", "").strip()
|
||
sec_id = item.get("id", "").strip()
|
||
score = item.get("score", "")
|
||
|
||
if not title or not sec_id:
|
||
continue
|
||
|
||
ref = make_link_qq(src=sec_id)
|
||
|
||
# بلوک کامل: عنوان + لینک — هر دو در یک بلوک غیرقابل تقسیم
|
||
# block = f"{i}. {title}(وزن {score})\n{ref}\n"
|
||
block = f"{i}. {title}\n{ref}\n"
|
||
|
||
# اگر اضافه کردن این بلوک باعث overflow شود → چانک قبلی را ذخیره و current را ریست کن
|
||
if len(current) + len(block) > 4000 and current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
current += block
|
||
|
||
# ذخیره آخرین چانک
|
||
if current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
return chunks
|
||
|
||
|
||
async def get_multi_qs_advance_gp(
|
||
_inputs: List[RuleRelation],
|
||
):
|
||
chunks = ["لطفا یک شماره را جهت بررسی جزئی تر مغایرت انتخاب کنید: "]
|
||
buttons = []
|
||
|
||
for i, item in enumerate(_inputs, start=1):
|
||
unity = item.subject_unity
|
||
section_id = item.db_rule.section_id
|
||
if unity:
|
||
if unity.has_subject_unity != "no":
|
||
buttons.append(
|
||
[
|
||
{
|
||
"text": f"{i} بررسی مورد ",
|
||
"callback_data": f"advanced_check_conflict_qsids:{section_id}",
|
||
}
|
||
]
|
||
)
|
||
|
||
return chunks, buttons
|
||
|
||
|
||
async def get_form_gp_p1(
|
||
_inputs: Union[List[RuleRelation], List],
|
||
):
|
||
f_result = []
|
||
if not _inputs or len(_inputs) <= 1:
|
||
f_result += ["قوانین مرتبط زیر از سیاستهای کلی نظام یافت شد : "]
|
||
for i, item in enumerate(_inputs, start=1):
|
||
link = make_link_qs(src=item.db_rule.section_id)
|
||
f_result += [f"{i}. {item.subject_unity.reasoning}. {link}"]
|
||
f_result += [
|
||
"و بعد از بررسی گزاره های حقوقی هر یک با متن شما ، موضوعات مرتبط مستقیم یافت نشد"
|
||
]
|
||
return ["\n".join(f_result)], []
|
||
|
||
# -------- 1. group by qanon_id / qanon_title
|
||
groups = defaultdict(list)
|
||
for item in _inputs:
|
||
key = item.db_rule.qanon_id or item.db_rule.qanon_title
|
||
groups[key].append(item)
|
||
|
||
# -------- 2. build output per group
|
||
for qanon_key, items in groups.items():
|
||
chunks = []
|
||
buttons = []
|
||
|
||
qanon_title = items[0].db_rule.qanon_title or "قانون نامشخص"
|
||
|
||
current = f"موضوعات مرتبط در قانون *{qanon_title}*:\n\n"
|
||
|
||
for i, item in enumerate(items, start=1):
|
||
unity = item.subject_unity
|
||
link = make_link_qs(src=item.db_rule.section_id)
|
||
|
||
lines = []
|
||
|
||
if unity:
|
||
if unity.has_subject_unity == "yes":
|
||
lines.append(f"{i}- " + unity.reasoning or "")
|
||
lines.append(link)
|
||
|
||
elif unity.has_subject_unity == "yes_under_assumptions":
|
||
lines.append(f"{i}- " + unity.reasoning or "")
|
||
lines.append("مشروط به فرض زیر :")
|
||
lines.append("\t" + unity.required_assumptions or "")
|
||
lines.append(link)
|
||
|
||
block = "\n".join(lines) + "\n\n"
|
||
|
||
if len(current) + len(block) > MAX_LEN and current.strip():
|
||
chunks.append(current.rstrip())
|
||
current = ""
|
||
|
||
current += block
|
||
|
||
if current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
# -------- 3. one button per law
|
||
buttons.append(
|
||
[
|
||
{
|
||
"text": f"بررسی وجود مغایرت",
|
||
"callback_data": f"advanced_check_conflict_qqids:{qanon_key}",
|
||
}
|
||
]
|
||
)
|
||
f_result.append([chunks, buttons])
|
||
|
||
return f_result
|
||
|
||
|
||
async def get_form_gp_old(_inputs: Union[List[RuleRelation], List]):
|
||
chunks = []
|
||
_button = []
|
||
|
||
print(f"_inputs {_inputs}")
|
||
if len(_inputs) > 1:
|
||
current = "نتایج اولیه مغایرت های احتمالی :\n"
|
||
|
||
for i, item in enumerate(_inputs, start=1):
|
||
|
||
# ساخت لینک
|
||
_link = make_link_qs(src=item.db_rule.section_id)
|
||
|
||
# ساخت بلوک متنی کامل مربوط به این item — بدون قطع شدن
|
||
lines = [f"{i}. {item.db_rule.qanon_title} \n{_link}"]
|
||
|
||
unity = item.subject_unity
|
||
|
||
print(f"unity.has_subject_unity {unity.has_subject_unity}")
|
||
_qs_title = item.db_rule.qanon_title + "-" + str(i)
|
||
if unity.has_subject_unity == "yes":
|
||
print(f"yes")
|
||
lines.append("توضیح:")
|
||
lines.append(unity.reasoning or "")
|
||
|
||
elif unity.has_subject_unity == "yes_under_assumptions":
|
||
print(f"yes_under_assumptions")
|
||
lines.append("توضیح:")
|
||
lines.append(unity.reasoning or "")
|
||
lines.append("توضیحات بیشتر (فرضیات لازم):")
|
||
lines.append(unity.required_assumptions or "")
|
||
|
||
block = "\n".join(lines) + "\n"
|
||
|
||
if len(current) + len(block) > MAX_LEN and current.strip():
|
||
# قبلی را ذخیره کن
|
||
chunks.append(current.rstrip())
|
||
|
||
current += block
|
||
|
||
_button.append(
|
||
[{"text": f"بررسی {_qs_title}", "callback_data": f"not_yet"}]
|
||
)
|
||
if current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
else:
|
||
chunks = ["هیچ مغایرتی یافت نشد."]
|
||
return chunks, _button
|
||
|
||
|
||
async def get_form_gp_advanced(_input: RuleRelation):
|
||
"""
|
||
ما در نظر میگیریم که subject_unity را داریم
|
||
"""
|
||
finall = ["نتیجه:\n"]
|
||
qs_id = _input.db_rule.section_id
|
||
button = []
|
||
if _input.relation_identification:
|
||
pass
|
||
if _input.conflict_type_detection:
|
||
pass
|
||
if _input.conflict_detection:
|
||
print("conflict_detection----------------------------")
|
||
_end = "آیا میخواهید نتیجه بررسی تزاحم را ببینید؟"
|
||
if _input.conflict_detection.has_confict == True:
|
||
# finall.append(
|
||
# 'باهم تعارض دارند !'
|
||
# )
|
||
button.append(
|
||
[
|
||
{
|
||
"text": "بررسی نوع تعارض",
|
||
"callback_data": f"advanced_check_conflict_qsids:{qs_id}",
|
||
}
|
||
]
|
||
)
|
||
finall += ["توضیحات"]
|
||
finall += [_input.conflict_detection.explanation_of_conflict]
|
||
finall += [_end]
|
||
else:
|
||
# finall.append(
|
||
# 'باهم تعارض مستقیم ندارند'
|
||
# )
|
||
finall += ["توضیحات"]
|
||
finall += [_input.conflict_detection.explanation_of_conflict]
|
||
finall = ["\n".join(finall)]
|
||
return finall, button
|
||
|
||
if _input.subject_unity:
|
||
pass
|
||
# _input.subject_unity.has_subject_unity
|
||
# _input.subject_unity.required_assumptions
|
||
# _input.subject_unity.reasoning
|
||
|
||
return _input.model_dump()
|
||
|
||
|
||
async def result_gp(text, url, effort="low") -> Dict:
|
||
|
||
print(
|
||
f"text {type(text)}\n-> {text}",
|
||
)
|
||
try:
|
||
async with httpx.AsyncClient(timeout=TIME_OUT) as client:
|
||
response = await client.post(
|
||
url,
|
||
json={
|
||
"section_content": text,
|
||
"effort": "medium",
|
||
"mode_type": "bale",
|
||
},
|
||
)
|
||
response.raise_for_status()
|
||
response = response.json()
|
||
data = response.get("result", "❌ پاسخی دریافت نشد")
|
||
if isinstance(data, str):
|
||
return data
|
||
_output = []
|
||
for item in data:
|
||
_output.append(RuleRelation.parse_obj(item))
|
||
|
||
# print('results_chat ',type(result))
|
||
return _output
|
||
|
||
except Exception as e:
|
||
print(f"❌ خطای RAG:\n{str(e)}")
|
||
return "❌ ارتباط با سرور قطع میباشد"
|
||
|
||
|
||
def extract_other_info(update: BaleUpdate) -> dict:
|
||
other_info = {}
|
||
|
||
if update.message:
|
||
user = update.message.from_user
|
||
|
||
elif update.callback_query:
|
||
user = update.callback_query.from_user
|
||
|
||
else:
|
||
return other_info # خالی برگردان اگر هیچکدام نبود
|
||
|
||
# ایمن در برابر None
|
||
other_info["username"] = user.username or ""
|
||
other_info["first_name"] = user.first_name or ""
|
||
other_info["last_name"] = getattr(user, "last_name", "") or ""
|
||
|
||
return other_info
|
||
|
||
|
||
def get_in_form(title: str, sections: list):
|
||
chunks = []
|
||
current = f"برای پرسش: {title}\n\n"
|
||
|
||
for i, data in enumerate(sections, start=1):
|
||
sec_text = data.get("content", "")
|
||
idx = data.get("id")
|
||
|
||
# ساخت ref کامل
|
||
ref = make_link_qs(src=idx)
|
||
# متن کامل آیتم
|
||
block = f"{i}: {sec_text}\n{ref}\n\n"
|
||
|
||
# اگر با اضافه شدن این آیتم از حد مجاز عبور میکنیم → شروع چانک جدید
|
||
if len(current) + len(block) > MAX_LEN:
|
||
chunks.append(current.rstrip())
|
||
current = ""
|
||
|
||
current += block
|
||
|
||
# آخرین چانک را هم اضافه کن
|
||
if current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
return chunks
|
||
|
||
|
||
def form_search_in_law(title: str, sections: List) -> List:
|
||
chunks = []
|
||
current = f"برای پرسش: {title}\n\n"
|
||
|
||
for i, data in enumerate(sections, start=1):
|
||
sec_text = data.get("content", "")
|
||
idx = data.get("id")
|
||
|
||
# ساخت ref کامل
|
||
ref = make_link_qs(src=idx)
|
||
# متن کامل آیتم
|
||
block = f"{i}: {sec_text}\n{ref}\n\n"
|
||
|
||
# اگر با اضافه شدن این آیتم از حد مجاز عبور میکنیم → شروع چانک جدید
|
||
if len(current) + len(block) > MAX_LEN:
|
||
chunks.append(current.rstrip())
|
||
current = ""
|
||
|
||
current += block
|
||
|
||
# آخرین چانک را هم اضافه کن
|
||
if current.strip():
|
||
chunks.append(current.rstrip())
|
||
|
||
return chunks
|
||
|
||
|
||
def format_answer_bale(answer_text: str):
|
||
"""
|
||
answer_text: متن خروجی مدل که داخلش عبارتهای مثل (منبع: qs2117427) وجود دارد
|
||
sources: مثل ['qs2117427']
|
||
"""
|
||
|
||
# الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد
|
||
# مثلا: (qs123) یا (qs123, qs456, qs789)
|
||
pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)"
|
||
|
||
def replace_source(m):
|
||
content = m.group(1)
|
||
codes = [c.strip() for c in content.split(",")] # جداسازی چند کد
|
||
links = [make_link_qs(src=code) for code in codes]
|
||
full_match = m.group(0)
|
||
# if "منبع" in full_match:
|
||
# print(f'Found explicit source(s): {links}')
|
||
# else:
|
||
# print(f'Found implicit source(s): {links}')
|
||
return ", ".join(links) # جایگزینی همه کدها با لینکهایشان
|
||
|
||
# جایگزینی در متن
|
||
answer_text = re.sub(pattern, replace_source, answer_text)
|
||
|
||
# اگر طول کمتر از MAX_LEN بود → تمام
|
||
if len(answer_text) <= MAX_LEN:
|
||
return [answer_text]
|
||
|
||
# تقسیم متن اگر طول زیاد شد
|
||
chunks = []
|
||
current = ""
|
||
|
||
sentences = answer_text.split(". ")
|
||
for sentence in sentences:
|
||
st = sentence.strip()
|
||
if not st.endswith("."):
|
||
st += "."
|
||
|
||
if len(current) + len(st) > MAX_LEN:
|
||
chunks.append(current.strip())
|
||
current = ""
|
||
|
||
current += st + " "
|
||
|
||
if current.strip():
|
||
chunks.append(current.strip())
|
||
|
||
return chunks
|
||
|
||
|
||
def form_answer_bale(answer_text: str):
|
||
"""
|
||
answer_text: متن خروجی مدل که داخلش عبارتهای مثل (منبع: qs2117427) وجود دارد
|
||
sources: مثل ['qs2117427']
|
||
"""
|
||
|
||
# الگو برای تشخیص هر پرانتز که شامل یک یا چند کد باشد
|
||
# مثلا: (qs123) یا (qs123, qs456, qs789)
|
||
pattern = r"\((?:منبع[:: ]+)?([a-zA-Z0-9_, ]+)\)"
|
||
|
||
def replace_source(m):
|
||
content = m.group(1)
|
||
codes = [c.strip() for c in content.split(",")] # جداسازی چند کد
|
||
links = [make_link_qs(src=code) for code in codes]
|
||
full_match = m.group(0)
|
||
# if "منبع" in full_match:
|
||
# print(f'Found explicit source(s): {links}')
|
||
# else:
|
||
# print(f'Found implicit source(s): {links}')
|
||
return ", ".join(links) # جایگزینی همه کدها با لینکهایشان
|
||
|
||
# جایگزینی در متن
|
||
answer_text = re.sub(pattern, replace_source, answer_text)
|
||
|
||
# اگر طول کمتر از MAX_LEN بود → تمام
|
||
if len(answer_text) <= MAX_LEN:
|
||
return [answer_text]
|
||
|
||
# تقسیم متن اگر طول زیاد شد
|
||
chunks = []
|
||
current = ""
|
||
|
||
sentences = answer_text.split(". ")
|
||
for sentence in sentences:
|
||
st = sentence.strip()
|
||
if not st.endswith("."):
|
||
st += "."
|
||
|
||
if len(current) + len(st) > MAX_LEN:
|
||
chunks.append(current.strip())
|
||
current = ""
|
||
|
||
current += st + " "
|
||
|
||
if current.strip():
|
||
chunks.append(current.strip())
|
||
|
||
return chunks
|
||
|
||
|
||
def chunked_simple_text(answer_text):
|
||
chunks = []
|
||
current = ""
|
||
|
||
sentences = answer_text.split(". ")
|
||
for sentence in sentences:
|
||
st = sentence.strip()
|
||
if not st.endswith("."):
|
||
st += "."
|
||
|
||
if len(current) + len(st) > MAX_LEN:
|
||
chunks.append(current.strip())
|
||
current = ""
|
||
|
||
current += st + " "
|
||
|
||
if current.strip():
|
||
chunks.append(current.strip())
|
||
|
||
return chunks
|