elastic_backend_fast/app/routes/v1/elastic.py
2025-11-29 15:48:25 +03:30

224 lines
9.1 KiB
Python

from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException, Request
from app.core.elastic_client_helper import ElasticClientHelper
from app.core.map_index_reader import MapIndexReader
from app.core.elastic_query_builder import ElasticQueryBuilder
from app.core.field_processor import FieldProcessor
from app.core.response_helper import ResponseHelper
from app.routes.v1.models import (
ExportToFileRequest,
SearchRequest,
InsertRequest,
UpdateByQueryRequest,
DeleteByQueryRequest,
)
from typing import Any, Dict, List, Optional
import time
from app.config.settings import get_settings, Settings
from app.lib.general_functions import is_user_permit_action
router = APIRouter(tags=["elasticsearch"])
settings: Settings = get_settings()
def get_elastic_helper(request: Request):
helper = getattr(request.app.state, "elastic_helper", None)
if helper is None:
raise RuntimeError("Elasticsearch helper not initialized")
return helper
@router.post("/indices/{type_name}/insert")
async def insert(type_name: str, payload: InsertRequest, request: Request):
if not is_user_permit_action(request.state.user_id, f"{type_name}_update", f"{type_name}_insert", request.state.app) :
message = "------ > not access " + str(request.state.user_id) + f" {type_name}_update"
print(message )
raise HTTPException(status_code=400, detail=message)
reader = MapIndexReader(type_name)
helper = get_elastic_helper(request)
field_processor = FieldProcessor(reader)
validate_doc, document = field_processor.validate_document(payload.document, payload.id)
edition_doc = field_processor.edition_document(payload.document)
document = edition_doc
if not validate_doc["valid"]:
raise HTTPException(status_code=400, detail=str(validate_doc))
processed_doc = field_processor.process_joinning_document(document)
doc = processed_doc
try:
es_res = await helper.index_document(reader.get_index_name(), doc, id=payload.id, refresh="wait_for")
return {"success": True, "result": es_res}
except Exception as exc: # noqa: BLE001
raise HTTPException(status_code=400, detail=str(exc))
@router.post("/indices/{type_name}/update/{id}")
async def update(type_name: str, id: str , payload: InsertRequest, request: Request):
payload.id = id
if not is_user_permit_action(request.state.user_id, f"{type_name}_update", f"{type_name}_insert", request.state.app) :
message = "------ > not access " + str(request.state.user_id) + f" {type_name}_update"
print(message )
raise HTTPException(status_code=400, detail=message)
reader = MapIndexReader(type_name)
helper = get_elastic_helper(request)
field_processor = FieldProcessor(reader)
validate_doc, document = field_processor.validate_document(payload.document, payload.id, False)
edition_doc = field_processor.edition_document(payload.document)
document = edition_doc
if not validate_doc["valid"]:
raise HTTPException(status_code=400, detail=str(validate_doc))
processed_doc = field_processor.process_joinning_document(document)
doc = processed_doc
try:
es_res = await helper.update_or_index_document(index=reader.get_index_name(), data=doc, document_id=payload.id, operation_type="update", refresh="wait_for")
return {"success": True, "result": es_res}
except Exception as exc: # noqa: BLE001
raise HTTPException(status_code=400, detail= 'ssss' + str(exc))
@router.post("/indices/{type_name}/delete/{id}")
async def delete(type_name: str, id: str , request: Request):
if not is_user_permit_action(request.state.user_id, f"{type_name}_update", f"{type_name}_delete", request.state.app) :
message = "------ > not access " + str(request.state.user_id) + f" {type_name}_update"
print(message )
raise HTTPException(status_code=400, detail=message)
reader = MapIndexReader(type_name)
helper = get_elastic_helper(request)
try:
es_res = await helper.delete_document(index=reader.get_index_name(), id=id, refresh="wait_for")
return {"success": True, "result": es_res}
except Exception as exc: # noqa: BLE001
raise HTTPException(status_code=400, detail=str(exc))
@router.get("/indices/{type_name}/{doc_id}")
async def get_by_id(type_name: str, doc_id: str, request: Request):
reader = MapIndexReader(type_name)
helper = get_elastic_helper(request)
try:
es_res = await helper.get_by_id(reader.get_index_name(), doc_id)
return ResponseHelper().normalize_get_response(es_res)
except Exception as exc: # noqa: BLE001
raise HTTPException(status_code=404, detail=str(exc))
#---------------------------------------------------
#--
#-- payload --> body = [ {....}, {....},{....} ]
#--
@router.post("/indices/{type_name}/insert/multi")
async def insertMulti(type_name: str, payload: List[InsertRequest], request: Request):
if not is_user_permit_action(request.state.user_id, f"{type_name}_update", f"{type_name}_insert", request.state.app) :
message = "------ > not access " + str(request.state.user_id) + f" {type_name}_update"
print(message )
raise HTTPException(status_code=400, detail=message)
reader = MapIndexReader(type_name)
helper = get_elastic_helper(request)
field_processor = FieldProcessor(reader)
list_new = []
list_errors = []
for i, item in enumerate(payload) :
id = item.id if item.id else ''
validate_doc, document = field_processor.validate_document(item.document, id)
if not validate_doc["valid"]:
validate_doc["index"] = i
validate_doc["doc"] = item
list_errors.append(validate_doc)
else:
processed_doc = field_processor.process_joinning_document(document)
if not "id" in processed_doc and id:
processed_doc["id"] = id
list_new.append(processed_doc)
try:
if list_new :
es_res = await helper.bulk_insert(reader.get_index_name(), list_new, refresh="wait_for")
return {"success": True, "validation_errors": list_errors, "result": es_res}
except Exception as exc: # noqa: BLE001
raise HTTPException(status_code=400, detail=str(exc))
#---------------------------------------------------
#--
@router.post("/indices/{type_name}/export-to-file")
async def export_to_file(type_name:str, payload: ExportToFileRequest,request:Request):
print("1 -->", time.time())
reader = MapIndexReader(type_name)
helper :ElasticClientHelper= get_elastic_helper(request)
builder = ElasticQueryBuilder()
result = await helper.export_to_file(
**dict(payload),index_name=reader.get_index_name()
)
return result
@router.post("/indices/{type_name}/search")
async def search(type_name: str, payload: SearchRequest, request: Request):
print("1 -->", time.time())
reader = MapIndexReader(type_name)
helper = get_elastic_helper(request)
builder = ElasticQueryBuilder()
print("2 -->", time.time())
# print(payload)
body = builder.build_search_query(reader, payload)
print("body -->", body)
try:
es_res = await helper.search(reader.get_index_name(), body)
print("3 -->", time.time())
collapse_field = body.get("collapse", {}).get("field") if body.get("collapse") else None
bookmark_id = payload.bookmark_id if payload.bookmark_id else ''
# print(es_res)
res = ResponseHelper(helper, reader).normalize_search_response(es_res, collapse_field, bookmark_id, payload.mode_response)
print("4 -->", time.time())
return res
except Exception as exc: # noqa: BLE001
raise HTTPException(status_code=400, detail=str(exc))
# @router.post("/indices/{type_name}/list")
# async def search(type_name: str, payload: SearchRequest, request: Request):
# reader = MapIndexReader(type_name)
# helper = get_elastic_helper(request)
# builder = ElasticQueryBuilder()
# body = builder.build_search_query(reader, payload)
# try:
# es_res = await helper.search(reader.get_index_name(), body)
# return ResponseHelper().normalize_search_response(es_res, body.get("collapse", {}).get("field") if body.get("collapse") else None)
# except Exception as exc: # noqa: BLE001
# raise HTTPException(status_code=400, detail=str(exc))
# @router.post("/indices/{type_name}/insert/{entity_type}/{data_type}/{ref_key}")
# @router.post("/indices/{type_name}/insert/favorite/{data_type}/{ref_key}")
# @router.post("/indices/{type_name}/insert/favorite/entity/{ref_key}")
# @router.post("/indices/{type_name}/insert/history/{data_type}/{ref_key}")
# @router.post("/indices/{type_name}/insert/history/{data_type}/{ref_key}")
# @router.post("/indices/{type_name}/insert/")
# async def insert1(type_name: str, payload: InsertRequest, request: Request):
# payload.document.entity_type = entity_type
# payload.document.data_type = data_type
# payload.document.ref_key = ref_key