From 9e487326f5841e639f02a7ad6f12cb62111cca8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E7=AB=8B=E5=9B=BD?= Date: Wed, 6 Dec 2023 15:28:34 +0800 Subject: [PATCH] feat:add the openai support and remove async in the most functions. --- data-processing/.gitignore | 6 +- data-processing/Dockerfile | 5 + data-processing/README.md | 98 ++- .../data_manipulation/common/config.py | 273 +++++++- .../data_manipulation/common/log_tag_const.py | 7 + data-processing/data_manipulation/config.yml | 36 + .../controller/data_process_controller.py | 86 ++- .../data_store_clients/minio_store_client.py | 41 +- .../data_store_process/minio_store_process.py | 87 ++- .../postgresql_pool_client.py | 11 +- .../data_process_db_operate.py | 81 ++- .../data_process_detail_db_operate.py | 146 +++- .../file_handle/csv_handle.py | 151 ++-- .../file_handle/pdf_handle.py | 661 +++++++++++++----- .../data_manipulation/kube/client.py | 10 +- .../data_manipulation/kube/dataset_cr.py | 21 +- .../llm_api_service/base_qa_provider.py | 37 + .../llm_api_service/qa_provider_open_ai.py | 143 ++++ .../qa_provider_zhi_pu_ai_online.py | 103 +++ .../bai_chuan_2_prompt.py} | 7 + .../llm_prompt_template/open_ai_prompt.py | 29 + .../zhi_pu_ai_prompt.py} | 53 +- data-processing/data_manipulation/server.py | 54 +- .../service/data_process_service.py | 504 ++++++++++--- .../transform/text/clean_transform.py | 306 ++++++-- .../transform/text/duplicates_transform.py | 14 +- .../transform/text/filtration_transform.py | 23 +- .../transform/text/privacy_transform.py | 452 +++++++++++- .../transform/text/support_type.py | 268 ++++--- .../data_manipulation/utils/class_utils.py | 34 + .../data_manipulation/utils/csv_utils.py | 54 ++ .../utils/date_time_utils.py | 13 +- .../data_manipulation/utils/file_utils.py | 50 +- .../data_manipulation/utils/json_utils.py | 79 ++- .../data_manipulation/utils/log_utils.py | 7 +- .../data_manipulation/utils/pdf_utils.py | 34 + .../utils/{sanic_utls.py => sanic_utils.py} | 5 +- data-processing/database/base.sql | 95 +++ data-processing/requirements.txt | 8 +- 39 files changed, 3122 insertions(+), 970 deletions(-) create mode 100644 data-processing/data_manipulation/config.yml create mode 100644 data-processing/data_manipulation/llm_api_service/base_qa_provider.py create mode 100644 data-processing/data_manipulation/llm_api_service/qa_provider_open_ai.py create mode 100644 data-processing/data_manipulation/llm_api_service/qa_provider_zhi_pu_ai_online.py rename data-processing/data_manipulation/{llm_api_service/gpt_3_dot_5_service.py => llm_prompt_template/bai_chuan_2_prompt.py} (71%) create mode 100644 data-processing/data_manipulation/llm_prompt_template/open_ai_prompt.py rename data-processing/data_manipulation/{llm_api_service/zhi_pu_ai_service.py => llm_prompt_template/zhi_pu_ai_prompt.py} (51%) create mode 100644 data-processing/data_manipulation/utils/class_utils.py create mode 100644 data-processing/data_manipulation/utils/csv_utils.py create mode 100644 data-processing/data_manipulation/utils/pdf_utils.py rename data-processing/data_manipulation/utils/{sanic_utls.py => sanic_utils.py} (95%) create mode 100644 data-processing/database/base.sql diff --git a/data-processing/.gitignore b/data-processing/.gitignore index 87bb27161..650de6cc2 100644 --- a/data-processing/.gitignore +++ b/data-processing/.gitignore @@ -2,8 +2,8 @@ __pycache__ .ipynb_checkpoints -mock_data +data_manipulation/mock_data -log +data_manipulation/log -file_handle/temp_file \ No newline at end of file +data_manipulation/file_handle/temp_file \ No newline at end of file diff --git a/data-processing/Dockerfile b/data-processing/Dockerfile index 2ee8b3da2..86ad0ae59 100644 --- a/data-processing/Dockerfile +++ b/data-processing/Dockerfile @@ -23,6 +23,11 @@ ENV MINIO_API_URL=localhost:9000 ENV MINIO_SECURE=False ENV MINIO_DATASET_PREFIX=dataset +ENV LLM_USE_TYPE=xxxxx +ENV LLM_QA_RETRY_COUNT=xxxxx +ENV OPEN_AI_DEFAULT_KEY=xxxxx +ENV OPEN_AI_DEFAULT_BASE_URL=xxxxx +ENV OPEN_AI_DEFAULT_MODEL=xxxxx ENV ZHIPUAI_API_KEY=xxxxx ENV KNOWLEDGE_CHUNK_SIZE=500 diff --git a/data-processing/README.md b/data-processing/README.md index e248bafb9..37f21fc1e 100644 --- a/data-processing/README.md +++ b/data-processing/README.md @@ -34,4 +34,100 @@ Install the Python dependencies in the requirements.txt file ### Running -Run the server.py file in the data_manipulation directory \ No newline at end of file +Run the server.py file in the data_manipulation directory + +# isort +isort is a tool for sorting imports alphabetically within your Python code. It helps maintain a consistent and clean import order. + +## install +```shell +pip install isort +``` + +## isort a file +```shell +isort server.py +``` + +## isort a directory +```shell +isort data_manipulation +``` + + +# config.yml +## dev phase +The example config.yml is as the following: +```yaml +minio: + access_key: '${MINIO_ACCESSKEY: hpU4SCmj5jixxx}' + secret_key: '${MINIO_SECRETKEY: xxx}' + api_url: '${MINIO_API_URL: 172.22.96.136.nip.io}' + secure: '${MINIO_SECURE: True}' + dataset_prefix: '${MINIO_DATASET_PREFIX: dataset}' + +zhipuai: + api_key: '${ZHIPUAI_API_KEY: 871772ac03fcb9db9d4ce7b1e6eea27.VZZVy0mCox0WrzAG}' + +llm: + use_type: '${LLM_USE_TYPE: zhipuai_online}' # zhipuai_online or open_ai + qa_retry_count: '${LLM_QA_RETRY_COUNT: 100}' + +open_ai: + key: '${OPEN_AI_DEFAULT_KEY: fake}' + base_url: '${OPEN_AI_DEFAULT_BASE_URL: http://172.22.96.167.nip.io/v1/}' + model: '${OPEN_AI_DEFAULT_MODEL_NAME: cb219b5f-8f3e-49e1-8d5b-f0c6da481186}' + +knowledge: + chunk_size: '${KNOWLEDGE_CHUNK_SIZE: 500}' + chunk_overlap: '${KNOWLEDGE_CHUNK_OVERLAP: 50}' + +backendPg: + host: '${PG_HOST: localhost}' + port: '${PG_PORT: 5432}' + user: '${PG_USER: postgres}' + password: '${PG_PASSWORD: 123456}' + database: '${PG_DATABASE: arcadia}' +``` + +\${MINIO_ACCESSKEY: hpU4SCmj5jixxx} + +MINIO_ACCESSKEY is the environment variable name. + +hpU4SCmj5jixxx is the default value if the environment variable is not set. + + +## release phase +The example config.yml is as the following: +```yaml +minio: + access_key: hpU4SCmj5jixxx + secret_key: xxx + api_url: 172.22.96.136.nip.io + secure: True + dataset_prefix: dataset + +zhipuai: + api_key: 871772ac03fcb9db9d4ce7b1e6eea27.VZZVy0mCox0WrzAG + +llm: + use_type: zhipuai_online # zhipuai_online or open_ai + qa_retry_count: 100 + +open_ai: + key: fake + base_url: http://172.22.96.167.nip.io/v1/ + model: cb219b5f-8f3e-49e1-8d5b-f0c6da481186 + +knowledge: + chunk_size: 500 + chunk_overlap: 50 + +backendPg: + host: localhost + port: 5432 + user: admin + password: 123456 + database: arcadia +``` +In the K8s, you can use the config map to point to the /arcadia_app/data_manipulation/config.yml file. diff --git a/data-processing/data_manipulation/common/config.py b/data-processing/data_manipulation/common/config.py index b11a2722c..4951efda0 100644 --- a/data-processing/data_manipulation/common/config.py +++ b/data-processing/data_manipulation/common/config.py @@ -12,26 +12,261 @@ # See the License for the specific language governing permissions and # limitations under the License. + +import logging import os +import traceback + +import yaml +from utils.class_utils import Singleton + +from . import log_tag_const + +logger = logging.getLogger(__name__) + + +class Config(metaclass=Singleton): + """Configuration class to store the env values.""" + + def __init__(self, yaml_file_path='config.yml'): + logger.debug(f"{log_tag_const.CONFIG} start to load config file.") + yaml_data = self.__get_default_yaml_data() + try: + with open(yaml_file_path, 'r') as file: + # load yaml data + yaml_data = yaml.safe_load(file) + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.CONFIG} There is an error when load the config " + f"(file path = {yaml_file_path}). \n" + f"{traceback.format_exc()}" + ])) + logger.debug(''.join([ + f"{log_tag_const.CONFIG} The content is config.\n", + f"{yaml_data}\n", + ])) + + self.__set_property_value(yaml_data) + + + + def __get_default_yaml_data(self): + """Get the default yaml data.""" + return { + 'minio': {}, + 'zhipuai': {}, + 'llm': { + 'open_ai': {} + }, + 'knowledge': {}, + 'backendPg': {} + } + + def __set_property_value(self, yaml_data): + """设置属性的值""" + # minio access key + self.minio_access_key = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='minio', + key='access_key', + env_name='MINIO_ACCESSKEY', + default_value='hpU4SCmj5jiU7IP5' + ) + # minio secret key + self.minio_secret_key = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='minio', + key='secret_key', + env_name='MINIO_SECRETKEY', + default_value='7AUewBESqvKijdnNskm8nU6emTZ3rG8F' + ) + # minio api url + self.minio_api_url = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='minio', + key='api_url', + env_name='MINIO_API_URL', + default_value='kubeagi-minio.172.22.96.136.nip.io' + ) + # minio secure + # if use HTTP, secure = False; + # if use HTTPS, secure = True; + self.minio_secure = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='minio', + key='secure', + env_name='MINIO_SECURE', + default_value=True + ) + # minio data set prefix + self.minio_dataset_prefix = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='minio', + key='dataset_prefix', + env_name='MINIO_DATASET_PREFIX', + default_value='dataset' + ) + + # zhi pu ai + # api key + self.zhipuai_api_key = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='zhipuai', + key='api_key', + env_name='ZHIPUAI_API_KEY', + default_value='871772ac03fcb9db9d4ce7b1e6eea210.VZZVy0mCox0WrzQI' + ) + + # llm + # use type such as zhipuai_online or open_ai + self.llm_use_type = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='llm', + key='use_type', + env_name='LLM_USE_TYPE', + default_value='zhipuai_online' + ) + + self.llm_qa_retry_count = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='llm', + key='qa_retry_count', + env_name='LLM_QA_RETRY_COUNT', + default_value=100 + ) + + # open ai + # key + self.open_ai_default_key = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='open_ai', + key='key', + env_name='OPEN_AI_DEFAULT_KEY', + default_value='happy' + ) + # base url + self.open_ai_default_base_url = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='open_ai', + key='base_url', + env_name='OPEN_AI_DEFAULT_BASE_URL', + default_value='http://arcadia-fastchat.172.22.96.167.nip.io/v1' + ) + # default model + self.open_ai_default_model = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='open_ai', + key='model', + env_name='OPEN_AI_DEFAULT_MODEL', + default_value='baichuan2-7b-worker-baichuan-sample-playground' + ) + + # knowledge + # chunk size + self.knowledge_chunk_size = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='knowledge', + key='chunk_size', + env_name='KNOWLEDGE_CHUNK_SIZE', + default_value=500 + ) + # chunk overlap + self.knowledge_chunk_overlap = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='knowledge', + key='chunk_overlap', + env_name='KNOWLEDGE_CHUNK_OVERLAP', + default_value=50 + ) + + # backend PostgreSQL + # host + self.pg_host = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='backendPg', + key='host', + env_name='PG_HOST', + default_value='localhost' + ) + # port + self.pg_port = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='backendPg', + key='port', + env_name='PG_HOST', + default_value=5432 + ) + # user + self.pg_user = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='backendPg', + key='user', + env_name='PG_USER', + default_value='postgres' + ) + # password + self.pg_password = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='backendPg', + key='password', + env_name='PG_PASSWORD', + default_value='123456' + ) + # database name + self.pg_database = self.__get_value_by_key_in_yaml( + yaml_data, + parent_key='backendPg', + key='database', + env_name='PG_DATABASE', + default_value='arcadia' + ) + + + def __get_value_by_key_in_yaml( + self, + config_json, + parent_key, + key, + env_name, + default_value + ): + """Get the value by key int the yaml file. + + Parameters + ---------- + config_json + the config json + parent_key + the parent key. + env_name + the environment variable name. + default_value: + the default value. + """ + value = config_json[parent_key].get(key) + if value is None: + value = os.getenv(env_name, default_value) + else: + if value.startswith('${'): + values_in_yaml = value.split(': ') + if len(values_in_yaml) == 2: + env_name_in_yaml = values_in_yaml[0].strip()[2:] + default_value_in_yaml = values_in_yaml[1].strip()[:-1] + + value = os.getenv(env_name_in_yaml, default_value_in_yaml) + + return value + + + +config = Config() + + + + + + + -minio_access_key = os.getenv('MINIO_ACCESSKEY', 'minioadmin') -minio_secret_key = os.getenv('MINIO_SECRETKEY', 'minioadmin') -minio_api_url = os.getenv('MINIO_API_URL', 'localhost:9000') -# 如果使用HTTP,将secure设置为False;如果使用HTTPS,将其设置为True -minio_secure = os.getenv('MINIO_SECURE', True) -minio_dataset_prefix = os.getenv('MINIO_DATASET_PREFIX', 'dataset') - -# zhipuai api_key -zhipuai_api_key = os.getenv('ZHIPUAI_API_KEY', 'xxxxxx') - -knowledge_chunk_size = os.getenv('KNOWLEDGE_CHUNK_SIZE', 500) -knowledge_chunk_overlap = os.getenv('KNOWLEDGE_CHUNK_OVERLAP', 50) - -# pg数据库 -pg_host = os.getenv('PG_HOST', 'localhost') -pg_port = os.getenv('PG_PORT', 5432) -pg_user = os.getenv('PG_USER', 'postgres') -pg_password = os.getenv('PG_PASSWORD', 'xxxxxx') -pg_database = os.getenv('PG_DATABASE', 'data_process') diff --git a/data-processing/data_manipulation/common/log_tag_const.py b/data-processing/data_manipulation/common/log_tag_const.py index 06175cd39..7ddb22966 100644 --- a/data-processing/data_manipulation/common/log_tag_const.py +++ b/data-processing/data_manipulation/common/log_tag_const.py @@ -25,8 +25,15 @@ DATA_PROCESS_DETAIL = "Data Process Detail" PDF_HANDLE = "PDF Handle" +CSV_HANDLE = "CSV Handle" QA_SPLIT = "Question Answer Split" +CLEAN_TRANSFORM = "Clean Transform" +PRIVACY_TRANSFORM = "Privacy Transform" + THREADING = "Threading" +ZHI_PU_AI = "Zhi Pu AI" +OPEN_AI = "Open AI" +CONFIG = "Config" diff --git a/data-processing/data_manipulation/config.yml b/data-processing/data_manipulation/config.yml new file mode 100644 index 000000000..a224b8128 --- /dev/null +++ b/data-processing/data_manipulation/config.yml @@ -0,0 +1,36 @@ +minio: + access_key: '${MINIO_ACCESSKEY: hpU4SCmj5jiU7IP5}' + secret_key: '${MINIO_SECRETKEY: 7AUewBESqvKijdnNskm8nU6emTZ3rG8F}' + api_url: '${MINIO_API_URL: kubeagi-minio.172.22.96.136.nip.io}' + secure: '${MINIO_SECURE: True}' + dataset_prefix: '${MINIO_DATASET_PREFIX: dataset}' + +zhipuai: + api_key: '${ZHIPUAI_API_KEY: 871772ac03fcb9db9d4ce7b1e6eea210.VZZVy0mCox0WrzQI}' + +llm: + use_type: '${LLM_USE_TYPE: open_ai}' # zhi_pu_online or open_ai + qa_retry_count: '${LLM_QA_RETRY_COUNT: 100}' + +open_ai: + key: '${OPEN_AI_DEFAULT_KEY: fake}' + base_url: '${OPEN_AI_DEFAULT_BASE_URL: http://arcadia-fastchat.172.22.96.167.nip.io/v1}' + model: '${OPEN_AI_DEFAULT_MODEL_NAME: 3d407a8b-90ab-43c4-9fc0-87e533368570}' + + + +knowledge: + chunk_size: '${KNOWLEDGE_CHUNK_SIZE: 500}' + chunk_overlap: '${KNOWLEDGE_CHUNK_OVERLAP: 50}' + +backendPg: + host: '${PG_HOST: localhost}' + port: '${PG_PORT: 5432}' + user: '${PG_USER: postgres}' + password: '${PG_PASSWORD: 123456}' + database: '${PG_DATABASE: data_process}' + + + + + diff --git a/data-processing/data_manipulation/controller/data_process_controller.py b/data-processing/data_manipulation/controller/data_process_controller.py index c4cd33f48..19a33b527 100644 --- a/data-processing/data_manipulation/controller/data_process_controller.py +++ b/data-processing/data_manipulation/controller/data_process_controller.py @@ -13,55 +13,85 @@ # limitations under the License. +from file_handle import pdf_handle from sanic import Blueprint from sanic.response import json - from service import data_process_service from transform.text import support_type - # Create a separate router (Blueprint) data_process = Blueprint("data_process", url_prefix="/") @data_process.route('list-by-page', methods=['POST']) async def list_by_page(request): - res = await data_process_service.list_by_page(request.json, { - 'pool': request.app.config['conn_pool'] - }) + res = data_process_service.list_by_page( + request.json, + pool=request.app.config['conn_pool'] + ) return json(res) @data_process.route('list-by-count', methods=['POST']) async def list_by_count(request): - res = await data_process_service.list_by_count(request.json, { - 'pool': request.app.config['conn_pool'] - }) + res = data_process_service.list_by_count( + request.json, + pool=request.app.config['conn_pool'] + ) return json(res) @data_process.route('add', methods=['POST']) async def add(request): - res = await data_process_service.add(request.json, { - 'pool': request.app.config['conn_pool'], - 'sanic_app': app - }) + """Add a new data process task. + + example for request.json + { + "name": "小T_test_0201", + "file_type": "text", + "pre_data_set_name": "dataset1", + "pre_data_set_version": "v2", + "post_data_set_name": "dataset1", + "post_data_set_version": "v2", + "version_data_set_name": "dataset1-v2", + "bucket_name": "system-tce", + "file_names": [ + { + "name": "数据处理文件_小T.pdf" + } + ], + "data_process_config_info": [] + } + """ + res = data_process_service.add( + request.json, + pool=request.app.config['conn_pool'] + ) return json(res) @data_process.route('delete-by-id', methods=['POST']) async def delete_by_id(request): - res = await data_process_service.delete_by_id(request.json, { - 'pool': request.app.config['conn_pool'] - }) + res = data_process_service.delete_by_id( + request.json, + pool=request.app.config['conn_pool'] + ) return json(res) @data_process.route('info-by-id', methods=['POST']) async def info_by_id(request): - res = await data_process_service.info_by_id(request.json, { - 'pool': request.app.config['conn_pool'] - }) + """Get the detail info by id. + + example for request.json + { + "id": "01HGWBE48DT3ADE9ZKA62SW4WS" + } + """ + res = data_process_service.info_by_id( + request.json, + pool=request.app.config['conn_pool'] + ) return json(res) @@ -71,5 +101,23 @@ async def text_process_type(request): return json({ 'status': 200, 'message': '', - 'data': support_type.support_types + 'data': support_type.get_default_support_types() + }) + +@data_process.route('test', methods=['POST']) +async def test(request): + """Get the support type for transforming the text content.""" + res = pdf_handle.test({ + 'support_type_map': { + 'remove_invisible_characters': 1 + }, + 'data': '“一户一表、水表出户、抄表到户”是指一个家庭用户安装一个计量水表,计量水表安装在住宅的公共部位,供水企业抄表到户,按户计量收费。', + 'file_name': '222', + 'task_id': '111', + 'conn_pool': request.app.config['conn_pool'] + }) + return json({ + 'status': 200, + 'message': '', + 'data': res }) diff --git a/data-processing/data_manipulation/data_store_clients/minio_store_client.py b/data-processing/data_manipulation/data_store_clients/minio_store_client.py index 9a4aac100..37d9c5ecd 100644 --- a/data-processing/data_manipulation/data_store_clients/minio_store_client.py +++ b/data-processing/data_manipulation/data_store_clients/minio_store_client.py @@ -15,21 +15,19 @@ import logging import os import traceback -import urllib3 -import traceback +import urllib3 +from common import log_tag_const +from common.config import config from minio import Minio from minio.commonconfig import Tags from minio.error import S3Error - -from common import config, log_tag_const from utils import file_utils - logger = logging.getLogger(__name__) -async def get_minio_client(): +def get_minio_client(): """Get a new minio client.""" return Minio( config.minio_api_url, @@ -48,20 +46,20 @@ async def get_minio_client(): ) -async def download(minio_client, opt={}): +def download( + minio_client, + folder_prefix, + bucket_name, + file_name, +): """Download a file. minio_client: minio client; - - opt is a dictionary object. It has the following keys: folder_prefix: folder prefix; bucket_name: bucket name; file_name: file name; """ - folder_prefix = opt['folder_prefix'] - bucket_name = opt['bucket_name'] - file_name = opt['file_name'] - file_path = await file_utils.get_temp_file_path() + file_path = file_utils.get_temp_file_path() # 如果文件夹不存在,则创建 directory_path = file_path + 'original' @@ -77,21 +75,22 @@ async def download(minio_client, opt={}): ) -async def upload_files_to_minio_with_tags(minio_client, opt={}): +def upload_files_to_minio_with_tags( + minio_client, + local_folder, + minio_bucket, + minio_prefix, + support_type, + data_volumes_file, +): """Upload the files to minio with tags - opt is a dictionary object. It has the following keys: local_folder: local folder; minio_bucket: bucket name; minio_prefix: folder prefix; support_type: support type data_volumes_file: data volumes file """ - local_folder=opt['local_folder'] - minio_bucket=opt['minio_bucket'] - minio_prefix=opt['minio_prefix'] - support_type=opt['support_type'] - data_volumes_file=opt['data_volumes_file'] logger.debug(f"{log_tag_const.MINIO} 上传文件到minio中 {data_volumes_file}") @@ -121,7 +120,7 @@ async def upload_files_to_minio_with_tags(minio_client, opt={}): ) # 删除本地文件 - await file_utils.delete_file(local_file_path) + file_utils.delete_file(local_file_path) except S3Error as ex: logger.error(''.join([ f"{log_tag_const.MINIO} Error uploading {minio_object_name} ", diff --git a/data-processing/data_manipulation/data_store_process/minio_store_process.py b/data-processing/data_manipulation/data_store_process/minio_store_process.py index 334d445df..1cee6ca5f 100644 --- a/data-processing/data_manipulation/data_store_process/minio_store_process.py +++ b/data-processing/data_manipulation/data_store_process/minio_store_process.py @@ -18,19 +18,26 @@ import os import pandas as pd - -from common import config, log_tag_const +from common import log_tag_const +from common.config import config from data_store_clients import minio_store_client from database_operate import data_process_db_operate from file_handle import csv_handle, pdf_handle from kube import dataset_cr from utils import file_utils - logger = logging.getLogger(__name__) -async def text_manipulate(req_json, opt={}): +def text_manipulate( + req_json, + pool, + id, +): + """Manipulate the text content. + + req_json is a dictionary object. + """ bucket_name = req_json['bucket_name'] support_type = req_json['data_process_config_info'] @@ -46,15 +53,16 @@ async def text_manipulate(req_json, opt={}): ]) # get a minio client - minio_client = await minio_store_client.get_minio_client() + minio_client = minio_store_client.get_minio_client() # 将文件都下载到本地 for file_name in file_names: - await minio_store_client.download(minio_client, { - 'bucket_name': bucket_name, - 'folder_prefix': folder_prefix, - 'file_name': file_name['name'] - }) + minio_store_client.download( + minio_client, + bucket_name=bucket_name, + folder_prefix=folder_prefix, + file_name=file_name['name'] + ) # 文件处理 # 存放每个文件对应的数据量 @@ -65,55 +73,60 @@ async def text_manipulate(req_json, opt={}): file_extension = file_name.split('.')[-1].lower() if file_extension in ['csv']: # 处理CSV文件 - result = await csv_handle.text_manipulate({ + result = csv_handle.text_manipulate({ 'file_name': file_name, 'support_type': support_type }) elif file_extension in ['pdf']: # 处理PDF文件 - result = await pdf_handle.text_manipulate(req_json, { - 'file_name': file_name, - 'support_type': support_type, - 'conn_pool': opt['pool'], - 'task_id': opt['id'] - }) + result = pdf_handle.text_manipulate( + chunk_size=req_json.get('chunk_size'), + chunk_overlap=req_json.get('chunk_overlap'), + file_name=file_name, + support_type=support_type, + conn_pool=pool, + task_id=id, + create_user=req_json['creator'] + ) data_volumes_file.append(result['data']) # 将清洗后的文件上传到MinIO中 # 上传final文件夹下的文件,并添加tag - file_path = await file_utils.get_temp_file_path() - await minio_store_client.upload_files_to_minio_with_tags(minio_client, { - 'minio_client': minio_client, - 'local_folder': file_path + 'final', - 'minio_bucket': bucket_name, - 'minio_prefix': folder_prefix, - 'support_type': support_type, - 'data_volumes_file': data_volumes_file - }) + file_path = file_utils.get_temp_file_path() + minio_store_client.upload_files_to_minio_with_tags( + minio_client=minio_client, + local_folder=file_path + 'final', + minio_bucket=bucket_name, + minio_prefix=folder_prefix, + support_type=support_type, + data_volumes_file=data_volumes_file + ) # 将本地临时文件删除 for item in file_names: - remove_file_path = await file_utils.get_temp_file_path() + remove_file_path = file_utils.get_temp_file_path() local_file_path = remove_file_path + 'original/' + item['name'] - await file_utils.delete_file(local_file_path) + file_utils.delete_file(local_file_path) # 数据库更新任务状态 update_params = { - 'id': opt['id'], + 'id': id, 'status': 'process_complete', + 'create_user': req_json['creator'] } - await data_process_db_operate.update_status_by_id(update_params, { - 'pool': opt['pool'] - }) + data_process_db_operate.update_status_by_id( + update_params, + pool=pool + ) # 更新数据集CR状态 - await dataset_cr.update_dataset_k8s_cr({ - 'bucket_name': req_json['bucket_name'], - 'version_data_set_name': req_json['version_data_set_name'], - 'reason': 'process_complete' - }) + dataset_cr.update_dataset_k8s_cr( + bucket_name=req_json['bucket_name'], + version_data_set_name=req_json['version_data_set_name'], + reason='process_complete' + ) return { 'status': 200, diff --git a/data-processing/data_manipulation/database_clients/postgresql_pool_client.py b/data-processing/data_manipulation/database_clients/postgresql_pool_client.py index f30d78f6e..a6ba3c464 100644 --- a/data-processing/data_manipulation/database_clients/postgresql_pool_client.py +++ b/data-processing/data_manipulation/database_clients/postgresql_pool_client.py @@ -15,11 +15,9 @@ import logging import traceback -from dbutils.pooled_db import PooledDB import psycopg2.extras - from common import log_tag_const - +from dbutils.pooled_db import PooledDB logger = logging.getLogger(__name__) @@ -54,7 +52,7 @@ def get_connection_from_pool(pool): -async def execute_query(pool, sql, params={}): +def execute_query(pool, sql, params={}): """Execute a query with the parameters.""" error = '' data = [] @@ -93,7 +91,7 @@ async def execute_query(pool, sql, params={}): } -async def execute_count_query(pool, sql, params={}): +def execute_count_query(pool, sql, params={}): """Execute a count query with the parameters.""" error = '' data = None @@ -102,7 +100,6 @@ async def execute_count_query(pool, sql, params={}): with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute(sql, params) data = cursor.fetchone()[0] - except Exception as ex: error = str(ex) data = None @@ -126,7 +123,7 @@ async def execute_count_query(pool, sql, params={}): } -async def execute_update(pool, sql, params={}): +def execute_update(pool, sql, params={}): """Execute a update with the parameters.""" error = '' data = None diff --git a/data-processing/data_manipulation/database_operate/data_process_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_db_operate.py index a9f721997..0ebee14ae 100644 --- a/data-processing/data_manipulation/database_operate/data_process_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_db_operate.py @@ -15,17 +15,19 @@ import ujson import ulid -from sanic.response import json - from database_clients import postgresql_pool_client +from sanic.response import json from utils import date_time_utils -async def list_by_page(req_json, opt={}): + +def list_by_page( + req_json, + pool +): """Get the list data for data processing by page""" - pool = opt['pool'] - params = { 'keyword': '%' + req_json['keyword'] + '%', + 'namespace': req_json['namespace'], 'pageIndex': int(req_json['pageIndex']), 'pageSize': int(req_json['pageSize']) } @@ -35,6 +37,7 @@ async def list_by_page(req_json, opt={}): id, name, status, + namespace, pre_data_set_name, pre_data_set_version, post_data_set_name, @@ -43,21 +46,24 @@ async def list_by_page(req_json, opt={}): from public.data_process_task where - name like %(keyword)s + name like %(keyword)s and + namespace = %(namespace)s order by start_datetime desc limit %(pageSize)s offset %(pageIndex)s """.strip() - res = await postgresql_pool_client.execute_query(pool, sql, params) + res = postgresql_pool_client.execute_query(pool, sql, params) return res -async def list_by_count(req_json, opt={}): +def list_by_count( + req_json, + pool +): """Get count for the list data processing with page""" - pool = opt['pool'] - params = { - 'keyword': '%' + req_json['keyword'] + '%' + 'keyword': '%' + req_json['keyword'] + '%', + 'namespace': req_json['namespace'] } sql = """ @@ -66,17 +72,19 @@ async def list_by_count(req_json, opt={}): from public.data_process_task where - name like %(keyword)s + name like %(keyword)s and + namespace = %(namespace)s """.strip() - res = await postgresql_pool_client.execute_count_query(pool, sql, params) + res = postgresql_pool_client.execute_count_query(pool, sql, params) return res -async def delete_by_id(req_json, opt={}): +def delete_by_id( + req_json, + pool +): """Delete a record with id""" - pool = opt['pool'] - params = { 'id': req_json['id'] } @@ -87,23 +95,26 @@ async def delete_by_id(req_json, opt={}): id = %(id)s """.strip() - res = await postgresql_pool_client.execute_update(pool, sql, params) + res = postgresql_pool_client.execute_update(pool, sql, params) return res -async def add(req_json, opt={}): +def add( + req_json, + pool, + id +): """Add a new record""" - pool = opt['pool'] - now = date_time_utils.now_str() - user = 'admin' + user = req_json['creator'] program = '数据处理任务-新增' params = { - 'id': opt['id'], + 'id': id, 'name': req_json['name'], 'file_type': req_json['file_type'], 'status': 'processing', + 'namespace': req_json['namespace'], 'pre_data_set_name': req_json['pre_data_set_name'], 'pre_data_set_version': req_json['pre_data_set_version'], 'file_names': ujson.dumps(req_json['file_names']), @@ -125,6 +136,7 @@ async def add(req_json, opt={}): name, file_type, status, + namespace, pre_data_set_name, pre_data_set_version, file_names, @@ -133,8 +145,8 @@ async def add(req_json, opt={}): data_process_config_info, start_datetime, create_datetime, - create_user, create_program, + create_user, update_datetime, update_program, update_user @@ -144,6 +156,7 @@ async def add(req_json, opt={}): %(name)s, %(file_type)s, %(status)s, + %(namespace)s, %(pre_data_set_name)s, %(pre_data_set_version)s, %(file_names)s, @@ -160,16 +173,17 @@ async def add(req_json, opt={}): ) """.strip() - res = await postgresql_pool_client.execute_update(pool, sql, params) + res = postgresql_pool_client.execute_update(pool, sql, params) return res -async def update_status_by_id(req_json, opt={}): +def update_status_by_id( + req_json, + pool +): """Update the status with id""" - pool = opt['pool'] - now = date_time_utils.now_str() - user = 'admin' + user = req_json['create_user'] program = '修改任务状态' params = { @@ -192,14 +206,15 @@ async def update_status_by_id(req_json, opt={}): id = %(id)s """.strip() - res = await postgresql_pool_client.execute_update(pool, sql, params) + res = postgresql_pool_client.execute_update(pool, sql, params) return res -async def info_by_id(req_json, opt={}): +def info_by_id( + req_json, + pool +): """info with id""" - pool = opt['pool'] - params = { 'id': req_json['id'] } @@ -226,5 +241,5 @@ async def info_by_id(req_json, opt={}): id = %(id)s """.strip() - res = await postgresql_pool_client.execute_query(pool, sql, params) + res = postgresql_pool_client.execute_query(pool, sql, params) return res \ No newline at end of file diff --git a/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py index 3af0df962..5cb548d4b 100644 --- a/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_detail_db_operate.py @@ -14,17 +14,17 @@ import ulid - from database_clients import postgresql_pool_client from utils import date_time_utils -async def insert_transform_info(req_json, opt={}): +def insert_transform_info( + req_json, + pool +): """Insert a transform info""" - pool = opt['pool'] - now = date_time_utils.now_str() - user = 'admin' + user = req_json['create_user'] program = '数据处理任务详情-新增' params = { @@ -33,7 +33,7 @@ async def insert_transform_info(req_json, opt={}): 'file_name': req_json['file_name'], 'transform_type': req_json['transform_type'], 'pre_content': req_json['pre_content'], - 'post_content': req_json['pre_content'], + 'post_content': req_json['post_content'], 'create_datetime': now, 'create_user': user, 'create_program': program, @@ -73,16 +73,17 @@ async def insert_transform_info(req_json, opt={}): ) """.strip() - res = await postgresql_pool_client.execute_update(pool, sql, params) + res = postgresql_pool_client.execute_update(pool, sql, params) return res -async def insert_question_answer_info(req_json, opt={}): +def insert_question_answer_info( + req_json, + pool +): """Insert a question answer info""" - pool = opt['pool'] - now = date_time_utils.now_str() - user = 'admin' + user = req_json['create_user'] program = '数据处理任务问题和答案-新增' params = { @@ -128,16 +129,61 @@ async def insert_question_answer_info(req_json, opt={}): ) """.strip() - res = await postgresql_pool_client.execute_update(pool, sql, params) + res = postgresql_pool_client.execute_update(pool, sql, params) return res -async def transform_list_by_task_id(req_json,opt={}): - """Get list for the transform info with task id""" - pool = opt['pool'] +def list_file_name_for_transform( + req_json, + pool +): + """List file name for tansform in the task detail. + + req_json is a dictionary object. for example: + { + "task_id": "01HGWBE48DT3ADE9ZKA62SW4WS", + "transform_type": "remove_invisible_characters" + } + pool: databasec connection pool; + """ + params = { + 'task_id': req_json['task_id'], + 'transform_type': req_json['transform_type'], + } + + sql = """ + select + file_name + from public.data_process_task_detail + where + task_id = %(task_id)s and + transform_type = %(transform_type)s + group by file_name + """.strip() + res = postgresql_pool_client.execute_query(pool, sql, params) + return res + + +def top_n_list_transform_for_preview( + req_json, + pool +): + """List transform info with task id, file name and + tansform type for preview. + + req_json is a dictionary object. for example: + { + "task_id": "01HGWBE48DT3ADE9ZKA62SW4WS", + "file_name": "MyFile.pdf", + "transform_type": "remove_invisible_characters" + } + pool: databasec connection pool; + """ params = { - 'task_id': req_json['task_id'] + 'task_id': req_json['task_id'], + 'file_name': req_json['file_name'], + 'transform_type': req_json['transform_type'] } sql = """ @@ -147,23 +193,67 @@ async def transform_list_by_task_id(req_json,opt={}): file_name, transform_type, pre_content, - post_content + post_content, + update_datetime from public.data_process_task_detail where - task_id = %(task_id)s + task_id = %(task_id)s and + file_name = %(file_name)s and + transform_type = %(transform_type)s + order by update_datetime desc + limit 10 """.strip() - res = await postgresql_pool_client.execute_sql(pool,sql,params) + res = postgresql_pool_client.execute_query(pool, sql, params) return res + +def list_file_name_in_qa_by_task_id( + req_json, + pool +): + """List file name in question answer with task id. + + req_json is a dictionary object. for example: + { + "task_id": "01HGWBE48DT3ADE9ZKA62SW4WS" + } + pool: databasec connection pool; + """ + params = { + 'task_id': req_json['task_id'] + } -async def question_answer_info_by_task_id(req_json,opt={}): - """ question answer info with task id""" - pool = opt['pool'] + sql = """ + select + file_name + from public.data_process_task_question_answer + where + task_id = %(task_id)s + group by file_name + """.strip() + res = postgresql_pool_client.execute_query(pool, sql, params) + return res + + +def top_n_list_qa_for_preview( + req_json, + pool +): + """List question answer info with task id for preview. + + req_json is a dictionary object. for example: + { + "task_id": "01HGWBE48DT3ADE9ZKA62SW4WS", + "file_name": "MyFile.pdf" + } + pool: databasec connection pool; + """ params = { - 'task_id': req_json['task_id'] + 'task_id': req_json['task_id'], + 'file_name': req_json['file_name'] } sql = """ @@ -176,9 +266,11 @@ async def question_answer_info_by_task_id(req_json,opt={}): from public.data_process_task_question_answer where - task_id = %(task_id)s + task_id = %(task_id)s and + file_name = %(file_name)s + order by update_datetime desc + limit 10 """.strip() - res = await postgresql_pool_client.execute_sql(pool,sql,params) - return res - + res = postgresql_pool_client.execute_query(pool, sql, params) + return res \ No newline at end of file diff --git a/data-processing/data_manipulation/file_handle/csv_handle.py b/data-processing/data_manipulation/file_handle/csv_handle.py index d54aa577f..071be31f8 100644 --- a/data-processing/data_manipulation/file_handle/csv_handle.py +++ b/data-processing/data_manipulation/file_handle/csv_handle.py @@ -13,44 +13,43 @@ # limitations under the License. -import asyncio -import csv import logging -import os +import traceback import pandas as pd import ulid - +from common import log_tag_const from transform.text import clean_transform, privacy_transform -from utils import date_time_utils, file_utils +from utils import csv_utils, date_time_utils, file_utils -logger = logging.getLogger('csv_handle') +logger = logging.getLogger(__name__) -async def text_manipulate(opt={}): - logger.info("csv text manipulate!") +def text_manipulate( + file_name, + support_type +): + """Manipuate the text content. + + file_name: file name; + support_type: support type; + process logic + 处理某条数据时,如果某个方式(比如:去除不可见字符)处理失败了,则直接结束,不在处理, + 整个文件都视作处理失败。 """ - 数据处理逻辑: - 处理某条数据时,如果某个方式(比如:去除不可见字符)处理失败了,则直接结束,不在处理,整个文件都视作处理失败 - - """ - try: - file_name = opt['file_name'] - support_type = opt['support_type'] + logger.debug(f"{log_tag_const.CSV_HANDLE} Start to manipulate text in csv file.") - csv_file_path = await file_utils.get_temp_file_path() + csv_file_path = file_utils.get_temp_file_path() file_path = csv_file_path + 'original/' + file_name # 获取CSV文件的内容 data = pd.read_csv(file_path) - - logger.info("start text manipulate!") text_data = data['prompt'] # 数据清洗 - clean_result = await data_clean({ + clean_result = _data_clean({ 'support_type': support_type, 'file_name': file_name, 'data': text_data @@ -62,23 +61,29 @@ async def text_manipulate(opt={}): text_data = clean_result['data'] # 将清洗后的文件保存为final - new_file_name = await file_utils.get_file_name({ + new_file_name = file_utils.get_file_name({ 'file_name': file_name, 'handle_name': 'final' }) - await save_csv({ + save_csv({ 'file_name': new_file_name, 'phase_value': 'final', 'data': text_data }) + logger.debug(f"{log_tag_const.CSV_HANDLE} Finish manipulating text in csv file.") + return { 'status': 200, 'message': '', 'data': '' } except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.CSV_HANDLE} There is a error when mainpulate the text ", + f"in a csv file. \n{traceback.format_exc()}" + ])) return { 'status': 400, 'message': '', @@ -86,28 +91,23 @@ async def text_manipulate(opt={}): } -### -# 数据异常清洗 -# @author: wangxinbiao -# @date: 2023-11-08 09:32:01 -# modify history -# ==== 2023-11-08 09:32:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### - - -async def data_clean(opt={}): - logger.info("csv text data clean start!") - support_type = opt['support_type'] - data = opt['data'] +def _data_clean( + support_type, + data, + file_name +): + """Clean the data. + + support_type: support type; + data: text content; + """ + logger.debug(f"{log_tag_const.CSV_HANDLE} Start to clean data in csv.") # 去除不可见字符 if 'remove_invisible_characters' in support_type: clean_data = [] for item in data: - result = await remove_invisible_characters({ + result = clean_transform.remove_invisible_characters({ 'text': item }) @@ -124,18 +124,19 @@ async def data_clean(opt={}): data.insert(0, ['prompt']) # 将文件存为middle - file_name = await file_utils.get_file_name({ - 'file_name': opt['file_name'], + file_name = file_utils.get_file_name({ + 'file_name': file_name, 'handle_name': 'middle' }) - await save_csv({ + csv_utils.save_csv({ 'file_name': file_name, 'phase_value': 'middle', 'data': data }) - logger.info("csv text data clean stop!") + + logger.debug(f"{log_tag_const.CSV_HANDLE} Finish cleaning data in csv.") return { 'status': 200, @@ -144,68 +145,4 @@ async def data_clean(opt={}): } -### -# 去除不可见字符 -# @author: wangxinbiao -# @date: 2023-11-02 14:42:01 -# modify history -# ==== 2023-11-02 14:42:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### - - -async def remove_invisible_characters(opt={}): - return await clean_transform.remove_invisible_characters({ - 'text': opt['text'] - }) - -### -# 去除邮箱地址 -# @author: wangxinbiao -# @date: 2023-11-02 14:42:01 -# modify history -# ==== 2023-11-02 14:42:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### - - -async def remove_email(opt={}): - return await privacy_transform.remove_email({ - 'text': opt['text'] - }) - -### -# 将数据存到CSV中 -# @author: wangxinbiao -# @date: 2023-11-02 14:42:01 -# modify history -# ==== 2023-11-02 14:42:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### - - -async def save_csv(opt={}): - file_name = opt['file_name'] - phase_value = opt['phase_value'] - data = opt['data'] - - csv_file_path = await file_utils.get_temp_file_path() - - # 如果文件夹不存在,则创建 - directory_path = csv_file_path + phase_value - if not os.path.exists(directory_path): - os.makedirs(directory_path) - - file_path = directory_path + '/' + file_name - - with open(file_path, 'w', newline='') as file: - writer = csv.writer(file) - writer.writerows(data) - - return file_path + diff --git a/data-processing/data_manipulation/file_handle/pdf_handle.py b/data-processing/data_manipulation/file_handle/pdf_handle.py index db3987c50..423d248b6 100644 --- a/data-processing/data_manipulation/file_handle/pdf_handle.py +++ b/data-processing/data_manipulation/file_handle/pdf_handle.py @@ -15,81 +15,96 @@ import logging import os +import traceback + import pandas as pd -from langchain.document_loaders import PyPDFLoader -from langchain.text_splitter import SpacyTextSplitter -from pypdf import PdfReader import ulid - -from common import config, log_tag_const +from common import log_tag_const +from common.config import config from database_operate import data_process_detail_db_operate -from file_handle import csv_handle -from llm_api_service import zhi_pu_ai_service +from langchain.text_splitter import SpacyTextSplitter +from llm_api_service.qa_provider_open_ai import QAProviderOpenAI +from llm_api_service.qa_provider_zhi_pu_ai_online import \ + QAProviderZhiPuAIOnline from transform.text import clean_transform, privacy_transform -from utils import file_utils - - +from utils import csv_utils, file_utils, pdf_utils logger = logging.getLogger(__name__) -async def text_manipulate(req_json, opt={}): +def text_manipulate( + file_name, + support_type, + conn_pool, + task_id, + create_user, + chunk_size, + chunk_overlap +): + """Manipulate the text content from a pdf file. + + file_name: file name; + support_type: support type; + conn_pool: database connection pool; + task_id: data process task id; + chunk_size: chunk size; + chunk_overlap: chunk overlap; + """ + logger.debug(f"{log_tag_const.PDF_HANDLE} Start to manipulate the text in pdf") try: - - file_name = opt['file_name'] - support_type = opt['support_type'] - conn_pool = opt['conn_pool'] # database connectionn pool - - # 数据量 - object_count = 0 - object_name = '' - - pdf_file_path = await file_utils.get_temp_file_path() + pdf_file_path = file_utils.get_temp_file_path() file_path = pdf_file_path + 'original/' + file_name + # step 1 + # Get the content from the pdf fild. + content = pdf_utils.get_content(file_path) + logger.debug(f"{log_tag_const.PDF_HANDLE} The pdf content is\n {content}") - # 获取PDF文件的内容 - content = await get_content({ - "file_path": file_path - }) - - logger.info("start text manipulate!") - - # 数据清洗 - clean_result = await data_clean({ - 'support_type': support_type, - 'file_name': file_name, - 'data': content - }) - - if clean_result['status'] != 200: - return clean_result - else: + support_type_map = _convert_support_type_to_map(support_type) + + # step 2 + # Clean the data such as removing invisible characters. + clean_result = _data_clean( + support_type_map=support_type_map, + file_name=file_name, + data=content, + conn_pool=conn_pool, + task_id=task_id, + create_user=create_user + ) + + if clean_result['status'] == 200: content = clean_result['data'] - # 去隐私 - clean_result = await privacy_erosion({ - 'support_type': support_type, - 'file_name': file_name, - 'data': content - }) - - if clean_result['status'] != 200: - return clean_result - else: + # step 3 + # Remove the privacy info such as removing email. + clean_result = _remove_privacy_info( + support_type_map=support_type_map, + file_name=file_name, + data=content, + conn_pool=conn_pool, + task_id=task_id, + create_user=create_user + ) + + if clean_result['status'] == 200: content = clean_result['data'] - # QA拆分 - if any(d.get('type') == 'qa_split' for d in support_type): - qa_data = await generate_QA(req_json, { - 'support_type': support_type, - 'data': content - }) + + # 数据量 + object_count = 0 + object_name = '' + if support_type_map.get('qa_split'): + logger.debug(f"{log_tag_const.QA_SPLIT} Start to split QA.") - # qa_data = [] + qa_data = _generate_qa_list( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + data=content + ) logger.debug(f"{log_tag_const.QA_SPLIT} The QA data is: \n{qa_data}\n") @@ -99,39 +114,33 @@ async def text_manipulate(req_json, opt={}): continue qa_insert_item = { 'id': ulid.ulid(), - 'task_id': opt['task_id'], + 'task_id': task_id, 'file_name': file_name, 'question': qa_data[i][0], - 'answer': qa_data[i][1] + 'answer': qa_data[i][1], + 'create_user': create_user } - await data_process_detail_db_operate.insert_question_answer_info( - qa_insert_item, { - 'pool': opt['conn_pool'] - } + data_process_detail_db_operate.insert_question_answer_info( + qa_insert_item, + pool=conn_pool ) - - - # 将生成的QA数据保存为CSV文件 - new_file_name = await file_utils.get_file_name({ - 'file_name': file_name, - 'handle_name': 'final' - }) - - file_name_without_extension = file_name.rsplit('.', 1)[0] - - await csv_handle.save_csv({ - 'file_name': file_name_without_extension + '.csv', - 'phase_value': 'final', - 'data': qa_data - }) + # Save the csv file. + file_name_without_extension = file_name.rsplit('.', 1)[0] + '_final' + csv_utils.save_csv( + file_name=file_name_without_extension + '.csv', + phase_value='final', + data=qa_data + ) object_name = file_name_without_extension + '.csv' - # 减 1 是为了去除表头 object_count = len(qa_data) - 1 - + + logger.debug(f"{log_tag_const.QA_SPLIT} Finish splitting QA.") + + logger.debug(f"{log_tag_const.PDF_HANDLE} Finish manipulating the text in pdf") return { 'status': 200, 'message': '', @@ -141,51 +150,181 @@ async def text_manipulate(req_json, opt={}): } } except Exception as ex: - logger.error(str(ex)) + logger.error(''.join([ + f"{log_tag_const.PDF_HANDLE} There is an error when manipulate ", + f"the text in pdf handler. \n{traceback.format_exc()}" + ])) + logger.debug(f"{log_tag_const.PDF_HANDLE} Finish manipulating the text in pdf") return { 'status': 400, - 'message': '', - 'data': '' + 'message': str(ex), + 'data': traceback.format_exc() } +def _data_clean( + support_type_map, + data, + task_id, + file_name, + create_user, + conn_pool +): + """Clean the data. + + support_type_map: example + { + "qa_split": 1, + "remove_invisible_characters": 1, + "space_standardization": 1, + "remove_email": 1 + } + data: data; + file_name: file name; + conn_pool: database connection pool; + task_id: data process task id; + """ + # remove invisible characters + if support_type_map.get('remove_invisible_characters'): + result = clean_transform.remove_invisible_characters( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_invisible_characters', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + # process for space standardization + if support_type_map.get('space_standardization'): + result = clean_transform.space_standardization( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'space_standardization', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + + # process for remove garbled text + if support_type_map.get('remove_garbled_text'): + result = clean_transform.remove_garbled_text( + text=data + ) + if result['status'] == 200: + if result['data']['found'] > 0: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_garbled_text', + 'pre_content': data, + 'post_content': result['data']['text'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] -async def data_clean(opt={}): - logger.info("pdf text data clean start!") - support_type = opt['support_type'] - data = opt['data'] - # 去除不可见字符 - if any(d.get('type') == 'remove_invisible_characters' for d in support_type): - result = await clean_transform.remove_invisible_characters({ - 'text': data - }) + # process for Traditional Chinese to Simplified Chinese + if support_type_map.get('traditional_to_simplified'): + result = clean_transform.traditional_to_simplified( + text=data + ) + if result['status'] == 200: + if result['data']['found'] > 0: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'traditional_to_simplified', + 'pre_content': data, + 'post_content': result['data']['text'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] - if result['status'] != 200: - return { - 'status': 400, - 'message': '去除不可见字符失败', - 'data': '' - } - - data = result['data'] + + # process for clean html code in text samples + if support_type_map.get('remove_html_tag'): + result = clean_transform.remove_html_tag( + text=data + ) + if result['status'] == 200: + if result['data']['found'] > 0: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_html_tag', + 'pre_content': data, + 'post_content': result['data']['text'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] - # 空格处理 - if any(d.get('type') == 'space_standardization' for d in support_type): - result = await clean_transform.space_standardization({ - 'text': data - }) - - if result['status'] != 200: - return { - 'status': 400, - 'message': '空格处理失败', - 'data': '' - } - - data = result['data'] - logger.info("pdf text data clean stop!") + # process for remove emojis + if support_type_map.get('remove_emojis'): + result = clean_transform.remove_emojis( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_emojis', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] return { 'status': 200, @@ -194,24 +333,171 @@ async def data_clean(opt={}): } -async def privacy_erosion(opt={}): - logger.info("pdf text privacy erosion start!") - support_type = opt['support_type'] - data = opt['data'] - - # 去邮箱 - if any(d.get('type') == 'remove_email' for d in support_type): - result = await privacy_transform.remove_email({ - 'text': data - }) - - if result['status'] != 200: - return result +def _remove_privacy_info( + support_type_map, + data, + task_id, + file_name, + create_user, + conn_pool +): + """"Remove the privacy info such as removing email. + + support_type_map: example + { + "qa_split": 1, + "remove_invisible_characters": 1, + "space_standardization": 1, + "remove_email": 1 + } + data: data; + file_name: file name; + conn_pool: database connection pool; + task_id: data process task id; + """ + # remove email + if support_type_map.get('remove_email'): + result = privacy_transform.remove_email( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_email', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] - data = result['data'] - - logger.info("pdf text privacy erosion stop!") + # remove ip addresses + if support_type_map.get('remove_ip_address'): + result = privacy_transform.remove_ip_address( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_ip_address', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + # remove number + if support_type_map.get('remove_number'): + # remove phone + result = privacy_transform.remove_phone( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_number', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + # remove id card + result = privacy_transform.remove_id_card( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_number', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + # remove weixin + result = privacy_transform.remove_weixin( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_number', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + # remove bank card + result = privacy_transform.remove_bank_card( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_number', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + return { 'status': 200, 'message': '', @@ -220,69 +506,80 @@ async def privacy_erosion(opt={}): -async def get_content(opt={}): - file_path = opt['file_path'] - - reader = PdfReader(file_path) - number_of_pages = len(reader.pages) - pages = reader.pages - content = "" - for page in pages: - content += page.extract_text() - - return content - - -async def generate_QA(req_json, opt={}): - logger.info("pdf text generate qa start!") +def _generate_qa_list( + chunk_size, + chunk_overlap, + data +): + """Generate the Question and Answer list. - # 文本分段 - chunk_size = config.knowledge_chunk_size - if "chunk_size" in req_json: - chunk_size = req_json['chunk_size'] - - chunk_overlap = config.knowledge_chunk_overlap - if "chunk_overlap" in req_json: - chunk_overlap = req_json['chunk_overlap'] - - separator = "\n\n" + chunk_size: chunck size; + chunk_overlap: chunk overlap; + data: the text used to generate QA; + """ + # step 1 + # Split the text. + if chunk_size is None: + chunk_size = config.knowledge_chunk_size + + if chunk_overlap is None: + chunk_overlap = config.knowledge_chunk_overlap text_splitter = SpacyTextSplitter( - separator=separator, + separator="\n\n", pipeline="zh_core_web_sm", chunk_size=int(chunk_size), - chunk_overlap=int(chunk_overlap), + chunk_overlap=int(chunk_overlap) ) - texts = text_splitter.split_text(opt['data']) + texts = text_splitter.split_text(data) + + logger.debug(''.join([ + f"original text is: \n{data}\n", + f"splitted text is: \n{texts}\n" + ])) + - # 生成QA + # step 2 + # Generate the QA list. qa_list = [['q', 'a']] - await zhi_pu_ai_service.init_service({ - 'api_key': config.zhipuai_api_key - }) - for item in texts: - text = item.replace("\n", "") - data = await zhi_pu_ai_service.generate_qa({ - 'text': text - }) - - qa_list.extend(data) - - logger.info("pdf text generate qa stop!") + if config.llm_use_type == 'open_ai': + qa_provider = QAProviderOpenAI() + for item in texts: + text = item.replace("\n", "") + data = qa_provider.generate_qa_list(text) + qa_list.extend(data) + elif config.llm_use_type == 'zhi_pu_online': + qa_provider = QAProviderZhiPuAIOnline() + for item in texts: + text = item.replace("\n", "") + data = qa_provider.generate_qa_list(text) + qa_list.extend(data) return qa_list -async def document_chunk(req_json, opt={}): - - separator = "\n\n" +def _convert_support_type_to_map(supprt_type): + """Convert support type to map. + + support_type: support type list + example + [ + { + "type": "qa_split" + }, + { + "type": "remove_invisible_characters" + }, + { + "type": "space_standardization" + }, + { + "type": "remove_email" + } + ] + """ + result = {} + for item in supprt_type: + result[item['type']] = 1 - text_splitter = SpacyTextSplitter( - separator=separator, - pipeline="zh_core_web_sm", - chunk_size=opt['chunk_size'], - chunk_overlap=opt['chunk_overlap'] - ) - texts = text_splitter.split_text(opt['data']) - - return texts \ No newline at end of file + return result \ No newline at end of file diff --git a/data-processing/data_manipulation/kube/client.py b/data-processing/data_manipulation/kube/client.py index 1468182ab..f7503d46b 100644 --- a/data-processing/data_manipulation/kube/client.py +++ b/data-processing/data_manipulation/kube/client.py @@ -13,17 +13,17 @@ # limitations under the License. -from kubernetes import client, config -from kubernetes.client import CustomObjectsApi import logging import os import traceback from common import log_tag_const -from .custom_resources import (arcadia_resource_datasets, - arcadia_resource_datasources, - arcadia_resource_versioneddatasets) +from kubernetes import client, config +from kubernetes.client import CustomObjectsApi +from .custom_resources import (arcadia_resource_datasets, + arcadia_resource_datasources, + arcadia_resource_versioneddatasets) logger = logging.getLogger(__name__) diff --git a/data-processing/data_manipulation/kube/dataset_cr.py b/data-processing/data_manipulation/kube/dataset_cr.py index b5089952d..01dd7bc11 100644 --- a/data-processing/data_manipulation/kube/dataset_cr.py +++ b/data-processing/data_manipulation/kube/dataset_cr.py @@ -14,16 +14,19 @@ import logging -from . import client from utils import date_time_utils +from . import client logger = logging.getLogger(__name__) -async def update_dataset_k8s_cr(opt={}): +def update_dataset_k8s_cr( + bucket_name, + version_data_set_name, + reason +): """ Update the condition info for the dataset. - opt is a dictionary object. It has the following keys: bucket_name: bucket name; version_data_set_name: version dataset name; reason: the update reason; @@ -32,8 +35,8 @@ async def update_dataset_k8s_cr(opt={}): kube = client.KubeEnv() one_cr_datasets = kube.get_versioneddatasets_status( - opt['bucket_name'], - opt['version_data_set_name'] + bucket_name, + version_data_set_name ) conditions = one_cr_datasets['status']['conditions'] @@ -51,21 +54,21 @@ async def update_dataset_k8s_cr(opt={}): if found_index is None: conditions.append({ 'lastTransitionTime': now_utc_str, - 'reason': opt['reason'], + 'reason': reason, 'status': "True", "type": "DataProcessing" }) else: conditions[found_index] = { 'lastTransitionTime': now_utc_str, - 'reason': opt['reason'], + 'reason': reason, 'status': "True", "type": "DataProcessing" } kube.patch_versioneddatasets_status( - opt['bucket_name'], - opt['version_data_set_name'], + bucket_name, + version_data_set_name, { 'status': { 'conditions': conditions diff --git a/data-processing/data_manipulation/llm_api_service/base_qa_provider.py b/data-processing/data_manipulation/llm_api_service/base_qa_provider.py new file mode 100644 index 000000000..02450eba4 --- /dev/null +++ b/data-processing/data_manipulation/llm_api_service/base_qa_provider.py @@ -0,0 +1,37 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + +from abc import ABC, abstractmethod + + +class BaseQAProvider(ABC): + """The Base class for the QA provider.""" + @abstractmethod + def generate_qa_list( + self, + text, + prompt_template=None + ): + """Generate the QA list. + + Parameters + ---------- + text + use the text to generate QA list + prompt_template + the prompt template + """ + diff --git a/data-processing/data_manipulation/llm_api_service/qa_provider_open_ai.py b/data-processing/data_manipulation/llm_api_service/qa_provider_open_ai.py new file mode 100644 index 000000000..5e571f1a1 --- /dev/null +++ b/data-processing/data_manipulation/llm_api_service/qa_provider_open_ai.py @@ -0,0 +1,143 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import re +import time +import traceback + +from common import log_tag_const +from common.config import config +from langchain.chains import LLMChain +from langchain.llms import OpenAI +from langchain.prompts import PromptTemplate +from llm_prompt_template import open_ai_prompt + +from .base_qa_provider import BaseQAProvider + +logger = logging.getLogger(__name__) + +class QAProviderOpenAI(BaseQAProvider): + """The QA provider is used by open ai.""" + + def __init__( + self, + api_key=None, + base_url=None, + model=None + ): + if api_key is None: + api_key = config.open_ai_default_key + if base_url is None: + base_url = config.open_ai_default_base_url + if model is None: + model = config.open_ai_default_model + + self.llm = OpenAI( + openai_api_key=api_key, + base_url=base_url, + model=model + ) + + def generate_qa_list( + self, + text, + prompt_template=None + ): + """Generate the QA list. + + Parameters + ---------- + text + use the text to generate QA list + prompt_template + the prompt template + """ + if prompt_template is None: + prompt_template = open_ai_prompt.get_default_prompt_template() + + prompt = PromptTemplate( + template=prompt_template, + input_variables=["text"] + ) + llm_chain = LLMChain( + prompt=prompt, + llm=self.llm + ) + + result = [] + invoke_count = 0 + while True: + try: + response = llm_chain.run(text) + result = self.__get_qa_list_from_response(response) + if len(result) > 0 or invoke_count > int(config.llm_qa_retry_count): + logger.debug(''.join([ + f"{log_tag_const.OPEN_AI} The QA list is \n", + f"\n{result}\n" + ])) + break + else: + time.sleep(2) # sleep 2 seconds + invoke_count += 1 + except Exception as ex: + result = [] + logger.error(''.join([ + f"{log_tag_const.OPEN_AI} Cannot access the open ai service.\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + time.sleep(2) + + return result + + + def __get_qa_list_from_response( + self, + response + ): + """Get the QA list from the response. + + Notice: There are some problems in the local OpenAI service. + Some time it cannot return the correct question and answer list. + + Parameters + ---------- + response + the response from open ai service + """ + result = [] + + pattern = re.compile(r'Q\d+:(\s*)(.*?)(\s*)A\d+:(\s*)([\s\S]*?)(?=Q|$)') + + + # 移除换行符 + response_text = response.replace('\\n', '') + matches = pattern.findall(response_text) + + result = [] + for match in matches: + q = match[1] + a = match[4] + if q and a: + a = re.sub(r'[\n]', '', a).strip() + result.append([q, a]) + + + return result + + + + + diff --git a/data-processing/data_manipulation/llm_api_service/qa_provider_zhi_pu_ai_online.py b/data-processing/data_manipulation/llm_api_service/qa_provider_zhi_pu_ai_online.py new file mode 100644 index 000000000..bea67f492 --- /dev/null +++ b/data-processing/data_manipulation/llm_api_service/qa_provider_zhi_pu_ai_online.py @@ -0,0 +1,103 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import re +import traceback + +import zhipuai +from common import log_tag_const +from common.config import config +from llm_prompt_template import zhi_pu_ai_prompt + +from .base_qa_provider import BaseQAProvider + +logger = logging.getLogger(__name__) + + +class QAProviderZhiPuAIOnline(BaseQAProvider): + """The QA provider is used by zhi pu ai online.""" + + def __init__(self, api_key=None): + if api_key is None: + api_key = config.zhipuai_api_key + zhipuai.api_key = api_key + + + def generate_qa_list( + self, + text, + prompt_template=None + ): + """Generate the QA list. + + Parameters + ---------- + text + use the text to generate QA list + prompt_template + the prompt template + """ + print('xx', 'text', text) + if prompt_template is None: + prompt_template = zhi_pu_ai_prompt.get_default_prompt_template() + + content = prompt_template.format( + text=text + ) + + result = [] + try: + response = zhipuai.model_api.invoke( + model="chatglm_6b", + prompt=[{"role": "user", "content": content}], + top_p=0.7, + temperature=0.9, + ) + if response['success']: + result = self.__format_response_to_qa_list(response) + else: + logger.error(''.join([ + f"{log_tag_const.ZHI_PU_AI} Cannot access the ZhiPuAI service.\n", + f"The error is: \n{response['msg']}\n" + ])) + except Exception as ex: + result = [] + logger.error(''.join([ + f"{log_tag_const.ZHI_PU_AI} Cannot access the ZhiPuAI service.\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + + return result + + + def __format_response_to_qa_list(self, response): + """Format the response to the QA list.""" + text = response['data']['choices'][0]['content'] + + pattern = re.compile(r'Q\d+:(\s*)(.*?)(\s*)A\d+:(\s*)([\s\S]*?)(?=Q|$)') + # 移除换行符 + text = text.replace('\\n', '') + matches = pattern.findall(text) + + result = [] + for match in matches: + q = match[1] + a = match[4] + if q and a: + result.append([q, a]) + + return result + \ No newline at end of file diff --git a/data-processing/data_manipulation/llm_api_service/gpt_3_dot_5_service.py b/data-processing/data_manipulation/llm_prompt_template/bai_chuan_2_prompt.py similarity index 71% rename from data-processing/data_manipulation/llm_api_service/gpt_3_dot_5_service.py rename to data-processing/data_manipulation/llm_prompt_template/bai_chuan_2_prompt.py index dc7c996e1..a0dd122c2 100644 --- a/data-processing/data_manipulation/llm_api_service/gpt_3_dot_5_service.py +++ b/data-processing/data_manipulation/llm_prompt_template/bai_chuan_2_prompt.py @@ -13,3 +13,10 @@ # limitations under the License. +def get_default_prompt_template(): + prompt_template = """ + {text} + + 将上述内容提出最多 25 个问题。给出每个问题的答案。每个问题必须有答案。 + """ + return prompt_template \ No newline at end of file diff --git a/data-processing/data_manipulation/llm_prompt_template/open_ai_prompt.py b/data-processing/data_manipulation/llm_prompt_template/open_ai_prompt.py new file mode 100644 index 000000000..2c20894f6 --- /dev/null +++ b/data-processing/data_manipulation/llm_prompt_template/open_ai_prompt.py @@ -0,0 +1,29 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def get_default_prompt_template(): + prompt_template = """ + {text} + + 请将上述内容按照问题、答案成对的方式,提出问题,并给出每个问题的答案,每个问题必须有问题和对应的答案,并严格按照以下方式展示: + Q1: 问题。 + A1: 答案。 + Q2: + A2: + …… + 严格按照QA的方式进行展示。 + """ + + return prompt_template \ No newline at end of file diff --git a/data-processing/data_manipulation/llm_api_service/zhi_pu_ai_service.py b/data-processing/data_manipulation/llm_prompt_template/zhi_pu_ai_prompt.py similarity index 51% rename from data-processing/data_manipulation/llm_api_service/zhi_pu_ai_service.py rename to data-processing/data_manipulation/llm_prompt_template/zhi_pu_ai_prompt.py index d76474250..be9e46dd4 100644 --- a/data-processing/data_manipulation/llm_api_service/zhi_pu_ai_service.py +++ b/data-processing/data_manipulation/llm_prompt_template/zhi_pu_ai_prompt.py @@ -13,21 +13,8 @@ # limitations under the License. -import re -import zhipuai - -from common import config - - -async def init_service(opt={}): - """Initialize the ZhiPuAI service.""" - zhipuai.api_key = opt['api_key'] - - -async def generate_qa(opt={}): - """Generate the questions and answers.""" - text = opt['text'] - content = """ +def get_default_prompt_template(): + prompt_template = """ 我会给你一段文本,它们可能包含多个主题内容,学习它们,并整理学习成果,要求为: 1. 提出最多 25 个问题。 2. 给出每个问题的答案。 @@ -40,37 +27,7 @@ async def generate_qa(opt={}): A2: …… - 我的文本: + 我的文本:{text} """ - - content = content + text - - response = zhipuai.model_api.invoke( - model="chatglm_6b", - prompt=[{"role": "user", "content": content}], - top_p=0.7, - temperature=0.9, - ) - - # # 格式化后的QA对 - result = await _formatSplitText(response['data']['choices'][0]['content']) - - return result - - -async def _formatSplitText(text): - - pattern = re.compile(r'Q\d+:(\s*)(.*?)(\s*)A\d+:(\s*)([\s\S]*?)(?=Q|$)') - - # 移除换行符 - text = text.replace('\\n', '') - matches = pattern.findall(text) - - result = [] - for match in matches: - q = match[1] - a = match[4] - if q and a: - result.append([q, a]) - - return result + + return prompt_template \ No newline at end of file diff --git a/data-processing/data_manipulation/server.py b/data-processing/data_manipulation/server.py index 70a2d2974..dac6e5959 100644 --- a/data-processing/data_manipulation/server.py +++ b/data-processing/data_manipulation/server.py @@ -18,27 +18,26 @@ import time import psycopg2 +from common import log_tag_const +from common.config import config +from controller import data_process_controller +from database_clients import postgresql_pool_client from sanic import Sanic from sanic.response import json from sanic_cors import CORS - -from common import config, log_tag_const -from controller import data_process_controller -from utils import log_utils, sanic_utls -from database_clients import postgresql_pool_client - +from utils import log_utils, sanic_utils # Initialize the log config -log_utils.init_config({ - 'source_type': 'manipulate_server', - 'log_dir': "log" -}) +log_utils.init_config( + source_type='manipulate_server', + log_dir="log" +) logger = logging.getLogger('manipulate_server') app = Sanic(name='data_manipulate') CORS(app) -app.error_handler = sanic_utls.CustomErrorHandler() +app.error_handler = sanic_utils.CustomErrorHandler() @app.middleware('request') @@ -76,6 +75,39 @@ async def shutdown_web_server(app, loop): +@app.route('test-config', methods=['POST']) +async def test_config(request): + from common.config import config + + data = { + 'minio_access_key': config.minio_access_key, + 'minio_secret_key': config.minio_secret_key, + 'minio_api_url': config.minio_api_url, + 'minio_secure': config.minio_secure, + 'minio_dataset_prefix': config.minio_dataset_prefix, + 'zhipuai_api_key': config.zhipuai_api_key, + 'llm_use_type': config.llm_use_type, + 'open_ai_default_key': config.open_ai_default_key, + 'open_ai_default_base_url': config.open_ai_default_base_url, + 'open_ai_default_model': config.open_ai_default_model, + 'knowledge_chunk_size': config.knowledge_chunk_size, + 'knowledge_chunk_overlap': config.knowledge_chunk_overlap, + 'pg_host': config.pg_host, + 'pg_port': config.pg_port, + 'pg_user': config.pg_user, + 'pg_password': config.pg_password, + 'pg_database': config.pg_database + + } + + return json({ + 'status': 200, + 'message': '', + 'data': data + }) + + + def _create_database_connection(): """Create a database connection.""" return psycopg2.connect( diff --git a/data-processing/data_manipulation/service/data_process_service.py b/data-processing/data_manipulation/service/data_process_service.py index dbfd69e29..aaf61a3d6 100644 --- a/data-processing/data_manipulation/service/data_process_service.py +++ b/data-processing/data_manipulation/service/data_process_service.py @@ -15,12 +15,13 @@ import asyncio import logging -import ulid import traceback +import ulid from common import log_tag_const from data_store_process import minio_store_process -from database_operate import data_process_db_operate +from database_operate import (data_process_db_operate, + data_process_detail_db_operate) from kube import dataset_cr from parallel import thread_parallel from utils import date_time_utils @@ -28,37 +29,78 @@ logger = logging.getLogger(__name__) -async def list_by_page(req_json, opt={}): +def list_by_page( + req_json, + pool +): """Get the list data for data processing by page""" - return await data_process_db_operate.list_by_page(req_json, opt) + return data_process_db_operate.list_by_page(req_json, pool=pool) -async def list_by_count(req_json, opt={}): +def list_by_count( + req_json, + pool +): """Get count for the list data processing with page""" - return await data_process_db_operate.list_by_count(req_json, opt) + return data_process_db_operate.list_by_count(req_json, pool=pool) + +def add( + req_json, + pool +): + """Add a new data process task. + + req_json is a dictionary object. for example: + { + "name": "小T_test_0201", + "file_type": "text", + "pre_data_set_name": "dataset1", + "pre_data_set_version": "v2", + "post_data_set_name": "dataset1", + "post_data_set_version": "v2", + "version_data_set_name": "dataset1-v2", + "bucket_name": "system-tce", + "file_names": [ + { + "name": "数据处理文件_小T.pdf" + } + ], + "data_process_config_info": [] + } -async def add(req_json, opt={}): - """Add a new data process""" + pool: database connection pool. + """ id = ulid.ulid() - opt['id'] = id - res = await data_process_db_operate.add(req_json, opt) + res = data_process_db_operate.add( + req_json, + pool=pool, + id=id + ) if res['status'] == 200: # update the dataset status - update_dataset = await dataset_cr.update_dataset_k8s_cr({ - 'bucket_name': req_json['bucket_name'], - 'version_data_set_name': req_json['version_data_set_name'], - 'reason': 'processing' - }) + update_dataset = dataset_cr.update_dataset_k8s_cr( + bucket_name=req_json['bucket_name'], + version_data_set_name=req_json['version_data_set_name'], + reason='processing' + ) if update_dataset['status'] != 200: return update_dataset try: + + async def async_text_manipulate( + req_json, + pool, + id + ): + minio_store_process.text_manipulate(req_json, pool=pool, id=id) + def execute_text_manipulate_task(loop): asyncio.set_event_loop(loop) - loop.run_until_complete(minio_store_process.text_manipulate(req_json, opt)) + loop.run_until_complete(async_text_manipulate(req_json, pool=pool, id=id)) thread_parallel.run_async_background_task( execute_text_manipulate_task, @@ -76,43 +118,62 @@ def execute_text_manipulate_task(loop): return res -async def delete_by_id(req_json, opt={}): +def delete_by_id( + req_json, + pool +): """Delete a record with id""" - return await data_process_db_operate.delete_by_id(req_json, opt) + return data_process_db_operate.delete_by_id(req_json, pool=pool) -async def info_by_id(req_json, opt={}): - """Get a detail info with id""" +def info_by_id( + req_json, + pool +): + """Get a detail info with id. + + req_json is a dictionary object. for example: + { + "id": "01HGWBE48DT3ADE9ZKA62SW4WS" + } + """ id = req_json['id'] data = _get_default_data_for_detail() - data['id'] = id + _get_and_set_basic_detail_info( + data, + task_id=id, + conn_pool=pool + ) - # Get the detail info for the data processs task - detail_info_params = { - 'id': id - } - detail_info_res = await data_process_db_operate.info_by_id(detail_info_params, { - 'pool': opt['pool'] - }) - logger.debug(f"{log_tag_const.DATA_PROCESS_DETAIL} The defail info is: \n{detail_info_res}") + if data['id'] == '': + return { + 'status': 200, + 'message': '', + 'data': data + } - if detail_info_res['status'] == 200 and len(detail_info_res['data']) > 0: - detail_info_data = detail_info_res['data'][0] - data['name'] = detail_info_data['name'] - data['status'] = detail_info_data['status'] - data['file_type'] = detail_info_data['file_type'] - data['file_num'] = 0 if detail_info_data['file_names'] is None else len(detail_info_data['file_names']) - data['pre_dataset_name'] = detail_info_data['pre_data_set_name'] - data['pre_dataset_version'] = detail_info_data['pre_data_set_version'] - data['post_dataset_name'] = detail_info_data['post_data_set_name'] - data['post_dataset_version'] = detail_info_data['post_data_set_version'] - data['start_time'] = detail_info_data['start_datetime'] - data['end_time'] = detail_info_data['end_datetime'] + process_cofig_map = _convert_config_info_to_map(data.get('data_process_config_info')) + + config_map_for_result = {} + _set_basic_info_for_config_map_for_result(config_map_for_result, process_cofig_map) + + _set_children_info_for_config_map_for_result( + config_map_for_result, + process_cofig_map, + task_id=id, + conn_pool=pool + ) + + # convert the conig resule from map to list + config_list_for_result = [] + for value in config_map_for_result.values(): + config_list_for_result.append(value) + + data['config'] = config_list_for_result logger.debug(f"{log_tag_const.DATA_PROCESS_DETAIL} The response data is: \n{data}") - return { 'status': 200, 'message': '', @@ -120,76 +181,305 @@ async def info_by_id(req_json, opt={}): } - def _get_default_data_for_detail(): """Get the data for the detail""" return { - "id": '', - "name": "数据处理任务1", - "status": "processing", - "file_type": "text", - "pre_dataset_name": "def", - "pre_dataset_version": "v1", - "post_dataset_name": "def", - "post_dataset_version": "v1", - "file_num": 20, - "start_time": date_time_utils.now_str(), - "end_time": date_time_utils.now_str(), - "config": [ + "id": "", + "name": "", + "status": "", + "file_type": "", + "pre_dataset_name": "", + "pre_dataset_version": "", + "post_dataset_name": "", + "post_dataset_version": "", + "file_num": 0, + "start_time": '', + "end_time": '', + "create_user": '', + "data_process_config_info": [], + "config": [] + } + + +def _get_and_set_basic_detail_info( + from_result, + task_id, + conn_pool +): + """Get and set the basic detail info. + + from_result: the from result, it's content will be changed; + + task_id: task id; + conn_pool: database connection pool + """ + # step 1 + # Get the detail info from the database. + detail_info_params = { + 'id': task_id + } + detail_info_res = data_process_db_operate.info_by_id( + detail_info_params, + pool=conn_pool + ) + if detail_info_res['status'] == 200 and len(detail_info_res['data']) > 0: + detail_info_data = detail_info_res['data'][0] + + file_num = 0 + if detail_info_data.get('file_names'): + file_num = len(detail_info_data['file_names']) + + from_result['id'] = task_id + from_result['name'] = detail_info_data['name'] + from_result['status'] = detail_info_data['status'] + from_result['file_type'] = detail_info_data['file_type'] + from_result['file_num'] = file_num + from_result['pre_dataset_name'] = detail_info_data['pre_data_set_name'] + from_result['pre_dataset_version'] = detail_info_data['pre_data_set_version'] + from_result['post_dataset_name'] = detail_info_data['post_data_set_name'] + from_result['post_dataset_version'] = detail_info_data['post_data_set_version'] + from_result['start_time'] = detail_info_data['start_datetime'] + from_result['end_time'] = detail_info_data['end_datetime'] + from_result['creator'] = detail_info_data['create_user'] + from_result['data_process_config_info'] = detail_info_data['data_process_config_info'] + else: + from_result['id'] = '' + + +def _convert_config_info_to_map(config_info_list): + """Convert the config info to map. + + config_info_list: a list for example + [ { - "name": "chunk_processing", - "description": "拆分处理", - "status": "succeed", - "children": [ - { - "name'": "qa_split", - "enable": "true", - "zh_name": "QA拆分", - "description": "根据文件中的文章与图表标题,自动将文件做 QA 拆分处理。", - "preview": [] - } - ] + "type": "qa_split" }, { - "name": "clean", - "description": "异常清洗配置", - "status": "succeed", - "children": [ - { - "name": "remove_invisible_characters", - "enable": "true", - "zh_name": "移除不可见字符", - "description": "移除ASCII中的一些不可见字符, 如0-32 和127-160这两个范围", - "preview": [ - { - "file_name": "xxx_001", - "content": [ - { - "pre": "全然不知对方身份,不断反转的剧情即将揭开层层真相。", - "post": "全然不知对方身份,不断反转的剧情即将揭开层层真相。" - } - ] - } - ] - }, - { - "name": "space_standardization", - "enable": "true", - "zh_name": "空格处理", - "description": "将不同的unicode空格比如u2008, 转成正常的空格", - "preview": [ - { - "file_name": "xxx_001", - "content": [ - { - "pre": "全然不知对方身份,不断反转的剧情即将揭开层层真相。", - "post": "全然不知对方身份,不断反转的剧情即将揭开层层真相。" - } - ] - } - ] - } - ] + "type": "remove_invisible_characters" + }, + { + "type": "space_standardization" + }, + { + "type": "remove_email" } ] - } \ No newline at end of file + """ + result = {} + for item in config_info_list: + result[item['type']] = 1 + + return result + + +def _set_basic_info_for_config_map_for_result( + from_result, + process_cofig_map +): + """Set basic info for the config map for result. + + from_result: the from result, it's content will be changed. + process_config_map: process config map + """ + # chunk processing + if process_cofig_map.get('qa_split'): + if from_result.get('chunk_processing') is None: + from_result['chunk_processing'] = { + 'name': 'chunk_processing', + 'description': '拆分处理', + 'status': 'succeed', + 'children': [] + } + + # data clean + if process_cofig_map.get('remove_invisible_characters') or \ + process_cofig_map.get('space_standardization') or \ + process_cofig_map.get('traditional_to_simplified') or \ + process_cofig_map.get('space_standremove_html_tagardization'): + if from_result.get('clean') is None: + from_result['clean'] = { + 'name': 'clean', + 'description': '异常清洗配置', + 'status': 'succeed', + 'children': [] + } + + # remove privacy + if process_cofig_map.get('remove_email'): + if from_result.get('privacy') is None: + from_result['privacy'] = { + 'name': 'privacy', + 'description': '数据隐私处理', + 'status': 'succeed', + 'children': [] + } + + +def _set_children_info_for_config_map_for_result( + from_result, + process_cofig_map, + task_id, + conn_pool +): + """Set child list for the config for result + + from_result: the from result, it's content will be changed. + process_config_map: process config map; + task_id: task id, + conn_pool: database connection pool + """ + # insert the qa list + if process_cofig_map.get('qa_split'): + from_result['chunk_processing']['children'].append({ + 'name': 'qa_split', + 'enable': 'true', + 'zh_name': 'QA拆分', + 'description': '根据文件中的文章与图表标题,自动将文件做 QA 拆分处理。', + 'preview': _get_qa_list_preview( + task_id=task_id, + conn_pool=conn_pool + ) + }) + + # remove invisible characters + if process_cofig_map.get('remove_invisible_characters'): + from_result['clean']['children'].append({ + 'name': 'remove_invisible_characters', + 'enable': 'true', + 'zh_name': '移除不可见字符', + 'description': '移除ASCII中的一些不可见字符, 如0-32 和127-160这两个范围', + 'preview': _get_transform_preview_list( + task_id=task_id, + transform_type='remove_invisible_characters', + conn_pool=conn_pool + ) + }) + + # space standardization + if process_cofig_map.get('space_standardization'): + from_result['clean']['children'].append({ + 'name': 'space_standardization', + 'enable': 'true', + 'zh_name': '空格处理', + 'description': '将不同的unicode空格比如u2008, 转成正常的空格', + 'preview': _get_transform_preview_list( + task_id=task_id, + transform_type='space_standardization', + conn_pool=conn_pool + ) + }) + + # remove email + if process_cofig_map.get('remove_email'): + from_result['privacy']['children'].append({ + 'name': 'remove_email', + 'enable': 'true', + 'zh_name': '去除Email', + 'description': '去除email地址', + 'preview': _get_transform_preview_list( + task_id=task_id, + transform_type='remove_email', + conn_pool=conn_pool + ) + }) + + + +def _get_transform_preview_list( + task_id, + transform_type, + conn_pool +): + """"Get transofm preview list. + + task_id: task id; + transform_type: transform type + conn_pool: database connection pool; + """ + transform_preview = [] + # step 1 + # list file name in transform + list_file_name_params = { + 'task_id': task_id, + 'transform_type': transform_type + } + list_file_name_res = data_process_detail_db_operate.list_file_name_for_transform( + list_file_name_params, + pool=conn_pool + ) + if list_file_name_res['status'] == 200: + for item in list_file_name_res['data']: + transform_preview.append({ + 'file_name': item['file_name'], + 'content': [] + }) + # step 2 + # iterate the transform preview + for item in transform_preview: + list_transform_params = { + 'task_id': task_id, + 'transform_type': transform_type, + 'file_name': item['file_name'] + } + list_transform_res = data_process_detail_db_operate.top_n_list_transform_for_preview( + list_transform_params, + pool=conn_pool + ) + if list_transform_res['status'] == 200: + for item_transform in list_transform_res['data']: + item['content'].append({ + 'pre': item_transform['pre_content'], + 'post': item_transform['post_content'] + }) + + return transform_preview + + +def _get_qa_list_preview( + task_id, + conn_pool +): + """Get the QA list preview. + + task_id: task od; + conn_pool: database connection pool + """ + qa_list_preview = [] + # step 1 + # list file name in QA + list_file_name_params = { + 'task_id': task_id + } + list_file_name_res = data_process_detail_db_operate.list_file_name_in_qa_by_task_id( + list_file_name_params, + pool=conn_pool + ) + if list_file_name_res['status'] == 200: + for item in list_file_name_res['data']: + qa_list_preview.append({ + 'file_name': item['file_name'], + 'content': [] + }) + + # step 2 + # iterate the QA list preview + for item in qa_list_preview: + list_qa_params = { + 'task_id': task_id, + 'file_name': item['file_name'] + } + list_qa_res = data_process_detail_db_operate.top_n_list_qa_for_preview( + list_qa_params, + pool=conn_pool + ) + if list_qa_res['status'] == 200: + for item_qa in list_qa_res['data']: + item['content'].append({ + 'pre': item_qa['question'], + 'post': item_qa['answer'] + }) + + return qa_list_preview + + + diff --git a/data-processing/data_manipulation/transform/text/clean_transform.py b/data-processing/data_manipulation/transform/text/clean_transform.py index ba739427c..e661e2dbf 100644 --- a/data-processing/data_manipulation/transform/text/clean_transform.py +++ b/data-processing/data_manipulation/transform/text/clean_transform.py @@ -13,77 +13,287 @@ # limitations under the License. -### -# 异常清洗 -# @author: wangxinbiao -# @date: 2023-11-01 10:44:01 -# modify history -# ==== 2023-11-01 10:44:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### - +import logging import re +import traceback + +import ftfy +import opencc +from common import log_tag_const, special_characters +from selectolax.parser import HTMLParser -from common import special_characters +logger = logging.getLogger(__name__) -### -# 去除不可见字符 -# @author: wangxinbiao -# @date: 2023-11-02 14:42:01 -# modify history -# ==== 2023-11-02 14:42:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### -async def remove_invisible_characters(opt={}): - text = opt['text'] +def remove_invisible_characters(text): + """remove invisible characters. + + text: text; + usage + input: + “一户一表、水表出户、抄表到户”是指一个家庭用户安装一个计量水表,计量水表安装在住宅的公共部位,供水企业抄表到户,按户计量收费。 + output: + “一户一表、水表出户、抄表到户”是指一个家庭用户安装一个计量水表,计量水表安装在住宅的公共部位,供水企业抄表到户,按户计量收费。 + """ try: - clean_text = re.sub( - r'[\x00-\x1F\x7F-\x9F\xAD\r\n\t\b\x0B\x1C\x1D\x1E]', '', text) + pattern = r'[\x00-\x1F\x7F-\x9F\xAD\r\n\t\b\x0B\x1C\x1D\x1E]' + find_pattern = r'[^,。!?,.!?]*[\x00-\x1F\x7F-\x9F\xAD\r\n\t\b\x0B\x1C\x1D\x1E][^,。!?,.!?]*' + + clean_text = re.sub(pattern, '', text) + + clean_data = _find_clean_data( + text=text, + pattern=pattern, + find_pattern=find_pattern + ) return { 'status': 200, 'message': '', - 'data': clean_text + 'data': { + 'clean_data': clean_data, + 'text': clean_text + } } + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.CLEAN_TRANSFORM} Execute removing invisible characters failed\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + return { + 'status': 400, + 'message': str(ex), + 'data': traceback.format_exc() + } + +def space_standardization(text): + """space standardization. + + text: text; + + usage: + input: + 第一条 灭火是指国家综合性消防救援队、专职消防队依法承担的火灾扑救工作。 + + output: + 第一条 灭火是指国家综合性消防救援队、专职消防队依法承担的火灾扑救工作。 + """ + try: + various_whitespaces = special_characters.VARIOUS_WHITESPACES + pattern = '|'.join(re.escape(value) for value in various_whitespaces) + find_pattern = '|'.join(f'[^,。!?,.!?]*{re.escape(value)}[^,。!?,.!?]*' for value in various_whitespaces) + + clean_text = re.sub(pattern, ' ', text) + + clean_data = _find_clean_data( + text=text, + pattern=pattern, + find_pattern=find_pattern + ) + + return { + 'status': 200, + 'message': '', + 'data': { + 'clean_data': clean_data, + 'text': clean_text + } + } except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.CLEAN_TRANSFORM} Executing space standardization failed.\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) return { 'status': 400, - 'message': '去除不可见字符失败:' + str(ex), - 'data': '' + 'message': str(ex), + 'data': traceback.format_exc() } -### -# 空格处理 -# @author: wangxinbiao -# @date: 2023-11-20 16:53:01 -# modify history -# ==== 2023-11-20 16:53:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### -async def space_standardization(opt={}): - text = opt['text'] +def remove_garbled_text(text): + """remove garbled text. + text: text; + usage: + input: + 江苏省滨海县人民法院民事判决书(2015)滨滩商初字第0014号原告孟庆连,男,49岁,居民。委托代理人王成庭,滨海县滨淮法律服务所法律工作者。 — like this one. + + output: + 江苏省滨海县人民法院民事判决书(2015)滨滩商初字第0014号原告孟庆连,男,49岁,居民。委托代理人王成庭,滨海县滨淮法律服务所法律工作者。 — like this one. + + """ try: - clean_text = ''.join([ - char if char not in special_characters.VARIOUS_WHITESPACES else ' ' for char in text - ]) + clean_text = ftfy.fix_text(text) return { 'status': 200, 'message': '', - 'data': clean_text + 'data': { + 'found': 0, + 'text': clean_text + } } except Exception as ex: + error = str(ex) + logger.error(''.join([ + f"{log_tag_const.CLEAN_TRANSFORM} Executing space standardization failed\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + return { 'status': 400, - 'message': '空格处理失败:' + str(ex), - 'data': '' - } \ No newline at end of file + 'message': error, + 'data': traceback.format_exc() + } + +def traditional_to_simplified(text): + """Traditional Chinese to Simplified Chinese. + + text: text; + + usage: + input: + 風暴帶來的暫停使消防員和其他緊急反應人員得以進入禁區進行結構破壞評估。 + + output: + 风暴带来的暂停使消防员和其他紧急反应人员得以进入禁区进行结构破坏评估。 + """ + try: + clean_text = opencc.OpenCC('t2s').convert(text) + + return { + 'status': 200, + 'message': '', + 'data': { + 'found': 0, + 'text': clean_text + } + } + except Exception as ex: + error = str(ex) + logger.error(''.join([ + f"{log_tag_const.CLEAN_TRANSFORM} Executing Traditional Chinese to Simplified Chinese failed\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + + return { + 'status': 400, + 'message': error, + 'data': traceback.format_exc() + } + + +def remove_html_tag(text): + """clean html code in text samples. + + text: text; + + usage: + input: +
朗播 SAT 学员成绩单分析报告 + + output: + 朗播 SAT 学员成绩单分析报告 + """ + try: + text = text.replace('
  • ', '\n*') + text = text.replace('
  • ', '') + text = text.replace('
      ', '\n*') + text = text.replace('
    ', '') + parser = HTMLParser(text) + + clean_text = parser.text() + + return { + 'status': 200, + 'message': '', + 'data': { + 'found': 0, + 'text': clean_text + } + } + except Exception as ex: + error = str(ex) + logger.error(''.join([ + f"{log_tag_const.CLEAN_TRANSFORM} Executing clean html code in text samples failed\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + + return { + 'status': 400, + 'message': error, + 'data': traceback.format_exc() + } + + +def remove_emojis(text): + """remove emojis. + + text: text; + + usage: + input: + 这是一段带有表情符号😊的文本。 + + output: + 这是一段带有表情符号的文本。 + """ + try: + emojis = special_characters.EMOJI + pattern = '|'.join(re.escape(value) for value in emojis) + find_pattern = '|'.join(f'[^,。!?,.!?]*{re.escape(value)}[^,。!?,.!?]*' for value in emojis) + + clean_text = re.sub(pattern, '', text) + + clean_data = _find_clean_data({ + 'text': text, + 'pattern': pattern, + 'find_pattern': find_pattern + }) + + return { + 'status': 200, + 'message': '', + 'data': { + 'clean_data': clean_data, + 'text': clean_text + } + } + + except Exception as ex: + error = str(ex) + logger.error(''.join([ + f"{log_tag_const.CLEAN_TRANSFORM} Executing remove emojis failed\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + + return { + 'status': 400, + 'message': error, + 'data': traceback.format_exc() + } + +def _find_clean_data( + text, + pattern, + find_pattern +): + """find clean data for pre_content and post_content. + + text: text; + pattern: ; + find_pattern: ; + + """ + clean_data = [] + + sentences = re.findall(find_pattern, text) + for sentence in sentences: + post_content = re.sub(pattern, '', sentence) + clean_data.append({ + 'pre_content': sentence, + 'post_content': post_content + }) + + return clean_data diff --git a/data-processing/data_manipulation/transform/text/duplicates_transform.py b/data-processing/data_manipulation/transform/text/duplicates_transform.py index 91739e76e..cdef7bcea 100644 --- a/data-processing/data_manipulation/transform/text/duplicates_transform.py +++ b/data-processing/data_manipulation/transform/text/duplicates_transform.py @@ -13,15 +13,5 @@ # limitations under the License. -### -# 去重 -# @author: wangxinbiao -# @date: 2023-11-02 14:42:01 -# modify history -# ==== 2023-11-02 14:42:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### -async def remove_duplicates(opt={}): - return opt['text'] +def remove_duplicates(text): + return text diff --git a/data-processing/data_manipulation/transform/text/filtration_transform.py b/data-processing/data_manipulation/transform/text/filtration_transform.py index b67f27f16..ef9b1ee9b 100644 --- a/data-processing/data_manipulation/transform/text/filtration_transform.py +++ b/data-processing/data_manipulation/transform/text/filtration_transform.py @@ -13,29 +13,8 @@ # limitations under the License. -### -# 过滤 -# @author: wangxinbiao -# @date: 2023-11-01 10:44:01 -# modify history -# ==== 2023-11-01 10:44:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### - import re -### -# 检查文档的词数目 -# @author: wangxinbiao -# @date: 2023-11-02 14:42:01 -# modify history -# ==== 2023-11-02 14:42:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### -async def word_count(opt={}): +def word_count(opt={}): return 49 diff --git a/data-processing/data_manipulation/transform/text/privacy_transform.py b/data-processing/data_manipulation/transform/text/privacy_transform.py index ff6bcfd2a..d046d857d 100644 --- a/data-processing/data_manipulation/transform/text/privacy_transform.py +++ b/data-processing/data_manipulation/transform/text/privacy_transform.py @@ -13,49 +13,439 @@ # limitations under the License. -### -# 去隐私 -# @author: wangxinbiao -# @date: 2023-11-01 10:44:01 -# modify history -# ==== 2023-11-01 10:44:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### - +import logging import re +import traceback + +from common import log_tag_const, special_characters + +logger = logging.getLogger(__name__) + + +def remove_email( + text, + replace_string=None +): + """Replace email info with the user defined string. + + text: text; + replace_string: the text is used to replace the email info; + + usage: + input: + 如果需要可以联系官方邮箱:172817631@qq.com马上申请为你开通 + + output: + 如果需要可以联系官方邮箱:xxxxxx马上申请为你开通 + """ + try: + if replace_string is None: + replace_string = 'xxxxxx' + + pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' + find_pattern = r'[^,。!?,.!?]*[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}[^,。!?,.!?]*' + + clean_text = re.sub(pattern, replace_string, text) + + clean_data = _find_clean_data( + text=text, + pattern=pattern, + find_pattern=find_pattern, + replace_string=replace_string + ) + return { + 'status': 200, + 'message': '', + 'data': { + 'clean_data': clean_data, + 'text': clean_text + } + } + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.CLEAN_TRANSFORM} Execute removing email.\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + return { + 'status': 400, + 'message': str(ex), + 'data': traceback.format_exc() + } + +def remove_ip_address( + text, + replace_string=None +): + """the ip addresses are replaced with xxxxxx. + + text: text; + replace_string: the text is used to replace the email info; + + usage: + input: + 服务器登陆ip为192.168.255.255 + + output: + 服务器登陆ip为xxxxxx + """ + try: + if replace_string is None: + replace_string = 'xxxxxx' + + pattern = ''.join([ + r'((?:(?:1[0-9][0-9]\.)|(?:2[0-4][0-9]\.)|', + r'(?:25[0-5]\.)|(?:[1-9][0-9]\.)|(?:[0-9]\.))', + r'{3}(?:(?:1[0-9][0-9])|(?:2[0-4][0-9])|', + r'(?:25[0-5])|(?:[1-9][0-9])|(?:[0-9]))|', + r'([\da-fA-F]{1,4}:){7}[\da-fA-F]{1,4})' + ]) + + find_pattern = ''.join([ + r'([^,。!?,.!?]*)', + pattern, + r'([^,。!?,.!?]*)' + ]) + + clean_text = re.sub(pattern=pattern, + repl=replace_string, + string=text) + + clean_data = [] + sentences = re.findall(find_pattern, text) + for sentence in sentences: + sentence = ''.join([ + sentence[0], + sentence[1], + sentence[3] + ]) + post_content = re.sub(pattern, replace_string, sentence) + clean_data.append({ + 'pre_content': sentence, + 'post_content': post_content + }) + return { + 'status': 200, + 'message': '', + 'data': { + 'clean_data': clean_data, + 'text': clean_text + } + } + + except Exception as ex: + error = str(ex) + logger.error(''.join([ + f"{log_tag_const.PRIVACY_TRANSFORM} Executing remove email failed\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) -### -# 去除邮箱地址 -# @author: wangxinbiao -# @date: 2023-11-02 14:42:01 -# modify history -# ==== 2023-11-02 14:42:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### -async def remove_email(opt={}): - text = opt['text'] + return { + 'status': 400, + 'message': error, + 'data': traceback.format_exc() + } +def remove_phone( + text, + replace_string=None +): + """the phone are replaced with xxxxxx. + + text: text; + replace_string: the text is used to replace the email info; + + usage: + input: + 12345678910, 我的手机号是: 18617261536,我的座机号是: 029-1234567 + + output: + 12345678910, 我的手机号是: xxxxxx,我的座机号是: 029-1234567 + """ try: - email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' + if replace_string is None: + replace_string = 'xxxxxx' + + pattern = r'((\+|00)86)?(1)((3[\d])|(4[5,6,7,9])|(5[0-3,5-9])|(6[5-7])|(7[0-8])|(8[\d])|(9[1,8,9]))(\d{8})(?![0-9])' + find_pattern = ''.join([ + r'([^,。!?,.!?]*)', + pattern, + r'([^,。!?,.!?]*)' + ]) + + clean_text = re.sub(pattern=pattern, + repl=replace_string, + string=text) - # 将邮箱地址替换为 "PI:EMAIL" - replacement_text = "PI:EMAIL" + clean_data = [] + sentences = re.findall(find_pattern, text) + for sentence in sentences: + sentence = ''.join([ + sentence[0], + sentence[3], + sentence[4], + sentence[12], + sentence[13] + ]) + post_content = re.sub(pattern, replace_string, sentence) + clean_data.append({ + 'pre_content': sentence, + 'post_content': post_content + }) - clean_text = re.sub(email_pattern, replacement_text, text) return { 'status': 200, 'message': '', - 'data': clean_text + 'data': { + 'clean_data': clean_data, + 'text': clean_text + } } except Exception as ex: + error = str(ex) + logger.error(''.join([ + f"{log_tag_const.PRIVACY_TRANSFORM} Executing remove phone failed\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + return { 'status': 400, - 'message': '去除邮箱地址失败:' + str(ex), - 'data': '' - } \ No newline at end of file + 'message': error, + 'data': traceback.format_exc() + } + +def remove_id_card( + text, + replace_string=None +): + """the phone are replaced with xxxxxx. + + text: text; + replace_string: the text is used to replace the email info; + + usage: + input: + 身份证号1:123451230112121234,身份证号2:12345123011212123x,位身份证号3:123456780009876 + + output: + 身份证号1:xxxxxx,身份证号2:xxxxxx,位身份证号3:xxxxxx + """ + try: + if replace_string is None: + replace_string = 'xxxxxx' + + id_card_regex = [ + r'\b([1-9]\d{5}[1-9]\d{3})((0\d)|(1[0-2]))(([0|1|2]\d)|(3[0-1]))(\d{3}[0-9Xx])(?![0-9])', + r'\b([1-9]\d{7})((0\d)|(1[0-2]))(([0-2][1-9])|(3[0-1]))(\d{2}[0-9Xx])(?![0-9])' + ] + + clean_data = [] + for regex_exp in id_card_regex: + find_pattern = ''.join([ + r'([^,。!?,.!?]*)', + regex_exp, + r'([^,。!?,.!?]*)' + ]) + + sentences = re.findall(find_pattern, text) + + text = re.sub(pattern=regex_exp, + repl=replace_string, + string=text) + + for sentence in sentences: + sentence = ''.join([ + sentence[0], + sentence[1], + sentence[2], + sentence[5], + sentence[8], + sentence[9] + ]) + post_content = re.sub(regex_exp, replace_string, sentence) + clean_data.append({ + 'pre_content': sentence, + 'post_content': post_content + }) + + return { + 'status': 200, + 'message': '', + 'data': { + 'clean_data': clean_data, + 'text': text + } + } + + except Exception as ex: + error = str(ex) + logger.error(''.join([ + f"{log_tag_const.PRIVACY_TRANSFORM} Executing remove id card failed\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + + return { + 'status': 400, + 'message': error, + 'data': traceback.format_exc() + } + +def remove_weixin( + text, + replace_string=None +): + """the weixin are replaced with xxxxxx. + + text: text; + replace_string: the text is used to replace the email info; + + usage: + input: + 我的微信号:qw123456 + + output: + 我的xxxxxx + """ + try: + if replace_string is None: + replace_string = 'xxxxxx' + weixin_regex = [ + r'vxin[:|:][a-zA-Z0-9{3,20}]+', + r'vx[:|:][a-zA-Z0-9{3,20}]+', + r'VX[:|:][a-zA-Z0-9{3,20}]+', + r'Vxin[:|:][a-zA-Z0-9{3,20}]+', + r'wx[:|:][a-zA-Z0-9{3,20}]+', + r'WX[:|:][a-zA-Z0-9{3,20}]+', + r'wei xin[:|:][a-zA-Z0-9{3,20}]+', + r'weixin[:|:][a-zA-Z0-9{3,20}]+', + r'微信[:|:][a-zA-Z0-9{3,20}]+', + r'微信号[:|:][a-zA-Z0-9{3,20}]+', + r'薇信[:|:][a-zA-Z0-9{3,20}]+', + r'薇信号[:|:][a-zA-Z0-9{3,20}]+', + r'v信[:|:][a-zA-Z0-9{3,20}]+', + r'V信[:|:][a-zA-Z0-9{3,20}]+' + ] + + clean_data = [] + for regex_exp in weixin_regex: + find_pattern = ''.join([ + r'[^,。!?,.!?]*', + regex_exp, + r'[^,。!?,.!?]*' + ]) + sentences = re.findall(regex_exp, text) + + text = re.sub(pattern=regex_exp, + repl=replace_string, + string=text) + + for sentence in sentences: + post_content = re.sub(regex_exp, replace_string, sentence) + clean_data.append({ + 'pre_content': sentence, + 'post_content': post_content + }) + + return { + 'status': 200, + 'message': '', + 'data': { + 'clean_data': clean_data, + 'text': text + } + } + + except Exception as ex: + error = str(ex) + logger.error(''.join([ + f"{log_tag_const.PRIVACY_TRANSFORM} Executing remove id card failed\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + + return { + 'status': 400, + 'message': error, + 'data': traceback.format_exc() + } + +def remove_bank_card( + text, + replace_string=None +): + """the remove bank card are replaced with xxxxxx. + + text: text; + + usage: + input: + 银行卡号1:1234567890123456,银行卡号2:12345678901234567,银行卡号3:1234567890123456789 + + output: + 银行卡号1:xxxxxx,银行卡号2:12345678901234567,银行卡号3:xxxxxx + """ + try: + if replace_string is None: + replace_string = 'xxxxxx' + + pattern = r'\b([1-9]{1})(\d{15}|\d{18})(?![0-9])' + find_pattern = r'([^,。!?,.!?]*)\b([1-9]{1})(\d{15}|\d{18})((?![0-9])[^,。!?,.!?]*)' + + clean_text = re.sub(pattern=pattern, + repl=replace_string, + string=text) + + clean_data = _find_clean_data( + text=text, + pattern=pattern, + find_pattern=find_pattern, + replace_string=replace_string + ) + return { + 'status': 200, + 'message': '', + 'data': { + 'clean_data': clean_data, + 'text': clean_text + } + } + + except Exception as ex: + error = str(ex) + logger.error(''.join([ + f"{log_tag_const.PRIVACY_TRANSFORM} Executing remove email failed\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + + return { + 'status': 400, + 'message': error, + 'data': traceback.format_exc() + } + +def _find_clean_data( + text, + replace_string, + pattern, + find_pattern +): + """find clean data for pre_content and post_content. + + text: text; + pattern: ; + find_pattern: ; + replace_string: replace string for privacy + + + """ + clean_data = [] + + sentences = re.findall(find_pattern, text) + for sentence in sentences: + post_content = re.sub(pattern, replace_string, sentence) + clean_data.append({ + 'pre_content': sentence, + 'post_content': post_content + }) + + return clean_data diff --git a/data-processing/data_manipulation/transform/text/support_type.py b/data-processing/data_manipulation/transform/text/support_type.py index e06ec9f9d..464a344aa 100644 --- a/data-processing/data_manipulation/transform/text/support_type.py +++ b/data-processing/data_manipulation/transform/text/support_type.py @@ -13,142 +13,134 @@ # limitations under the License. -### -# 数据处理支持的类型 -# @author: wangxinbiao -# @date: 2023-11-02 14:25:01 -# modify history -# ==== 2023-11-02 14:25:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### -support_types = [ - { - 'name': 'chunk_processing', - 'description': '拆分处理', - 'children': [ - { - 'name': 'qa_split', - 'enable': 'true', - 'zh_name': 'QA拆分', - 'description': '根据文件中的文章与图表标题,自动将文件做 QA 拆分处理。' - }, - { - 'name': 'document_chunk', - 'enable': 'false', - 'zh_name': '文本分段', - 'description': '' - } - ] - }, - { - 'name': 'clean', - 'description': '异常清洗配置', - 'children': [ - { - 'name': 'remove_invisible_characters', - 'enable': 'true', - 'zh_name': '移除不可见字符', - 'description': '移除ASCII中的一些不可见字符, 如0-32 和127-160这两个范围' - }, - { - 'name': 'space_standardization', - 'enable': 'true', - 'zh_name': '空格处理', - 'description': '将不同的unicode空格比如u2008, 转成正常的空格' - }, - { - 'name': 'remove_garbled_text', - 'enable': 'false', - 'zh_name': '去除乱码', - 'description': '去除乱码和无意义的unicode' - }, - { - 'name': 'traditional_to_simplified', - 'enable': 'false', - 'zh_name': '繁转简', - 'description': '繁体转简体,如“不經意,妳的笑容”清洗成“不经意,你的笑容”' - }, - { - 'name': 'remove_html_tag', - 'enable': 'false', - 'zh_name': '去除网页标识符', - 'description': '移除文档中的html标签, 如,,

    等' - }, - { - 'name': 'remove_emojis', - 'enable': 'false', - 'zh_name': '去除表情', - 'description': '去除文档中的表情,如‘🐰’, ‘🧑🏼’等' - } - ] - }, - { - 'name': 'filtration', - 'description': '数据过滤配置', - 'children': [ - { - 'name': 'character_duplication_rate', - 'enable': 'false', - 'zh_name': '字重复率过滤', - 'description': '如果字重复率太高,意味着文档中重复的字太多,文档会被过滤掉' - }, - { - 'name': 'word_duplication_rate', - 'enable': 'false', - 'zh_name': '词重复率过滤', - 'description': '如果词重复率太高,意味着文档中重复的词太多,文档会被过滤掉' - }, - { - 'name': 'special_character_rate', - 'enable': 'false', - 'zh_name': '特殊字符串率', - 'description': '如果特殊字符率太高,意味着文档中特殊字符太多,文档会被过滤掉' - }, - { - 'name': 'pornography_violence_word_rate', - 'enable': 'false', - 'zh_name': '色情暴力词率', - 'description': '如果色情暴力词率太高,文档会被过滤掉' - } - ] - }, - { - 'name': 'duplicates', - 'description': '数据去重配置', - 'children': [ - { - 'name': 'simhash', - 'enable': 'false', - 'zh_name': 'Simhash', - 'description': '根据海明距离计算文档相似度, 相似度<=海明距离,认为两个文档相似。(范围:4-6)' - } - ] - }, - { - 'name': 'privacy_erosion', - 'description': '数据隐私配置', - 'children': [ - { - 'name': 'remove_email', - 'enable': 'true', - 'zh_name': '去除Email', - 'description': '去除email地址' - }, - { - 'name': 'remove_ip_address', - 'enable': 'false', - 'zh_name': '去除IP地址', - 'description': '去除IPv4 或者 IPv6 地址' - }, - { - 'name': 'remove_number', - 'enable': 'false', - 'zh_name': '去除数字', - 'description': '去除数字和字母数字标识符,如电话号码、信用卡号、十六进制散列等,同时跳过年份和简单数字的实例' - } - ] - } -] +def get_default_support_types(): + """Get the default support types.""" + return [ + { + 'name': 'chunk_processing', + 'description': '拆分处理', + 'children': [ + { + 'name': 'qa_split', + 'enable': 'true', + 'zh_name': 'QA拆分', + 'description': '根据文件中的文章与图表标题,自动将文件做 QA 拆分处理。' + }, + { + 'name': 'document_chunk', + 'enable': 'false', + 'zh_name': '文本分段', + 'description': '' + } + ] + }, + { + 'name': 'clean', + 'description': '异常清洗配置', + 'children': [ + { + 'name': 'remove_invisible_characters', + 'enable': 'true', + 'zh_name': '移除不可见字符', + 'description': '移除ASCII中的一些不可见字符, 如0-32 和127-160这两个范围' + }, + { + 'name': 'space_standardization', + 'enable': 'true', + 'zh_name': '空格处理', + 'description': '将不同的unicode空格比如u2008, 转成正常的空格' + }, + { + 'name': 'remove_garbled_text', + 'enable': 'false', + 'zh_name': '去除乱码', + 'description': '去除乱码和无意义的unicode' + }, + { + 'name': 'traditional_to_simplified', + 'enable': 'false', + 'zh_name': '繁转简', + 'description': '繁体转简体,如“不經意,妳的笑容”清洗成“不经意,你的笑容”' + }, + { + 'name': 'remove_html_tag', + 'enable': 'false', + 'zh_name': '去除网页标识符', + 'description': '移除文档中的html标签, 如,,

    等' + }, + { + 'name': 'remove_emojis', + 'enable': 'false', + 'zh_name': '去除表情', + 'description': '去除文档中的表情,如‘🐰’, ‘🧑🏼’等' + } + ] + }, + { + 'name': 'filtration', + 'description': '数据过滤配置', + 'children': [ + { + 'name': 'character_duplication_rate', + 'enable': 'false', + 'zh_name': '字重复率过滤', + 'description': '如果字重复率太高,意味着文档中重复的字太多,文档会被过滤掉' + }, + { + 'name': 'word_duplication_rate', + 'enable': 'false', + 'zh_name': '词重复率过滤', + 'description': '如果词重复率太高,意味着文档中重复的词太多,文档会被过滤掉' + }, + { + 'name': 'special_character_rate', + 'enable': 'false', + 'zh_name': '特殊字符串率', + 'description': '如果特殊字符率太高,意味着文档中特殊字符太多,文档会被过滤掉' + }, + { + 'name': 'pornography_violence_word_rate', + 'enable': 'false', + 'zh_name': '色情暴力词率', + 'description': '如果色情暴力词率太高,文档会被过滤掉' + } + ] + }, + { + 'name': 'duplicates', + 'description': '数据去重配置', + 'children': [ + { + 'name': 'simhash', + 'enable': 'false', + 'zh_name': 'Simhash', + 'description': '根据海明距离计算文档相似度, 相似度<=海明距离,认为两个文档相似。(范围:4-6)' + } + ] + }, + { + 'name': 'privacy_erosion', + 'description': '数据隐私配置', + 'children': [ + { + 'name': 'remove_email', + 'enable': 'true', + 'zh_name': '去除Email', + 'description': '去除email地址' + }, + { + 'name': 'remove_ip_address', + 'enable': 'false', + 'zh_name': '去除IP地址', + 'description': '去除IPv4 或者 IPv6 地址' + }, + { + 'name': 'remove_number', + 'enable': 'false', + 'zh_name': '去除数字', + 'description': '去除数字和字母数字标识符,如电话号码、信用卡号、十六进制散列等,同时跳过年份和简单数字的实例' + } + ] + } + ] diff --git a/data-processing/data_manipulation/utils/class_utils.py b/data-processing/data_manipulation/utils/class_utils.py new file mode 100644 index 000000000..8388f814e --- /dev/null +++ b/data-processing/data_manipulation/utils/class_utils.py @@ -0,0 +1,34 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import abc +from typing import Any + + +class Singleton(abc.ABCMeta, type): + """Singleton metaclass for ensuring only one instance of a class""" + + _instances = {} + + def __call__(cls, *args: Any, **kwargs: Any) -> Any: + """Call method for the singleton metaclass""" + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + + +class AbstractSingleton(abc.ABC, metaclass=Singleton): + """Abstract singleton class for ensuring only one instance of a class""" + pass diff --git a/data-processing/data_manipulation/utils/csv_utils.py b/data-processing/data_manipulation/utils/csv_utils.py new file mode 100644 index 000000000..da5da0eb2 --- /dev/null +++ b/data-processing/data_manipulation/utils/csv_utils.py @@ -0,0 +1,54 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import csv +import logging +import os + +from common import log_tag_const +from utils import file_utils + +logger = logging.getLogger(__name__) + + +def save_csv( + file_name, + phase_value, + data +): + """Save the csv file. + + file_name: file name; + phase_value: phase value + """ + csv_file_path = file_utils.get_temp_file_path() + + # 如果文件夹不存在,则创建 + directory_path = csv_file_path + phase_value + if not os.path.exists(directory_path): + os.makedirs(directory_path) + + file_path = directory_path + '/' + file_name + + logger.debug(''.join([ + f"{log_tag_const.CSV_HANDLE} Save a csv file.\n", + f"file path: {file_path}" + ])) + + with open(file_path, 'w', newline='') as file: + writer = csv.writer(file) + writer.writerows(data) + + return file_path + diff --git a/data-processing/data_manipulation/utils/date_time_utils.py b/data-processing/data_manipulation/utils/date_time_utils.py index d961efc76..64772df2a 100644 --- a/data-processing/data_manipulation/utils/date_time_utils.py +++ b/data-processing/data_manipulation/utils/date_time_utils.py @@ -14,6 +14,7 @@ import datetime + import pytz @@ -21,7 +22,7 @@ def now_str(): return f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S.%f}" -def now_utc_str(opt={}): +def now_utc_str(): return datetime.datetime.now(pytz.utc).strftime('%Y-%m-%dT%H:%M:%SZ') @@ -41,9 +42,13 @@ def timestamp_to_str_second(timestamp): return f"{datetime.datetime.fromtimestamp(timestamp):%Y-%m-%d %H:%M:%S}" -def chage_datetime_fromat(opt={}): +def chage_datetime_fromat( + date_time, + from_format +): my_date_time = datetime.datetime.strptime( - opt['date_time'], - opt['from_format']) + date_time, + from_format + ) return my_date_time.strftime(opt.get('to_format', '%Y-%m-%d %H:%M:%S')) diff --git a/data-processing/data_manipulation/utils/file_utils.py b/data-processing/data_manipulation/utils/file_utils.py index 16bcee21e..2478d881d 100644 --- a/data-processing/data_manipulation/utils/file_utils.py +++ b/data-processing/data_manipulation/utils/file_utils.py @@ -12,63 +12,31 @@ # See the License for the specific language governing permissions and # limitations under the License. -### -# 文件工具类 -### - import os -### -# 生成文件名称 -# @author: wangxinbiao -# @date: 2023-11-09 10:14:01 -# modify history -# ==== 2023-11-09 10:14:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### -async def get_file_name(opt={}): - file_name = opt['file_name'] - handle_name = opt['handle_name'] - +def get_file_name( + file_name, + handle_name +): + """Get file name.""" file_extension = file_name.split('.')[-1].lower() file_name_without_extension = file_name.rsplit('.', 1)[0] return file_name_without_extension + '_' + handle_name + '.' + file_extension -### -# 获取临时文件路径 -# @author: wangxinbiao -# @date: 2023-11-09 10:14:01 -# modify history -# ==== 2023-11-09 10:14:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### - -async def get_temp_file_path(): +def get_temp_file_path(): + """Get temp file path""" current_directory = os.getcwd() csv_file_path = os.path.join(current_directory, 'file_handle/temp_file/') return csv_file_path -### -# 删除文件 -# @author: wangxinbiao -# @date: 2023-11-09 10:14:01 -# modify history -# ==== 2023-11-09 10:14:01 ==== -# author: wangxinbiao -# content: -# 1) 基本功能实现 -### -async def delete_file(file_path): +def delete_file(file_path): + """Delete file""" os.remove(file_path) diff --git a/data-processing/data_manipulation/utils/json_utils.py b/data-processing/data_manipulation/utils/json_utils.py index 18c40487d..cb8d2361b 100644 --- a/data-processing/data_manipulation/utils/json_utils.py +++ b/data-processing/data_manipulation/utils/json_utils.py @@ -18,33 +18,38 @@ import ujson -def pretty_print(opt={}): - data = opt.get('data', {}) - - print(ujson.dumps(data, - ensure_ascii=False, - escape_forward_slashes=False, - indent=4)) - - -def get_str_empty(opt={}): - json_item = opt['json_item'] - json_key = opt['json_key'] - +def get_str_empty( + json_item, + json_key +): if json_item.get(json_key, '') is None: return '' return json_item.get(json_key, '') -def write_json_file(opt={}): - file_name = Path(opt['file_name']) +def write_json_file( + file_name, + data, + indent=None, + ensure_ascii=None, + escape_forward_slashes=None +): + file_name = Path(file_name) with open(file_name, 'w', encoding='utf-8') as outfile: - dump(opt['data'], outfile, opt) - - -def read_json_file(opt={}): - file_name = Path(opt['file_name']) + dump( + data, + outfile, + indent, + ensure_ascii, + escape_forward_slashes + ) + + +def read_json_file( + file_name +): + file_name = Path(file_name) json_result = None with open(file_name, 'r', encoding='utf-8') as f: json_result = ujson.load(f) @@ -52,10 +57,18 @@ def read_json_file(opt={}): return json_result -def dumps(json_data, opt={}): - indent = opt.get('indent', 2) - ensure_ascii = opt.get('ensure_ascii', False) - escape_forward_slashes = opt.get('escape_forward_slashes', False) +def dumps( + json_data, + indent=None, + ensure_ascii=None, + escape_forward_slashes=None +): + if indent is None: + indent=2 + if ensure_ascii is None: + ensure_ascii=False + if escape_forward_slashes is None: + escape_forward_slashes=False ujson.dumps(json_data, indent=indent, @@ -63,13 +76,23 @@ def dumps(json_data, opt={}): escape_forward_slashes=escape_forward_slashes) -def dump(json_data, file, opt={}): - indent = opt.get('indent', 2) - ensure_ascii = opt.get('ensure_ascii', False) - escape_forward_slashes = opt.get('escape_forward_slashes', False) +def dump( + json_data, + file, + indent=None, + ensure_ascii=None, + escape_forward_slashes=None +): + if indent is None: + indent=2 + if ensure_ascii is None: + ensure_ascii=False + if escape_forward_slashes is None: + escape_forward_slashes=False ujson.dump(json_data, file, indent=indent, ensure_ascii=ensure_ascii, escape_forward_slashes=escape_forward_slashes) + diff --git a/data-processing/data_manipulation/utils/log_utils.py b/data-processing/data_manipulation/utils/log_utils.py index 8b3c5ec15..068218b2e 100644 --- a/data-processing/data_manipulation/utils/log_utils.py +++ b/data-processing/data_manipulation/utils/log_utils.py @@ -18,13 +18,14 @@ from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler -def init_config(opt={}): +def init_config( + source_type, + log_dir +): """Initialize the log config""" # Disable debug logs for the Kubernetes Python client logging.getLogger("kubernetes").setLevel(logging.WARNING) - source_type = opt['source_type'] - log_dir = opt['log_dir'] os.makedirs(log_dir, exist_ok=True) ### # 配置全局日志配置 diff --git a/data-processing/data_manipulation/utils/pdf_utils.py b/data-processing/data_manipulation/utils/pdf_utils.py new file mode 100644 index 000000000..97774243f --- /dev/null +++ b/data-processing/data_manipulation/utils/pdf_utils.py @@ -0,0 +1,34 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from pypdf import PdfReader + + +def get_content( + file_path +): + """Get the content from a pdf file. + + file_path: file path; + """ + reader = PdfReader(file_path) + content = "" + + for page in reader.pages: + content += page.extract_text() + + return content + + diff --git a/data-processing/data_manipulation/utils/sanic_utls.py b/data-processing/data_manipulation/utils/sanic_utils.py similarity index 95% rename from data-processing/data_manipulation/utils/sanic_utls.py rename to data-processing/data_manipulation/utils/sanic_utils.py index 73613c149..5c910c006 100644 --- a/data-processing/data_manipulation/utils/sanic_utls.py +++ b/data-processing/data_manipulation/utils/sanic_utils.py @@ -16,11 +16,10 @@ import logging import traceback +from common import log_tag_const from sanic.handlers import ErrorHandler from sanic.response import json -from common import log_tag_const - logger = logging.getLogger(__name__) @@ -32,7 +31,7 @@ def default(self, request, exception): f"{log_tag_const.WEB_SERVER_ERROR} The url has a error.\n", f"url: {request.url}\n", f"status code: {status_code} \n", - f"{exception} \n{traceback.format_exc()}" + f"error trace: \n{traceback.format_exc()}" ])) return json({ 'status': status_code, diff --git a/data-processing/database/base.sql b/data-processing/database/base.sql new file mode 100644 index 000000000..5cf262d52 --- /dev/null +++ b/data-processing/database/base.sql @@ -0,0 +1,95 @@ +-- Table: public.data_process_task + +-- DROP TABLE IF EXISTS public.data_process_task; + +CREATE TABLE IF NOT EXISTS public.data_process_task +( + id character varying(32) COLLATE pg_catalog."default" NOT NULL, + name character varying(64) COLLATE pg_catalog."default", + file_type character varying(32) COLLATE pg_catalog."default", + status character varying(32) COLLATE pg_catalog."default", + pre_data_set_name character varying(32) COLLATE pg_catalog."default", + pre_data_set_version character varying(32) COLLATE pg_catalog."default", + file_names jsonb, + post_data_set_name character varying(32) COLLATE pg_catalog."default", + post_data_set_version character varying(32) COLLATE pg_catalog."default", + data_process_config_info jsonb, + start_datetime character varying(32) COLLATE pg_catalog."default", + end_datetime character varying(32) COLLATE pg_catalog."default", + create_datetime character varying(32) COLLATE pg_catalog."default", + create_user character varying(32) COLLATE pg_catalog."default", + create_program character varying(64) COLLATE pg_catalog."default", + update_datetime character varying(32) COLLATE pg_catalog."default", + update_user character varying(32) COLLATE pg_catalog."default", + update_program character varying(64) COLLATE pg_catalog."default", + namespace character varying(64) COLLATE pg_catalog."default", + CONSTRAINT data_process_task_pkey PRIMARY KEY (id) +) + +TABLESPACE pg_default; + +-- Table: public.data_process_task_detail + +-- DROP TABLE IF EXISTS public.data_process_task_detail; + +CREATE TABLE IF NOT EXISTS public.data_process_task_detail +( + id character varying(32) COLLATE pg_catalog."default" NOT NULL, + task_id character varying(32) COLLATE pg_catalog."default", + file_name character varying(512) COLLATE pg_catalog."default", + transform_type character varying(64) COLLATE pg_catalog."default", + pre_content text COLLATE pg_catalog."default", + post_content text COLLATE pg_catalog."default", + create_datetime character varying(32) COLLATE pg_catalog."default", + create_user character varying(32) COLLATE pg_catalog."default", + create_program character varying(64) COLLATE pg_catalog."default", + update_datetime character varying(32) COLLATE pg_catalog."default", + update_user character varying(32) COLLATE pg_catalog."default", + update_program character varying(32) COLLATE pg_catalog."default", + CONSTRAINT data_process_detail_pkey PRIMARY KEY (id) +) + +COMMENT ON TABLE public.data_process_task_detail IS '数据处理详情'; +COMMENT ON COLUMN public.data_process_task_detail.id IS '主键'; +COMMENT ON COLUMN public.data_process_task_detail.task_id IS '任务Id'; +COMMENT ON COLUMN public.data_process_task_detail.file_name IS '文件名称'; +COMMENT ON COLUMN public.data_process_task_detail.transform_type IS '转换类型'; +COMMENT ON COLUMN public.data_process_task_detail.pre_content IS '处理前的内容'; +COMMENT ON COLUMN public.data_process_task_detail.post_content IS '处理后的内容'; +COMMENT ON COLUMN public.data_process_task_detail.create_datetime IS '创建时间'; +COMMENT ON COLUMN public.data_process_task_detail.create_user IS '创建用户'; +COMMENT ON COLUMN public.data_process_task_detail.create_program IS '创建程序'; +COMMENT ON COLUMN public.data_process_task_detail.update_datetime IS '更新时间'; +COMMENT ON COLUMN public.data_process_task_detail.update_user IS '更新用户'; +COMMENT ON COLUMN public.data_process_task_detail.update_program IS '更新程序'; + +CREATE TABLE public.data_process_task_question_answer ( + id varchar(32) NOT NULL, -- 主键 + task_id varchar(32) NULL, -- 任务Id + file_name varchar(512) NULL, -- 文件名称 + question text NULL, -- 问题 + answer text NULL, -- 答案 + create_datetime varchar(32) NULL, -- 创建时间 + create_user varchar(32) NULL, -- 创建用户 + create_program varchar(64) NULL, -- 创建程序 + update_datetime varchar(32) NULL, -- 更新时间 + update_user varchar(32) NULL, -- 更新用户 + update_program varchar(32) NULL, -- 更新程序 + CONSTRAINT data_process_task_question_answer_pkey PRIMARY KEY (id) +); +COMMENT ON TABLE public.data_process_task_question_answer IS '数据处理问题答案'; + +-- Column comments + +COMMENT ON COLUMN public.data_process_task_question_answer.id IS '主键'; +COMMENT ON COLUMN public.data_process_task_question_answer.task_id IS '任务Id'; +COMMENT ON COLUMN public.data_process_task_question_answer.file_name IS '文件名称'; +COMMENT ON COLUMN public.data_process_task_question_answer.question IS '问题'; +COMMENT ON COLUMN public.data_process_task_question_answer.answer IS '答案'; +COMMENT ON COLUMN public.data_process_task_question_answer.create_datetime IS '创建时间'; +COMMENT ON COLUMN public.data_process_task_question_answer.create_user IS '创建用户'; +COMMENT ON COLUMN public.data_process_task_question_answer.create_program IS '创建程序'; +COMMENT ON COLUMN public.data_process_task_question_answer.update_datetime IS '更新时间'; +COMMENT ON COLUMN public.data_process_task_question_answer.update_user IS '更新用户'; +COMMENT ON COLUMN public.data_process_task_question_answer.update_program IS '更新程序'; + diff --git a/data-processing/requirements.txt b/data-processing/requirements.txt index 4de33787a..a22e3b2fc 100644 --- a/data-processing/requirements.txt +++ b/data-processing/requirements.txt @@ -15,8 +15,8 @@ psycopg2_binary==2.9.9 kubernetes==25.3.0 duckdb==0.9.2 DBUtils==3.0.3 +pyyaml==6.0.1 -jupyterlab==3.6.1 -ipython==8.12.0 - - +opencc==0.2 +opencc-python-reimplemented==0.1.7 +selectolax==0.3.17