elastic_backend_fast/app/routes/v1/models.py
2025-11-29 15:48:25 +03:30

230 lines
7.1 KiB
Python

from __future__ import annotations
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, validator
from typing import Optional, Dict, Any, List
import os
class SearchRequest(BaseModel):
track_total_hits: Optional[bool] = True
mode_response : str = "elastic" # "normal" , "elastic"
query: Optional[str] = ''
search_type: Optional[str] = "normal" # "normal", "phrase" , "and"
filters: Optional[Dict[str, Any]] = None # { "f_ud": 2, "f_cd": "435234234"}
sort: Optional[List[str]] = None # time_edit:desc, lastTitle, ...
from_: int = Field(0, ge=0, alias="from")
size: int = Field(10, ge=0, le=10000)
collapse_field: Optional[str] = ''
bookmark_id: Optional[str] = ''
highlight: Optional[Dict[str, Any]] = None
aggregation_fields: Optional[Dict[str, Any]] = None
# Enhanced fields for new backend.json properties
export_mode: bool = False
validate_fields: bool = True
use_field_boosts: bool = True
aggregation_fields: Optional[List[str]] = None
advanced_search_tags: Optional[List[str]] = None
include_metadata: bool = False
field_type_filter: Optional[str] = None
search_after: Optional[List[Any]] = None
search_fields: Optional[List[str]] = None
default_search_field: str = "_all"
include_fields: Optional[List[str]] = None
exclude_fields: Optional[List[str]] = None
class Config:
populate_by_name = True
class InsertRequest(BaseModel):
id: Optional[str] = None
document: Dict[str, Any]
class UpdateByQueryRequest(BaseModel):
filters: Optional[Dict[str, Any]] = None
set_fields: Optional[Dict[str, Any]] = None
script: Optional[Dict[str, Any]] = None
class DeleteByQueryRequest(BaseModel):
filters: Optional[Dict[str, Any]] = None
class ExportToFileRequest(BaseModel):
path_back: str = Field(
...,
description="Backup directory path",
example="/backup/data"
)
out_name: str = Field(
"",
description="Output file name (default: index_name)",
example="backup_2024",
max_length=255
)
body: Optional[Dict[str, Any]] = Field(
None,
description="Optional search query body",
example={
"query": {
"match_all": {}
}
}
)
fields: Optional[List[str]] = Field(
None,
description="List of fields to include (if empty, include all)",
example=["title", "content", "timestamp"]
)
chunk_size: int = Field(
1000,
description="Number of documents per chunk",
ge=100,
le=10000,
example=1000
)
scroll_timeout: str = Field(
"5m",
description="Scroll timeout for Elasticsearch",
pattern="^[0-9]+[smh]$",
example="5m"
)
max_documents: Optional[int] = Field(
None,
description="Maximum number of documents to export",
ge=1,
le=10000000,
example=50000
)
delay_between_chunks: float = Field(
0.1,
description="Delay between processing chunks in seconds",
ge=0.0,
le=10.0,
example=0.1
)
to_zip: bool = Field(
False,
description="Whether to compress output to ZIP file",
example=True
)
@validator('path_back')
def validate_path_back(cls, v):
"""Validate backup directory path"""
if not v:
raise ValueError('path_back cannot be empty')
# Check if path contains invalid characters
invalid_chars = ['..', '~', '*', '?', '"', '<', '>', '|']
if any(char in v for char in invalid_chars):
raise ValueError(f'Invalid characters in path: {v}')
return v
@validator('out_name')
def validate_out_name(cls, v, values):
"""Validate output file name"""
if not v:
# Use index_name as default
return values.get('index_name', '')
# File name validation
invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
if any(char in v for char in invalid_chars):
raise ValueError(f'Invalid characters in output name: {v}')
return v
@validator('chunk_size')
def validate_chunk_size(cls, v):
"""Validate chunk size"""
if v < 100:
raise ValueError('chunk_size must be at least 100')
if v > 10000:
raise ValueError('chunk_size cannot exceed 10000')
return v
@validator('scroll_timeout')
def validate_scroll_timeout(cls, v):
"""Validate scroll timeout format"""
import re
pattern = re.compile(r'^[0-9]+[smh]$')
if not pattern.match(v):
raise ValueError('scroll_timeout must be in format: [number][s|m|h] (e.g., 5m, 30s, 1h)')
# Extract number and unit
num = int(v[:-1])
unit = v[-1]
if unit == 's' and num > 3600: # 1 hour
raise ValueError('Scroll timeout in seconds cannot exceed 3600 (1 hour)')
elif unit == 'm' and num > 60: # 1 hour
raise ValueError('Scroll timeout in minutes cannot exceed 60 (1 hour)')
elif unit == 'h' and num > 24: # 1 day
raise ValueError('Scroll timeout in hours cannot exceed 24 (1 day)')
return v
@validator('max_documents')
def validate_max_documents(cls, v):
"""Validate maximum documents"""
if v is not None and v <= 0:
raise ValueError('max_documents must be positive')
return v
@validator('delay_between_chunks')
def validate_delay_between_chunks(cls, v):
"""Validate delay between chunks"""
if v < 0:
raise ValueError('delay_between_chunks cannot be negative')
if v > 10.0:
raise ValueError('delay_between_chunks cannot exceed 10 seconds')
return v
@validator('fields')
def validate_fields(cls, v):
"""Validate fields list"""
if v is not None:
if len(v) == 0:
raise ValueError('fields list cannot be empty')
# Check for duplicate fields
if len(v) != len(set(v)):
raise ValueError('fields list contains duplicates')
# Validate each field name
for field in v:
if not field or not field.strip():
raise ValueError('field name cannot be empty')
if len(field) > 255:
raise ValueError(f'field name too long: {field}')
return v
class Config:
schema_extra = {
"example": {
"path_back": "/backup/data",
"out_name": "backup_2024",
"body": {
"query": {
"match_all": {}
}
},
"fields": ["title", "content", "timestamp"],
"chunk_size": 1000,
"scroll_timeout": "5m",
"max_documents": 50000,
"delay_between_chunks": 0.1,
"to_zip": True
}
}