Skip to content

Commit

Permalink
Merge pull request kubeagi#648 from wangxinbiao/main
Browse files Browse the repository at this point in the history
feat:Splitting website data
  • Loading branch information
bjwswang authored Jan 29, 2024
2 parents ddfcce3 + fe1190d commit a57e6b5
Show file tree
Hide file tree
Showing 14 changed files with 454 additions and 14 deletions.
1 change: 1 addition & 0 deletions pypi/data-processing/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ python-docx==1.1.0
bs4==0.0.1
playwright==1.40.0
pillow==10.2.0
html2text==2020.1.16
24 changes: 18 additions & 6 deletions pypi/data-processing/src/data_store_process/minio_store_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
data_process_document_db_operate,
data_process_log_db_operate,
data_process_stage_log_db_operate)
from file_handle import common_handle, pdf_handle, word_handle
from file_handle import common_handle, pdf_handle, web_handle, word_handle
from kube import dataset_cr
from utils import date_time_utils, file_utils, json_utils

logger = logging.getLogger(__name__)


def text_manipulate(
async def text_manipulate(
req_json,
pool,
id,
Expand Down Expand Up @@ -147,7 +147,7 @@ def text_manipulate(
file_extension = file_utils.get_file_extension(file_name)
if file_extension in ["pdf"]:
# 处理PDF文件
result = pdf_handle.text_manipulate(
result = pdf_handle.pdf_manipulate(
chunk_size=req_json.get("chunk_size"),
chunk_overlap=req_json.get("chunk_overlap"),
file_name=file_name,
Expand All @@ -160,7 +160,19 @@ def text_manipulate(

elif file_extension in ["docx"]:
# 处理.docx文件
result = word_handle.docx_text_manipulate(
result = word_handle.docx_manipulate(
chunk_size=req_json.get("chunk_size"),
chunk_overlap=req_json.get("chunk_overlap"),
file_name=file_name,
document_id=item.get("document_id"),
support_type=support_type,
conn_pool=pool,
task_id=id,
create_user=req_json["creator"],
)
elif file_extension == "web":
# 处理.web文件
result = await web_handle.web_manipulate(
chunk_size=req_json.get("chunk_size"),
chunk_overlap=req_json.get("chunk_overlap"),
file_name=file_name,
Expand Down Expand Up @@ -987,7 +999,7 @@ def _text_manipulate_retry_for_document(document, task_info, log_id, pool, creat
document_type = document.get("document_type")
if document_type in ["pdf"]:
# 处理PDF文件
result = pdf_handle.text_manipulate(
result = pdf_handle.pdf_manipulate(
file_name=file_name,
document_id=document.get("id"),
support_type=support_type,
Expand All @@ -998,7 +1010,7 @@ def _text_manipulate_retry_for_document(document, task_info, log_id, pool, creat

elif document_type in ["docx"]:
# 处理.docx文件
result = word_handle.docx_text_manipulate(
result = word_handle.docx_manipulate(
file_name=file_name,
document_id=document.get("id"),
support_type=support_type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ def query_question_answer_list(document_id, pool):
dptqa.question,
dptqa.answer,
dptdc.content,
dptdc.meta_info,
dptdc.page_number
from public.data_process_task_question_answer dptqa
left join public.data_process_task_document_chunk dptdc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from database_clients import postgresql_pool_client
from utils import date_time_utils

Expand Down
246 changes: 246 additions & 0 deletions pypi/data-processing/src/document_loaders/async_playwright.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
# Copyright 2024 KubeAGI.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import List

from langchain_community.document_loaders.base import BaseLoader
from langchain_community.document_transformers import Html2TextTransformer
from langchain_core.documents import Document

from common import log_tag_const

logger = logging.getLogger(__name__)

class AsyncPlaywrightLoader(BaseLoader):
"""Scrape HTML pages from URLs using a
headless instance of the Chromium."""

def __init__(
self,
url: str,
max_count: int = 100,
max_depth: int = 1,
interval_time: int = 1,
):
"""
Initialize the loader with a list of URL paths.
Args:
url (str): Website url.
max_count (int): Maximum Number of Website URLs.
max_depth (int): Website Crawling Depth.
interval_time (int): Interval Time.
Raises:
ImportError: If the required 'playwright' package is not installed.
"""
self.url = url
self.max_count = max_count
self.max_depth = max_depth
self.interval_time = interval_time

try:
import playwright
except ImportError:
raise ImportError(
"playwright is required for AsyncPlaywrightLoader. "
"Please install it with `pip install playwright`."
)

async def ascrape_playwright(self, url: str) -> str:
"""
Asynchronously scrape the content of a given URL using Playwright's async API.
Args:
url (str): The URL to scrape.
Returns:
str: The scraped HTML content or an error message if an exception occurs.
"""
from playwright.async_api import async_playwright

logger.info("Starting scraping...")
results = ""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
try:
page = await browser.new_page()
await page.goto(url)
results = await page.content() # Simply get the HTML content
logger.info("Content scraped")
except Exception as e:
results = f"Error: {e}"
await browser.close()
return results

async def load(self) -> List[Document]:
"""
Load and return all Documents from the provided URLs.
Returns:
List[Document]: A list of Document objects
containing the scraped content from each URL.
"""
docs = []
all_url = await self.get_all_url()
for url in all_url:
html_content = await self.ascrape_playwright(url)
metadata = {"source": url}
docs.append(Document(page_content=html_content, metadata=metadata))

html2text = Html2TextTransformer()
docs_transformed = html2text.transform_documents(docs)
return docs_transformed

async def get_all_url(self):
"""
Retrieve the URLs for Data Extraction from the Website.
Args:
url (str): Website url.
max_count (int): Maximum Number of Website URLs.
max_depth (int): Website Crawling Depth.
interval_time (int): Interval Time.
"""
logger.debug(
"".join(
[
f"{log_tag_const.WEB_CRAWLING} Get all url in a web page\n",
f" url: {self.url}"
]
)
)

all_url = [self.url]
sub_urls = [self.url]

try:
for i in range(1, self.max_depth):
for sub_url in sub_urls:
children_urls = await self._get_children_url(
url=sub_url,
max_count=self.max_count,
url_count=len(all_url)
)

if children_urls.get("status") == 200:
res = children_urls.get("data")

# 避免重复的url
unique_urls = set(all_url)
unique_urls.update(res.get("children_url"))
all_url = list(unique_urls)

# 如果达到最大数量限制,直接返回
if res.get("url_count") >= self.max_count:
logger.info(
"".join(
[
f"{log_tag_const.WEB_CRAWLING} The number of URLs has reached the upper limit.\n",
f" max_count: {self.max_count}\n"
]
)
)
return all_url

sub_urls = res.get("children_url")
# 时间间隔
logger.info(f"{log_tag_const.WEB_CRAWLING} Wait for {self.interval_time} seconds before continuing the visit.")
time.sleep(self.interval_time)
return all_url
except Exception:
logger.error(
''.join(
[
f"{log_tag_const.WEB_CRAWLING} Execute crawling url failure\n",
f"The tracing error is: \n{traceback.format_exc()}"
]
)
)
return all_url

async def _get_children_url(self, url, url_count):
"""
Retrieve URLs contained in the website.
Args:
url (str): Website url.
url_count (int): URL count.
"""
logger.debug(
"".join(
[
f"{log_tag_const.WEB_CRAWLING} Get sub url in a web page\n",
f" url: {url}\n",
f" max_count: {self.max_count}\n",
f" url_count: {url_count}"
]
)
)

try:
children_url = []
async with async_playwright() as p:
browser = await p.chromium.launch()
context = await browser.new_context()
page = await context.new_page()

# 在浏览器中打开网页
await page.goto(url)

# 提取每个 a 标签的 href 属性
links = await page.query_selector_all('a')
for link in links:
href = await link.get_attribute('href')
# 需要抓取的url数量不得超过最大数量
if url_count >= self.max_count:
logger.info(
"".join(
[
f"{log_tag_const.WEB_CRAWLING} The number of URLs has reached the upper limit.\n",
f" max_count: {self.max_count}\n",
f" url_count: {url_count}"
]
)
)
break

# 获取以http开头的url 并排除已存在的url
if href:
if href.startswith("http") and href not in children_url:
children_url.append(href)
url_count += 1

# 关闭浏览器
await browser.close()
data = {
"children_url": children_url,
"url_count": url_count
}
return {"status": 200, "message": "", "data": data}
except Exception:
logger.error(
''.join(
[
f"{log_tag_const.WEB_CRAWLING} Execute crawling url failure\n",
f"The tracing error is: \n{traceback.format_exc()}"
]
)
)
return {"status": 500, "message": "获取网页中的子网页url失败", "data": ""}
28 changes: 28 additions & 0 deletions pypi/data-processing/src/document_loaders/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2024 KubeAGI.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import List

from langchain_core.documents import Document

class BaseLoader(ABC):
"""Interface for Document Loader.
The `load` method will remain as is for backwards compatibility.
"""

@abstractmethod
def load(self) -> List[Document]:
"""Load data into Document objects."""
11 changes: 9 additions & 2 deletions pypi/data-processing/src/file_handle/common_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from llm_api_service.qa_provider_zhi_pu_ai_online import \
QAProviderZhiPuAIOnline
from transform.text import clean_transform, privacy_transform
from utils import csv_utils, date_time_utils, file_utils
from utils import csv_utils, date_time_utils, file_utils, json_utils

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -126,11 +126,18 @@ def text_manipulate(

qa_data_dict = [["q", "a", "file_name", "page_number", "chunk_content"]]
for item in qa_list.get("data"):
meta_info = item.get("meta_info")
if meta_info:
meta_json = json_utils.loads(meta_info)
meta_source = meta_json.get("source")
else:
meta_source = item.get("file_name")

qa_data_dict.append(
[
item.get("question"),
item.get("answer"),
item.get("file_name"),
meta_source,
item.get("page_number"),
item.get("content"),
]
Expand Down
2 changes: 1 addition & 1 deletion pypi/data-processing/src/file_handle/pdf_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
logger = logging.getLogger(__name__)


def text_manipulate(
def pdf_manipulate(
file_name,
document_id,
support_type,
Expand Down
Loading

0 comments on commit a57e6b5

Please sign in to comment.