from __future__ import annotations from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field from pydantic import BaseModel, Field, validator from typing import Optional, Dict, Any, List import os class SearchRequest(BaseModel): track_total_hits: Optional[bool] = True mode_response : str = "elastic" # "normal" , "elastic" query: Optional[str] = '' search_type: Optional[str] = "normal" # "normal", "phrase" , "and" filters: Optional[Dict[str, Any]] = None # { "f_ud": 2, "f_cd": "435234234"} sort: Optional[List[str]] = None # time_edit:desc, lastTitle, ... from_: int = Field(0, ge=0, alias="from") size: int = Field(10, ge=0, le=10000) collapse_field: Optional[str] = '' bookmark_id: Optional[str] = '' highlight: Optional[Dict[str, Any]] = None aggregation_fields: Optional[Dict[str, Any]] = None # Enhanced fields for new backend.json properties export_mode: bool = False validate_fields: bool = True use_field_boosts: bool = True aggregation_fields: Optional[List[str]] = None advanced_search_tags: Optional[List[str]] = None include_metadata: bool = False field_type_filter: Optional[str] = None search_after: Optional[List[Any]] = None search_fields: Optional[List[str]] = None default_search_field: str = "_all" include_fields: Optional[List[str]] = None exclude_fields: Optional[List[str]] = None class Config: populate_by_name = True class InsertRequest(BaseModel): id: Optional[str] = None document: Dict[str, Any] class UpdateByQueryRequest(BaseModel): filters: Optional[Dict[str, Any]] = None set_fields: Optional[Dict[str, Any]] = None script: Optional[Dict[str, Any]] = None class DeleteByQueryRequest(BaseModel): filters: Optional[Dict[str, Any]] = None class ExportToFileRequest(BaseModel): path_back: str = Field( ..., description="Backup directory path", example="/backup/data" ) out_name: str = Field( "", description="Output file name (default: index_name)", example="backup_2024", max_length=255 ) body: Optional[Dict[str, Any]] = Field( None, description="Optional search query body", example={ "query": { "match_all": {} } } ) fields: Optional[List[str]] = Field( None, description="List of fields to include (if empty, include all)", example=["title", "content", "timestamp"] ) chunk_size: int = Field( 1000, description="Number of documents per chunk", ge=100, le=10000, example=1000 ) scroll_timeout: str = Field( "5m", description="Scroll timeout for Elasticsearch", pattern="^[0-9]+[smh]$", example="5m" ) max_documents: Optional[int] = Field( None, description="Maximum number of documents to export", ge=1, le=10000000, example=50000 ) delay_between_chunks: float = Field( 0.1, description="Delay between processing chunks in seconds", ge=0.0, le=10.0, example=0.1 ) to_zip: bool = Field( False, description="Whether to compress output to ZIP file", example=True ) @validator('path_back') def validate_path_back(cls, v): """Validate backup directory path""" if not v: raise ValueError('path_back cannot be empty') # Check if path contains invalid characters invalid_chars = ['..', '~', '*', '?', '"', '<', '>', '|'] if any(char in v for char in invalid_chars): raise ValueError(f'Invalid characters in path: {v}') return v @validator('out_name') def validate_out_name(cls, v, values): """Validate output file name""" if not v: # Use index_name as default return values.get('index_name', '') # File name validation invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|'] if any(char in v for char in invalid_chars): raise ValueError(f'Invalid characters in output name: {v}') return v @validator('chunk_size') def validate_chunk_size(cls, v): """Validate chunk size""" if v < 100: raise ValueError('chunk_size must be at least 100') if v > 10000: raise ValueError('chunk_size cannot exceed 10000') return v @validator('scroll_timeout') def validate_scroll_timeout(cls, v): """Validate scroll timeout format""" import re pattern = re.compile(r'^[0-9]+[smh]$') if not pattern.match(v): raise ValueError('scroll_timeout must be in format: [number][s|m|h] (e.g., 5m, 30s, 1h)') # Extract number and unit num = int(v[:-1]) unit = v[-1] if unit == 's' and num > 3600: # 1 hour raise ValueError('Scroll timeout in seconds cannot exceed 3600 (1 hour)') elif unit == 'm' and num > 60: # 1 hour raise ValueError('Scroll timeout in minutes cannot exceed 60 (1 hour)') elif unit == 'h' and num > 24: # 1 day raise ValueError('Scroll timeout in hours cannot exceed 24 (1 day)') return v @validator('max_documents') def validate_max_documents(cls, v): """Validate maximum documents""" if v is not None and v <= 0: raise ValueError('max_documents must be positive') return v @validator('delay_between_chunks') def validate_delay_between_chunks(cls, v): """Validate delay between chunks""" if v < 0: raise ValueError('delay_between_chunks cannot be negative') if v > 10.0: raise ValueError('delay_between_chunks cannot exceed 10 seconds') return v @validator('fields') def validate_fields(cls, v): """Validate fields list""" if v is not None: if len(v) == 0: raise ValueError('fields list cannot be empty') # Check for duplicate fields if len(v) != len(set(v)): raise ValueError('fields list contains duplicates') # Validate each field name for field in v: if not field or not field.strip(): raise ValueError('field name cannot be empty') if len(field) > 255: raise ValueError(f'field name too long: {field}') return v class Config: schema_extra = { "example": { "path_back": "/backup/data", "out_name": "backup_2024", "body": { "query": { "match_all": {} } }, "fields": ["title", "content", "timestamp"], "chunk_size": 1000, "scroll_timeout": "5m", "max_documents": 50000, "delay_between_chunks": 0.1, "to_zip": True } }