Skip to content

Commit

Permalink
chore: lint data-process code with pylint and optimize the dockerfile
Browse files Browse the repository at this point in the history
Signed-off-by: bjwswang <[email protected]>
  • Loading branch information
bjwswang committed Nov 16, 2023
1 parent fb4985f commit 413a91e
Show file tree
Hide file tree
Showing 23 changed files with 804 additions and 523 deletions.
634 changes: 634 additions & 0 deletions .pylintrc

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,13 @@ arctl: fmt vet ## Build manager binary.
gql-gen:
@go run github.com/99designs/[email protected] generate
build-graphql-server: gql-gen
@CGO_ENABLED=0 GOOS=linux go build -o bin/graphql-server graphql-server/go-server/main.go
@CGO_ENABLED=0 GOOS=linux go build -o bin/graphql-server graphql-server/go-server/main.go


# Commands for Data-Processing
DATA_PROCESSING_IMAGE ?= kubebb/dp-base

.PHONY: docker-build-dp-base
docker-build-dp-base:
docker build -f ./data-process/Dockerfile.base -t $(DATA_PROCESSING_IMAGE):$(VERSION) ./data-process/

14 changes: 14 additions & 0 deletions data-process/Dockerfile.base
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM python:3.10.13-slim

ENV TZ=Asia/Shanghai

RUN sed -i 's/deb.debian.org/mirrors.tuna.tsinghua.edu.cn/g' /etc/apt/sources.list.d/debian.sources

RUN export DEBIAN_FRONTEND=noninteractive \
&& apt-get update \
&& apt-get install -y tzdata \
&& ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
&& dpkg-reconfigure --frontend noninteractive tzdata \
&& apt-get install -y python3-distutils curl python3-pip

WORKDIR /happy_work_space
39 changes: 37 additions & 2 deletions data-process/README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,37 @@
# Data Process
The current documentation is only available in Chinese. Please refer to the content in .zh.md for specific details.
# Data Processing

## Current Version Main Features

Data Processing is used for data processing through MinIO, databases, Web APIs, etc. The data types handled include:
- txt
- json
- doc
- html
- excel
- csv
- pdf
- markdown
- ppt

### Current Text Type Processing

The data processing process includes: cleaning abnormal data, filtering, de-duplication, and anonymization.

## Design

![Design](../assets/data_process.drawio.png)

## Local Development
### Software Requirements

Before setting up the local data-process environment, please make sure the following software is installed:

- Python 3.10.x

### Environment Setup

Install the Python dependencies in the requirements.txt file

### Running

Run the server.py file in the data_manipulation directory
2 changes: 1 addition & 1 deletion data-process/data_manipulation/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
minio_secure = os.getenv('MINIO_SECURE', False)

# zhipuai api_key
zhipuai_api_key = os.getenv('ZHIPUAI_API_KEY', 'xxxxx')
zhipuai_api_key = os.getenv('ZHIPUAI_API_KEY', 'xxxxx')
36 changes: 17 additions & 19 deletions data-process/data_manipulation/file_handle/csv_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,13 @@
###

import csv
import ulid
import pandas as pd
import os
import logging
import os

from transform.text import (
clean_transform,
privacy_transform
)

from utils import (
date_time_utils,
file_utils
)
import pandas as pd
import ulid
from transform.text import clean_transform, privacy_transform
from utils import date_time_utils, file_utils

logger = logging.getLogger('csv_handle')

Expand All @@ -51,6 +44,8 @@
# content:
# 1) 基本功能实现
###


async def text_manipulate(opt={}):
logger.info("csv text manipulate!")

