nahj_rag/data_model.py
2026-02-23 11:49:43 +00:00

417 lines
14 KiB
Python

from datetime import datetime
import sqlite3
import json
# ایجاد جدول (فقط یک بار اجرا می‌شود)
def create_speechs_table():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
# ایجاد جدول اگر وجود نداشته باشد
cursor.execute('''
CREATE TABLE IF NOT EXISTS speeches (
id TEXT PRIMARY KEY,
context_id TEXT,
part_id TEXT,
title TEXT,
large_title TEXT,
normalized_sentence TEXT,
url TEXT,
types TEXT,
arabic_text TEXT,
interpretation_links TEXT
)
''')
conn.commit()
conn.close()
# درج یک لیست از داده‌ها در جدول
def insert_data(data_list):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
for data in data_list:
try:
cursor.execute('''
INSERT INTO speeches (id, context_id, part_id, title, large_title, normalized_sentence, url, types, arabic_text, interpretation_links)
VALUES (:id, :context_id, :part_id, :title, :large_title, :normalized_sentence, :url, :types, :arabic_text, :Interpretation_links)
''', data)
except sqlite3.IntegrityError:
print(f"Warning: Data with id '{data['id']}' already exists and was skipped.")
conn.commit()
conn.close()
print("Data inserted successfully!")
# خواندن داده بر اساس id
def get_data_by_id(record_id):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM speeches WHERE id = ?', (record_id,))
result = cursor.fetchone()
conn.close()
return result
# خواندن داده بر اساس part_id
def get_data_by_part_id(part_id):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM speeches WHERE part_id = ?', (part_id,))
result = cursor.fetchall()
conn.close()
return result
# خواندن داده بر اساس context_id
def get_data_by_context_id(context_id):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM speeches WHERE context_id = ?', (context_id,))
result = cursor.fetchall()
conn.close()
return result
# خواندن تمام داده‌ها
def get_all_data():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
# اجرای کوئری برای دریافت همه داده‌ها
cursor.execute('SELECT * FROM speeches')
result = cursor.fetchall()
# دریافت نام ستون‌ها
columns = [desc[0] for desc in cursor.description]
conn.close()
# تبدیل داده‌ها به دیکشنری (کلید-مقدار)
data_as_dict = [dict(zip(columns, row)) for row in result]
return data_as_dict
# متد ایجاد جدول چت
def create_chat_table():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
# ایجاد جدول اگر وجود نداشته باشد
cursor.execute('''
CREATE TABLE IF NOT EXISTS chat (
id TEXT PRIMARY KEY,
title TEXT,
user_id TEXT,
user_query TEXT,
model_key TEXT,
retrived_passage TEXT,
retrived_ref_ids TEXT,
prompt_type TEXT,
retrived_duration TEXT,
llm_duration TEXT,
full_duration TEXT,
time_create TEXT,
used_ref_ids TEXT,
prompt_answer TEXT,
status BOOLEAN
)
''')
conn.commit()
conn.close()
print("Chat table created successfully!")
# متد ذخیره یک داده تکی در جدول چت
def insert_chat_message(chat_obj):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
# تبدیل لیست‌ها یا دیکشنری‌ها به JSON برای ذخیره در جدول
chat_obj['retrived_ref_ids'] = json.dumps(chat_obj['retrived_ref_ids'])
chat_obj['used_ref_ids'] = json.dumps(chat_obj['used_ref_ids'])
try:
cursor.execute('''
INSERT INTO chat (id, title, user_id, user_query, model_key, retrived_passage, retrived_ref_ids, prompt_type, retrived_duration, llm_duration, full_duration, time_create, used_ref_ids, prompt_answer, status)
VALUES (:id, :title, :user_id, :user_query, :model_key, :retrived_passage, :retrived_ref_ids, :prompt_type, :retrived_duration, :llm_duration, :full_duration, :time_create, :used_ref_ids, :prompt_answer, :status)
''', chat_obj)
conn.commit()
print(f"Chat message with id '{chat_obj['id']}' inserted successfully!")
except sqlite3.IntegrityError:
print(f"Warning: Chat message with id '{chat_obj['id']}' already exists!")
conn.close()
# متد بازیابی داده بر اساس آی‌دی
def get_chat_message_by_id(chat_id):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM chat WHERE id = ?', (chat_id,))
result = cursor.fetchone()
conn.close()
if result:
# تبدیل ستون‌های JSON به نوع اصلی (لیست یا دیکشنری)
result_dict = {
"id": result[0],
"title": result[1],
"user_id": result[2],
"user_query": result[3],
"model_key": result[4],
"retrived_passage": result[5],
"retrived_ref_ids": json.loads(result[6]),
"prompt_type": result[7],
"retrived_duration": result[8],
"llm_duration": result[9],
"full_duration": result[10],
"time_create": result[11],
"used_ref_ids": json.loads(result[12]),
"prompt_answer": result[13],
"status": bool(result[14])
}
return result_dict
return None
# متد بازیابی چت‌ها بر اساس user_id
def get_chats_by_user_id(user_id):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM chat WHERE user_id = ?', (user_id,))
results = cursor.fetchall()
conn.close()
# تبدیل هر رکورد به دیکشنری و مدیریت ستون‌های JSON
chats = []
for result in results:
chat = {
"id": result[0],
"title": result[1],
"user_id": result[2],
"user_query": result[3],
"model_key": result[4],
"retrived_passage": result[5],
"retrived_ref_ids": json.loads(result[6]),
"prompt_type": result[7],
"retrived_duration": result[8],
"llm_duration": result[9],
"full_duration": result[10],
"time_create": result[11],
"used_ref_ids": json.loads(result[12]),
"prompt_answer": result[13],
"status": bool(result[14])
}
chats.append(chat)
return chats
def insert_error(query, error_message):
conn = sqlite3.connect('./db/nahj.db') # اتصال به دیتابیس
cursor = conn.cursor()
try:
# درج داده در جدول `error`
cursor.execute('''
INSERT INTO error (query, error_message)
VALUES (?, ?)
''', (query, error_message))
conn.commit() # ذخیره تغییرات
print("Error inserted successfully.")
except Exception as e:
print(f"Failed to insert error: {e}")
finally:
conn.close() # بستن اتصال
# اضافه کردن یک رکورد جدید
def add_credit(remained_credit, date=None):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
if date is None:
date = datetime.now().strftime('%Y-%m-%d')
try:
cursor.execute('''
INSERT INTO credit (remained_credit, date)
VALUES (?, ?)
''', (remained_credit, date))
conn.commit()
print("Record added successfully.")
except Exception as e:
print(f"Failed to add record: {e}")
finally:
conn.close()
# بازخوانی آخرین رکورد
def get_last_credit():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
try:
cursor.execute('''
SELECT * FROM credit
ORDER BY id DESC
LIMIT 1
''')
last_record = cursor.fetchone()
conn.close()
if last_record:
return {"id": last_record[0], "remained_credit": last_record[1], "date": last_record[2]}
else:
return None
except Exception as e:
print(f"Failed to fetch last record: {e}")
conn.close()
return None
def insert_request(update_id, username=None, text=None, answer=None, message_id=None, user_id=None, is_bot=None, date_value=None, chat_id=None, req_type=None, first_name=None, last_name=None, is_active=None):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
if date_value:
date_value = datetime.fromtimestamp(date_value).strftime('%Y-%m-%d %H:%M:%S')
try:
# درج رکورد جدید در جدول `requests`
cursor.execute('''
INSERT INTO requests (update_id, username, text, answer, message_id, user_id, is_bot, date, chat_id, type, first_name, last_name, is_active)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (update_id, username, text, answer, message_id, user_id, is_bot, date_value, chat_id, req_type, first_name, last_name, is_active))
conn.commit()
# print("Record inserted successfully.")
except Exception as e:
print(f"Failed to insert record: {e}")
finally:
conn.close()
def update_request(update_id, username=None, text=None, answer=None, message_id=None, user_id=None, is_bot=None,
date_value=None, chat_id=None, req_type=None, first_name=None, last_name=None, is_active=None):
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
try:
# لیست ستون‌هایی که به‌روزرسانی می‌شوند
update_fields = []
update_values = []
if username is not None:
update_fields.append("username = ?")
update_values.append(username)
if text is not None:
update_fields.append("text = ?")
update_values.append(text)
if answer is not None:
update_fields.append("answer = ?")
update_values.append(answer)
if message_id is not None:
update_fields.append("message_id = ?")
update_values.append(message_id)
if user_id is not None:
update_fields.append("user_id = ?")
update_values.append(user_id)
if is_bot is not None:
update_fields.append("is_bot = ?")
update_values.append(is_bot)
if date_value is not None:
update_fields.append("date = ?")
update_values.append(date_value)
if chat_id is not None:
update_fields.append("chat_id = ?")
update_values.append(chat_id)
if req_type is not None:
update_fields.append("type = ?")
update_values.append(req_type)
if first_name is not None:
update_fields.append("first_name = ?")
update_values.append(first_name)
if last_name is not None:
update_fields.append("last_name = ?")
update_values.append(last_name)
if is_active is not None:
update_fields.append("is_active = ?")
update_values.append(is_active)
# افزودن مقدار `update_id` برای شرط WHERE
update_values.append(update_id)
# اجرای کوئری آپدیت
cursor.execute(f'''
UPDATE requests
SET {', '.join(update_fields)}
WHERE update_id = ?
''', update_values)
conn.commit()
# print("Record updated successfully.")
except Exception as e:
print(f"Failed to update record: {e}")
finally:
conn.close()
def get_last_request():
conn = sqlite3.connect('./db/nahj.db')
cursor = conn.cursor()
try:
# بازخوانی آخرین رکورد بر اساس بیشترین مقدار update_id
cursor.execute('''
SELECT * FROM requests
ORDER BY update_id DESC
LIMIT 1
''')
last_record = cursor.fetchone()
conn.close()
if last_record:
return {
"update_id": last_record[0],
"username": last_record[1],
"text": last_record[2],
"answer": last_record[3],
"message_id": last_record[4],
"user_id": last_record[5],
"is_bot": last_record[6],
"date": last_record[7],
"chat_id": last_record[8],
"type": last_record[9],
"first_name": last_record[10],
"last_name": last_record[11],
"is_active": last_record[12]
}
else:
return None # اگر جدولی خالی باشد
except Exception as e:
print(f"Failed to fetch last record: {e}")
conn.close()
return None
def create_tables():
create_speechs_table()
create_chat_table()
# مثال استفاده
if __name__ == "__main__":
pass
# ایجاد جدول (فقط بار اول اجرا می‌شود)
#create_tables()
# درج داده‌ها
# with open('./data-faiss/faiss_index_nahj_metadata.json', 'r', encoding='utf-8') as file:
# data = json.load(file)
# insert_data(data)