from __future__ import annotations from fastapi import APIRouter, Depends, HTTPException, Request from app.core.elastic_client_helper import ElasticClientHelper from app.core.map_index_reader import MapIndexReader from app.core.elastic_query_builder import ElasticQueryBuilder from app.core.field_processor import FieldProcessor from app.core.response_helper import ResponseHelper from app.routes.v1.models import ( ExportToFileRequest, SearchRequest, InsertRequest, UpdateByQueryRequest, DeleteByQueryRequest, ) from typing import Any, Dict, List, Optional import time from app.config.settings import get_settings, Settings from app.lib.general_functions import is_user_permit_action router = APIRouter(tags=["elasticsearch"]) settings: Settings = get_settings() def get_elastic_helper(request: Request): helper = getattr(request.app.state, "elastic_helper", None) if helper is None: raise RuntimeError("Elasticsearch helper not initialized") return helper @router.post("/indices/{type_name}/insert") async def insert(type_name: str, payload: InsertRequest, request: Request): if not is_user_permit_action(request.state.user_id, f"{type_name}_update", f"{type_name}_insert", request.state.app) : message = "------ > not access " + str(request.state.user_id) + f" {type_name}_update" print(message ) raise HTTPException(status_code=400, detail=message) reader = MapIndexReader(type_name) helper = get_elastic_helper(request) field_processor = FieldProcessor(reader) validate_doc, document = field_processor.validate_document(payload.document, payload.id) edition_doc = field_processor.edition_document(payload.document) document = edition_doc if not validate_doc["valid"]: raise HTTPException(status_code=400, detail=str(validate_doc)) processed_doc = field_processor.process_joinning_document(document) doc = processed_doc try: es_res = await helper.index_document(reader.get_index_name(), doc, id=payload.id, refresh="wait_for") return {"success": True, "result": es_res} except Exception as exc: # noqa: BLE001 raise HTTPException(status_code=400, detail=str(exc)) @router.post("/indices/{type_name}/update/{id}") async def update(type_name: str, id: str , payload: InsertRequest, request: Request): payload.id = id if not is_user_permit_action(request.state.user_id, f"{type_name}_update", f"{type_name}_insert", request.state.app) : message = "------ > not access " + str(request.state.user_id) + f" {type_name}_update" print(message ) raise HTTPException(status_code=400, detail=message) reader = MapIndexReader(type_name) helper = get_elastic_helper(request) field_processor = FieldProcessor(reader) validate_doc, document = field_processor.validate_document(payload.document, payload.id, False) edition_doc = field_processor.edition_document(payload.document) document = edition_doc if not validate_doc["valid"]: raise HTTPException(status_code=400, detail=str(validate_doc)) processed_doc = field_processor.process_joinning_document(document) doc = processed_doc try: es_res = await helper.update_or_index_document(index=reader.get_index_name(), data=doc, document_id=payload.id, operation_type="update", refresh="wait_for") return {"success": True, "result": es_res} except Exception as exc: # noqa: BLE001 raise HTTPException(status_code=400, detail= 'ssss' + str(exc)) @router.post("/indices/{type_name}/delete/{id}") async def delete(type_name: str, id: str , request: Request): if not is_user_permit_action(request.state.user_id, f"{type_name}_update", f"{type_name}_delete", request.state.app) : message = "------ > not access " + str(request.state.user_id) + f" {type_name}_update" print(message ) raise HTTPException(status_code=400, detail=message) reader = MapIndexReader(type_name) helper = get_elastic_helper(request) try: es_res = await helper.delete_document(index=reader.get_index_name(), id=id, refresh="wait_for") return {"success": True, "result": es_res} except Exception as exc: # noqa: BLE001 raise HTTPException(status_code=400, detail=str(exc)) @router.get("/indices/{type_name}/{doc_id}") async def get_by_id(type_name: str, doc_id: str, request: Request): reader = MapIndexReader(type_name) helper = get_elastic_helper(request) try: es_res = await helper.get_by_id(reader.get_index_name(), doc_id) return ResponseHelper().normalize_get_response(es_res) except Exception as exc: # noqa: BLE001 raise HTTPException(status_code=404, detail=str(exc)) #--------------------------------------------------- #-- #-- payload --> body = [ {....}, {....},{....} ] #-- @router.post("/indices/{type_name}/insert/multi") async def insertMulti(type_name: str, payload: List[InsertRequest], request: Request): if not is_user_permit_action(request.state.user_id, f"{type_name}_update", f"{type_name}_insert", request.state.app) : message = "------ > not access " + str(request.state.user_id) + f" {type_name}_update" print(message ) raise HTTPException(status_code=400, detail=message) reader = MapIndexReader(type_name) helper = get_elastic_helper(request) field_processor = FieldProcessor(reader) list_new = [] list_errors = [] for i, item in enumerate(payload) : id = item.id if item.id else '' validate_doc, document = field_processor.validate_document(item.document, id) if not validate_doc["valid"]: validate_doc["index"] = i validate_doc["doc"] = item list_errors.append(validate_doc) else: processed_doc = field_processor.process_joinning_document(document) if not "id" in processed_doc and id: processed_doc["id"] = id list_new.append(processed_doc) try: if list_new : es_res = await helper.bulk_insert(reader.get_index_name(), list_new, refresh="wait_for") return {"success": True, "validation_errors": list_errors, "result": es_res} except Exception as exc: # noqa: BLE001 raise HTTPException(status_code=400, detail=str(exc)) #--------------------------------------------------- #-- @router.post("/indices/{type_name}/export-to-file") async def export_to_file(type_name:str, payload: ExportToFileRequest,request:Request): print("1 -->", time.time()) reader = MapIndexReader(type_name) helper :ElasticClientHelper= get_elastic_helper(request) builder = ElasticQueryBuilder() result = await helper.export_to_file( **dict(payload),index_name=reader.get_index_name() ) return result @router.post("/indices/{type_name}/search") async def search(type_name: str, payload: SearchRequest, request: Request): print("1 -->", time.time()) reader = MapIndexReader(type_name) helper = get_elastic_helper(request) builder = ElasticQueryBuilder() print("2 -->", time.time()) # print(payload) body = builder.build_search_query(reader, payload) print("body -->", body) try: es_res = await helper.search(reader.get_index_name(), body) print("3 -->", time.time()) collapse_field = body.get("collapse", {}).get("field") if body.get("collapse") else None bookmark_id = payload.bookmark_id if payload.bookmark_id else '' # print(es_res) res = ResponseHelper(helper, reader).normalize_search_response(es_res, collapse_field, bookmark_id, payload.mode_response) print("4 -->", time.time()) return res except Exception as exc: # noqa: BLE001 raise HTTPException(status_code=400, detail=str(exc)) # @router.post("/indices/{type_name}/list") # async def search(type_name: str, payload: SearchRequest, request: Request): # reader = MapIndexReader(type_name) # helper = get_elastic_helper(request) # builder = ElasticQueryBuilder() # body = builder.build_search_query(reader, payload) # try: # es_res = await helper.search(reader.get_index_name(), body) # return ResponseHelper().normalize_search_response(es_res, body.get("collapse", {}).get("field") if body.get("collapse") else None) # except Exception as exc: # noqa: BLE001 # raise HTTPException(status_code=400, detail=str(exc)) # @router.post("/indices/{type_name}/insert/{entity_type}/{data_type}/{ref_key}") # @router.post("/indices/{type_name}/insert/favorite/{data_type}/{ref_key}") # @router.post("/indices/{type_name}/insert/favorite/entity/{ref_key}") # @router.post("/indices/{type_name}/insert/history/{data_type}/{ref_key}") # @router.post("/indices/{type_name}/insert/history/{data_type}/{ref_key}") # @router.post("/indices/{type_name}/insert/") # async def insert1(type_name: str, payload: InsertRequest, request: Request): # payload.document.entity_type = entity_type # payload.document.data_type = data_type # payload.document.ref_key = ref_key