Expand All @@ -59,7 +54,7 @@ async def text_manipulate(opt={}):
处理某条数据时,如果某个方式(比如:去除不可见字符)处理失败了,则直接结束,不在处理,整个文件都视作处理失败
"""

try:
file_name = opt['file_name']
support_type = opt['support_type']
Expand Down Expand Up @@ -87,7 +82,6 @@ async def text_manipulate(opt={}):

text_data = clean_result['data']


# 将清洗后的文件保存为final
new_file_name = await file_utils.get_file_name({
'file_name': file_name,
Expand Down Expand Up @@ -159,7 +153,7 @@ async def data_clean(opt={}):
})

logger.info("csv text data clean stop!")

return {
'status': 200,
'message': '',
Expand All @@ -179,8 +173,8 @@ async def data_clean(opt={}):
###
async def remove_invisible_characters(opt={}):
return await clean_transform.remove_invisible_characters({
'text': opt['text']
})
'text': opt['text']
})

###
# 去除邮箱地址
Expand All @@ -192,10 +186,12 @@ async def remove_invisible_characters(opt={}):
# content:
# 1) 基本功能实现
###


async def remove_email(opt={}):
return await privacy_transform.remove_email({
'text': opt['text']
})
'text': opt['text']
})

###
# 将数据存到CSV中
Expand All @@ -207,6 +203,8 @@ async def remove_email(opt={}):
# content:
# 1) 基本功能实现
###


async def save_csv(opt={}):
file_name = opt['file_name']
phase_value = opt['phase_value']
Expand Down
42 changes: 15 additions & 27 deletions data-process/data_manipulation/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,22 @@
# 1) 基本功能实现
###

from sanic import Sanic
from sanic.response import json, text
from sanic_cors import CORS, cross_origin
from sanic.exceptions import NotFound

import asyncio
import aiohttp

import sys

import logging


from service import (
minio_store_process_service
)

from transform.text import (
support_type
)

from utils import (
log_utils
)
from sanic import Sanic
from sanic.response import json
from sanic_cors import CORS
from service import minio_store_process_service
from transform.text import support_type
from utils import log_utils

###
# 初始化日志配置
###
log_utils.init_config({
'source_type': 'manipulate_server'
'source_type': 'manipulate_server',
'log_dir': "log"
})


Expand All @@ -62,7 +48,7 @@
app = Sanic(name='data_manipulate')
CORS(app)

app.config['REQUEST_MAX_SIZE'] = 1024 * 1024 * 1024 # 1G
app.config['REQUEST_MAX_SIZE'] = 1024 * 1024 * 1024 # 1G
app.config['REQUEST_TIMEOUT'] = 60 * 60 * 60
app.config['RESPONSE_TIMEOUT'] = 60 * 60 * 60
app.config['KEEP_ALIVE_TIMEOUT'] = 60 * 60 * 60
Expand All @@ -77,6 +63,8 @@
# content:
# 1) 基本功能实现
###


@app.route('text-manipulate', methods=['POST'])
async def text_manipulate(request):
"""
Expand All @@ -87,7 +75,7 @@ async def text_manipulate(request):
file_path: 文本路径
Returns:
"""

await asyncio.create_task(
Expand All @@ -110,13 +98,14 @@ async def text_manipulate(request):
# content:
# 1) 基本功能实现
###


@app.route('text-process-type', methods=['POST'])
async def text_process_type(request):
"""
获取数据处理支持的类型
Args:
Returns:
json: 支持的类型
Expand All @@ -127,11 +116,10 @@ async def text_process_type(request):
'message': '',
'data': support_type.support_types
})


if __name__ == '__main__':
app.run(host='0.0.0.0',
port=28888,
access_log=True,
debug=True,
workers=2)
workers=2)
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,17 @@
# 1) 基本功能实现
###

from sanic.response import json, raw
from minio import Minio
from minio.commonconfig import Tags
from minio.error import S3Error
import pandas as pd
import io
import os

import logging
import os

from file_handle import (
csv_handle
)

from utils import (
minio_utils,
file_utils
)
import pandas as pd
from file_handle import csv_handle
from minio import Minio
from minio.commonconfig import Tags
from minio.error import S3Error
from sanic.response import json, raw
from utils import file_utils, minio_utils

logger = logging.getLogger('minio_store_process_service')

Expand All @@ -54,6 +47,8 @@
# content:
# 1) 基本功能实现
###


async def text_manipulate(request):

request_json = request.json
Expand All @@ -63,7 +58,7 @@ async def text_manipulate(request):

# create minio client
minio_client = await minio_utils.create_client()

# 查询存储桶下的所有对象
objects = minio_client.list_objects(bucket_name, prefix=folder_prefix)

Expand All @@ -81,9 +76,9 @@ async def text_manipulate(request):
if file_extension in ['csv']:
# 处理CSV文件
result = await csv_handle.text_manipulate({
'file_name': item,
'support_type': support_type
})
'file_name': item,
'support_type': support_type
})

# 将清洗后的文件上传到MinIO中
# 上传middle文件夹下的文件,并添加tag
Expand Down Expand Up @@ -112,7 +107,7 @@ async def text_manipulate(request):
for item in file_names:
remove_file_path = await file_utils.get_temp_file_path()
await file_utils.delete_file(remove_file_path + 'original/' + item)

return json({
'status': 200,
'message': '',
Expand All @@ -129,6 +124,8 @@ async def text_manipulate(request):
# content:
# 1) 基本功能实现
###


async def download(opt={}):
objects = opt['objects']
minio_client = opt['minio_client']
Expand Down Expand Up @@ -160,17 +157,21 @@ async def download(opt={}):
# content:
# 1) 基本功能实现
###


async def upload_files_to_minio_with_tags(minio_client, local_folder, minio_bucket, minio_prefix="", tags=None):
for root, dirs, files in os.walk(local_folder):
for file in files:
local_file_path = os.path.join(root, file)
minio_object_name = os.path.join(minio_prefix, os.path.relpath(local_file_path, local_folder))

minio_object_name = os.path.join(
minio_prefix, os.path.relpath(local_file_path, local_folder))

try:
minio_client.fput_object(minio_bucket, minio_object_name, local_file_path, tags=tags)

minio_client.fput_object(
minio_bucket, minio_object_name, local_file_path, tags=tags)

# 删除本地文件
await file_utils.delete_file(local_file_path)
except S3Error as e:
logger.error(f"Error uploading {minio_object_name} to {minio_bucket}: {e}")

logger.error(
f"Error uploading {minio_object_name} to {minio_bucket}: {e}")
Loading

0 comments on commit 413a91e

Please sign in to comment.