diff --git a/data-processing/.gitignore b/data-processing/.gitignore index 700634c4f..87bb27161 100644 --- a/data-processing/.gitignore +++ b/data-processing/.gitignore @@ -4,4 +4,6 @@ __pycache__ mock_data -log \ No newline at end of file +log + +file_handle/temp_file \ No newline at end of file diff --git a/data-processing/Dockerfile b/data-processing/Dockerfile new file mode 100644 index 000000000..29c021762 --- /dev/null +++ b/data-processing/Dockerfile @@ -0,0 +1,47 @@ +FROM python:3.10.13-slim + +ENV TZ=Asia/Shanghai + +RUN sed -i 's/deb.debian.org/mirrors.tuna.tsinghua.edu.cn/g' /etc/apt/sources.list.d/debian.sources + +RUN export DEBIAN_FRONTEND=noninteractive \ + && apt-get update \ + && apt-get install -y tzdata \ + && ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ + && dpkg-reconfigure --frontend noninteractive tzdata \ + && apt-get install -y python3-distutils curl python3-pip \ + && apt-get install -y wget + +RUN wget https://github.com/explosion/spacy-models/releases/download/zh_core_web_sm-3.5.0/zh_core_web_sm-3.5.0-py3-none-any.whl -O /tmp/zh_core_web_sm-3.5.0-py3-none-any.whl \ + && pip3 install /tmp/zh_core_web_sm-3.5.0-py3-none-any.whl -i https://pypi.org/simple \ + && rm /tmp/zh_core_web_sm-3.5.0-py3-none-any.whl + + +ENV MINIO_ACCESSKEY=minio_accesskey +ENV MINIO_SECRETKEY=minio_secretkey +ENV MINIO_API_URL=localhost:9000 +ENV MINIO_SECURE=False + +ENV ZHIPUAI_API_KEY=xxxxx + +ENV KNOWLEDGE_CHUNK_SIZE=500 +ENV KNOWLEDGE_CHUNK_OVERLAP=50 + +ENV PG_HOST=localhost +ENV PG_PORT=5432 +ENV PG_USER=postgres +ENV PG_PASSWORD=xxxxx +ENV PG_DATABASE=data_process + +EXPOSE 28888 + +ADD . /arcadia_app/ +WORKDIR /arcadia_app + +RUN chmod 777 /arcadia_app/entrypoint.sh + +RUN pip install -r requirements.txt + +ENTRYPOINT ["./entrypoint.sh"] + + diff --git a/data-processing/Dockerfile.base b/data-processing/Dockerfile.base deleted file mode 100644 index 4ab63694d..000000000 --- a/data-processing/Dockerfile.base +++ /dev/null @@ -1,14 +0,0 @@ -FROM python:3.10.13-slim - -ENV TZ=Asia/Shanghai - -RUN sed -i 's/deb.debian.org/mirrors.tuna.tsinghua.edu.cn/g' /etc/apt/sources.list.d/debian.sources - -RUN export DEBIAN_FRONTEND=noninteractive \ - && apt-get update \ - && apt-get install -y tzdata \ - && ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ - && dpkg-reconfigure --frontend noninteractive tzdata \ - && apt-get install -y python3-distutils curl python3-pip - -WORKDIR /happy_work_space \ No newline at end of file diff --git a/data-processing/data_manipulation/common/config.py b/data-processing/data_manipulation/common/config.py index 1a0755b77..953d9159e 100644 --- a/data-processing/data_manipulation/common/config.py +++ b/data-processing/data_manipulation/common/config.py @@ -21,7 +21,14 @@ minio_secure = os.getenv('MINIO_SECURE', False) # zhipuai api_key -zhipuai_api_key = os.getenv('ZHIPUAI_API_KEY', 'xxxxx') +zhipuai_api_key = os.getenv('ZHIPUAI_API_KEY', 'xxxxxx') knowledge_chunk_size = os.getenv("KNOWLEDGE_CHUNK_SIZE", 500) -knowledge_chunk_overlap = os.getenv("KNOWLEDGE_CHUNK_OVERLAP", 50) \ No newline at end of file +knowledge_chunk_overlap = os.getenv("KNOWLEDGE_CHUNK_OVERLAP", 50) + +# pg数据库 +pg_host = os.getenv("PG_HOST", "localhost") +pg_port = os.getenv("PG_PORT", 5432) +pg_user = os.getenv("PG_USER", "postgres") +pg_password = os.getenv("PG_PASSWORD", "xxxxx") +pg_database = os.getenv("PG_DATABASE", "data_process") \ No newline at end of file diff --git a/data-processing/data_manipulation/common/special_characters.py b/data-processing/data_manipulation/common/special_characters.py new file mode 100644 index 000000000..2c0d70795 --- /dev/null +++ b/data-processing/data_manipulation/common/special_characters.py @@ -0,0 +1,39 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import string + +import emoji + +# special characters +MAIN_SPECIAL_CHARACTERS = string.punctuation + string.digits \ + + string.whitespace +OTHER_SPECIAL_CHARACTERS = ( + "’ “— ™ – •‘œ    ˜ ‚ƒ„’“”–ー一▬…✦�­£​•€«»°·═" + "×士^˘⇓↓↑←→()§″′´¿−±∈¢ø‚„½¼¾¹²³―⁃,ˌ¸‹›ʺˈʻ¦‐⠀‰……‑≤≥‖" + "◆●■►▼▲▴∆▻¡★☆✱ːº。¯˜¥ɪ≈†上ン:∼⁄・♡✓⊕․.⋅÷1‟;،、¨ाাी्े◦˚" + "゜ʼ≖ʼ¤ッツシ℃√!【】‿∞➤~πه۩☛₨➩☻๑٪♥ıॽ《‘©﴿٬?▷Г♫∟™ª₪®「—❖" + "」﴾》" +) +EMOJI = list(emoji.EMOJI_DATA.keys()) +SPECIAL_CHARACTERS = set(MAIN_SPECIAL_CHARACTERS + OTHER_SPECIAL_CHARACTERS) +SPECIAL_CHARACTERS.update(EMOJI) + +# various whitespaces for whitespace normalization +# whitespaces in unicode can be found here: +# https://en.wikipedia.org/wiki/Whitespace_character +VARIOUS_WHITESPACES = { + ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', + ' ', ' ', ' ', ' ', '​', '‌', '‍', '⁠', '', '„' +} \ No newline at end of file diff --git a/data-processing/data_manipulation/db/data_process_task.py b/data-processing/data_manipulation/db/data_process_task.py new file mode 100644 index 000000000..84cb4e53e --- /dev/null +++ b/data-processing/data_manipulation/db/data_process_task.py @@ -0,0 +1,207 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +### +# 数据处理任务 +# @author: wangxinbiao +# @date: 2023-11-21 13:57:01 +# modify history +# ==== 2023-11-21 13:57:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + +import ulid +import ujson + +from datetime import datetime +from utils import pg_utils +from sanic.response import json + + +async def list_by_page(request, opt={}): + conn = opt['conn'] + + req_json = request.json + + params = { + 'keyword': '%' + req_json['keyword'] + '%', + 'page': int(req_json['page']), + 'pageSize': int(req_json['pageSize']) + } + + sql = """ + select + id, + name, + status, + pre_data_set_name, + pre_data_set_version, + post_data_set_name, + post_data_set_version, + start_datetime + from + public.data_process_task + where + name like %(keyword)s + limit %(pageSize)s offset %(page)s + """.strip() + + res = await pg_utils.execute_sql(conn,sql,params) + return json(res) + + +async def list_by_count(request, opt={}): + conn = opt['conn'] + + req_json = request.json + + params = { + 'keyword': '%' + req_json['keyword'] + '%' + } + + sql = """ + select + count(*) + from + public.data_process_task + where + name like %(keyword)s + """.strip() + + res = await pg_utils.execute_count_sql(conn,sql,params) + return json(res) + + +async def add(request, opt={}): + conn = opt['conn'] + + req_json = request.json + + now = datetime.now() + user = 'admin' + program = '数据处理任务-新增' + + params = { + 'id': opt['id'], + 'name': req_json['name'], + 'file_type': req_json['file_type'], + 'status': 'processing', + 'pre_data_set_name': req_json['pre_data_set_name'], + 'pre_data_set_version': req_json['pre_data_set_version'], + 'file_names': ujson.dumps(req_json['file_names']), + 'post_data_set_name': req_json['post_data_set_name'], + 'post_data_set_version': req_json['post_data_set_version'], + 'data_process_config_info': ujson.dumps(req_json['data_process_config_info']), + 'start_datetime': now, + 'create_datetime': now, + 'create_user': user, + 'create_program': program, + 'update_datetime': now, + 'update_user': user, + 'update_program': program + } + + sql = """ + insert into public.data_process_task ( + id, + name, + file_type, + status, + pre_data_set_name, + pre_data_set_version, + file_names, + post_data_set_name, + post_data_set_version, + data_process_config_info, + start_datetime, + create_datetime, + create_user, + create_program, + update_datetime, + update_user, + update_program + ) + values ( + %(id)s, + %(name)s, + %(file_type)s, + %(status)s, + %(pre_data_set_name)s, + %(pre_data_set_version)s, + %(file_names)s, + %(post_data_set_name)s, + %(post_data_set_version)s, + %(data_process_config_info)s, + %(start_datetime)s, + %(create_datetime)s, + %(create_program)s, + %(create_user)s, + %(update_datetime)s, + %(update_program)s, + %(update_user)s + ) + """.strip() + + return await pg_utils.execute_insert_sql(conn,sql,params) + + +async def delete_by_id(request, opt={}): + conn = opt['conn'] + + req_json = request.json + + params = { + 'id': req_json['id'] + } + + sql = """ + delete from public.data_process_task + where + id = %(id)s + """.strip() + + res = await pg_utils.execute_delete_sql(conn,sql,params) + return json(res) + + +async def update_status_by_id(opt={}): + conn = opt['conn'] + + now = datetime.now() + user = 'admin' + program = '修改任务状态' + + params = { + 'id': opt['id'], + 'status': opt['status'], + 'update_datetime': now, + 'update_program': program, + 'update_user': user + } + + sql = """ + UPDATE public.dataset set + status = %(status)s + update_datetime = %(update_datetime)s, + update_program = %(update_program)s, + update_user = %(update_user)s + WHERE + id = %(id)s + """.strip() + + res = await pg_utils.execute_update_sql(conn,sql,params) + return json(res) diff --git a/data-processing/data_manipulation/file_handle/csv_handle.py b/data-processing/data_manipulation/file_handle/csv_handle.py index d2c78f38c..163656b23 100644 --- a/data-processing/data_manipulation/file_handle/csv_handle.py +++ b/data-processing/data_manipulation/file_handle/csv_handle.py @@ -27,6 +27,8 @@ import logging import os +import asyncio + import pandas as pd import ulid from transform.text import clean_transform, privacy_transform diff --git a/data-processing/data_manipulation/file_handle/pdf_handle.py b/data-processing/data_manipulation/file_handle/pdf_handle.py index 7739aa68d..16f31636f 100644 --- a/data-processing/data_manipulation/file_handle/pdf_handle.py +++ b/data-processing/data_manipulation/file_handle/pdf_handle.py @@ -54,12 +54,6 @@ async def text_manipulate(request, opt={}): logger.info("pdf text manipulate!") - """ - 数据处理逻辑: - 处理某条数据时,如果某个方式(比如:去除不可见字符)处理失败了,则直接结束,不在处理,整个文件都视作处理失败 - - """ - try: file_name = opt['file_name'] @@ -85,14 +79,23 @@ async def text_manipulate(request, opt={}): if clean_result['status'] != 200: return clean_result - - content = clean_result['data'] + else: + content = clean_result['data'] # 去隐私 - + clean_result = await privacy_erosion({ + 'support_type': support_type, + 'file_name': file_name, + 'data': content + }) + + if clean_result['status'] != 200: + return clean_result + else: + content = clean_result['data'] # QA拆分 - if 'qa_split' in support_type: + if any(d.get('type') == 'qa_split' for d in support_type): qa_data = await generate_QA(request, { 'support_type': support_type, 'data': content @@ -142,7 +145,7 @@ async def data_clean(opt={}): data = opt['data'] # 去除不可见字符 - if 'remove_invisible_characters' in support_type: + if any(d.get('type') == 'remove_invisible_characters' for d in support_type): result = await clean_transform.remove_invisible_characters({ 'text': data }) @@ -155,6 +158,21 @@ async def data_clean(opt={}): } data = result['data'] + + # 空格处理 + if any(d.get('type') == 'space_standardization' for d in support_type): + result = await clean_transform.space_standardization({ + 'text': data + }) + + if result['status'] != 200: + return { + 'status': 400, + 'message': '空格处理失败', + 'data': '' + } + + data = result['data'] logger.info("pdf text data clean stop!") @@ -165,6 +183,46 @@ async def data_clean(opt={}): } +### +# 去隐私 +# @author: wangxinbiao +# @date: 2023-11-17 16:14:01 +# modify history +# ==== 2023-11-17 16:14:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + + +async def privacy_erosion(opt={}): + logger.info("pdf text privacy erosion start!") + support_type = opt['support_type'] + data = opt['data'] + + # 去邮箱 + if any(d.get('type') == 'remove_email' for d in support_type): + result = await privacy_transform.remove_email({ + 'text': data + }) + + if result['status'] != 200: + return { + 'status': 400, + 'message': '去邮箱', + 'data': '' + } + + data = result['data'] + + logger.info("pdf text privacy erosion stop!") + + return { + 'status': 200, + 'message': '', + 'data': data + } + ### # 获取PDF内容 # @author: wangxinbiao @@ -235,3 +293,29 @@ async def generate_QA(request, opt={}): qa_list.extend(data) return qa_list + +### +# 文本分段 +# @author: wangxinbiao +# @date: 2023-11-17 16:14:01 +# modify history +# ==== 2023-11-17 16:14:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + + +async def document_chunk(request, opt={}): + + separator = "\n\n" + + text_splitter = SpacyTextSplitter( + separator=separator, + pipeline="zh_core_web_sm", + chunk_size=opt['chunk_size'], + chunk_overlap=opt['chunk_overlap'] + ) + texts = text_splitter.split_text(opt['data']) + + return texts \ No newline at end of file diff --git a/data-processing/data_manipulation/server.py b/data-processing/data_manipulation/server.py index 9f886cb6a..4442e9654 100644 --- a/data-processing/data_manipulation/server.py +++ b/data-processing/data_manipulation/server.py @@ -26,11 +26,13 @@ import asyncio import logging +import psycopg2 +from common import config from sanic import Sanic from sanic.response import json from sanic_cors import CORS -from service import minio_store_process_service +from service import minio_store_process_service, data_process_service from transform.text import support_type from utils import log_utils @@ -53,6 +55,83 @@ app.config['RESPONSE_TIMEOUT'] = 60 * 60 * 60 app.config['KEEP_ALIVE_TIMEOUT'] = 60 * 60 * 60 +@app.listener('before_server_start') +async def init_web_server(app, loop): + app.config['conn'] = get_connection() + + +### +# 分页查询列表 +# @author: wangxinbiao +# @date: 2023-11-21 11:31:01 +# modify history +# ==== 2023-11-21 11:31:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + + +@app.route('list-by-page', methods=['POST']) +async def list_by_page(request): + return await data_process_service.list_by_page(request, { + 'conn': app.config['conn'] + }) + +### +# 列表总记录数 +# @author: wangxinbiao +# @date: 2023-11-21 15:45:01 +# modify history +# ==== 2023-11-21 15:45:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + + +@app.route('list-by-count', methods=['POST']) +async def list_by_count(request): + return await data_process_service.list_by_count(request, { + 'conn': app.config['conn'] + }) + +### +# 新增 +# @author: wangxinbiao +# @date: 2023-11-21 15:45:01 +# modify history +# ==== 2023-11-21 15:45:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + + +@app.route('add', methods=['POST']) +async def add(request): + return await data_process_service.add(request, { + 'conn': app.config['conn'] + }) + +### +# 删除 +# @author: wangxinbiao +# @date: 2023-11-21 15:45:01 +# modify history +# ==== 2023-11-21 15:45:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + + +@app.route('delete-by-id', methods=['POST']) +async def delete_by_id(request): + return await data_process_service.delete_by_id(request, { + 'conn': app.config['conn'] + }) + ### # 文本数据处理 # @author: wangxinbiao @@ -79,7 +158,7 @@ async def text_manipulate(request): """ - await asyncio.create_task( + asyncio.create_task( minio_store_process_service.text_manipulate(request) ) @@ -118,6 +197,39 @@ async def text_process_type(request): 'data': support_type.support_types }) + +### +# 数据库链接 +# @author: wangxinbiao +# @date: 2023-11-02 14:42:01 +# modify history +# ==== 2023-11-02 14:42:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + + +def get_connection(): + ''' + 获取postgresql连接 + :param host: + :param port: + :param user: + :param password: + :param database: + :return: + ''' + conn = psycopg2.connect(database=config.pg_database, user=config.pg_user, password=config.pg_password, host=config.pg_host, port=config.pg_port) + + # while True: + # cur = conn.cursor() + # cur.execute("SELECT 1") + # cur.close() + # time.sleep(3600) # 每隔5分钟发送一次查询 + + return conn + if __name__ == '__main__': app.run(host='0.0.0.0', port=28888, diff --git a/data-processing/data_manipulation/service/data_process_service.py b/data-processing/data_manipulation/service/data_process_service.py new file mode 100644 index 000000000..efbeb0e5f --- /dev/null +++ b/data-processing/data_manipulation/service/data_process_service.py @@ -0,0 +1,107 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +### +# 数据处理 +# @author: wangxinbiao +# @date: 2023-11-21 11:35:01 +# modify history +# ==== 2023-11-21 11:35:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + +import asyncio +import logging +import ulid + +from db import data_process_task +from sanic.response import json +from service import minio_store_process_service + +logger = logging.getLogger('data_process_service') + +### +# 分页查询列表 +# @author: wangxinbiao +# @date: 2023-11-21 11:31:01 +# modify history +# ==== 2023-11-21 11:31:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + + +async def list_by_page(request, opt={}): + return await data_process_task.list_by_page(request, opt) + +### +# 查询列表总记录数 +# @author: wangxinbiao +# @date: 2023-11-21 15:45:01 +# modify history +# ==== 2023-11-21 15:45:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + + +async def list_by_count(request, opt={}): + return await data_process_task.list_by_count(request, opt) + +### +# 新增 +# @author: wangxinbiao +# @date: 2023-11-21 15:45:01 +# modify history +# ==== 2023-11-21 15:45:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + + +async def add(request, opt={}): + id = ulid.ulid() + opt['id'] = id + res = await data_process_task.add(request, opt) + + if res['status'] == 200: + # 进行数据处理 + asyncio.create_task( + minio_store_process_service.text_manipulate(request, opt) + ) + + return json(res) + + +### +# 删除 +# @author: wangxinbiao +# @date: 2023-11-21 17:47:01 +# modify history +# ==== 2023-11-21 17:47:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### + + +async def delete_by_id(request, opt={}): + + return await data_process_task.delete_by_id(request, opt) + diff --git a/data-processing/data_manipulation/service/minio_store_process_service.py b/data-processing/data_manipulation/service/minio_store_process_service.py index 478cb2f0f..c54e1555d 100644 --- a/data-processing/data_manipulation/service/minio_store_process_service.py +++ b/data-processing/data_manipulation/service/minio_store_process_service.py @@ -26,8 +26,9 @@ import io import logging import os - import pandas as pd + +from db import data_process_task from file_handle import csv_handle, pdf_handle from minio import Minio from minio.commonconfig import Tags @@ -49,41 +50,41 @@ ### -async def text_manipulate(request): +async def text_manipulate(request, opt={}): request_json = request.json bucket_name = request_json['bucket_name'] - support_type = request_json['type'] - folder_prefix = request_json['folder_prefix'] + support_type = request_json['data_process_config_info'] + file_names = request_json['file_names'] + folder_prefix = '/' + request_json['pre_data_set_name'] + '/' + request_json['pre_data_set_version'] # create minio client minio_client = await minio_utils.create_client() - # 查询存储桶下的所有对象 - objects = minio_client.list_objects(bucket_name, prefix=folder_prefix) - # 将文件都下载到本地 - file_names = await download({ - 'minio_client': minio_client, - 'bucket_name': bucket_name, - 'folder_prefix': folder_prefix, - 'objects': objects - }) + for file_name in file_names: + await download({ + 'minio_client': minio_client, + 'bucket_name': bucket_name, + 'folder_prefix': folder_prefix, + 'file_name': file_name['name'] + }) # 文件处理 for item in file_names: - file_extension = item.split('.')[-1].lower() + file_name = item['name'] + file_extension = file_name.split('.')[-1].lower() if file_extension in ['csv']: # 处理CSV文件 result = await csv_handle.text_manipulate({ - 'file_name': item, + 'file_name': file_name, 'support_type': support_type }) elif file_extension in ['pdf']: # 处理PDF文件 result = await pdf_handle.text_manipulate(request, { - 'file_name': item, + 'file_name': file_name, 'support_type': support_type }) @@ -115,6 +116,13 @@ async def text_manipulate(request): remove_file_path = await file_utils.get_temp_file_path() await file_utils.delete_file(remove_file_path + 'original/' + item) + # 数据库更新任务状态 + await data_process_task.update_status_by_id({ + 'id': opt['id'], + 'status': 'process_complete', + 'conn': opt['conn'] + }) + return json({ 'status': 200, 'message': '', @@ -134,27 +142,23 @@ async def text_manipulate(request): async def download(opt={}): - objects = opt['objects'] + folder_prefix = opt['folder_prefix'] minio_client = opt['minio_client'] bucket_name = opt['bucket_name'] - folder_prefix = opt['folder_prefix'] - file_names = [] - for obj in objects: - file_name = obj.object_name[len(folder_prefix):] + file_name = opt['file_name'] + + file_path = await file_utils.get_temp_file_path() - csv_file_path = await file_utils.get_temp_file_path() + # 如果文件夹不存在,则创建 + directory_path = file_path + 'original' + if not os.path.exists(directory_path): + os.makedirs(directory_path) - # 如果文件夹不存在,则创建 - directory_path = csv_file_path + 'original' - if not os.path.exists(directory_path): - os.makedirs(directory_path) + file_path = directory_path + '/' + file_name - file_path = directory_path + '/' + file_name + minio_client.fget_object(bucket_name, folder_prefix + '/' + file_name, file_path) - minio_client.fget_object(bucket_name, obj.object_name, file_path) - file_names.append(file_name) - return file_names ### # 文件上传至MinIO,添加Tags diff --git a/data-processing/data_manipulation/transform/text/clean_transform.py b/data-processing/data_manipulation/transform/text/clean_transform.py index 1fa729891..088e5005b 100644 --- a/data-processing/data_manipulation/transform/text/clean_transform.py +++ b/data-processing/data_manipulation/transform/text/clean_transform.py @@ -25,7 +25,7 @@ ### import re - +from common import special_characters ### # 去除不可见字符 @@ -55,3 +55,33 @@ async def remove_invisible_characters(opt={}): 'message': '去除不可见字符失败:' + str(ex), 'data': '' } + +### +# 空格处理 +# @author: wangxinbiao +# @date: 2023-11-20 16:53:01 +# modify history +# ==== 2023-11-20 16:53:01 ==== +# author: wangxinbiao +# content: +# 1) 基本功能实现 +### +async def space_standardization(opt={}): + text = opt['text'] + + try: + clean_text = ''.join([ + char if char not in special_characters.VARIOUS_WHITESPACES else ' ' for char in text + ]) + return { + 'status': 200, + 'message': '', + 'data': clean_text + } + + except Exception as ex: + return { + 'status': 400, + 'message': '空格处理失败:' + str(ex), + 'data': '' + } \ No newline at end of file diff --git a/data-processing/data_manipulation/utils/pg_utils.py b/data-processing/data_manipulation/utils/pg_utils.py new file mode 100644 index 000000000..0050f1bf8 --- /dev/null +++ b/data-processing/data_manipulation/utils/pg_utils.py @@ -0,0 +1,197 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +### +# pg数据库工具类 +### + +import psycopg2.extras + +async def execute_sql(conn,sql,record_to_select): + ''' + 执行sql语句 + :param conn: + :param sql: + :return: + ''' + error = '' + data = [] + try: + cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + cursor.execute(sql,record_to_select) + result = cursor.fetchall() + for row in result: + dataItem = {} + for item in row.keys(): + dataItem[item] = row[item] + data.append(dataItem) + + conn.commit() + cursor.close() + except Exception as ex: + print('查询', ex) + conn.rollback() + error = str(ex) + data = None + + if len(error) > 0: + return { + 'status': 400, + 'message': error, + 'data': None + } + + return { + 'status': 200, + 'message': '', + "data": data + } + +async def execute_count_sql(conn,sql,record_to_select_count): + ''' + 执行count sql语句 + :param conn: + :param sql: + :return: + ''' + error = '' + number_count = 0 + try: + cursor = conn.cursor() + cursor.execute(sql,record_to_select_count) + result = cursor.fetchall() + for row in result: + number_count = row + + conn.commit() + cursor.close() + except Exception as ex: + print('mysql 查询', ex) + conn.rollback() + error = str(ex) + data = None + + if len(error) > 0: + return { + 'status': 400, + 'message': error, + 'data': None + } + return { + 'status': 200, + 'message': '', + "data": number_count[0] + } + +async def execute_insert_sql(conn,sql,record_to_insert): + ''' + 执行insert sql语句 + :param conn: + :param sql: + :return: + ''' + error = '' + try: + cursor = conn.cursor() + cursor.execute(sql,record_to_insert) + + conn.commit() + cursor.close() + except Exception as ex: + print('insert 失败', ex) + conn.rollback() + error = str(ex) + data = None + + if len(error) > 0: + return { + 'status': 400, + 'message': error, + 'data': None + } + + return { + 'status': 200, + 'message': '新增成功', + "data": None + } + +async def execute_update_sql(conn,sql,record_to_update): + ''' + 执行 update sql语句 + :param conn: + :param sql: + :return: + ''' + error = '' + number_count = 0 + try: + cursor = conn.cursor() + cursor.execute(sql,record_to_update) + + conn.commit() + cursor.close() + except Exception as ex: + print('update 失败', ex) + conn.rollback() + error = str(ex) + data = None + + if len(error) > 0: + return { + 'status': 400, + 'message': error, + 'data': None + } + + return { + 'status': 200, + 'message': '编辑成功', + "data": None + } + +async def execute_delete_sql(conn,sql,record_to_delete): + ''' + 执行 delete sql语句 + :param conn: + :param sql: + :return: + ''' + error = '' + number_count = 0 + try: + cursor = conn.cursor() + cursor.execute(sql,record_to_delete) + + conn.commit() + cursor.close() + except Exception as ex: + print('delete 失败', ex) + conn.rollback() + error = str(ex) + data = None + + if len(error) > 0: + return { + 'status': 400, + 'message': error, + 'data': None + } + + return { + 'status': 200, + 'message': '删除成功', + "data": None + } diff --git a/data-processing/entrypoint.sh b/data-processing/entrypoint.sh new file mode 100755 index 000000000..c5e1cdee8 --- /dev/null +++ b/data-processing/entrypoint.sh @@ -0,0 +1,2 @@ +#!/bin/sh +python /arcadia_app/data_manipulation/server.py \ No newline at end of file diff --git a/data-processing/requirements.txt b/data-processing/requirements.txt index 0cde39b26..801ec66c3 100644 --- a/data-processing/requirements.txt +++ b/data-processing/requirements.txt @@ -8,4 +8,7 @@ minio==7.1.17 zhipuai==1.0.7 langchain==0.0.336 spacy==3.5.4 -pypdf==3.17.1 \ No newline at end of file +pypdf==3.17.1 +emoji==2.2.0 +ftfy==6.1.1 +psycopg2_binary==2.9.9 \ No newline at end of file