diff --git a/dbgpt/_private/config.py b/dbgpt/_private/config.py index 03ec7b079..35c424029 100644 --- a/dbgpt/_private/config.py +++ b/dbgpt/_private/config.py @@ -339,13 +339,6 @@ def __init__(self) -> None: os.getenv("SCHEDULER_ENABLED", "True").lower() == "true" ) - # OSS conifg - self.OSS_SUPPORTED = os.getenv("OSS_SUPPORTED", "false").lower() == "true" - self.OSS_BUCKET = os.getenv("OSS_BUCKET", "") - self.OSS_ACCESS_KEY_ID = os.getenv("OSS_ACCESS_KEY_ID", "") - self.OSS_ACCESS_KEY_SECRET = os.getenv("OSS_ACCESS_KEY_SECRET", "") - self.OSS_ENDPOINT = os.getenv("OSS_ENDPOINT", "") - @property def local_db_manager(self) -> "ConnectorManager": from dbgpt.datasource.manages import ConnectorManager diff --git a/dbgpt/serve/evaluate/api/endpoints.py b/dbgpt/serve/evaluate/api/endpoints.py index 168f3814e..a7b893c23 100644 --- a/dbgpt/serve/evaluate/api/endpoints.py +++ b/dbgpt/serve/evaluate/api/endpoints.py @@ -2,8 +2,7 @@ from functools import cache from typing import List, Optional -from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile -from fastapi.responses import FileResponse, StreamingResponse +from fastapi import APIRouter, Depends, HTTPException from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBearer from dbgpt.component import ComponentType, SystemApp @@ -11,19 +10,9 @@ from dbgpt.model.cluster import BaseModelController, WorkerManager, WorkerManagerFactory from dbgpt.rag.evaluation.answer import AnswerRelevancyMetric from dbgpt.serve.core import Result -from dbgpt.serve.evaluate.api.schemas import ( - DatasetServeRequest, - DatasetServeResponse, - EvaluateServeRequest, - EvaluateServeResponse, -) -from dbgpt.serve.evaluate.config import ( - SERVE_DATASET_SERVICE_COMPONENT_NAME, - SERVE_SERVICE_COMPONENT_NAME, -) +from dbgpt.serve.evaluate.api.schemas import EvaluateServeRequest, EvaluateServeResponse +from dbgpt.serve.evaluate.config import SERVE_SERVICE_COMPONENT_NAME from dbgpt.serve.evaluate.service.service import Service -from dbgpt.serve.evaluate.service.service_dataset import DatasetService -from dbgpt.util import PaginationResult from ...prompt.service.service import Service as PromptService @@ -44,12 +33,6 @@ def get_prompt_service() -> PromptService: return global_system_app.get_component("dbgpt_serve_prompt_service", PromptService) -def get_dataset_service() -> DatasetService: - return global_system_app.get_component( - SERVE_DATASET_SERVICE_COMPONENT_NAME, DatasetService - ) - - def get_worker_manager() -> WorkerManager: worker_manager = global_system_app.get_component( ComponentType.WORKER_MANAGER_FACTORY, WorkerManagerFactory @@ -141,48 +124,6 @@ async def get_scenes(): return Result.succ(scene_list) -@router.get("/storage/types") -async def get_storage_types(): - storage_list = [{"oss": "oss存储"}, {"db": "db直接存储"}] - - return Result.succ(storage_list) - - -@router.get("/metrics") -async def get_metrics( - scene_key: str, - scene_value: str, - prompt_service: PromptService = Depends(get_prompt_service), - controller: WorkerManager = Depends(get_model_controller), -): - metrics = metric_manage.all_metric_infos() - for metric in metrics: - if metric["name"] == AnswerRelevancyMetric.name: - types = set() - models = await controller.get_all_instances(healthy_only=True) - for model in models: - worker_name, worker_type = model.model_name.split("@") - if worker_type == "llm" and worker_name not in [ - "codegpt_proxyllm", - "text2sql_proxyllm", - ]: - types.add(worker_name) - metric["params"] = { - "prompts": prompt_service.get_list({}), - "models": list(types), - } - - return Result.succ(metrics) - - -@router.post("/start") -async def evaluation_start( - request: EvaluateServeRequest, - service: Service = Depends(get_service), -) -> Result: - return Result.succ(await service.new_evaluation(request)) - - @router.post("/evaluation") async def evaluation( request: EvaluateServeRequest, @@ -207,260 +148,8 @@ async def evaluation( ) -@router.get( - "/evaluations", - response_model=Result[PaginationResult[EvaluateServeResponse]], -) -async def list_evaluations( - filter_param: Optional[str] = Query(default=None, description="filter param"), - sys_code: Optional[str] = Query(default=None, description="system code"), - page: int = Query(default=1, description="current page"), - page_size: int = Query(default=100, description="page size"), - service: Service = Depends(get_service), -) -> Result[PaginationResult[EvaluateServeResponse]]: - """ - query evaluations - Args: - filter_param (str): The query filter param - page (int): The page index - page_size (int): The page size - Returns: - ServerResponse: The response - """ - try: - return Result.succ( - service.get_list_by_page( - {}, - page, - page_size, - ) - ) - - except Exception as e: - logger.exception("查询评测记录异常!") - return Result.failed(msg=str(e), err_code="E0205") - - -@router.get("/evaluation/detail/show") -async def show_evaluation_detail( - evaluate_code: Optional[str] = Query(default=None, description="evaluate code"), - service: Service = Depends(get_service), -) -> Result: - """Show evaluation result detail - - Args: - evaluate_code(str): The evaluation code - service (Service): The service - Returns: - ServerResponse: The response - """ - - logger.info(f"show_evaluation_detail:{evaluate_code}") - try: - return Result.succ(await service.get_evaluation_dicts(evaluate_code)) - - except Exception as e: - logger.exception(f"show_evaluation_detail exception:{evaluate_code}") - return Result.failed(msg=str(e), err_code="E0213") - - -@router.get("/evaluation/result/download") -async def download_evaluation_result( - evaluate_code: Optional[str] = Query(default=None, description="evaluate code"), - service: Service = Depends(get_service), -): - logger.info(f"download_evaluation_result:{evaluate_code}") - try: - file_name, stream = await service.get_evaluation_file_stream(evaluate_code) - - from urllib.parse import quote - - encoded_filename = quote(file_name) - - headers = { - "Content-Disposition": f"attachment; filename*=utf-8''{encoded_filename}" - } - - return StreamingResponse( - stream, - media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", - headers=headers, - ) - except Exception as e: - return Result.failed(msg=str(e), err_code="E0213") - - -@router.delete("/evaluation", response_model=Result[bool]) -async def delete_evaluation( - evaluation_code: str, - service: Service = Depends(get_service), -) -> Result: - """Create a new Space entity - - Args: - request (SpaceServeRequest): The request - service (Service): The service - Returns: - ServerResponse: The response - """ - - request = EvaluateServeRequest( - evaluate_code=evaluation_code, - ) - return Result.succ(service.delete(request)) - - -@router.get("/datasets", response_model=Result[PaginationResult[DatasetServeResponse]]) -async def list_datasets( - filter_param: Optional[str] = Query(default=None, description="filter param"), - sys_code: Optional[str] = Query(default=None, description="system code"), - page: int = Query(default=1, description="current page"), - page_size: int = Query(default=100, description="page size"), - service: DatasetService = Depends(get_dataset_service), -) -> Result[PaginationResult[DatasetServeResponse]]: - """ - query evaluations - Args: - filter_param (str): The query filter param - page (int): The page index - page_size (int): The page size - Returns: - ServerResponse: The response - """ - try: - return Result.succ( - service.get_list_by_page( - {}, - page, - page_size, - ) - ) - - except Exception as e: - logger.exception("查询评测数据集列表异常!") - return Result.failed(msg=str(e), err_code="E0211") - - -@router.post( - "/dataset/upload/content", - response_model=Result, -) -async def upload_dataset( - dataset_name: str = Form(...), - members: str = Form(...), - content: str = Form(...), - service: DatasetService = Depends(get_dataset_service), -) -> Result: - try: - return Result.succ( - await service.upload_content_dataset(content, dataset_name, members) - ) - except Exception as e: - logger.exception(str(e)) - return Result.failed(msg=str(e), err_code="E0202") - - -@router.post( - "/dataset/upload/file", - response_model=Result, -) -async def upload_dataset( - dataset_name: str = Form(...), - members: str = Form(...), - doc_file: UploadFile = File(...), - service: DatasetService = Depends(get_dataset_service), -) -> Result: - """Upload the evaluate dataset - - Args: - doc_file (doc_file): The dataset file - service (Service): The service - Returns: - ServerResponse: The response - """ - try: - return Result.succ( - await service.upload_file_dataset(doc_file, dataset_name, members) - ) - except Exception as e: - logger.exception(str(e)) - return Result.failed(msg=str(e), err_code="E0201") - - -@router.get("/dataset/download") -async def download_dataset( - code: Optional[str] = Query(default=None, description="evaluate code"), - service: DatasetService = Depends(get_dataset_service), -) -> Result: - """Download the evaluate dataset - - Args: - code (str): The dataset code - service (Service): The service - Returns: - ServerResponse: The response - """ - try: - file_name, stream = await service.get_dataset_stream(code) - - from urllib.parse import quote - - encoded_filename = quote(file_name) # 使用 URL 编码 - - headers = { - "Content-Disposition": f"attachment; filename*=utf-8''{encoded_filename}" - } - - return StreamingResponse( - stream, - media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", - headers=headers, - ) - except Exception as e: - return Result.failed(msg=str(e), err_code="E0203") - - -@router.delete("/dataset") -async def delete_dataset( - code: Optional[str] = Query(default=None, description="evaluate code"), - service: DatasetService = Depends(get_dataset_service), -) -> Result: - """Create a new Space entity - - Args: - request (SpaceServeRequest): The request - service (Service): The service - Returns: - ServerResponse: The response - """ - - try: - - return Result.succ(service.delete(DatasetServeRequest(code=code))) - except Exception as e: - return Result.failed(msg=str(e), err_code="E0204") - - -@router.post("/dataset/members/update") -async def update_members( - request: DatasetServeRequest, - service: DatasetService = Depends(get_dataset_service), -) -> Result: - """Create a new Space entity - - Args: - request (SpaceServeRequest): The request - service (Service): The service - Returns: - ServerResponse: The response - """ - - return Result.succ(service.update_members(request.code, request.members)) - - def init_endpoints(system_app: SystemApp) -> None: """Initialize the endpoints""" global global_system_app - system_app.register(DatasetService) system_app.register(Service) global_system_app = system_app diff --git a/dbgpt/serve/evaluate/config.py b/dbgpt/serve/evaluate/config.py index 60bdb750a..45c86f43b 100644 --- a/dbgpt/serve/evaluate/config.py +++ b/dbgpt/serve/evaluate/config.py @@ -8,7 +8,6 @@ SERVE_APP_NAME_HUMP = "dbgpt_serve_evaluate" SERVE_CONFIG_KEY_PREFIX = "dbgpt.serve.evaluate." SERVE_SERVICE_COMPONENT_NAME = f"{SERVE_APP_NAME}_service" -SERVE_DATASET_SERVICE_COMPONENT_NAME = f"{SERVE_APP_NAME}_dataset_service" # Database table name SERVER_APP_TABLE_NAME = "dbgpt_serve_evaluate" diff --git a/dbgpt/serve/evaluate/serve.py b/dbgpt/serve/evaluate/serve.py index 9f6e78a16..6c9a7f224 100644 --- a/dbgpt/serve/evaluate/serve.py +++ b/dbgpt/serve/evaluate/serve.py @@ -8,13 +8,7 @@ from dbgpt.storage.metadata import DatabaseManager from .api.endpoints import init_endpoints, router -from .config import ( - APP_NAME, - SERVE_APP_NAME, - SERVE_APP_NAME_HUMP, - SERVE_CONFIG_KEY_PREFIX, - ServeConfig, -) +from .config import APP_NAME, SERVE_APP_NAME, SERVE_APP_NAME_HUMP logger = logging.getLogger(__name__) diff --git a/dbgpt/serve/evaluate/service/service.py b/dbgpt/serve/evaluate/service/service.py index cc04ede0e..2e94a2621 100644 --- a/dbgpt/serve/evaluate/service/service.py +++ b/dbgpt/serve/evaluate/service/service.py @@ -5,17 +5,11 @@ from concurrent.futures import ThreadPoolExecutor from typing import List, Optional -import chardet -import pandas as pd - from dbgpt._private.config import Config -from dbgpt.agent.core.schema import Status from dbgpt.component import ComponentType, SystemApp from dbgpt.configs.model_config import EMBEDDING_MODEL_CONFIG from dbgpt.core.interface.evaluation import ( EVALUATE_FILE_COL_ANSWER, - EVALUATE_FILE_COL_PREDICTION, - EVALUATE_FILE_COL_PREDICTION_COST, EvaluationResult, metric_manage, ) @@ -25,12 +19,10 @@ from dbgpt.rag.evaluation import RetrieverEvaluator from dbgpt.rag.evaluation.answer import AnswerRelevancyMetric from dbgpt.rag.evaluation.retriever import RetrieverSimilarityMetric -from dbgpt.serve.core import BaseService, Result +from dbgpt.serve.core import BaseService from dbgpt.serve.rag.operators.knowledge_space import SpaceRetrieverOperator from dbgpt.storage.metadata import BaseDao from dbgpt.storage.vector_store.base import VectorStoreConfig -from dbgpt.util.oss_utils import a_get_object, a_put_object -from dbgpt.util.pagination_utils import PaginationResult from ...agent.agents.controller import multi_agents from ...agent.evaluation.evaluation import AgentEvaluator, AgentOutputOperator @@ -38,22 +30,9 @@ from ...prompt.service.service import Service as PromptService from ...rag.connector import VectorStoreConnector from ...rag.service.service import Service as RagService -from ..api.schemas import ( - DatasetServeRequest, - DatasetServeResponse, - DatasetStorageType, - EvaluateServeRequest, - EvaluateServeResponse, - EvaluationScene, -) -from ..config import ( - SERVE_CONFIG_KEY_PREFIX, - SERVE_DATASET_SERVICE_COMPONENT_NAME, - SERVE_SERVICE_COMPONENT_NAME, - ServeConfig, -) +from ..api.schemas import EvaluateServeRequest, EvaluateServeResponse, EvaluationScene +from ..config import SERVE_CONFIG_KEY_PREFIX, SERVE_SERVICE_COMPONENT_NAME, ServeConfig from ..models.models import ServeDao, ServeEntity -from .service_dataset import DatasetService logger = logging.getLogger(__name__) @@ -61,12 +40,6 @@ executor = ThreadPoolExecutor(max_workers=5) -def get_dataset_service(system_app) -> DatasetService: - return system_app.get_component( - SERVE_DATASET_SERVICE_COMPONENT_NAME, DatasetService - ) - - def get_rag_service(system_app) -> RagService: return system_app.get_component("dbgpt_rag_service", RagService) @@ -85,7 +58,6 @@ def __init__(self, system_app: SystemApp, dao: Optional[ServeDao] = None): self._serve_config: ServeConfig = None self._dao: ServeDao = dao super().__init__(system_app) - self.dataset_service = get_dataset_service(system_app) self.rag_service = get_rag_service(system_app) self.prompt_service = get_prompt_service(system_app) @@ -212,320 +184,3 @@ async def run_evaluation( dataset=datasets, metrics=metrics, parallel_num=parallel_num ) return results - - async def _evaluation_executor(self, request, dataset_info, evaluation_record): - ( - datasets_info, - datasets_content, - ) = await self.dataset_service.get_dataset_json_record(dataset_info) - err_msg = None - results = None - try: - self.dao.update( - {"evaluate_code": evaluation_record.evaluate_code}, - EvaluateServeRequest( - state=Status.RUNNING.value, storage_type=datasets_info.storage_type - ), - ) - results: List[List[EvaluationResult]] = await self.run_evaluation( - request.scene_key, - request.scene_value, - datasets_content, - request.context, - request.evaluate_metrics, - request.parallel_num, - ) - except Exception as e: - logger.exception("run_evaluation exception!") - status = Status.FAILED.value - err_msg = "evaluate error:" + str(e) - update_request = EvaluateServeRequest() - try: - if results: - datasets_map = {d["query"]: d for d in datasets_content} - - total_prediction_cost = 0 - metric_score_map = {} - metric_valid_count_map = {} - temps = [] - for result in results: - query = result[0].query - dataset_item = datasets_map.get(query) - if dataset_item: - dataset_item.update( - {EVALUATE_FILE_COL_PREDICTION: result[0].prediction} - ) - dataset_item.update( - { - EVALUATE_FILE_COL_PREDICTION_COST: result[ - 0 - ].prediction_cost - } - ) - if result[0].feedback: - dataset_item.update({"feedback": result[0].feedback}) - for item in result: - # 指标有效计数 - vaild_count = 1 - if item.passing: - if item.metric_name in metric_valid_count_map: - vaild_count = ( - metric_valid_count_map[item.metric_name] + 1 - ) - - metric_valid_count_map[item.metric_name] = vaild_count - - # metric total score - total_score = item.score - if item.metric_name in metric_score_map: - total_score = ( - metric_score_map[item.metric_name] + item.score - ) - metric_score_map[item.metric_name] = total_score - - dataset_item.update({item.metric_name: item.score}) - temps.append(item.dict()) - metric_average_score_map = {} - - for k, v in metric_score_map.items(): - logger.info(f"evaluate total nums:{k}:{ metric_valid_count_map[k]}") - average_score = v / metric_valid_count_map[k] - metric_average_score_map[k] = average_score - - print(json.dumps(temps)) - - # evaluate result - if DatasetStorageType.OSS.value == datasets_info.storage_type: - result_df = pd.DataFrame(datasets_map.values()) - if datasets_info.file_type.endswith( - ".xlsx" - ) or datasets_info.file_type.endswith(".xls"): - file_stream = io.BytesIO() - with pd.ExcelWriter(file_stream, engine="xlsxwriter") as writer: - result_df.to_excel(writer, index=False) - - file_stream.seek(0) - file_string = file_stream.getvalue() - elif datasets_info.file_type.endswith(".csv"): - file_string = result_df.to_csv( - index=False, encoding="utf-8-sig" - ) - - else: - logger.warn(f"not support file type{datasets_info.file_type}") - result_oss_key = f"{evaluation_record.evaluate_code}_{datasets_info.name}(结果){datasets_info.file_type}" - await a_put_object(oss_key=result_oss_key, data=file_string) - update_request.result = result_oss_key - else: - update_request.result = json.dumps(datasets_map.values()) - - update_request.average_score = json.dumps(metric_average_score_map) - status = Status.COMPLETE.value - except Exception as e: - logger.exception("evaluate service error.") - status = Status.FAILED.value - err_msg = "evaluate service error: " + str(e) - update_request.state = status - update_request.log_info = err_msg - self.dao.update( - {"evaluate_code": evaluation_record.evaluate_code}, update_request - ) - - def _check_permissions(self, response, user_id, user_name): - if response and response.user_id == user_id: - return True - raise ValueError(f"你没有当前评测记录{response.evaluate_code}的权限!") - - async def get_evaluation_file_stream(self, evaluate_code: str): - logger.info(f"get_evaluation_file_stream:{evaluate_code}") - - evaluation = self.get(EvaluateServeRequest(evaluate_code=evaluate_code)) - if evaluation: - if Status.COMPLETE.value == evaluation.state: - - if evaluation.storage_type == "oss": - dataset_bytes = await a_get_object(oss_key=evaluation.result) - return evaluation.result, io.BytesIO(dataset_bytes) - else: - datasets_dicts = json.loads(evaluation.result) - datasets_df = pd.DataFrame(datasets_dicts) - - file_string = datasets_df.to_csv(index=False, encoding="utf-8-sig") - - return f"{evaluation.evaluate_code}.csv", io.BytesIO(file_string) - else: - raise ValueError("evaluation have not complete yet.") - else: - raise ValueError(f"unknown evaluation record[{evaluate_code}]") - - async def get_evaluation_dicts(self, evaluate_code: str): - logger.info(f"get_evaluation_file_stream:{evaluate_code}") - - evaluation = self.get(EvaluateServeRequest(evaluate_code=evaluate_code)) - if evaluation: - if Status.COMPLETE.value == evaluation.state: - if evaluation.storage_type == "oss": - file_content = await a_get_object(oss_key=evaluation.result) - result = chardet.detect(file_content) - encoding = result["encoding"] - if evaluation.result.endswith( - ".xlsx" - ) or evaluation.result.endswith(".xls"): - df_tmp = pd.read_excel( - io.BytesIO(file_content), index_col=False - ) - elif evaluation.result.endswith(".csv"): - df_tmp = pd.read_csv( - io.BytesIO(file_content), - index_col=False, - encoding=encoding, - ) - else: - raise ValueError( - f"evaluate do not support {evaluation.result}." - ) - - return df_tmp.to_dict(orient="records") - else: - datasets_dicts = json.loads(evaluation.result) - return datasets_dicts - else: - raise ValueError("evaluation have not complete yet.") - else: - raise ValueError(f"unknown evaluation record[{evaluate_code}]") - - def run_slow_task(self, async_func, *args): - try: - try: - loop = asyncio.get_event_loop() - except RuntimeError as e: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - if loop.is_running(): - future = asyncio.run_coroutine_threadsafe(async_func(*args), loop) - else: - loop.run_until_complete(async_func(*args)) - - except Exception as e: - logger.error("evaluation run error", e) - - async def new_evaluation(self, request: EvaluateServeRequest) -> Result: - logger.info(f"new_evaluation:{request}") - - """New evaluation - - Args: - request (EvaluateServeRequest): The request - - Returns: - EvaluateServeResponse: The response - """ - - dataset_info: DatasetServeResponse = self.dataset_service.get( - DatasetServeRequest(code=request.datasets) - ) - request.datasets_name = dataset_info.name - - request.state = Status.TODO.value - new_evaluation = self.create(request) - - executor.submit( - self.run_slow_task, - self._evaluation_executor, - request, - dataset_info, - new_evaluation, - ) - - # asyncio.create_task(self.run_slow_bound_task(request, dataset_info, new_evaluation)) - return new_evaluation - - def create(self, request: EvaluateServeRequest) -> EvaluateServeResponse: - """Create a new Evaluate entity - - Args: - request (EvaluateServeRequest): The request - - Returns: - EvaluateServeResponse: The response - """ - - if not request.user_name: - request.user_name = self.config.default_user - if not request.sys_code: - request.sys_code = self.config.default_sys_code - return super().create(request) - - def update(self, request: EvaluateServeRequest) -> EvaluateServeResponse: - """Update a Evaluate entity - - Args: - request (EvaluateServeRequest): The request - - Returns: - EvaluateServeResponse: The response - """ - # Build the query request from the request - query_request = { - "prompt_code": request.prompt_code, - "sys_code": request.sys_code, - } - return self.dao.update(query_request, update_request=request) - - def get(self, request: EvaluateServeRequest) -> Optional[EvaluateServeResponse]: - """Get a Evaluate entity - - Args: - request (EvaluateServeRequest): The request - - Returns: - EvaluateServeResponse: The response - """ - # TODO: implement your own logic here - # Build the query request from the request - query_request = request - return self.dao.get_one(query_request) - - def delete(self, request: EvaluateServeRequest) -> None: - """Delete a Evaluate entity - - Args: - request (EvaluateServeRequest): The request - """ - # Build the query request from the request - query_request = { - "evaluate_code": request.evaluate_code, - "user_id": request.user_id, - } - self.dao.delete(query_request) - - def get_list(self, request: EvaluateServeRequest) -> List[EvaluateServeResponse]: - """Get a list of Evaluate entities - - Args: - request (EvaluateServeRequest): The request - - Returns: - List[EvaluateServeResponse]: The response - """ - # Build the query request from the request - query_request = request - return self.dao.get_list(query_request) - - def get_list_by_page( - self, request: EvaluateServeRequest, page: int, page_size: int - ) -> PaginationResult[EvaluateServeResponse]: - """Get a list of Evaluate entities by page - - Args: - request (EvaluateServeRequest): The request - page (int): The page number - page_size (int): The page size - - Returns: - List[EvaluateServeResponse]: The response - """ - query_request = request - return self.dao.get_list_page( - query_request, page, page_size, ServeEntity.id.name - ) diff --git a/dbgpt/serve/evaluate/service/service_dataset.py b/dbgpt/serve/evaluate/service/service_dataset.py deleted file mode 100644 index e5d935824..000000000 --- a/dbgpt/serve/evaluate/service/service_dataset.py +++ /dev/null @@ -1,340 +0,0 @@ -import io -import json -import logging -import os -import uuid -from typing import List, Optional - -import chardet -import pandas as pd -from fastapi import UploadFile - -from dbgpt._private.config import Config -from dbgpt.component import SystemApp -from dbgpt.core.interface.evaluation import ( - EVALUATE_FILE_COL_ANSWER, - EVALUATE_FILE_COL_QUESTION, -) -from dbgpt.serve.core import BaseService -from dbgpt.storage.metadata import BaseDao -from dbgpt.util.oss_utils import a_delete_object, a_get_object, a_put_object -from dbgpt.util.pagination_utils import PaginationResult - -from ..api.schemas import DatasetServeRequest, DatasetServeResponse, DatasetStorageType -from ..config import ( - SERVE_CONFIG_KEY_PREFIX, - SERVE_DATASET_SERVICE_COMPONENT_NAME, - ServeConfig, -) -from ..models.models_dataset import DatasetServeDao, DatasetServeEntity - -logger = logging.getLogger(__name__) - -CFG = Config() - - -class DatasetService( - BaseService[DatasetServeEntity, DatasetServeRequest, DatasetServeResponse] -): - """The service class for Evaluate""" - - name = SERVE_DATASET_SERVICE_COMPONENT_NAME - - def __init__(self, system_app: SystemApp, dao: Optional[DatasetServeDao] = None): - self._system_app = None - self._serve_config: ServeConfig = None - self._dao: DatasetServeDao = dao - super().__init__(system_app) - - def init_app(self, system_app: SystemApp) -> None: - """Initialize the service - - Args: - system_app (SystemApp): The system app - """ - self._serve_config = ServeConfig.from_app_config( - system_app.config, SERVE_CONFIG_KEY_PREFIX - ) - self._dao = self._dao or DatasetServeDao(self._serve_config) - self._system_app = system_app - - @property - def dao( - self, - ) -> BaseDao[DatasetServeEntity, DatasetServeRequest, DatasetServeResponse]: - """Returns the internal DAO.""" - return self._dao - - @property - def config(self) -> ServeConfig: - """Returns the internal ServeConfig.""" - return self._serve_config - - async def upload_content_dataset( - self, - content: str, - dataset_name: Optional[str] = None, - members: Optional[str] = None, - ): - logger.info(f"upload_content_dataset:{dataset_name},{members}") - try: - datasets_dicts = json.loads(content) - datasets_df = pd.DataFrame(datasets_dicts) - - if EVALUATE_FILE_COL_QUESTION not in datasets_df.columns: - raise ValueError( - f"cannot be recognized and columns are missing " - f"{EVALUATE_FILE_COL_QUESTION}" - ) - - have_answer = False - if EVALUATE_FILE_COL_ANSWER in datasets_df.columns: - have_answer = True - dataset_code = str(uuid.uuid1()) - request: DatasetServeRequest = DatasetServeRequest( - code=dataset_code, - name=dataset_name, - file_type=".csv", - storage_type="db", - storage_position=content, - datasets_count=len(datasets_df), - have_answer=have_answer, - members=members, - ) - return self.create(request) - except Exception as e: - logger.exception("data upload failed") - raise ValueError("data upload failed" + str(e)) - - async def upload_file_dataset( - self, - file: UploadFile, - dataset_name: Optional[str] = None, - members: Optional[str] = None, - user_id: Optional[str] = None, - user_name: Optional[str] = None, - ): - logger.info(f"upload_file_dataset:{file.filename},{members},{user_name}") - - dataset_code = str(uuid.uuid1()) - file_content = await file.read() - file_name = file.filename - extension = os.path.splitext(file_name)[1] - try: - result = chardet.detect(file_content) - encoding = result["encoding"] - confidence = result["confidence"] - - df = None - # read excel file - if file_name.endswith(".xlsx") or file_name.endswith(".xls"): - df_tmp = pd.read_excel(io.BytesIO(file_content), index_col=False) - elif file_name.endswith(".csv"): - df_tmp = pd.read_csv( - io.BytesIO(file_content), - index_col=False, - encoding=encoding, - ) - else: - raise ValueError(f"do not support file type {file.filename}.") - - oss_key = f"{dataset_code}_{file_name}" - await a_put_object(oss_key=oss_key, data=file_content) - if EVALUATE_FILE_COL_QUESTION not in df_tmp.columns: - raise ValueError(f"evaluate column {EVALUATE_FILE_COL_QUESTION}") - - have_answer = False - if EVALUATE_FILE_COL_ANSWER in df_tmp.columns: - have_answer = True - - request: DatasetServeRequest = DatasetServeRequest( - code=dataset_code, - name=dataset_name, - file_type=extension, - storage_type="oss", - storage_position=oss_key, - datasets_count=len(df_tmp), - have_answer=have_answer, - members=members, - user_name=user_name, - user_id=user_id, - ) - return self.create(request) - except Exception as e: - logger.exception("evaluation upload error") - raise ValueError("evaluation upload error" + str(e)) - - async def get_dataset_stream(self, code: str): - logger.info(f"get_dataset_stream:{code}") - dataset_info: DatasetServeRequest = self.get(DatasetServeRequest(code=code)) - if dataset_info: - file_name = f"{dataset_info.name}{dataset_info.file_type}" - if dataset_info.storage_type == "oss": - dataset_bytes = await a_get_object( - oss_key=dataset_info.storage_position - ) - return file_name, io.BytesIO(dataset_bytes) - elif dataset_info.storage_type == "db": - datasets_dicts = json.loads(dataset_info.storage_position) - datasets_df = pd.DataFrame(datasets_dicts) - if dataset_info.file_type.endswith( - ".xlsx" - ) or dataset_info.file_type.endswith(".xls"): - file_stream = io.BytesIO() - datasets_df.to_excel(file_stream, index=False, encoding="utf-8-sig") - file_stream.seek(0) - file_string = file_stream.getvalue() - elif dataset_info.file_type.endswith(".csv"): - file_string = datasets_df.to_csv(index=False, encoding="utf-8-sig") - - return file_name, io.BytesIO(file_string) - else: - raise ValueError("do not support dataset type") - else: - raise ValueError(f"unknown data[{code}]") - - async def get_dataset_json_record( - self, dataset_info: DatasetServeResponse - ) -> (DatasetServeRequest, List[dict]): - logger.info(f"get_dataset_json_record:{dataset_info.name}") - if dataset_info: - if dataset_info.storage_type == DatasetStorageType.OSS.value: - file_content = await a_get_object(oss_key=dataset_info.storage_position) - result = chardet.detect(file_content) - encoding = result["encoding"] - if dataset_info.file_type.endswith( - ".xlsx" - ) or dataset_info.file_type.endswith(".xls"): - df_tmp = pd.read_excel(io.BytesIO(file_content), index_col=False) - elif dataset_info.file_type.endswith(".csv"): - df_tmp = pd.read_csv( - io.BytesIO(file_content), - index_col=False, - encoding=encoding, - ) - else: - raise ValueError( - f"Evaluate does not support the current file " - f"type {dataset_info.file_type}." - ) - - return dataset_info, df_tmp.to_dict(orient="records") - elif dataset_info.storage_type == DatasetStorageType.DB.value: - return dataset_info, json.loads(dataset_info.storage_position) - else: - raise ValueError("Dataset storage type not yet supported") - else: - raise ValueError(f"unknown data info") - - def _check_permissions( - self, dataset_info: DatasetServeRequest, user_id: str, user_name: str - ): - if dataset_info and dataset_info.user_id != user_id: - if dataset_info.members and user_name: - if user_name not in dataset_info.members: - return - raise ValueError("你不是数据集成员或拥有者,无法删除!") - - async def delete_dataset(self, code: str, user_id: str, user_name: str): - dataset_info: DatasetServeRequest = self.get(DatasetServeRequest(code=code)) - self._check_permissions(dataset_info, user_id, user_name) - if dataset_info: - if dataset_info.storage_type == "oss": - await a_delete_object(oss_key=dataset_info.storage_position) - - request = DatasetServeRequest(code=code) - self.delete(request) - - def create(self, request: DatasetServeRequest) -> DatasetServeResponse: - """Create a new Dataset entity - - Args: - request (DatasetServeRequest): The request - - Returns: - DatasetServeResponse: The response - """ - - if not request.user_name: - request.user_name = self.config.default_user - if not request.sys_code: - request.sys_code = self.config.default_sys_code - return super().create(request) - - def update(self, request: DatasetServeRequest) -> DatasetServeResponse: - """Update a Dataset entity - - Args: - request (DatasetServeRequest): The request - - Returns: - DatasetServeResponse: The response - """ - # Build the query request from the request - query_request = { - "prompt_code": request.prompt_code, - "sys_code": request.sys_code, - } - return self.dao.update(query_request, update_request=request) - - def get(self, request: DatasetServeRequest) -> Optional[DatasetServeResponse]: - """Get a Evaluate entity - - Args: - request (DatasetServeRequest): The request - - Returns: - DatasetServeResponse: The response - """ - query_request = request - return self.dao.get_one(query_request) - - def delete(self, request: DatasetServeRequest) -> None: - """Delete a Evaluate entity - - Args: - request (DatasetServeRequest): The request - """ - # Build the query request from the request - query_request = { - "code": request.code, - "user_id": request.user_id, - } - self.dao.delete(query_request) - - def get_list(self, request: DatasetServeRequest) -> List[DatasetServeResponse]: - """Get a list of Evaluate entities - - Args: - request (DatasetServeRequest): The request - - Returns: - List[DatasetServeResponse]: The response - """ - - # Build the query request from the request - query_request = request - return self.dao.get_list(query_request) - - def get_list_by_page( - self, request: DatasetServeRequest, page: int, page_size: int - ) -> PaginationResult[DatasetServeResponse]: - """Get a list of Dataset entities by page - - Args: - request (DatasetServeRequest): The request - page (int): The page number - page_size (int): The page size - - Returns: - List[DatasetServeResponse]: The response - """ - query_request = request - return self.dao.get_list_page( - query_request, page, page_size, DatasetServeEntity.id.name - ) - - def update_members(self, code, members): - if code is None: - raise Exception("code can not be None when update members.") - return self._dao.update({"code": code}, DatasetServeRequest(members=members)) diff --git a/dbgpt/util/oss_utils.py b/dbgpt/util/oss_utils.py deleted file mode 100644 index 8400b88bb..000000000 --- a/dbgpt/util/oss_utils.py +++ /dev/null @@ -1,191 +0,0 @@ -import logging -import os -import uuid -from datetime import datetime - -import oss2 -from oss2.credentials import EnvironmentVariableCredentialsProvider - -from dbgpt._private.config import Config - -logger = logging.getLogger(__name__) -CFG = Config() - -os.environ["OSS_ACCESS_KEY_ID"] = CFG.OSS_ACCESS_KEY_ID -os.environ["OSS_ACCESS_KEY_SECRET"] = CFG.OSS_ACCESS_KEY_SECRET - - -def generate_oss_key(filename: str): - """ - generate oss key by file name, timestamp + filename + uuid(32) - """ - timestamp_millis = int(datetime.now().timestamp() * 1000) - - unique_id = uuid.uuid4().hex - - unique_filename = f"{timestamp_millis}_{filename}_{unique_id}" - - return unique_filename - - -def put_obj_from_file(oss_key: str, local_file_path, bucket_name: str): - """ - upload local file to oss, you should give the oss file path, local file path and bucket, - ensure oss_file_path is unique address and not exist, otherwise upload will raise exception. - - oss_file_path and local_file_path should contain postfix of file such as '.txt' - - params: - oss_key: generate a unique key for current file - local_file_path -- file to upload - bucket_name - """ - try: - auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) - bucket = oss2.Bucket(auth, CFG.OSS_ENDPOINT, bucket_name) - resp = bucket.put_object_from_file(oss_key, local_file_path) - return resp is not None and resp.status == 200 - except Exception as ex: - err_info = f"upload file failed: {str(ex)}" - logger.error(err_info) - raise err_info - - -def put_object(oss_key: str, data, bucket_name: str): - """ - upload local file to oss, you should give the oss file path, local file path and bucket, - ensure oss_file_path is unique address and not exist, otherwise upload will raise exception. - - oss_file_path and local_file_path should contain postfix of file such as '.txt' - - params: - oss_key: generate a unique key for current file - local_file_path -- file to upload - bucket_name - """ - try: - auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) - bucket = oss2.Bucket(auth, CFG.OSS_ENDPOINT, bucket_name) - resp = bucket.put_object(oss_key, data) - return resp is not None and resp.status == 200 - except Exception as ex: - err_info = f"upload file failed: {str(ex)}" - logger.error(err_info) - raise err_info - - -def get_object_to_file(oss_key: str, local_file_path, bucket: str): - """ - download file from oss - params: - oss_key -- oss_key file to download - local_file_path -- local file path - bucket_name - """ - auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) - bucket = oss2.Bucket(auth, CFG.OSS_ENDPOINT, bucket) - resp = bucket.get_object_to_file(oss_key, local_file_path) - return resp is not None and resp.status == 200 - - -def get_object(oss_key: str, bucket: str): - """ - download file from oss - params: - oss_key -- oss_key file to download - bucket_name - """ - logger.info(f"get file object!{oss_key}") - auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) - bucket = oss2.Bucket(auth, CFG.OSS_ENDPOINT, bucket) - resp = bucket.get_object(oss_key) - if resp is not None and resp.status == 200: - return resp.read() - - -async def a_put_object(oss_key: str, data): - """ - async upload local file to oss, you should give the oss file path, local file path and bucket, - ensure oss_file_path is unique address and not exist, otherwise upload will raise exception. - - oss_file_path and local_file_path should contain postfix of file such as '.txt' - - params: - oss_key: generate a unique key for current file - local_file_path -- file to upload - bucket_name - """ - try: - auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) - bucket = oss2.Bucket(auth, CFG.OSS_ENDPOINT, CFG.OSS_BUCKET) - resp = bucket.put_object(oss_key, data) - return resp is not None and resp.status == 200 - except Exception as ex: - err_info = f"async upload file failed: {str(ex)}" - logger.error(err_info) - raise err_info - - -async def a_get_object(oss_key: str): - """ - async download file from oss - params: - oss_key -- oss_key file to download - bucket_name - """ - logger.info(f"async get file object!{oss_key}") - auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) - bucket = oss2.Bucket(auth, CFG.OSS_ENDPOINT, CFG.OSS_BUCKET) - resp = bucket.get_object(oss_key) - if resp is not None and resp.status == 200: - return resp.read() - - -async def a_delete_object(oss_key: str): - """ - async delete oss file by oss_key. - - params: - oss_key -- file key to delete - bucket_name - """ - auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) - bucket = oss2.Bucket(auth, CFG.OSS_ENDPOINT, CFG.OSS_BUCKET) - resp = bucket.delete_object(oss_key) - if resp is not None and resp.status == 200: - return True - else: - return False - - -def delete_object(oss_key: str, bucket_name: str): - """ - delete oss file by oss_key. - - params: - oss_key -- file key to delete - bucket_name - """ - auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) - bucket = oss2.Bucket(auth, CFG.OSS_ENDPOINT, bucket_name) - resp = bucket.delete_object(oss_key) - if resp is not None and resp.status == 200: - return True - else: - return False - - -def get_download_url_with_timeout( - oss_file_key: str, time_seconds: int, bucket_name: str -): - """ - get download url with timeout - params: - oss_file_key: osskey - time_seconds: valid seconds. - bucket_name: bucket - """ - auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) - bucket = oss2.Bucket(auth, CFG.OSS_ENDPOINT, bucket_name) - download_url = bucket.sign_url("GET", oss_file_key, time_seconds) - return download_url