224 lines
9.1 KiB
Python
224 lines
9.1 KiB
Python
from __future__ import annotations
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request
|
|
|
|
from app.core.elastic_client_helper import ElasticClientHelper
|
|
from app.core.map_index_reader import MapIndexReader
|
|
from app.core.elastic_query_builder import ElasticQueryBuilder
|
|
from app.core.field_processor import FieldProcessor
|
|
from app.core.response_helper import ResponseHelper
|
|
from app.routes.v1.models import (
|
|
ExportToFileRequest,
|
|
SearchRequest,
|
|
InsertRequest,
|
|
UpdateByQueryRequest,
|
|
DeleteByQueryRequest,
|
|
)
|
|
from typing import Any, Dict, List, Optional
|
|
import time
|
|
from app.config.settings import get_settings, Settings
|
|
from app.lib.general_functions import is_user_permit_action
|
|
|
|
|
|
router = APIRouter(tags=["elasticsearch"])
|
|
settings: Settings = get_settings()
|
|
|
|
|
|
def get_elastic_helper(request: Request):
|
|
helper = getattr(request.app.state, "elastic_helper", None)
|
|
if helper is None:
|
|
raise RuntimeError("Elasticsearch helper not initialized")
|
|
return helper
|
|
|
|
@router.post("/indices/{type_name}/insert")
|
|
async def insert(type_name: str, payload: InsertRequest, request: Request):
|
|
|
|
|
|
if not is_user_permit_action(request.state.user_id, f"{type_name}_update", f"{type_name}_insert", request.state.app) :
|
|
message = "------ > not access " + str(request.state.user_id) + f" {type_name}_update"
|
|
print(message )
|
|
raise HTTPException(status_code=400, detail=message)
|
|
|
|
reader = MapIndexReader(type_name)
|
|
helper = get_elastic_helper(request)
|
|
field_processor = FieldProcessor(reader)
|
|
|
|
validate_doc, document = field_processor.validate_document(payload.document, payload.id)
|
|
edition_doc = field_processor.edition_document(payload.document)
|
|
document = edition_doc
|
|
if not validate_doc["valid"]:
|
|
raise HTTPException(status_code=400, detail=str(validate_doc))
|
|
|
|
processed_doc = field_processor.process_joinning_document(document)
|
|
doc = processed_doc
|
|
try:
|
|
es_res = await helper.index_document(reader.get_index_name(), doc, id=payload.id, refresh="wait_for")
|
|
return {"success": True, "result": es_res}
|
|
except Exception as exc: # noqa: BLE001
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
|
|
|
|
@router.post("/indices/{type_name}/update/{id}")
|
|
async def update(type_name: str, id: str , payload: InsertRequest, request: Request):
|
|
payload.id = id
|
|
|
|
if not is_user_permit_action(request.state.user_id, f"{type_name}_update", f"{type_name}_insert", request.state.app) :
|
|
message = "------ > not access " + str(request.state.user_id) + f" {type_name}_update"
|
|
print(message )
|
|
raise HTTPException(status_code=400, detail=message)
|
|
|
|
reader = MapIndexReader(type_name)
|
|
helper = get_elastic_helper(request)
|
|
field_processor = FieldProcessor(reader)
|
|
|
|
validate_doc, document = field_processor.validate_document(payload.document, payload.id, False)
|
|
edition_doc = field_processor.edition_document(payload.document)
|
|
document = edition_doc
|
|
if not validate_doc["valid"]:
|
|
raise HTTPException(status_code=400, detail=str(validate_doc))
|
|
|
|
processed_doc = field_processor.process_joinning_document(document)
|
|
doc = processed_doc
|
|
try:
|
|
es_res = await helper.update_or_index_document(index=reader.get_index_name(), data=doc, document_id=payload.id, operation_type="update", refresh="wait_for")
|
|
return {"success": True, "result": es_res}
|
|
except Exception as exc: # noqa: BLE001
|
|
raise HTTPException(status_code=400, detail= 'ssss' + str(exc))
|
|
|
|
@router.post("/indices/{type_name}/delete/{id}")
|
|
async def delete(type_name: str, id: str , request: Request):
|
|
|
|
if not is_user_permit_action(request.state.user_id, f"{type_name}_update", f"{type_name}_delete", request.state.app) :
|
|
message = "------ > not access " + str(request.state.user_id) + f" {type_name}_update"
|
|
print(message )
|
|
raise HTTPException(status_code=400, detail=message)
|
|
|
|
reader = MapIndexReader(type_name)
|
|
helper = get_elastic_helper(request)
|
|
try:
|
|
es_res = await helper.delete_document(index=reader.get_index_name(), id=id, refresh="wait_for")
|
|
return {"success": True, "result": es_res}
|
|
except Exception as exc: # noqa: BLE001
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
|
|
|
|
@router.get("/indices/{type_name}/{doc_id}")
|
|
async def get_by_id(type_name: str, doc_id: str, request: Request):
|
|
reader = MapIndexReader(type_name)
|
|
helper = get_elastic_helper(request)
|
|
try:
|
|
es_res = await helper.get_by_id(reader.get_index_name(), doc_id)
|
|
return ResponseHelper().normalize_get_response(es_res)
|
|
except Exception as exc: # noqa: BLE001
|
|
raise HTTPException(status_code=404, detail=str(exc))
|
|
|
|
|
|
|
|
#---------------------------------------------------
|
|
#--
|
|
#-- payload --> body = [ {....}, {....},{....} ]
|
|
#--
|
|
@router.post("/indices/{type_name}/insert/multi")
|
|
async def insertMulti(type_name: str, payload: List[InsertRequest], request: Request):
|
|
|
|
if not is_user_permit_action(request.state.user_id, f"{type_name}_update", f"{type_name}_insert", request.state.app) :
|
|
message = "------ > not access " + str(request.state.user_id) + f" {type_name}_update"
|
|
print(message )
|
|
raise HTTPException(status_code=400, detail=message)
|
|
|
|
reader = MapIndexReader(type_name)
|
|
helper = get_elastic_helper(request)
|
|
field_processor = FieldProcessor(reader)
|
|
|
|
list_new = []
|
|
list_errors = []
|
|
for i, item in enumerate(payload) :
|
|
id = item.id if item.id else ''
|
|
validate_doc, document = field_processor.validate_document(item.document, id)
|
|
if not validate_doc["valid"]:
|
|
validate_doc["index"] = i
|
|
validate_doc["doc"] = item
|
|
list_errors.append(validate_doc)
|
|
else:
|
|
processed_doc = field_processor.process_joinning_document(document)
|
|
if not "id" in processed_doc and id:
|
|
processed_doc["id"] = id
|
|
list_new.append(processed_doc)
|
|
|
|
try:
|
|
if list_new :
|
|
es_res = await helper.bulk_insert(reader.get_index_name(), list_new, refresh="wait_for")
|
|
return {"success": True, "validation_errors": list_errors, "result": es_res}
|
|
except Exception as exc: # noqa: BLE001
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
|
|
|
|
#---------------------------------------------------
|
|
#--
|
|
@router.post("/indices/{type_name}/export-to-file")
|
|
async def export_to_file(type_name:str, payload: ExportToFileRequest,request:Request):
|
|
print("1 -->", time.time())
|
|
reader = MapIndexReader(type_name)
|
|
helper :ElasticClientHelper= get_elastic_helper(request)
|
|
builder = ElasticQueryBuilder()
|
|
result = await helper.export_to_file(
|
|
**dict(payload),index_name=reader.get_index_name()
|
|
)
|
|
return result
|
|
|
|
|
|
|
|
|
|
@router.post("/indices/{type_name}/search")
|
|
async def search(type_name: str, payload: SearchRequest, request: Request):
|
|
print("1 -->", time.time())
|
|
reader = MapIndexReader(type_name)
|
|
helper = get_elastic_helper(request)
|
|
builder = ElasticQueryBuilder()
|
|
print("2 -->", time.time())
|
|
# print(payload)
|
|
body = builder.build_search_query(reader, payload)
|
|
print("body -->", body)
|
|
|
|
try:
|
|
es_res = await helper.search(reader.get_index_name(), body)
|
|
print("3 -->", time.time())
|
|
collapse_field = body.get("collapse", {}).get("field") if body.get("collapse") else None
|
|
bookmark_id = payload.bookmark_id if payload.bookmark_id else ''
|
|
# print(es_res)
|
|
res = ResponseHelper(helper, reader).normalize_search_response(es_res, collapse_field, bookmark_id, payload.mode_response)
|
|
print("4 -->", time.time())
|
|
return res
|
|
except Exception as exc: # noqa: BLE001
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
|
|
|
|
# @router.post("/indices/{type_name}/list")
|
|
# async def search(type_name: str, payload: SearchRequest, request: Request):
|
|
# reader = MapIndexReader(type_name)
|
|
# helper = get_elastic_helper(request)
|
|
# builder = ElasticQueryBuilder()
|
|
|
|
# body = builder.build_search_query(reader, payload)
|
|
|
|
# try:
|
|
# es_res = await helper.search(reader.get_index_name(), body)
|
|
# return ResponseHelper().normalize_search_response(es_res, body.get("collapse", {}).get("field") if body.get("collapse") else None)
|
|
# except Exception as exc: # noqa: BLE001
|
|
# raise HTTPException(status_code=400, detail=str(exc))
|
|
|
|
|
|
|
|
|
|
# @router.post("/indices/{type_name}/insert/{entity_type}/{data_type}/{ref_key}")
|
|
# @router.post("/indices/{type_name}/insert/favorite/{data_type}/{ref_key}")
|
|
# @router.post("/indices/{type_name}/insert/favorite/entity/{ref_key}")
|
|
# @router.post("/indices/{type_name}/insert/history/{data_type}/{ref_key}")
|
|
# @router.post("/indices/{type_name}/insert/history/{data_type}/{ref_key}")
|
|
# @router.post("/indices/{type_name}/insert/")
|
|
# async def insert1(type_name: str, payload: InsertRequest, request: Request):
|
|
# payload.document.entity_type = entity_type
|
|
# payload.document.data_type = data_type
|
|
# payload.document.ref_key = ref_key
|
|
|