diff --git a/pypi/data-processing/requirements.txt b/pypi/data-processing/requirements.txt index 8e5391ec5..15a97a6f6 100644 --- a/pypi/data-processing/requirements.txt +++ b/pypi/data-processing/requirements.txt @@ -26,3 +26,4 @@ python-docx==1.1.0 bs4==0.0.1 playwright==1.40.0 pillow==10.2.0 +html2text==2020.1.16 diff --git a/pypi/data-processing/src/data_store_process/minio_store_process.py b/pypi/data-processing/src/data_store_process/minio_store_process.py index cc03afca2..69cbf9e14 100644 --- a/pypi/data-processing/src/data_store_process/minio_store_process.py +++ b/pypi/data-processing/src/data_store_process/minio_store_process.py @@ -27,14 +27,14 @@ data_process_document_db_operate, data_process_log_db_operate, data_process_stage_log_db_operate) -from file_handle import common_handle, pdf_handle, word_handle +from file_handle import common_handle, pdf_handle, web_handle, word_handle from kube import dataset_cr from utils import date_time_utils, file_utils, json_utils logger = logging.getLogger(__name__) -def text_manipulate( +async def text_manipulate( req_json, pool, id, @@ -147,7 +147,7 @@ def text_manipulate( file_extension = file_utils.get_file_extension(file_name) if file_extension in ["pdf"]: # 处理PDF文件 - result = pdf_handle.text_manipulate( + result = pdf_handle.pdf_manipulate( chunk_size=req_json.get("chunk_size"), chunk_overlap=req_json.get("chunk_overlap"), file_name=file_name, @@ -160,7 +160,19 @@ def text_manipulate( elif file_extension in ["docx"]: # 处理.docx文件 - result = word_handle.docx_text_manipulate( + result = word_handle.docx_manipulate( + chunk_size=req_json.get("chunk_size"), + chunk_overlap=req_json.get("chunk_overlap"), + file_name=file_name, + document_id=item.get("document_id"), + support_type=support_type, + conn_pool=pool, + task_id=id, + create_user=req_json["creator"], + ) + elif file_extension == "web": + # 处理.web文件 + result = await web_handle.web_manipulate( chunk_size=req_json.get("chunk_size"), chunk_overlap=req_json.get("chunk_overlap"), file_name=file_name, @@ -987,7 +999,7 @@ def _text_manipulate_retry_for_document(document, task_info, log_id, pool, creat document_type = document.get("document_type") if document_type in ["pdf"]: # 处理PDF文件 - result = pdf_handle.text_manipulate( + result = pdf_handle.pdf_manipulate( file_name=file_name, document_id=document.get("id"), support_type=support_type, @@ -998,7 +1010,7 @@ def _text_manipulate_retry_for_document(document, task_info, log_id, pool, creat elif document_type in ["docx"]: # 处理.docx文件 - result = word_handle.docx_text_manipulate( + result = word_handle.docx_manipulate( file_name=file_name, document_id=document.get("id"), support_type=support_type, diff --git a/pypi/data-processing/src/database_operate/data_process_detail_db_operate.py b/pypi/data-processing/src/database_operate/data_process_detail_db_operate.py index c40eddf00..29c2c80ee 100644 --- a/pypi/data-processing/src/database_operate/data_process_detail_db_operate.py +++ b/pypi/data-processing/src/database_operate/data_process_detail_db_operate.py @@ -422,6 +422,7 @@ def query_question_answer_list(document_id, pool): dptqa.question, dptqa.answer, dptdc.content, + dptdc.meta_info, dptdc.page_number from public.data_process_task_question_answer dptqa left join public.data_process_task_document_chunk dptdc diff --git a/pypi/data-processing/src/database_operate/dp_document_image_db_operate.py b/pypi/data-processing/src/database_operate/dp_document_image_db_operate.py index a23fcec77..a8bc57c30 100644 --- a/pypi/data-processing/src/database_operate/dp_document_image_db_operate.py +++ b/pypi/data-processing/src/database_operate/dp_document_image_db_operate.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from database_clients import postgresql_pool_client from utils import date_time_utils diff --git a/pypi/data-processing/src/document_loaders/async_playwright.py b/pypi/data-processing/src/document_loaders/async_playwright.py new file mode 100644 index 000000000..0fd6272da --- /dev/null +++ b/pypi/data-processing/src/document_loaders/async_playwright.py @@ -0,0 +1,246 @@ +# Copyright 2024 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import List + +from langchain_community.document_loaders.base import BaseLoader +from langchain_community.document_transformers import Html2TextTransformer +from langchain_core.documents import Document + +from common import log_tag_const + +logger = logging.getLogger(__name__) + +class AsyncPlaywrightLoader(BaseLoader): + """Scrape HTML pages from URLs using a + headless instance of the Chromium.""" + + def __init__( + self, + url: str, + max_count: int = 100, + max_depth: int = 1, + interval_time: int = 1, + ): + """ + Initialize the loader with a list of URL paths. + + Args: + url (str): Website url. + max_count (int): Maximum Number of Website URLs. + max_depth (int): Website Crawling Depth. + interval_time (int): Interval Time. + + Raises: + ImportError: If the required 'playwright' package is not installed. + """ + self.url = url + self.max_count = max_count + self.max_depth = max_depth + self.interval_time = interval_time + + try: + import playwright + except ImportError: + raise ImportError( + "playwright is required for AsyncPlaywrightLoader. " + "Please install it with `pip install playwright`." + ) + + async def ascrape_playwright(self, url: str) -> str: + """ + Asynchronously scrape the content of a given URL using Playwright's async API. + + Args: + url (str): The URL to scrape. + + Returns: + str: The scraped HTML content or an error message if an exception occurs. + + """ + from playwright.async_api import async_playwright + + logger.info("Starting scraping...") + results = "" + async with async_playwright() as p: + browser = await p.chromium.launch(headless=True) + try: + page = await browser.new_page() + await page.goto(url) + results = await page.content() # Simply get the HTML content + logger.info("Content scraped") + except Exception as e: + results = f"Error: {e}" + await browser.close() + return results + + async def load(self) -> List[Document]: + """ + Load and return all Documents from the provided URLs. + + Returns: + List[Document]: A list of Document objects + containing the scraped content from each URL. + + """ + docs = [] + all_url = await self.get_all_url() + for url in all_url: + html_content = await self.ascrape_playwright(url) + metadata = {"source": url} + docs.append(Document(page_content=html_content, metadata=metadata)) + + html2text = Html2TextTransformer() + docs_transformed = html2text.transform_documents(docs) + return docs_transformed + + async def get_all_url(self): + """ + Retrieve the URLs for Data Extraction from the Website. + + Args: + url (str): Website url. + max_count (int): Maximum Number of Website URLs. + max_depth (int): Website Crawling Depth. + interval_time (int): Interval Time. + + """ + logger.debug( + "".join( + [ + f"{log_tag_const.WEB_CRAWLING} Get all url in a web page\n", + f" url: {self.url}" + ] + ) + ) + + all_url = [self.url] + sub_urls = [self.url] + + try: + for i in range(1, self.max_depth): + for sub_url in sub_urls: + children_urls = await self._get_children_url( + url=sub_url, + max_count=self.max_count, + url_count=len(all_url) + ) + + if children_urls.get("status") == 200: + res = children_urls.get("data") + + # 避免重复的url + unique_urls = set(all_url) + unique_urls.update(res.get("children_url")) + all_url = list(unique_urls) + + # 如果达到最大数量限制,直接返回 + if res.get("url_count") >= self.max_count: + logger.info( + "".join( + [ + f"{log_tag_const.WEB_CRAWLING} The number of URLs has reached the upper limit.\n", + f" max_count: {self.max_count}\n" + ] + ) + ) + return all_url + + sub_urls = res.get("children_url") + # 时间间隔 + logger.info(f"{log_tag_const.WEB_CRAWLING} Wait for {self.interval_time} seconds before continuing the visit.") + time.sleep(self.interval_time) + return all_url + except Exception: + logger.error( + ''.join( + [ + f"{log_tag_const.WEB_CRAWLING} Execute crawling url failure\n", + f"The tracing error is: \n{traceback.format_exc()}" + ] + ) + ) + return all_url + + async def _get_children_url(self, url, url_count): + """ + Retrieve URLs contained in the website. + + Args: + url (str): Website url. + url_count (int): URL count. + + """ + logger.debug( + "".join( + [ + f"{log_tag_const.WEB_CRAWLING} Get sub url in a web page\n", + f" url: {url}\n", + f" max_count: {self.max_count}\n", + f" url_count: {url_count}" + ] + ) + ) + + try: + children_url = [] + async with async_playwright() as p: + browser = await p.chromium.launch() + context = await browser.new_context() + page = await context.new_page() + + # 在浏览器中打开网页 + await page.goto(url) + + # 提取每个 a 标签的 href 属性 + links = await page.query_selector_all('a') + for link in links: + href = await link.get_attribute('href') + # 需要抓取的url数量不得超过最大数量 + if url_count >= self.max_count: + logger.info( + "".join( + [ + f"{log_tag_const.WEB_CRAWLING} The number of URLs has reached the upper limit.\n", + f" max_count: {self.max_count}\n", + f" url_count: {url_count}" + ] + ) + ) + break + + # 获取以http开头的url 并排除已存在的url + if href: + if href.startswith("http") and href not in children_url: + children_url.append(href) + url_count += 1 + + # 关闭浏览器 + await browser.close() + data = { + "children_url": children_url, + "url_count": url_count + } + return {"status": 200, "message": "", "data": data} + except Exception: + logger.error( + ''.join( + [ + f"{log_tag_const.WEB_CRAWLING} Execute crawling url failure\n", + f"The tracing error is: \n{traceback.format_exc()}" + ] + ) + ) + return {"status": 500, "message": "获取网页中的子网页url失败", "data": ""} diff --git a/pypi/data-processing/src/document_loaders/base.py b/pypi/data-processing/src/document_loaders/base.py new file mode 100644 index 000000000..a31e22eb5 --- /dev/null +++ b/pypi/data-processing/src/document_loaders/base.py @@ -0,0 +1,28 @@ +# Copyright 2024 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import List + +from langchain_core.documents import Document + +class BaseLoader(ABC): + """Interface for Document Loader. + + The `load` method will remain as is for backwards compatibility. + """ + + @abstractmethod + def load(self) -> List[Document]: + """Load data into Document objects.""" diff --git a/pypi/data-processing/src/file_handle/common_handle.py b/pypi/data-processing/src/file_handle/common_handle.py index 1f63359ad..56aaa0533 100644 --- a/pypi/data-processing/src/file_handle/common_handle.py +++ b/pypi/data-processing/src/file_handle/common_handle.py @@ -29,7 +29,7 @@ from llm_api_service.qa_provider_zhi_pu_ai_online import \ QAProviderZhiPuAIOnline from transform.text import clean_transform, privacy_transform -from utils import csv_utils, date_time_utils, file_utils +from utils import csv_utils, date_time_utils, file_utils, json_utils logger = logging.getLogger(__name__) @@ -126,11 +126,18 @@ def text_manipulate( qa_data_dict = [["q", "a", "file_name", "page_number", "chunk_content"]] for item in qa_list.get("data"): + meta_info = item.get("meta_info") + if meta_info: + meta_json = json_utils.loads(meta_info) + meta_source = meta_json.get("source") + else: + meta_source = item.get("file_name") + qa_data_dict.append( [ item.get("question"), item.get("answer"), - item.get("file_name"), + meta_source, item.get("page_number"), item.get("content"), ] diff --git a/pypi/data-processing/src/file_handle/pdf_handle.py b/pypi/data-processing/src/file_handle/pdf_handle.py index e460cf9b7..28f6e9cfc 100644 --- a/pypi/data-processing/src/file_handle/pdf_handle.py +++ b/pypi/data-processing/src/file_handle/pdf_handle.py @@ -29,7 +29,7 @@ logger = logging.getLogger(__name__) -def text_manipulate( +def pdf_manipulate( file_name, document_id, support_type, diff --git a/pypi/data-processing/src/file_handle/web_handle.py b/pypi/data-processing/src/file_handle/web_handle.py new file mode 100644 index 000000000..460fbd6a8 --- /dev/null +++ b/pypi/data-processing/src/file_handle/web_handle.py @@ -0,0 +1,139 @@ +# Copyright 2024 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import traceback + +import ujson +import ulid +from langchain.text_splitter import SpacyTextSplitter + +from common import log_tag_const +from common.config import config +from database_operate import data_process_document_chunk_db_operate +from document_loaders.async_playwright import AsyncPlaywrightLoader +from file_handle import common_handle +from utils import file_utils, json_utils + +logger = logging.getLogger(__name__) + +async def web_manipulate( + file_name, + document_id, + support_type, + conn_pool, + task_id, + create_user, + chunk_size=None, + chunk_overlap=None, +): + """Manipulate the text content from a web file. + + file_name: file name; + support_type: support type; + conn_pool: database connection pool; + task_id: data process task id; + chunk_size: chunk size; + chunk_overlap: chunk overlap; + """ + logger.debug(f"{log_tag_const.PDF_HANDLE} Start to manipulate the text in web") + + try: + pdf_file_path = file_utils.get_temp_file_path() + file_path = pdf_file_path + "original/" + file_name + + # Text splitter + documents = await _get_documents_by_langchain( + chunk_size=chunk_size, chunk_overlap=chunk_overlap, file_path=file_path + ) + + # step 2 + # save all chunk info to database + all_document_for_process = [] + for document in documents: + chunck_id = ulid.ulid() + content = document.page_content.replace("\n", "") + chunk_insert_item = { + "id": chunck_id, + "document_id": document_id, + "task_id": task_id, + "status": "not_start", + "content": content, + "meta_info": json_utils.dumps(document.metadata), + "page_number": "1", + "creator": create_user, + } + all_document_for_process.append(chunk_insert_item) + + data_process_document_chunk_db_operate.add( + chunk_insert_item, pool=conn_pool + ) + + response = common_handle.text_manipulate( + file_name=file_name, + all_document_for_process=all_document_for_process, + support_type=support_type, + conn_pool=conn_pool, + create_user=create_user, + ) + + return response + except Exception as ex: + logger.error( + "".join( + [ + f"{log_tag_const.PDF_HANDLE} There is an error when manipulate ", + f"the text in pdf handler. \n{traceback.format_exc()}", + ] + ) + ) + logger.debug(f"{log_tag_const.PDF_HANDLE} Finish manipulating the text in pdf") + return {"status": 400, "message": str(ex), "data": traceback.format_exc()} + + +async def _get_documents_by_langchain(chunk_size, chunk_overlap, file_path): + # Split the text. + if chunk_size is None: + chunk_size = config.knowledge_chunk_size + + if chunk_overlap is None: + chunk_overlap = config.knowledge_chunk_overlap + + with open(file_path, "r", encoding="utf-8") as file: + # 读取文件内容 + file_content = file.read() + + web_content = ujson.loads(file_content) + url = web_content.get("url") + interval_time = web_content.get("interval_time") + max_depth = web_content.get("max_depth") + max_count = web_content.get("max_count") + + loader = AsyncPlaywrightLoader( + url=url, + max_count=max_count, + max_depth=max_depth, + interval_time=interval_time, + ) + docs = await loader.load() + + text_splitter = SpacyTextSplitter( + separator="\n\n", + pipeline="zh_core_web_sm", + chunk_size=int(chunk_size), + chunk_overlap=int(chunk_overlap), + ) + documents = text_splitter.split_documents(docs) + + return documents \ No newline at end of file diff --git a/pypi/data-processing/src/file_handle/word_handle.py b/pypi/data-processing/src/file_handle/word_handle.py index 0076cc3bc..5251a1299 100644 --- a/pypi/data-processing/src/file_handle/word_handle.py +++ b/pypi/data-processing/src/file_handle/word_handle.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) -def docx_text_manipulate( +def docx_manipulate( file_name, document_id, support_type, diff --git a/pypi/data-processing/src/llm_api_service/qa_provider_zhi_pu_ai_online.py b/pypi/data-processing/src/llm_api_service/qa_provider_zhi_pu_ai_online.py index f987bb6e9..27ef76c0b 100644 --- a/pypi/data-processing/src/llm_api_service/qa_provider_zhi_pu_ai_online.py +++ b/pypi/data-processing/src/llm_api_service/qa_provider_zhi_pu_ai_online.py @@ -90,7 +90,7 @@ def generate_qa_list( result = self.__format_response_to_qa_list(response) if len(result) > 0: break - + logger.warn( f"failed to get QA list, wait for {wait_seconds} seconds and retry" ) diff --git a/pypi/data-processing/src/service/data_process_service.py b/pypi/data-processing/src/service/data_process_service.py index 5c895654a..c70c5413d 100644 --- a/pypi/data-processing/src/service/data_process_service.py +++ b/pypi/data-processing/src/service/data_process_service.py @@ -73,7 +73,7 @@ def add(req_json, pool): try: async def async_text_manipulate(req_json, pool, id): - minio_store_process.text_manipulate(req_json, pool=pool, id=id) + await minio_store_process.text_manipulate(req_json, pool=pool, id=id) def execute_text_manipulate_task(loop): asyncio.set_event_loop(loop) diff --git a/pypi/data-processing/src/utils/json_utils.py b/pypi/data-processing/src/utils/json_utils.py index 131772c0b..962a0d410 100644 --- a/pypi/data-processing/src/utils/json_utils.py +++ b/pypi/data-processing/src/utils/json_utils.py @@ -40,3 +40,10 @@ def dumps( escape_forward_slashes=escape_forward_slashes, ) +def loads( + data, +): + return ujson.loads( + data, + ) + diff --git a/pypi/data-processing/src/utils/web_url_utils.py b/pypi/data-processing/src/utils/web_url_utils.py index 8e2265163..fd118c4a7 100644 --- a/pypi/data-processing/src/utils/web_url_utils.py +++ b/pypi/data-processing/src/utils/web_url_utils.py @@ -530,7 +530,7 @@ def filter_image(url, resource_types, exclude_img_info): ) return False # 如果format无法获取,则默认为JPEG格式 - format = image.format or 'JPEG' + format = image.format or 'JPEG' if format.lower() not in resource_types: logger.debug(f"{log_tag_const.WEB_CRAWLING} Not within the range of resource types to be crawled") return False