nahj_rag/data_model.py
2026-05-17 21:33:25 +03:30

549 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
import sqlite3
import json
# ایجاد جدول (فقط یک بار اجرا می‌شود)
def create_speechs_table():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
# ایجاد جدول اگر وجود نداشته باشد
cursor.execute('''
CREATE TABLE IF NOT EXISTS speeches (
id TEXT PRIMARY KEY,
context_id TEXT,
part_id TEXT,
title TEXT,
large_title TEXT,
normalized_sentence TEXT,
url TEXT,
types TEXT,
arabic_text TEXT,
interpretation_links TEXT,
ai_title TEXT,
ai_paragraph_type TEXT
)
''')
conn.commit()
conn.close()
# درج یک لیست از داده‌ها در جدول
def insert_data(data_list):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
for data in data_list:
try:
cursor.execute('''
INSERT INTO speeches (id, context_id, part_id, title, large_title, normalized_sentence, url, types, arabic_text, interpretation_links, ai_title, ai_paragraph_type)
VALUES (:id, :context_id, :part_id, :title, :large_title, :norm_sentence, :url, :types, :arabic_text, :Interpretation_links, :ai_title, :ai_paragraph_type)
''', data)
except sqlite3.IntegrityError:
print(f"Warning: Data with id '{data['id']}' already exists and was skipped.")
conn.commit()
conn.close()
print("Data inserted successfully!")
# درج داده ها به صورت تکی
def insert_data_to_speechs(data: list):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO speeches (id, context_id, part_id, title, large_title, normalized_sentence, url, types, arabic_text, interpretation_links, ai_title, ai_paragraph_type)
VALUES (:id, :context_id, :part_id, :title, :large_title, :norm_sentence, :url, :types, :arabic_text, :Interpretation_links, :ai_title, :ai_paragraph_type)
''', data)
except sqlite3.IntegrityError:
print(f"Warning: Data with id '{data['id']}' already exists and was skipped.")
conn.commit()
conn.close()
print("Data inserted successfully!")
def insert_data_to_central_concepts(data: list):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO central_concepts (id, concept, paragraph_effect, part_id)
VALUES (:id, :concept, :paragraph_effect, :part_id)
''', data)
except sqlite3.IntegrityError:
print(f"Warning: Data with id '{data['id']}' already exists and was skipped.")
conn.commit()
conn.close()
print("Data inserted successfully!")
def insert_data_to_persons(data: list ):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO persons (id, person, part_id)
VALUES (:id, :person, :part_id)
''', data)
except sqlite3.IntegrityError:
print(f"Warning: Data with id '{data['id']}' already exists and was skipped.")
conn.commit()
conn.close()
print("Data inserted successfully!")
def insert_data_to_rules(data: list ):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO rules (id, rule, type, part_id)
VALUES (:id, :rule, :type, :part_id)
''', data)
except sqlite3.IntegrityError:
print(f"Warning: Data with id '{data['id']}' already exists and was skipped.")
conn.commit()
conn.close()
print("Data inserted successfully!")
# خواندن داده بر اساس id
def get_data_by_id(record_id):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM speeches WHERE id = ?', (record_id,))
result = cursor.fetchone()
conn.close()
return result
# خواندن داده بر اساس part_id
def get_data_by_part_id(part_id):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM speeches WHERE part_id = ?', (part_id,))
result = cursor.fetchall()
conn.close()
return result
# خواندن داده بر اساس context_id
def get_data_by_context_id(context_id):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM speeches WHERE context_id = ?', (context_id,))
result = cursor.fetchall()
conn.close()
return result
# خواندن تمام داده‌ها
def get_all_data():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
# اجرای کوئری برای دریافت همه داده‌ها
cursor.execute('SELECT * FROM speeches')
result = cursor.fetchall()
# دریافت نام ستون‌ها
columns = [desc[0] for desc in cursor.description]
conn.close()
# تبدیل داده‌ها به دیکشنری (کلید-مقدار)
data_as_dict = [dict(zip(columns, row)) for row in result]
return data_as_dict
# متد ایجاد جدول چت
def create_chat_table():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
# ایجاد جدول اگر وجود نداشته باشد
cursor.execute('''
CREATE TABLE IF NOT EXISTS chat (
id TEXT PRIMARY KEY,
title TEXT,
user_id TEXT,
user_query TEXT,
model_key TEXT,
retrived_passage TEXT,
retrived_ref_ids TEXT,
prompt_type TEXT,
retrived_duration TEXT,
llm_duration TEXT,
full_duration TEXT,
time_create TEXT,
used_ref_ids TEXT,
prompt_answer TEXT,
status BOOLEAN
)
''')
conn.commit()
conn.close()
print("Chat table created successfully!")
# متد ذخیره یک داده تکی در جدول چت
def insert_chat_message(chat_obj):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
# تبدیل لیست‌ها یا دیکشنری‌ها به JSON برای ذخیره در جدول
chat_obj['retrived_ref_ids'] = json.dumps(chat_obj['retrived_ref_ids'])
chat_obj['used_ref_ids'] = json.dumps(chat_obj['used_ref_ids'])
try:
cursor.execute('''
INSERT INTO chat (id, title, user_id, user_query, model_key, retrived_passage, retrived_ref_ids, prompt_type, retrived_duration, llm_duration, full_duration, time_create, used_ref_ids, prompt_answer, status)
VALUES (:id, :title, :user_id, :user_query, :model_key, :retrived_passage, :retrived_ref_ids, :prompt_type, :retrived_duration, :llm_duration, :full_duration, :time_create, :used_ref_ids, :prompt_answer, :status)
''', chat_obj)
conn.commit()
print(f"Chat message with id '{chat_obj['id']}' inserted successfully!")
except sqlite3.IntegrityError:
print(f"Warning: Chat message with id '{chat_obj['id']}' already exists!")
conn.close()
# متد بازیابی داده بر اساس آی‌دی
def get_chat_message_by_id(chat_id):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM chat WHERE id = ?', (chat_id,))
result = cursor.fetchone()
conn.close()
if result:
# تبدیل ستون‌های JSON به نوع اصلی (لیست یا دیکشنری)
result_dict = {
"id": result[0],
"title": result[1],
"user_id": result[2],
"user_query": result[3],
"model_key": result[4],
"retrived_passage": result[5],
"retrived_ref_ids": json.loads(result[6]),
"prompt_type": result[7],
"retrived_duration": result[8],
"llm_duration": result[9],
"full_duration": result[10],
"time_create": result[11],
"used_ref_ids": json.loads(result[12]),
"prompt_answer": result[13],
"status": bool(result[14])
}
return result_dict
return None
# متد بازیابی چت‌ها بر اساس user_id
def get_chats_by_user_id(user_id):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM chat WHERE user_id = ?', (user_id,))
results = cursor.fetchall()
conn.close()
# تبدیل هر رکورد به دیکشنری و مدیریت ستون‌های JSON
chats = []
for result in results:
chat = {
"id": result[0],
"title": result[1],
"user_id": result[2],
"user_query": result[3],
"model_key": result[4],
"retrived_passage": result[5],
"retrived_ref_ids": json.loads(result[6]),
"prompt_type": result[7],
"retrived_duration": result[8],
"llm_duration": result[9],
"full_duration": result[10],
"time_create": result[11],
"used_ref_ids": json.loads(result[12]),
"prompt_answer": result[13],
"status": bool(result[14])
}
chats.append(chat)
return chats
def insert_error(query, error_message):
conn = sqlite3.connect('./db/nahj.db') # اتصال به دیتابیس
cursor = conn.cursor()
try:
# درج داده در جدول `error`
cursor.execute('''
INSERT INTO error (query, error_message)
VALUES (?, ?)
''', (query, error_message))
conn.commit() # ذخیره تغییرات
print("Error inserted successfully.")
except Exception as e:
print(f"Failed to insert error: {e}")
finally:
conn.close() # بستن اتصال
# اضافه کردن یک رکورد جدید
def add_credit(remained_credit, date=None):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
if date is None:
date = datetime.now().strftime('%Y-%m-%d')
try:
cursor.execute('''
INSERT INTO credit (remained_credit, date)
VALUES (?, ?)
''', (remained_credit, date))
conn.commit()
print("Record added successfully.")
except Exception as e:
print(f"Failed to add record: {e}")
finally:
conn.close()
# بازخوانی آخرین رکورد
def get_last_credit():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
try:
cursor.execute('''
SELECT * FROM credit
ORDER BY id DESC
LIMIT 1
''')
last_record = cursor.fetchone()
conn.close()
if last_record:
return {"id": last_record[0], "remained_credit": last_record[1], "date": last_record[2]}
else:
return None
except Exception as e:
print(f"Failed to fetch last record: {e}")
conn.close()
return None
def insert_request(update_id, username=None, text=None, answer=None, message_id=None, user_id=None, is_bot=None, date_value=None, chat_id=None, req_type=None, first_name=None, last_name=None, is_active=None):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
if date_value:
date_value = datetime.fromtimestamp(date_value).strftime('%Y-%m-%d %H:%M:%S')
try:
# درج رکورد جدید در جدول `requests`
cursor.execute('''
INSERT INTO requests (update_id, username, text, answer, message_id, user_id, is_bot, date, chat_id, type, first_name, last_name, is_active)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (update_id, username, text, answer, message_id, user_id, is_bot, date_value, chat_id, req_type, first_name, last_name, is_active))
conn.commit()
# print("Record inserted successfully.")
except Exception as e:
print(f"Failed to insert record: {e}")
finally:
conn.close()
def update_request(update_id, username=None, text=None, answer=None, message_id=None, user_id=None, is_bot=None,
date_value=None, chat_id=None, req_type=None, first_name=None, last_name=None, is_active=None):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
try:
# لیست ستون‌هایی که به‌روزرسانی می‌شوند
update_fields = []
update_values = []
if username is not None:
update_fields.append("username = ?")
update_values.append(username)
if text is not None:
update_fields.append("text = ?")
update_values.append(text)
if answer is not None:
update_fields.append("answer = ?")
update_values.append(answer)
if message_id is not None:
update_fields.append("message_id = ?")
update_values.append(message_id)
if user_id is not None:
update_fields.append("user_id = ?")
update_values.append(user_id)
if is_bot is not None:
update_fields.append("is_bot = ?")
update_values.append(is_bot)
if date_value is not None:
update_fields.append("date = ?")
update_values.append(date_value)
if chat_id is not None:
update_fields.append("chat_id = ?")
update_values.append(chat_id)
if req_type is not None:
update_fields.append("type = ?")
update_values.append(req_type)
if first_name is not None:
update_fields.append("first_name = ?")
update_values.append(first_name)
if last_name is not None:
update_fields.append("last_name = ?")
update_values.append(last_name)
if is_active is not None:
update_fields.append("is_active = ?")
update_values.append(is_active)
# افزودن مقدار `update_id` برای شرط WHERE
update_values.append(update_id)
# اجرای کوئری آپدیت
cursor.execute(f'''
UPDATE requests
SET {', '.join(update_fields)}
WHERE update_id = ?
''', update_values)
conn.commit()
# print("Record updated successfully.")
except Exception as e:
print(f"Failed to update record: {e}")
finally:
conn.close()
def get_last_request():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
try:
# بازخوانی آخرین رکورد بر اساس بیشترین مقدار update_id
cursor.execute('''
SELECT * FROM requests
ORDER BY update_id DESC
LIMIT 1
''')
last_record = cursor.fetchone()
conn.close()
if last_record:
return {
"update_id": last_record[0],
"username": last_record[1],
"text": last_record[2],
"answer": last_record[3],
"message_id": last_record[4],
"user_id": last_record[5],
"is_bot": last_record[6],
"date": last_record[7],
"chat_id": last_record[8],
"type": last_record[9],
"first_name": last_record[10],
"last_name": last_record[11],
"is_active": last_record[12]
}
else:
return None # اگر جدولی خالی باشد
except Exception as e:
print(f"Failed to fetch last record: {e}")
conn.close()
return None
def create_central_concepts_table():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
# ایجاد جدول اگر وجود نداشته باشد
cursor.execute('''
CREATE TABLE IF NOT EXISTS central_concepts (
id TEXT PRIMARY KEY,
concept TEXT,
paragraph_effect REAL,
part_id TEXT
)
''')
conn.commit()
conn.close()
print("central_concepts table created successfully!")
def create_persons_table():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
# ایجاد جدول اگر وجود نداشته باشد
cursor.execute('''
CREATE TABLE IF NOT EXISTS persons (
id TEXT PRIMARY KEY,
person TEXT,
part_id TEXT
)
''')
conn.commit()
conn.close()
print("persons table created successfully!")
def create_rules_table():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
# ایجاد جدول اگر وجود نداشته باشد
cursor.execute('''
CREATE TABLE IF NOT EXISTS rules (
id TEXT PRIMARY KEY,
rule TEXT,
type TEXT,
part_id TEXT
)
''')
conn.commit()
conn.close()
print("rules table created successfully!")
def create_tables():
create_speechs_table()
create_chat_table()
create_rules_table()
create_persons_table()
create_central_concepts_table()
# مثال استفاده
if __name__ == "__main__":
pass
# ایجاد جدول (فقط بار اول اجرا می‌شود)
# create_tables()
# درج داده‌ها
with open('./data-faiss/faiss_index_nahj_metadata.json', 'r', encoding='utf-8') as file:
data = json.load(file)
insert_data(data)