From 5c7b4f586b6505a83a0dcbf797780833fe9b2480 Mon Sep 17 00:00:00 2001 From: Roman Isecke <136338424+rbiseck3@users.noreply.github.com> Date: Tue, 26 Sep 2023 19:24:21 -0400 Subject: [PATCH] Roman/azure cognitive embeddings (#1524) ### Description This PR is two-fold: **Embeddings:** * Embeddings incorporated into the sharepoint source connector, which will now call out to OpenAI and create embeddings if the flag is passed in and the api key provided. **Writing vector content (embeddings) to Azure cognitive search index:** * The schema for the index expected to exist in Azure has been updated to include the vector field type and a test script has been added to test the new content being produced from the Sharepoint connector to push the embedding content. Some important notes about other changes in here: * The embedding code had to be updated to patch the `to_dict` method on elements to add `embeddings` to the dict output if that was added. While the code originally added the embedding content, when `to_dict` was called to save the content as json, this was lost. --- CHANGELOG.md | 1 + docs/requirements.txt | 17 ++- .../azure_cognitive_sample_index_schema.json | 22 +++- requirements/base.txt | 2 +- requirements/build.txt | 2 +- requirements/constraints.in | 2 + requirements/dev.txt | 2 +- requirements/extra-pdf-image.txt | 2 +- requirements/huggingface.txt | 2 +- requirements/ingest-azure.txt | 2 +- requirements/ingest-delta-table.txt | 2 +- requirements/ingest-notion.txt | 20 +-- requirements/ingest-openai.in | 5 + requirements/ingest-openai.txt | 115 ++++++++++++++++++ setup.py | 1 + .../files/azure_cognitive_index_schema.json | 26 +++- .../test-ingest-azure-cognitive-search.sh | 46 ++++--- .../test-ingest-sharepoint-embed-cog-index.sh | 113 +++++++++++++++++ unstructured/embed/openai.py | 23 +++- .../ingest/cli/cmds/azure_cognitive_search.py | 33 +++-- unstructured/ingest/cli/cmds/sharepoint.py | 14 ++- unstructured/ingest/cli/interfaces.py | 45 ++++++- .../connector/azure_cognitive_search.py | 1 - unstructured/ingest/connector/registry.py | 6 +- unstructured/ingest/connector/sharepoint.py | 25 ++-- unstructured/ingest/interfaces.py | 35 ++++-- unstructured/ingest/runner/__init__.py | 6 +- unstructured/ingest/runner/base_runner.py | 19 +++ unstructured/ingest/runner/sharepoint.py | 103 ++++++++-------- unstructured/ingest/runner/writers.py | 2 + unstructured/staging/base.py | 2 +- 31 files changed, 569 insertions(+), 127 deletions(-) create mode 100644 requirements/ingest-openai.in create mode 100644 requirements/ingest-openai.txt create mode 100755 test_unstructured_ingest/test-ingest-sharepoint-embed-cog-index.sh create mode 100644 unstructured/ingest/runner/base_runner.py diff --git a/CHANGELOG.md b/CHANGELOG.md index b634d276f7..81a04a1890 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * **Improves salesforce partitioning** Partitions Salesforce data as xlm instead of text for improved detail and flexibility. Partitions htmlbody instead of textbody for Salesforce emails. Importance: Allows all Salesforce fields to be ingested and gives Salesforce emails more detailed partitioning. * **Add document level language detection functionality.** Introduces the "auto" default for the languages param, which then detects the languages present in the document using the `langdetect` package. Adds the document languages as ISO 639-3 codes to the element metadata. Implemented only for the partition_text function to start. * **PPTX partitioner refactored in preparation for enhancement.** Behavior should be unchanged except that shapes enclosed in a group-shape are now included, as many levels deep as required (a group-shape can itself contain a group-shape). +* **Embeddings support for the SharePoint SourceConnector via unstructured-ingest CLI** The SharePoint connector can now optionally create embeddings from the elements it pulls out during partition and upload those embeddings to Azure Cognitive Search index. ### Features diff --git a/docs/requirements.txt b/docs/requirements.txt index 02ca879ba6..99386eb5a7 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -24,6 +24,7 @@ charset-normalizer==3.2.0 # requests docutils==0.18.1 # via + # myst-parser # sphinx # sphinx-rtd-theme # sphinx-tabs @@ -38,10 +39,21 @@ imagesize==1.4.1 importlib-metadata==6.8.0 # via sphinx jinja2==3.1.2 - # via sphinx + # via + # myst-parser + # sphinx +markdown-it-py==3.0.0 + # via + # mdit-py-plugins + # myst-parser markupsafe==2.1.3 # via jinja2 +mdit-py-plugins==0.4.0 + # via myst-parser +mdurl==0.1.2 + # via markdown-it-py myst-parser==2.0.0 + # via -r requirements/build.in packaging==23.1 # via # -c requirements/base.txt @@ -53,6 +65,8 @@ pygments==2.16.1 # sphinx-tabs pytz==2023.3.post1 # via babel +pyyaml==6.0.1 + # via myst-parser requests==2.31.0 # via # -c requirements/base.txt @@ -67,6 +81,7 @@ sphinx==6.2.1 # via # -r requirements/build.in # furo + # myst-parser # sphinx-basic-ng # sphinx-rtd-theme # sphinx-tabs diff --git a/docs/source/destination_connectors/azure_cognitive_sample_index_schema.json b/docs/source/destination_connectors/azure_cognitive_sample_index_schema.json index 85bb077db6..3b6e55e568 100644 --- a/docs/source/destination_connectors/azure_cognitive_sample_index_schema.json +++ b/docs/source/destination_connectors/azure_cognitive_sample_index_schema.json @@ -16,6 +16,12 @@ "name": "text", "type": "Edm.String" }, + { + "name": "embeddings", + "type": "Collection(Edm.Single)", + "dimensions": 400, + "vectorSearchConfiguration": "embeddings-config" + }, { "name": "type", "type": "Edm.String" @@ -162,5 +168,19 @@ } ] } - ] + ], + "vectorSearch": { + "algorithmConfigurations": [ + { + "name": "embeddings-config", + "kind": "hnsw", + "hnswParameters": { + "metric": "cosine", + "m": 4, + "efConstruction": 400, + "efSearch": 500 + } + } + ] + } } diff --git a/requirements/base.txt b/requirements/base.txt index 332c8d04da..62be4fd36e 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -16,7 +16,7 @@ charset-normalizer==3.2.0 # via requests click==8.1.7 # via nltk -dataclasses-json==0.6.0 +dataclasses-json==0.6.1 # via -r requirements/base.in emoji==2.8.0 # via -r requirements/base.in diff --git a/requirements/build.txt b/requirements/build.txt index b2b9169245..99386eb5a7 100644 --- a/requirements/build.txt +++ b/requirements/build.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with Python 3.8 # by the following command: # -# pip-compile --config=pyproject.toml requirements/build.in +# pip-compile requirements/build.in # alabaster==0.7.13 # via sphinx diff --git a/requirements/constraints.in b/requirements/constraints.in index 62c11fa7ef..59f1d35dc2 100644 --- a/requirements/constraints.in +++ b/requirements/constraints.in @@ -39,3 +39,5 @@ matplotlib==3.7.2 # NOTE(crag) - pin to available pandas for python 3.8 (at least in CI) fsspec==2023.9.1 pandas<2.0.4 +# langchain limits this to 3.1.7 +anyio==3.1.7 diff --git a/requirements/dev.txt b/requirements/dev.txt index dd3847e40e..f785ea00bd 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -360,7 +360,7 @@ tornado==6.3.3 # jupyterlab # notebook # terminado -traitlets==5.10.0 +traitlets==5.10.1 # via # comm # ipykernel diff --git a/requirements/extra-pdf-image.txt b/requirements/extra-pdf-image.txt index ab2e702d17..6c60f99dee 100644 --- a/requirements/extra-pdf-image.txt +++ b/requirements/extra-pdf-image.txt @@ -41,7 +41,7 @@ fsspec==2023.9.1 # via # -c requirements/constraints.in # huggingface-hub -huggingface-hub==0.17.2 +huggingface-hub==0.17.3 # via # timm # transformers diff --git a/requirements/huggingface.txt b/requirements/huggingface.txt index a0a1047643..3bbc556b4e 100644 --- a/requirements/huggingface.txt +++ b/requirements/huggingface.txt @@ -26,7 +26,7 @@ fsspec==2023.9.1 # via # -c requirements/constraints.in # huggingface-hub -huggingface-hub==0.17.2 +huggingface-hub==0.17.3 # via transformers idna==3.4 # via diff --git a/requirements/ingest-azure.txt b/requirements/ingest-azure.txt index d8e1d1978c..e9eadb8deb 100644 --- a/requirements/ingest-azure.txt +++ b/requirements/ingest-azure.txt @@ -23,7 +23,7 @@ azure-datalake-store==0.0.53 # via adlfs azure-identity==1.14.0 # via adlfs -azure-storage-blob==12.18.1 +azure-storage-blob==12.18.2 # via adlfs certifi==2023.7.22 # via diff --git a/requirements/ingest-delta-table.txt b/requirements/ingest-delta-table.txt index 26efef8659..2c83b64e9a 100644 --- a/requirements/ingest-delta-table.txt +++ b/requirements/ingest-delta-table.txt @@ -4,7 +4,7 @@ # # pip-compile requirements/ingest-delta-table.in # -deltalake==0.10.2 +deltalake==0.11.0 # via -r requirements/ingest-delta-table.in fsspec==2023.9.1 # via diff --git a/requirements/ingest-notion.txt b/requirements/ingest-notion.txt index 850b1d0256..fadccceea2 100644 --- a/requirements/ingest-notion.txt +++ b/requirements/ingest-notion.txt @@ -4,33 +4,33 @@ # # pip-compile requirements/ingest-notion.in # -anyio==4.0.0 - # via httpcore certifi==2023.7.22 # via # -c requirements/base.txt # -c requirements/constraints.in - # httpcore # httpx -exceptiongroup==1.1.3 - # via anyio -h11==0.14.0 +charset-normalizer==3.2.0 + # via + # -c requirements/base.txt + # httpx +h11==0.12.0 # via httpcore htmlbuilder==1.0.0 # via -r requirements/ingest-notion.in -httpcore==0.18.0 +httpcore==0.13.3 # via httpx -httpx==0.25.0 +httpx==0.20.0 # via notion-client idna==3.4 # via # -c requirements/base.txt - # anyio # httpx + # rfc3986 notion-client==2.0.0 # via -r requirements/ingest-notion.in +rfc3986[idna2008]==1.5.0 + # via httpx sniffio==1.3.0 # via - # anyio # httpcore # httpx diff --git a/requirements/ingest-openai.in b/requirements/ingest-openai.in new file mode 100644 index 0000000000..e7b0369f0d --- /dev/null +++ b/requirements/ingest-openai.in @@ -0,0 +1,5 @@ +-c constraints.in +-c base.txt +langchain +tiktoken +openai diff --git a/requirements/ingest-openai.txt b/requirements/ingest-openai.txt new file mode 100644 index 0000000000..c6620e6579 --- /dev/null +++ b/requirements/ingest-openai.txt @@ -0,0 +1,115 @@ +# +# This file is autogenerated by pip-compile with Python 3.8 +# by the following command: +# +# pip-compile requirements/ingest-openai.in +# +aiohttp==3.8.5 + # via + # langchain + # openai +aiosignal==1.3.1 + # via aiohttp +async-timeout==4.0.3 + # via + # aiohttp + # langchain +attrs==23.1.0 + # via aiohttp +certifi==2023.7.22 + # via + # -c requirements/base.txt + # -c requirements/constraints.in + # requests +charset-normalizer==3.2.0 + # via + # -c requirements/base.txt + # aiohttp + # requests +dataclasses-json==0.6.1 + # via + # -c requirements/base.txt + # langchain +frozenlist==1.4.0 + # via + # aiohttp + # aiosignal +idna==3.4 + # via + # -c requirements/base.txt + # requests + # yarl +langchain==0.0.298 + # via -r requirements/ingest-openai.in +langsmith==0.0.40 + # via langchain +marshmallow==3.20.1 + # via + # -c requirements/base.txt + # dataclasses-json +multidict==6.0.4 + # via + # aiohttp + # yarl +mypy-extensions==1.0.0 + # via + # -c requirements/base.txt + # typing-inspect +numexpr==2.8.6 + # via langchain +numpy==1.24.4 + # via + # -c requirements/constraints.in + # langchain + # numexpr +openai==0.28.1 + # via -r requirements/ingest-openai.in +packaging==23.1 + # via + # -c requirements/base.txt + # marshmallow +pydantic==1.10.12 + # via + # -c requirements/constraints.in + # langchain + # langsmith +pyyaml==6.0.1 + # via langchain +regex==2023.8.8 + # via + # -c requirements/base.txt + # tiktoken +requests==2.31.0 + # via + # -c requirements/base.txt + # langchain + # langsmith + # openai + # tiktoken +sqlalchemy==2.0.21 + # via langchain +tenacity==8.2.3 + # via langchain +tiktoken==0.5.1 + # via -r requirements/ingest-openai.in +tqdm==4.66.1 + # via + # -c requirements/base.txt + # openai +typing-extensions==4.8.0 + # via + # -c requirements/base.txt + # pydantic + # sqlalchemy + # typing-inspect +typing-inspect==0.9.0 + # via + # -c requirements/base.txt + # dataclasses-json +urllib3==1.26.16 + # via + # -c requirements/base.txt + # -c requirements/constraints.in + # requests +yarl==1.9.2 + # via aiohttp diff --git a/setup.py b/setup.py index 4b08e3c074..cbde874f62 100644 --- a/setup.py +++ b/setup.py @@ -157,6 +157,7 @@ def load_requirements(file_list: Optional[Union[str, List[str]]] = None) -> List "huggingface": load_requirements("requirements/huggingface.in"), "local-inference": all_doc_reqs, "paddleocr": load_requirements("requirements/extra-paddleocr.in"), + "openai": load_requirements("requirements/ingest-openai.in"), }, package_dir={"unstructured": "unstructured"}, package_data={"unstructured": ["nlp/*.txt"]}, diff --git a/test_unstructured_ingest/files/azure_cognitive_index_schema.json b/test_unstructured_ingest/files/azure_cognitive_index_schema.json index c15cbcc73f..2abdc7b1d4 100644 --- a/test_unstructured_ingest/files/azure_cognitive_index_schema.json +++ b/test_unstructured_ingest/files/azure_cognitive_index_schema.json @@ -15,6 +15,12 @@ "name": "text", "type": "Edm.String" }, + { + "name": "embeddings", + "type": "Collection(Edm.Single)", + "dimensions": 1536, + "vectorSearchConfiguration": "embeddings-config" + }, { "name": "type", "type": "Edm.String" @@ -107,6 +113,10 @@ "name": "page_number", "type": "Edm.String" }, + { + "name": "page_name", + "type": "Edm.String" + }, { "name": "url", "type": "Edm.String" @@ -161,5 +171,19 @@ } ] } - ] + ], + "vectorSearch": { + "algorithmConfigurations": [ + { + "name": "embeddings-config", + "kind": "hnsw", + "hnswParameters": { + "metric": "cosine", + "m": 4, + "efConstruction": 400, + "efSearch": 500 + } + } + ] + } } diff --git a/test_unstructured_ingest/test-ingest-azure-cognitive-search.sh b/test_unstructured_ingest/test-ingest-azure-cognitive-search.sh index c40bccc561..18f638e375 100755 --- a/test_unstructured_ingest/test-ingest-azure-cognitive-search.sh +++ b/test_unstructured_ingest/test-ingest-azure-cognitive-search.sh @@ -4,11 +4,13 @@ set -e SCRIPT_DIR=$(dirname "$(realpath "$0")") cd "$SCRIPT_DIR"/.. || exit 1 -OUTPUT_FOLDER_NAME=s3 +OUTPUT_FOLDER_NAME=s3-azure-cog-search-dest OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME DESTINATION_INDEX="utic-test-ingest-fixtures-output-$(date +%s)" -API_VERSION=2020-06-30 +# The vector configs on the schema currently only exist on versions: +# 2023-07-01-Preview, 2021-04-30-Preview, 2020-06-30-Preview +API_VERSION=2023-07-01-Preview if [ -z "$AZURE_SEARCH_ENDPOINT" ] && [ -z "$AZURE_SEARCH_API_KEY" ]; then echo "Skipping Azure Cognitive Search ingest test because neither AZURE_SEARCH_ENDPOINT nor AZURE_SEARCH_API_KEY env vars are set." @@ -18,13 +20,13 @@ fi function cleanup { response_code=$(curl -s -o /dev/null -w "%{http_code}" \ "https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX?api-version=$API_VERSION" \ - --header "api-key: JV1LDVRivKEY9J9rHBQqQeTvaGoYbD670RWRaANxaTAzSeDy8Eon" \ + --header "api-key: $AZURE_SEARCH_API_KEY" \ --header 'content-type: application/json') if [ "$response_code" == "200" ]; then echo "deleting index $DESTINATION_INDEX" curl -X DELETE \ "https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX?api-version=$API_VERSION" \ - --header "api-key: JV1LDVRivKEY9J9rHBQqQeTvaGoYbD670RWRaANxaTAzSeDy8Eon" \ + --header "api-key: $AZURE_SEARCH_API_KEY" \ --header 'content-type: application/json' else echo "Index $DESTINATION_INDEX does not exist, nothing to delete" @@ -38,7 +40,7 @@ trap cleanup EXIT echo "Creating index $DESTINATION_INDEX" response_code=$(curl -s -o /dev/null -w "%{http_code}" -X PUT \ "https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX?api-version=$API_VERSION" \ ---header "api-key: JV1LDVRivKEY9J9rHBQqQeTvaGoYbD670RWRaANxaTAzSeDy8Eon" \ +--header "api-key: $AZURE_SEARCH_API_KEY" \ --header 'content-type: application/json' \ --data "@$SCRIPT_DIR/files/azure_cognitive_index_schema.json") @@ -65,21 +67,33 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ --endpoint "$AZURE_SEARCH_ENDPOINT" \ --index "$DESTINATION_INDEX" -echo "sleeping 5 seconds to let index finish catching up after writes" -sleep 5 -# Check the contents of the index -docs_count=$(curl "https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX/docs/\$count?api-version=$API_VERSION" \ - --header "api-key: $AZURE_SEARCH_API_KEY" \ - --header 'content-type: application/json' | jq) +# It can take some time for the index to catch up with the content that was written, this check between 10s sleeps +# to give it that time process the writes. Will timeout after checking for a minute. +docs_count_remote=0 +attempt=1 +while [ "$docs_count_remote" -eq 0 ] && [ "$attempt" -lt 6 ]; do + echo "attempt $attempt: sleeping 10 seconds to let index finish catching up after writes" + sleep 10 + + # Check the contents of the index + docs_count_remote=$(curl "https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX/docs/\$count?api-version=$API_VERSION" \ + --header "api-key: $AZURE_SEARCH_API_KEY" \ + --header 'content-type: application/json' | jq) + + echo "docs count pulled from Azure: $docs_count_remote" + + attempt=$((attempt+1)) +done + -expected_docs_count=0 -for i in $(jq length "$OUTPUT_DIR"/*); do - expected_docs_count=$((expected_docs_count+i)); +docs_count_local=0 +for i in $(jq length "$OUTPUT_DIR"/**/*.json); do + docs_count_local=$((docs_count_local+i)); done -if [ "$docs_count" -ne "$expected_docs_count" ];then - echo "Number of docs $docs_count doesn't match the expected docs: $expected_docs_count" +if [ "$docs_count_remote" -ne "$docs_count_local" ];then + echo "Number of docs in Azure $docs_count_remote doesn't match the expected docs: $docs_count_local" exit 1 fi diff --git a/test_unstructured_ingest/test-ingest-sharepoint-embed-cog-index.sh b/test_unstructured_ingest/test-ingest-sharepoint-embed-cog-index.sh new file mode 100755 index 0000000000..af9d8f33ae --- /dev/null +++ b/test_unstructured_ingest/test-ingest-sharepoint-embed-cog-index.sh @@ -0,0 +1,113 @@ +#!/usr/bin/env bash + +set -e + +SCRIPT_DIR=$(dirname "$(realpath "$0")") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=sharepoint-azure-dest +OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME +DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME +DESTINATION_INDEX="utic-test-ingest-fixtures-output-$(date +%s)" +# The vector configs on the schema currently only exist on versions: +# 2023-07-01-Preview, 2021-04-30-Preview, 2020-06-30-Preview +API_VERSION=2023-07-01-Preview + +if [ -z "$SHAREPOINT_CLIENT_ID" ] || [ -z "$SHAREPOINT_CRED" ] ; then + echo "Skipping Sharepoint ingest test because the SHAREPOINT_CLIENT_ID or SHAREPOINT_CRED env var is not set." + exit 0 +fi + +if [ -z "$OPENAI_API_KEY" ]; then + echo "Skipping Sharepoint embedding ingest test because the OPENAI_API_KEY env var is not set." + exit 0 +fi + +if [ -z "$AZURE_SEARCH_ENDPOINT" ] && [ -z "$AZURE_SEARCH_API_KEY" ]; then + echo "Skipping Sharepoint Azure Cognitive Search ingest test because neither AZURE_SEARCH_ENDPOINT nor AZURE_SEARCH_API_KEY env vars are set." + exit 0 +fi + +function cleanup { + response_code=$(curl -s -o /dev/null -w "%{http_code}" \ + "https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX?api-version=$API_VERSION" \ + --header "api-key: $AZURE_SEARCH_API_KEY" \ + --header 'content-type: application/json') + if [ "$response_code" == "200" ]; then + echo "deleting index $DESTINATION_INDEX" + curl -X DELETE \ + "https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX?api-version=$API_VERSION" \ + --header "api-key: $AZURE_SEARCH_API_KEY" \ + --header 'content-type: application/json' + else + echo "Index $DESTINATION_INDEX does not exist, nothing to delete" + fi +} + +trap cleanup EXIT + + +# Create index +echo "Creating index $DESTINATION_INDEX" +response_code=$(curl -s -o /dev/null -w "%{http_code}" -X PUT \ +"https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX?api-version=$API_VERSION" \ +--header "api-key: $AZURE_SEARCH_API_KEY" \ +--header 'content-type: application/json' \ +--data "@$SCRIPT_DIR/files/azure_cognitive_index_schema.json") + +if [ "$response_code" -lt 400 ]; then + echo "Index creation success: $response_code" +else + echo "Index creation failure: $response_code" + exit 1 +fi + +PYTHONPATH=. ./unstructured/ingest/main.py \ + sharepoint \ + --download-dir "$DOWNLOAD_DIR" \ + --metadata-exclude file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth \ + --num-processes 2 \ + --strategy hi_res \ + --preserve-downloads \ + --reprocess \ + --output-dir "$OUTPUT_DIR" \ + --verbose \ + --client-cred "$SHAREPOINT_CRED" \ + --client-id "$SHAREPOINT_CLIENT_ID" \ + --site "$SHAREPOINT_SITE" \ + --path "Shared Documents" \ + --recursive \ + --embedding-api-key "$OPENAI_API_KEY" \ + azure-cognitive-search \ + --key "$AZURE_SEARCH_API_KEY" \ + --endpoint "$AZURE_SEARCH_ENDPOINT" \ + --index "$DESTINATION_INDEX" + +# It can take some time for the index to catch up with the content that was written, this check between 10s sleeps +# to give it that time process the writes. Will timeout after checking for a minute. +docs_count_remote=0 +attempt=1 +while [ "$docs_count_remote" -eq 0 ] && [ "$attempt" -lt 6 ]; do + echo "attempt $attempt: sleeping 10 seconds to let index finish catching up after writes" + sleep 10 + + # Check the contents of the index + docs_count_remote=$(curl "https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX/docs/\$count?api-version=$API_VERSION" \ + --header "api-key: $AZURE_SEARCH_API_KEY" \ + --header 'content-type: application/json' | jq) + + echo "docs count pulled from Azure: $docs_count_remote" + + attempt=$((attempt+1)) +done + + +docs_count_local=0 +for i in $(jq length "$OUTPUT_DIR"/**/*.json); do + docs_count_local=$((docs_count_local+i)); +done + + +if [ "$docs_count_remote" -ne "$docs_count_local" ];then + echo "Number of docs $docs_count_remote doesn't match the expected docs: $docs_count_local" + exit 1 +fi diff --git a/unstructured/embed/openai.py b/unstructured/embed/openai.py index f67d10e427..dd5a360970 100644 --- a/unstructured/embed/openai.py +++ b/unstructured/embed/openai.py @@ -1,3 +1,4 @@ +import types from typing import List, Optional import numpy as np @@ -26,7 +27,7 @@ def is_unit_vector(self): return np.isclose(np.linalg.norm(self.examplary_embedding), 1.0) def embed_query(self, query): - return self.openai_client.embed_documents(str(query)) + return self.openai_client.embed_documents([str(query)]) def embed_documents(self, elements: List[Element]) -> List[Element]: embeddings = self.openai_client.embed_documents([str(e) for e in elements]) @@ -35,14 +36,26 @@ def embed_documents(self, elements: List[Element]) -> List[Element]: def _add_embeddings_to_elements(self, elements, embeddings) -> List[Element]: assert len(elements) == len(embeddings) - for i in range(len(elements)): - elements[i].embeddings = embeddings[i] + elements_w_embedding = [] + + for i, element in enumerate(elements): + original_method = element.to_dict + + def new_to_dict(self): + d = original_method() + d["embeddings"] = self.embeddings + return d + + element.embeddings = embeddings[i] + elements_w_embedding.append(element) + element.to_dict = types.MethodType(new_to_dict, element) return elements @EmbeddingEncoderConnectionError.wrap @requires_dependencies( - ["langchain", "openai"], - ) # add extras="langchain" when it's added to the makefile + ["langchain", "openai", "tiktoken"], + extras="openai", + ) def get_openai_client(self): if not hasattr(self, "openai_client"): """Creates a langchain OpenAI python client to embed elements.""" diff --git a/unstructured/ingest/cli/cmds/azure_cognitive_search.py b/unstructured/ingest/cli/cmds/azure_cognitive_search.py index a426a68c7b..22eded4373 100644 --- a/unstructured/ingest/cli/cmds/azure_cognitive_search.py +++ b/unstructured/ingest/cli/cmds/azure_cognitive_search.py @@ -1,4 +1,5 @@ import logging +import types from dataclasses import dataclass import click @@ -8,6 +9,7 @@ log_options, ) from unstructured.ingest.cli.interfaces import ( + CliEmbeddingsConfig, CliMixin, CliPartitionConfig, CliReadConfig, @@ -61,7 +63,6 @@ def azure_cognitive_search_dest(ctx: click.Context, **options): if not ctx.parent.info_name: raise click.ClickException("parent command missing info name") source_cmd = ctx.parent.info_name.replace("-", "_") - runner_fn = runner_map[source_cmd] parent_options: dict = ctx.parent.params if ctx.parent else {} conform_click_options(options) conform_click_options(parent_options) @@ -72,15 +73,31 @@ def azure_cognitive_search_dest(ctx: click.Context, **options): try: read_config = CliReadConfig.from_dict(parent_options) partition_config = CliPartitionConfig.from_dict(parent_options) + embedding_config = CliEmbeddingsConfig.from_dict(parent_options) # Run for schema validation AzureCognitiveSearchCliWriteConfig.from_dict(options) - runner_fn( - read_config=read_config, - partition_config=partition_config, - writer_type="azure_cognitive_search", - writer_kwargs=options, - **parent_options, - ) + runner = runner_map[source_cmd] + # TODO update all other runners to implement base runner class + if isinstance(runner, types.FunctionType): + runner( + read_config=read_config, + partition_config=partition_config, + writer_type="s3", + writer_kwargs=options, + **parent_options, + ) + else: + runner_instance = runner( + read_config=read_config, + partition_config=partition_config, + writer_type="azure_cognitive_search", + writer_kwargs=options, + embedding_config=embedding_config, + ) + runner_instance.run( + **parent_options, + ) + except Exception as e: logger.error(e, exc_info=True) raise click.ClickException(str(e)) from e diff --git a/unstructured/ingest/cli/cmds/sharepoint.py b/unstructured/ingest/cli/cmds/sharepoint.py index 01d7091f97..2457f474c8 100644 --- a/unstructured/ingest/cli/cmds/sharepoint.py +++ b/unstructured/ingest/cli/cmds/sharepoint.py @@ -9,6 +9,7 @@ log_options, ) from unstructured.ingest.cli.interfaces import ( + CliEmbeddingsConfig, CliMixin, CliPartitionConfig, CliReadConfig, @@ -16,7 +17,7 @@ ) from unstructured.ingest.interfaces import BaseConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger -from unstructured.ingest.runner import sharepoint as sharepoint_fn +from unstructured.ingest.runner import SharePoint @dataclass @@ -82,12 +83,18 @@ def sharepoint_source(ctx: click.Context, **options): ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) log_options(options, verbose=verbose) try: - # run_init_checks(**options) read_config = CliReadConfig.from_dict(options) partition_config = CliPartitionConfig.from_dict(options) + embedding_config = CliEmbeddingsConfig.from_dict(options) # Run for schema validation SharepointCliConfig.from_dict(options) - sharepoint_fn(read_config=read_config, partition_config=partition_config, **options) + sharepoint_runner = SharePoint( + read_config=read_config, + partition_config=partition_config, + verbose=verbose, + embedding_config=embedding_config, + ) + sharepoint_runner.run(**options) except Exception as e: logger.error(e, exc_info=True) raise click.ClickException(str(e)) from e @@ -101,5 +108,6 @@ def get_source_cmd() -> click.Group: # Common CLI configs CliReadConfig.add_cli_options(cmd) CliPartitionConfig.add_cli_options(cmd) + CliEmbeddingsConfig.add_cli_options(cmd) cmd.params.append(click.Option(["-v", "--verbose"], is_flag=True, default=False)) return cmd diff --git a/unstructured/ingest/cli/interfaces.py b/unstructured/ingest/cli/interfaces.py index 5bb224e389..2190744b5b 100644 --- a/unstructured/ingest/cli/interfaces.py +++ b/unstructured/ingest/cli/interfaces.py @@ -1,9 +1,10 @@ from abc import abstractmethod import click +from dataclasses_json.core import Json, _decode_dataclass from unstructured.ingest.cli.cmds.utils import DelimitedString -from unstructured.ingest.interfaces import BaseConfig, PartitionConfig, ReadConfig +from unstructured.ingest.interfaces import BaseConfig, EmbeddingConfig, PartitionConfig, ReadConfig class CliMixin: @@ -106,7 +107,7 @@ def add_cli_options(cmd: click.Command) -> None: click.Option( ["--fields-include"], type=DelimitedString(), - default=["element_id", "text", "type", "metadata"], + default=["element_id", "text", "type", "metadata", "embeddings"], help="Comma-delimited list. If set, include the specified top-level " "fields in an element.", ), @@ -184,3 +185,43 @@ def add_cli_options(cmd: click.Command) -> None: ), ] cmd.params.extend(options) + + +class CliEmbeddingsConfig(EmbeddingConfig, CliMixin): + @staticmethod + def add_cli_options(cmd: click.Command) -> None: + options = [ + click.Option( + ["--embedding-api-key"], + help="openai api key", + ), + click.Option( + ["--embedding-model-name"], + type=str, + default=None, + ), + ] + cmd.params.extend(options) + + @classmethod + def from_dict( + cls, + kvs: Json, + *, + infer_missing=False, + ): + """ + Extension of the dataclass from_dict() to avoid a naming conflict with other CLI params. + This allows CLI arguments to be prepended with embedding_ during CLI invocation but + doesn't require that as part of the field names in this class + """ + if isinstance(kvs, dict): + new_kvs = { + k[len("embedding-") :]: v # noqa: E203 + for k, v in kvs.items() + if k.startswith("embedding_") + } + if len(new_kvs.keys()) == 0: + return None + return _decode_dataclass(cls, new_kvs, infer_missing) + return _decode_dataclass(cls, kvs, infer_missing) diff --git a/unstructured/ingest/connector/azure_cognitive_search.py b/unstructured/ingest/connector/azure_cognitive_search.py index 7b3e7f0714..f1050a8dd9 100644 --- a/unstructured/ingest/connector/azure_cognitive_search.py +++ b/unstructured/ingest/connector/azure_cognitive_search.py @@ -87,7 +87,6 @@ def write(self, docs: t.List[BaseIngestDoc]) -> None: for doc in docs: local_path = doc._output_filename with open(local_path) as json_file: - # TODO element id not a sufficient unique id to use json_content = json.load(json_file) for content in json_content: self.conform_dict(data=content) diff --git a/unstructured/ingest/connector/registry.py b/unstructured/ingest/connector/registry.py index 7c01cb72b5..1a415f13fe 100644 --- a/unstructured/ingest/connector/registry.py +++ b/unstructured/ingest/connector/registry.py @@ -74,12 +74,16 @@ def create_ingest_doc_from_json(ingest_doc_json: str) -> BaseIngestDoc: raise TypeError( f"failed to load json string when deserializing IngestDoc: {ingest_doc_json}", ) from te + return create_ingest_doc_from_dict(ingest_doc_dict) + + +def create_ingest_doc_from_dict(ingest_doc_dict: dict) -> BaseIngestDoc: if "registry_name" not in ingest_doc_dict: raise ValueError(f"registry_name not present in ingest doc: {ingest_doc_dict}") registry_name = ingest_doc_dict.pop("registry_name") try: ingest_doc_cls = INGEST_DOC_NAME_TO_CLASS[registry_name] - return cast(BaseIngestDoc, ingest_doc_cls.from_json(ingest_doc_json)) + return cast(BaseIngestDoc, ingest_doc_cls.from_dict(ingest_doc_dict)) except KeyError: raise ValueError( f"Error: Received unknown IngestDoc name: {registry_name} while deserializing", diff --git a/unstructured/ingest/connector/sharepoint.py b/unstructured/ingest/connector/sharepoint.py index f42fda3c1a..0dacea83d4 100644 --- a/unstructured/ingest/connector/sharepoint.py +++ b/unstructured/ingest/connector/sharepoint.py @@ -5,12 +5,14 @@ from pathlib import Path from urllib.parse import urlparse +from unstructured.embed.interfaces import BaseEmbeddingEncoder from unstructured.file_utils.filetype import EXT_TO_FILETYPE from unstructured.ingest.error import SourceConnectionError from unstructured.ingest.interfaces import ( BaseConnectorConfig, BaseIngestDoc, BaseSourceConnector, + EmbeddingConfig, IngestDocCleanupMixin, SourceConnectorCleanupMixin, SourceMetadata, @@ -66,6 +68,13 @@ class SharepointIngestDoc(IngestDocCleanupMixin, BaseIngestDoc): is_page: bool file_path: str registry_name: str = "sharepoint" + embedding_config: t.Optional[EmbeddingConfig] = None + + @property + def embedder(self) -> t.Optional[BaseEmbeddingEncoder]: + if self.embedding_config and self.embedding_config.api_key: + return self.embedding_config.get_embedder() + return None def __post_init__(self): self.extension = Path(self.file_path).suffix if not self.is_page else ".html" @@ -234,6 +243,7 @@ def get_file(self): @dataclass class SharepointSourceConnector(SourceConnectorCleanupMixin, BaseSourceConnector): connector_config: SimpleSharepointConfig + embedding_config: t.Optional[EmbeddingConfig] = None @requires_dependencies(["office365"], extras="sharepoint") def _list_files(self, folder, recursive) -> t.List["File"]: @@ -265,13 +275,14 @@ def _prepare_ingest_doc(self, obj: t.Union["File", "SitePage"], base_url, is_pag file_path = obj.serverRelativeUrl[1:] return SharepointIngestDoc( - self.read_config, - self.partition_config, - self.connector_config, - base_url, - server_path, - is_page, - file_path, + read_config=self.read_config, + partition_config=self.partition_config, + connector_config=self.connector_config, + site_url=base_url, + server_path=server_path, + is_page=is_page, + file_path=file_path, + embedding_config=self.embedding_config, ) @requires_dependencies(["office365"], extras="sharepoint") diff --git a/unstructured/ingest/interfaces.py b/unstructured/ingest/interfaces.py index cc9a020997..c708938bfd 100644 --- a/unstructured/ingest/interfaces.py +++ b/unstructured/ingest/interfaces.py @@ -14,10 +14,12 @@ from dataclasses_json import DataClassJsonMixin from unstructured.documents.elements import DataSourceMetadata +from unstructured.embed.interfaces import BaseEmbeddingEncoder +from unstructured.embed.openai import OpenAIEmbeddingEncoder from unstructured.ingest.error import PartitionError, SourceConnectionError from unstructured.ingest.logger import logger from unstructured.partition.auto import partition -from unstructured.staging.base import convert_to_dict +from unstructured.staging.base import convert_to_dict, elements_from_json @dataclass @@ -42,7 +44,7 @@ class PartitionConfig(BaseConfig): ocr_languages: str = "eng" encoding: t.Optional[str] = None fields_include: t.List[str] = field( - default_factory=lambda: ["element_id", "text", "type", "metadata"], + default_factory=lambda: ["element_id", "text", "type", "metadata", "embeddings"], ) flatten_metadata: bool = False metadata_exclude: t.List[str] = field(default_factory=list) @@ -61,6 +63,21 @@ class ReadConfig(BaseConfig): download_only: bool = False +@dataclass +class EmbeddingConfig(BaseConfig): + api_key: str + model_name: t.Optional[str] = None + + def get_embedder(self) -> BaseEmbeddingEncoder: + # TODO update to incorporate other embedder types once they exist + kwargs = { + "api_key": self.api_key, + } + if self.model_name: + kwargs["model_name"] = self.model_name + return OpenAIEmbeddingEncoder(**kwargs) + + @dataclass class WriteConfig(BaseConfig): pass @@ -98,6 +115,10 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._date_processed = None + @property + def embedder(self) -> t.Optional[BaseEmbeddingEncoder]: + return None + @property def date_created(self) -> t.Optional[str]: """The date the document was created on the source system.""" @@ -222,8 +243,6 @@ def partition_file(self, **partition_kwargs) -> t.List[t.Dict[str, t.Any]]: ), **partition_kwargs, ) - return convert_to_dict(elements) - else: endpoint = self.partition_config.partition_endpoint @@ -243,8 +262,11 @@ def partition_file(self, **partition_kwargs) -> t.List[t.Dict[str, t.Any]]: if response.status_code != 200: raise RuntimeError(f"Caught {response.status_code} from API: {response.text}") - - return response.json() + elements = elements_from_json(text=json.dumps(response.json())) + if self.embedder: + logger.info("Running embedder to add vector content to elements") + elements = self.embedder.embed_documents(elements) + return convert_to_dict(elements) def process_file(self, **partition_kwargs) -> t.Optional[t.List[t.Dict[str, t.Any]]]: self._date_processed = datetime.utcnow().isoformat() @@ -281,7 +303,6 @@ def process_file(self, **partition_kwargs) -> t.Optional[t.List[t.Dict[str, t.An for k in list(elem["metadata"].keys()): # type: ignore[attr-defined] if k not in in_list: elem["metadata"].pop(k, None) # type: ignore[attr-defined] - in_list = self.partition_config.fields_include elem = {k: v for k, v in elem.items() if k in in_list} diff --git a/unstructured/ingest/runner/__init__.py b/unstructured/ingest/runner/__init__.py index 4dbe2316a7..ef521f280c 100644 --- a/unstructured/ingest/runner/__init__.py +++ b/unstructured/ingest/runner/__init__.py @@ -22,7 +22,7 @@ from .reddit import reddit from .s3 import s3 from .salesforce import salesforce -from .sharepoint import sharepoint +from .sharepoint import SharePoint from .slack import slack from .wikipedia import wikipedia @@ -50,7 +50,7 @@ "reddit": reddit, "s3": s3, "salesforce": salesforce, - "sharepoint": sharepoint, + "sharepoint": SharePoint, "slack": slack, "wikipedia": wikipedia, } @@ -78,7 +78,7 @@ "reddit", "s3", "salesforce", - "sharepoint", + "SharePoint", "slack", "wikipedia", "runner_map", diff --git a/unstructured/ingest/runner/base_runner.py b/unstructured/ingest/runner/base_runner.py new file mode 100644 index 0000000000..772e282f0d --- /dev/null +++ b/unstructured/ingest/runner/base_runner.py @@ -0,0 +1,19 @@ +import typing as t +from abc import ABC, abstractmethod +from dataclasses import dataclass + +from unstructured.ingest.interfaces import EmbeddingConfig, PartitionConfig, ReadConfig + + +@dataclass +class Runner(ABC): + read_config: ReadConfig + partition_config: PartitionConfig + verbose: bool = False + writer_type: t.Optional[str] = None + writer_kwargs: t.Optional[dict] = None + embedding_config: t.Optional[EmbeddingConfig] = None + + @abstractmethod + def run(self, *args, **kwargs): + pass diff --git a/unstructured/ingest/runner/sharepoint.py b/unstructured/ingest/runner/sharepoint.py index 5e2ad6e793..a20e64bdf8 100644 --- a/unstructured/ingest/runner/sharepoint.py +++ b/unstructured/ingest/runner/sharepoint.py @@ -1,69 +1,66 @@ import hashlib import logging -import typing as t -from unstructured.ingest.interfaces import PartitionConfig, ReadConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger from unstructured.ingest.processor import process_documents +from unstructured.ingest.runner.base_runner import Runner from unstructured.ingest.runner.utils import update_download_dir_hash from unstructured.ingest.runner.writers import writer_map -def sharepoint( - verbose: bool, - read_config: ReadConfig, - partition_config: PartitionConfig, - site: str, - client_id: str, - client_cred: str, - files_only: bool, - path: str, - recursive: bool, - writer_type: t.Optional[str] = None, - writer_kwargs: t.Optional[dict] = None, - **kwargs, -): - writer_kwargs = writer_kwargs if writer_kwargs else {} +class SharePoint(Runner): + def run( + self, + site: str, + client_id: str, + client_cred: str, + files_only: bool, + path: str, + recursive: bool, + **kwargs, + ): + writer_kwargs = self.writer_kwargs if self.writer_kwargs else {} - ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) + ingest_log_streaming_init(logging.DEBUG if self.verbose else logging.INFO) - hashed_dir_name = hashlib.sha256( - f"{site}_{path}".encode("utf-8"), - ) + hashed_dir_name = hashlib.sha256( + f"{site}_{path}".encode("utf-8"), + ) - read_config.download_dir = update_download_dir_hash( - connector_name="sharepoint", - read_config=read_config, - hashed_dir_name=hashed_dir_name, - logger=logger, - ) + self.read_config.download_dir = update_download_dir_hash( + connector_name="sharepoint", + read_config=self.read_config, + hashed_dir_name=hashed_dir_name, + logger=logger, + ) - from unstructured.ingest.connector.sharepoint import ( - SharepointSourceConnector, - SimpleSharepointConfig, - ) + from unstructured.ingest.connector.sharepoint import ( + SharepointSourceConnector, + SimpleSharepointConfig, + ) - source_doc_connector = SharepointSourceConnector( # type: ignore - connector_config=SimpleSharepointConfig( - client_id=client_id, - client_credential=client_cred, - site_url=site, - path=path, - process_pages=(not files_only), - recursive=recursive, - ), - read_config=read_config, - partition_config=partition_config, - ) + source_doc_connector = SharepointSourceConnector( # type: ignore + connector_config=SimpleSharepointConfig( + client_id=client_id, + client_credential=client_cred, + site_url=site, + path=path, + process_pages=(not files_only), + recursive=recursive, + ), + read_config=self.read_config, + partition_config=self.partition_config, + embedding_config=self.embedding_config, + ) - dest_doc_connector = None - if writer_type: - writer = writer_map[writer_type] - dest_doc_connector = writer(**writer_kwargs) + dest_doc_connector = None + if self.writer_type: + writer = writer_map[self.writer_type] + dest_doc_connector = writer(**writer_kwargs) - process_documents( - source_doc_connector=source_doc_connector, - partition_config=partition_config, - verbose=verbose, - dest_doc_connector=dest_doc_connector, - ) + process_documents( + source_doc_connector=source_doc_connector, + partition_config=self.partition_config, + verbose=self.verbose, + dest_doc_connector=dest_doc_connector, + ) diff --git a/unstructured/ingest/runner/writers.py b/unstructured/ingest/runner/writers.py index 3201315bda..46a875035e 100644 --- a/unstructured/ingest/runner/writers.py +++ b/unstructured/ingest/runner/writers.py @@ -10,6 +10,7 @@ def s3_writer( remote_url: str, anonymous: bool, verbose: bool = False, + **kwargs, ): from unstructured.ingest.connector.s3 import ( S3DestinationConnector, @@ -30,6 +31,7 @@ def azure_cognitive_search_writer( endpoint: str, key: str, index: str, + **kwargs, ): from unstructured.ingest.connector.azure_cognitive_search import ( AzureCognitiveSearchDestinationConnector, diff --git a/unstructured/staging/base.py b/unstructured/staging/base.py index 0d2d012192..425b202d9e 100644 --- a/unstructured/staging/base.py +++ b/unstructured/staging/base.py @@ -41,7 +41,7 @@ def _get_metadata_table_fieldnames(): def convert_to_isd(elements: List[Element]) -> List[Dict[str, Any]]: """Represents the document elements as an Initial Structured Document (ISD).""" - isd: List[Dict[str, str]] = [] + isd: List[Dict[str, Any]] = [] for element in elements: section = element.to_dict() isd.append(section)