From b710bafa89619f6f5d5574b1b85a44204d948361 Mon Sep 17 00:00:00 2001 From: David Potter Date: Sat, 2 Sep 2023 08:50:31 -0700 Subject: [PATCH] feat: add salesforce connector (#1168) --- .github/workflows/ci.yml | 4 + .../ingest-test-fixtures-update-pr.yml | 4 + CHANGELOG.md | 10 + Makefile | 4 + docs/source/upstream_connectors.rst | 1 + .../source/upstream_connectors/salesforce.rst | 129 +++++++ examples/ingest/box/ingest.sh | 4 +- examples/ingest/salesforce/ingest.sh | 29 ++ requirements/ingest-salesforce.in | 3 + requirements/ingest-salesforce.txt | 63 ++++ setup.py | 1 + .../Campaign/701Hu000001eX9EIAU.json | 101 ++++++ .../Campaign/701Hu000001eX9FIAU.json | 101 ++++++ .../Campaign/701Hu000001eX9GIAU.json | 101 ++++++ .../Campaign/701Hu000001eX9HIAU.json | 101 ++++++ .../EmailMessage/02sHu00001efErPIAU.json | 18 + .../EmailMessage/02sHu00001efErQIAU.json | 18 + test_unstructured_ingest/test-ingest-box.sh | 3 +- .../test-ingest-salesforce.sh | 40 +++ test_unstructured_ingest/test-ingest.sh | 1 + unstructured/__version__.py | 2 +- unstructured/ingest/cli/cli.py | 1 + unstructured/ingest/cli/cmds/__init__.py | 2 + unstructured/ingest/cli/cmds/salesforce.py | 62 ++++ unstructured/ingest/connector/registry.py | 2 + unstructured/ingest/connector/salesforce.py | 340 ++++++++++++++++++ unstructured/ingest/processor.py | 3 +- unstructured/ingest/runner/__init__.py | 2 + unstructured/ingest/runner/salesforce.py | 47 +++ 29 files changed, 1191 insertions(+), 6 deletions(-) create mode 100644 docs/source/upstream_connectors/salesforce.rst create mode 100755 examples/ingest/salesforce/ingest.sh create mode 100644 requirements/ingest-salesforce.in create mode 100644 requirements/ingest-salesforce.txt create mode 100644 test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9EIAU.json create mode 100644 test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9FIAU.json create mode 100644 test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9GIAU.json create mode 100644 test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9HIAU.json create mode 100644 test_unstructured_ingest/expected-structured-output/salesforce/EmailMessage/02sHu00001efErPIAU.json create mode 100644 test_unstructured_ingest/expected-structured-output/salesforce/EmailMessage/02sHu00001efErQIAU.json create mode 100755 test_unstructured_ingest/test-ingest-salesforce.sh create mode 100644 unstructured/ingest/cli/cmds/salesforce.py create mode 100644 unstructured/ingest/connector/salesforce.py create mode 100644 unstructured/ingest/runner/salesforce.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f96b60f9dc..8f7cb5284c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -282,6 +282,9 @@ jobs: MS_TENANT_ID: ${{ secrets.MS_TENANT_ID }} MS_USER_EMAIL: ${{ secrets.MS_USER_EMAIL }} MS_USER_PNAME: ${{ secrets.MS_USER_PNAME }} + SALESFORCE_USERNAME: ${{secrets.SALESFORCE_USERNAME}} + SALESFORCE_CONSUMER_KEY: ${{secrets.SALESFORCE_CONSUMER_KEY}} + SALESFORCE_PRIVATE_KEY: ${{secrets.SALESFORCE_PRIVATE_KEY}} SHAREPOINT_CLIENT_ID: ${{secrets.SHAREPOINT_CLIENT_ID}} SHAREPOINT_CRED: ${{secrets.SHAREPOINT_CRED}} SHAREPOINT_SITE: ${{secrets.SHAREPOINT_SITE}} @@ -313,6 +316,7 @@ jobs: make install-ingest-gitlab make install-ingest-onedrive make install-ingest-outlook + make install-ingest-salesforce make install-ingest-slack make install-ingest-wikipedia make install-ingest-notion diff --git a/.github/workflows/ingest-test-fixtures-update-pr.yml b/.github/workflows/ingest-test-fixtures-update-pr.yml index dbd5a5ecca..08df81effd 100644 --- a/.github/workflows/ingest-test-fixtures-update-pr.yml +++ b/.github/workflows/ingest-test-fixtures-update-pr.yml @@ -73,6 +73,9 @@ jobs: MS_TENANT_ID: ${{ secrets.MS_TENANT_ID }} MS_USER_EMAIL: ${{ secrets.MS_USER_EMAIL }} MS_USER_PNAME: ${{ secrets.MS_USER_PNAME }} + SALESFORCE_USERNAME: ${{secrets.SALESFORCE_USERNAME}} + SALESFORCE_CONSUMER_KEY: ${{secrets.SALESFORCE_CONSUMER_KEY}} + SALESFORCE_PRIVATE_KEY: ${{secrets.SALESFORCE_PRIVATE_KEY}} SHAREPOINT_CLIENT_ID: ${{secrets.SHAREPOINT_CLIENT_ID}} SHAREPOINT_CRED: ${{secrets.SHAREPOINT_CRED}} SHAREPOINT_SITE: ${{secrets.SHAREPOINT_SITE}} @@ -104,6 +107,7 @@ jobs: make install-ingest-gitlab make install-ingest-onedrive make install-ingest-outlook + make install-ingest-salesforce make install-ingest-slack make install-ingest-wikipedia make install-ingest-notion diff --git a/CHANGELOG.md b/CHANGELOG.md index b60bc38964..79b3315c8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 0.10.12-dev3 + +### Enhancements + +### Features + +* Add Salesforce Connector to be able to pull Account, Case, Campaign, EmailMessage, Lead + +### Fixes + ## 0.10.12-dev2 ### Enhancements diff --git a/Makefile b/Makefile index afbb338d83..147fd77db8 100644 --- a/Makefile +++ b/Makefile @@ -188,6 +188,10 @@ install-ingest-local: install-ingest-notion: python3 -m pip install -r requirements/ingest-notion.txt +.PHONY: install-ingest-salesforce +install-ingest-salesforce: + python3 -m pip install -r requirements/ingest-salesforce.txt + .PHONY: install-unstructured-inference install-unstructured-inference: python3 -m pip install -r requirements/local-inference.txt diff --git a/docs/source/upstream_connectors.rst b/docs/source/upstream_connectors.rst index 1246247b75..b7af1ae20e 100644 --- a/docs/source/upstream_connectors.rst +++ b/docs/source/upstream_connectors.rst @@ -27,6 +27,7 @@ in our community `Slack. " + + .. tab:: Python + + .. code:: python + + import subprocess + + command = [ + "unstructured-ingest", + "salesforce", + "--salesforce-username" "$SALESFORCE_USERNAME" + "--salesforce-consumer-key" "$SALESFORCE_CONSUMER_KEY" + "--salesforce-private-key-path" "$SALESFORCE_PRIVATE_KEY_PATH" + "--salesforce-categories" "EmailMessage,Account,Lead,Case,Campaign" + "--structured-output-dir" "salesforce-output" + "--box_app_config", "$BOX_APP_CONFIG_PATH" + "--remote-url", "box://utic-test-ingest-fixtures" + "--structured-output-dir", "box-output" + "--num-processes", "2" + "--recursive", + "--verbose", + "--partition-by-api", + "--api-key", "", + ] + + # Run the command + process = subprocess.Popen(command, stdout=subprocess.PIPE) + output, error = process.communicate() + + # Print output + if process.returncode == 0: + print('Command executed successfully. Output:') + print(output.decode()) + else: + print('Command failed. Error:') + print(error.decode()) + +Additionaly, you will need to pass the ``--partition-endpoint`` if you're running the API locally. You can find more information about the ``unstructured`` API `here `_. + +For a full list of the options the CLI accepts check ``unstructured-ingest salesforce --help``. + +NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide `_. \ No newline at end of file diff --git a/examples/ingest/box/ingest.sh b/examples/ingest/box/ingest.sh index 06b4ba08f2..ea12ffc889 100755 --- a/examples/ingest/box/ingest.sh +++ b/examples/ingest/box/ingest.sh @@ -12,7 +12,7 @@ # Maybe check 'Make api calls as the as-user header' # REAUTHORIZE app after making any of the above changes -# box_app_config is the path to a json file, available in the App Settings section of your Box App +# box-app-config is the path to a json file, available in the App Settings section of your Box App # More info to set up the app: # https://developer.box.com/guides/authentication/jwt/jwt-setup/ # and set up the app config.json file here: @@ -24,7 +24,7 @@ cd "$SCRIPT_DIR"/../../.. || exit 1 PYTHONPATH=. ./unstructured/ingest/main.py \ box \ - --box_app_config "$BOX_APP_CONFIG_PATH" \ + --box-app-config "$BOX_APP_CONFIG_PATH" \ --remote-url box://utic-test-ingest-fixtures \ --structured-output-dir box-output \ --num-processes 2 \ diff --git a/examples/ingest/salesforce/ingest.sh b/examples/ingest/salesforce/ingest.sh new file mode 100755 index 0000000000..9ab44f3d0b --- /dev/null +++ b/examples/ingest/salesforce/ingest.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +# Processes multiple files in a nested folder structure from Salesforce +# through Unstructured's library in 2 processes. + +# Available categories are: Account, Case, Campaign, EmailMessage, Lead + +# Structured outputs are stored in salesforce-output/ + +# Using JWT authorization +# https://developer.salesforce.com/docs/atlas.en-us.sfdx_dev.meta/sfdx_dev/sfdx_dev_auth_key_and_cert.htm +# https://developer.salesforce.com/docs/atlas.en-us.sfdx_dev.meta/sfdx_dev/sfdx_dev_auth_connected_app.htm + +# private-key-path is the path to the key file + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +cd "$SCRIPT_DIR"/../../.. || exit 1 + + +PYTHONPATH=. ./unstructured/ingest/main.py \ + salesforce \ + --username "$SALESFORCE_USERNAME" \ + --consumer-key "$SALESFORCE_CONSUMER_KEY" \ + --private-key-path "$SALESFORCE_PRIVATE_KEY_PATH" \ + --categories "EmailMessage,Account,Lead,Case,Campaign" \ + --structured-output-dir salesforce-output \ + --preserve-downloads \ + --reprocess \ + --verbose \ No newline at end of file diff --git a/requirements/ingest-salesforce.in b/requirements/ingest-salesforce.in new file mode 100644 index 0000000000..ca827ba706 --- /dev/null +++ b/requirements/ingest-salesforce.in @@ -0,0 +1,3 @@ +-c constraints.in +-c base.txt +simple-salesforce diff --git a/requirements/ingest-salesforce.txt b/requirements/ingest-salesforce.txt new file mode 100644 index 0000000000..3072f4eb57 --- /dev/null +++ b/requirements/ingest-salesforce.txt @@ -0,0 +1,63 @@ +# +# This file is autogenerated by pip-compile with Python 3.8 +# by the following command: +# +# pip-compile requirements/ingest-salesforce.in +# +attrs==23.1.0 + # via zeep +certifi==2023.7.22 + # via + # -c requirements/base.txt + # -c requirements/constraints.in + # requests +cffi==1.15.1 + # via cryptography +charset-normalizer==3.2.0 + # via + # -c requirements/base.txt + # requests +cryptography==41.0.3 + # via simple-salesforce +idna==3.4 + # via + # -c requirements/base.txt + # requests +isodate==0.6.1 + # via zeep +lxml==4.9.3 + # via + # -c requirements/base.txt + # zeep +platformdirs==3.10.0 + # via zeep +pycparser==2.21 + # via cffi +pyjwt==2.8.0 + # via simple-salesforce +pytz==2023.3 + # via zeep +requests==2.31.0 + # via + # -c requirements/base.txt + # requests-file + # requests-toolbelt + # simple-salesforce + # zeep +requests-file==1.5.1 + # via zeep +requests-toolbelt==1.0.0 + # via zeep +simple-salesforce==1.12.4 + # via -r requirements/ingest-salesforce.in +six==1.16.0 + # via + # isodate + # requests-file +urllib3==1.26.16 + # via + # -c requirements/base.txt + # -c requirements/constraints.in + # requests +zeep==4.2.1 + # via simple-salesforce diff --git a/setup.py b/setup.py index 8300902ad9..12980d3748 100644 --- a/setup.py +++ b/setup.py @@ -148,6 +148,7 @@ def load_requirements(file_list: Optional[Union[str, List[str]]] = None) -> List "airtable": load_requirements("requirements/ingest-airtable.in"), "sharepoint": load_requirements("requirements/ingest-sharepoint.in"), "delta-table": load_requirements("requirements/ingest-delta-table.in"), + "salesforce": load_requirements("requirements/ingest-salesforce.in"), # Legacy extra requirements "huggingface": load_requirements("requirements/huggingface.in"), "local-inference": all_doc_reqs, diff --git a/test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9EIAU.json b/test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9EIAU.json new file mode 100644 index 0000000000..b873afc4fb --- /dev/null +++ b/test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9EIAU.json @@ -0,0 +1,101 @@ +[ + { + "type": "UncategorizedText", + "element_id": "d6a8689a12ad0cd0314b04e1c2cee3c9", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Id: 701Hu000001eX9EIAU" + }, + { + "type": "Title", + "element_id": "69d9d94f0bc4b8d425fa99dce2b78311", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Name: GC Product Webinar - Jan 7, 2002" + }, + { + "type": "Title", + "element_id": "f80d0033c7e5a8d6ae66778815e33f35", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Type: Webinar" + }, + { + "type": "Title", + "element_id": "ad1b8a8ebbde05c57a773f60045de6f6", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Status: Completed" + }, + { + "type": "UncategorizedText", + "element_id": "08336889c7ebb4ba297a396eb072d83c", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "StartDate: 2023-01-29" + }, + { + "type": "UncategorizedText", + "element_id": "887de29c98087cc9d07b242432cff930", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "EndDate: 2023-01-29" + }, + { + "type": "Title", + "element_id": "ffe62693f34276315c62d28b06005bcf", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "BudgetedCost: 10000.0" + }, + { + "type": "Title", + "element_id": "5739187b01834fedcc2362b5d3841d07", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "ActualCost: 11400.0" + }, + { + "type": "Title", + "element_id": "8750ec1f6f59b282d104b919c7ffab0f", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Description: None" + }, + { + "type": "Title", + "element_id": "54e16b100bf2118d4c7c18c3f93e2223", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "NumberOfLeads: 0" + }, + { + "type": "Title", + "element_id": "db1ed63f2ed83e51f75619047c417e49", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "NumberOfConvertedLeads: 0" + } +] \ No newline at end of file diff --git a/test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9FIAU.json b/test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9FIAU.json new file mode 100644 index 0000000000..89519a9e6e --- /dev/null +++ b/test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9FIAU.json @@ -0,0 +1,101 @@ +[ + { + "type": "UncategorizedText", + "element_id": "582957afcab0b1f0df8e414c601b679a", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Id: 701Hu000001eX9FIAU" + }, + { + "type": "Title", + "element_id": "b1097935922e3ea926a35ace5fe68a61", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Name: User Conference - Jun 17-19, 2002" + }, + { + "type": "Title", + "element_id": "0c29c88f4b31c6f5caf8b36885c4c1c6", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Type: Conference" + }, + { + "type": "Title", + "element_id": "b537e10ee8e78ec7a18792eaa76ce0e4", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Status: Planned" + }, + { + "type": "UncategorizedText", + "element_id": "6cbe5bf59dabd290307e79547b2a86f2", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "StartDate: 2023-07-09" + }, + { + "type": "UncategorizedText", + "element_id": "6cb3c2919b2c47be4b187d17c316a539", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "EndDate: 2023-07-11" + }, + { + "type": "Title", + "element_id": "3062c8ae2aae8afe4e38d8fa2a6ea248", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "BudgetedCost: 100000.0" + }, + { + "type": "Title", + "element_id": "754630afa9bba639e59a8a80785f2766", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "ActualCost: None" + }, + { + "type": "Title", + "element_id": "8750ec1f6f59b282d104b919c7ffab0f", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Description: None" + }, + { + "type": "Title", + "element_id": "54e16b100bf2118d4c7c18c3f93e2223", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "NumberOfLeads: 0" + }, + { + "type": "Title", + "element_id": "db1ed63f2ed83e51f75619047c417e49", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "NumberOfConvertedLeads: 0" + } +] \ No newline at end of file diff --git a/test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9GIAU.json b/test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9GIAU.json new file mode 100644 index 0000000000..d87da83622 --- /dev/null +++ b/test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9GIAU.json @@ -0,0 +1,101 @@ +[ + { + "type": "UncategorizedText", + "element_id": "153e1fa63953e7e19f9004e0253eab68", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Id: 701Hu000001eX9GIAU" + }, + { + "type": "Title", + "element_id": "9b9c5e71eff6a483e85da52d1a1f1005", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Name: DM Campaign to Top Customers - Nov 12-23, 2001" + }, + { + "type": "Title", + "element_id": "28fa7658294d152358d23d8bde3c9e56", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Type: Direct Mail" + }, + { + "type": "Title", + "element_id": "ad1b8a8ebbde05c57a773f60045de6f6", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Status: Completed" + }, + { + "type": "UncategorizedText", + "element_id": "59b06bd535fc60e446ce4f6db6392a8d", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "StartDate: 2022-12-04" + }, + { + "type": "UncategorizedText", + "element_id": "a7caa29a82d158a422b901babed10321", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "EndDate: 2022-12-15" + }, + { + "type": "Title", + "element_id": "a830028696ccdc6c73f26e2f5f0b3e0d", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "BudgetedCost: 25000.0" + }, + { + "type": "Title", + "element_id": "f393ef1a67e6b638d6825e00ffa85b5e", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "ActualCost: 23500.0" + }, + { + "type": "Title", + "element_id": "8750ec1f6f59b282d104b919c7ffab0f", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Description: None" + }, + { + "type": "Title", + "element_id": "54e16b100bf2118d4c7c18c3f93e2223", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "NumberOfLeads: 0" + }, + { + "type": "Title", + "element_id": "db1ed63f2ed83e51f75619047c417e49", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "NumberOfConvertedLeads: 0" + } +] \ No newline at end of file diff --git a/test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9HIAU.json b/test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9HIAU.json new file mode 100644 index 0000000000..36753980ba --- /dev/null +++ b/test_unstructured_ingest/expected-structured-output/salesforce/Campaign/701Hu000001eX9HIAU.json @@ -0,0 +1,101 @@ +[ + { + "type": "UncategorizedText", + "element_id": "697cb5681a4f17c6cb712dfce64ae2d1", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Id: 701Hu000001eX9HIAU" + }, + { + "type": "Title", + "element_id": "37c6c6fcf92fd42c703ad967a4691a32", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Name: International Electrical Engineers Association Trade Show - Mar 4-5, 2002" + }, + { + "type": "Title", + "element_id": "a4b5a79024228eb84bcefe4bfe8bce47", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Type: Trade Show" + }, + { + "type": "Title", + "element_id": "b537e10ee8e78ec7a18792eaa76ce0e4", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Status: Planned" + }, + { + "type": "UncategorizedText", + "element_id": "5d68899808565a0eb340e7ce9a42c981", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "StartDate: 2023-03-26" + }, + { + "type": "UncategorizedText", + "element_id": "eb23b79d3e286bef615fa4bf7bbe6c6d", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "EndDate: 2023-03-27" + }, + { + "type": "Title", + "element_id": "2bb64865cdfea5db0f000dde162fc372", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "BudgetedCost: 50000.0" + }, + { + "type": "Title", + "element_id": "754630afa9bba639e59a8a80785f2766", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "ActualCost: None" + }, + { + "type": "Title", + "element_id": "8750ec1f6f59b282d104b919c7ffab0f", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "Description: None" + }, + { + "type": "Title", + "element_id": "54e16b100bf2118d4c7c18c3f93e2223", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "NumberOfLeads: 0" + }, + { + "type": "Title", + "element_id": "db1ed63f2ed83e51f75619047c417e49", + "metadata": { + "data_source": {}, + "filetype": "text/plain" + }, + "text": "NumberOfConvertedLeads: 0" + } +] \ No newline at end of file diff --git a/test_unstructured_ingest/expected-structured-output/salesforce/EmailMessage/02sHu00001efErPIAU.json b/test_unstructured_ingest/expected-structured-output/salesforce/EmailMessage/02sHu00001efErPIAU.json new file mode 100644 index 0000000000..3c3dd2f35f --- /dev/null +++ b/test_unstructured_ingest/expected-structured-output/salesforce/EmailMessage/02sHu00001efErPIAU.json @@ -0,0 +1,18 @@ +[ + { + "type": "NarrativeText", + "element_id": "d954fa8e82ded23ebde30b2d53d5f81d", + "metadata": { + "data_source": {}, + "filetype": "message/rfc822", + "sent_from": [ + "devops+salesforce-connector@unstructured.io" + ], + "sent_to": [ + "jane_gray@uoa.edu" + ], + "subject": "Test of email 1" + }, + "text": "Jane. This is a test of sending you an email from Salesforce!\n\n_____________________________________________________________________\nPowered by Salesforce\nhttp://www.salesforce.com/" + } +] \ No newline at end of file diff --git a/test_unstructured_ingest/expected-structured-output/salesforce/EmailMessage/02sHu00001efErQIAU.json b/test_unstructured_ingest/expected-structured-output/salesforce/EmailMessage/02sHu00001efErQIAU.json new file mode 100644 index 0000000000..903312d7d5 --- /dev/null +++ b/test_unstructured_ingest/expected-structured-output/salesforce/EmailMessage/02sHu00001efErQIAU.json @@ -0,0 +1,18 @@ +[ + { + "type": "NarrativeText", + "element_id": "f5ac98aa9002453f536877714c5eb88d", + "metadata": { + "data_source": {}, + "filetype": "message/rfc822", + "sent_from": [ + "devops+salesforce-connector@unstructured.io" + ], + "sent_to": [ + "sean@edge.com" + ], + "subject": "Test of Salesforce 2" + }, + "text": "Hey Sean.\n\nTesting email parsing here.\nType: email\n\nJust testing the email system\n\n_____________________________________________________________________\nPowered by Salesforce\nhttp://www.salesforce.com/" + } +] \ No newline at end of file diff --git a/test_unstructured_ingest/test-ingest-box.sh b/test_unstructured_ingest/test-ingest-box.sh index 48fb9a224b..27f8b87b42 100755 --- a/test_unstructured_ingest/test-ingest-box.sh +++ b/test_unstructured_ingest/test-ingest-box.sh @@ -27,13 +27,12 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ --download-dir "$DOWNLOAD_DIR" \ --box-app-config "$BOX_APP_CONFIG_PATH" \ --remote-url box://utic-test-ingest-fixtures \ - --structured-output-dir box-output \ + --structured-output-dir "$OUTPUT_DIR" \ --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified \ --num-processes 2 \ --preserve-downloads \ --recursive \ --reprocess \ - --structured-output-dir "$OUTPUT_DIR" \ --verbose sh "$SCRIPT_DIR"/check-diff-expected-output.sh $OUTPUT_FOLDER_NAME \ No newline at end of file diff --git a/test_unstructured_ingest/test-ingest-salesforce.sh b/test_unstructured_ingest/test-ingest-salesforce.sh new file mode 100755 index 0000000000..303c65f847 --- /dev/null +++ b/test_unstructured_ingest/test-ingest-salesforce.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +# Set either SALESFORCE_PRIVATE_KEY (app config json content as string) or +# SALESFORCE_PRIVATE_KEY_PATH (path to app config json file) env var + +set -e + +SCRIPT_DIR=$(dirname "$(realpath "$0")") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=salesforce +OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME +DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME + +if [ -z "$SALESFORCE_PRIVATE_KEY" ] && [ -z "$SALESFORCE_PRIVATE_KEY_PATH" ]; then + echo "Skipping Salesforce ingest test because neither SALESFORCE_PRIVATE_KEY nor SALESFORCE_PRIVATE_KEY_PATH env vars are set." + exit 0 +fi + +if [ -z "$SALESFORCE_PRIVATE_KEY_PATH" ]; then + # Create temporary service key file + SALESFORCE_PRIVATE_KEY_PATH=$(mktemp) + echo "$SALESFORCE_PRIVATE_KEY" >"$SALESFORCE_PRIVATE_KEY_PATH" +fi + +PYTHONPATH=. ./unstructured/ingest/main.py \ + salesforce \ + --categories "EmailMessage,Campaign" \ + --download-dir "$DOWNLOAD_DIR" \ + --username "$SALESFORCE_USERNAME" \ + --consumer-key "$SALESFORCE_CONSUMER_KEY" \ + --private-key-path "$SALESFORCE_PRIVATE_KEY_PATH" \ + --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified \ + --num-processes 2 \ + --preserve-downloads \ + --recursive \ + --reprocess \ + --structured-output-dir "$OUTPUT_DIR" \ + --verbose + +sh "$SCRIPT_DIR"/check-diff-expected-output.sh $OUTPUT_FOLDER_NAME \ No newline at end of file diff --git a/test_unstructured_ingest/test-ingest.sh b/test_unstructured_ingest/test-ingest.sh index d22767862b..75f696cd01 100755 --- a/test_unstructured_ingest/test-ingest.sh +++ b/test_unstructured_ingest/test-ingest.sh @@ -35,6 +35,7 @@ export OMP_THREAD_LIMIT=1 ./test_unstructured_ingest/test-ingest-local-single-file-with-pdf-infer-table-structure.sh ./test_unstructured_ingest/test-ingest-notion.sh ./test_unstructured_ingest/test-ingest-delta-table.sh +./test_unstructured_ingest/test-ingest-salesforce.sh # NOTE(yuming): The following test should be put after any tests with --preserve-downloads option ./test_unstructured_ingest/test-ingest-pdf-fast-reprocess.sh ./test_unstructured_ingest/test-ingest-sharepoint.sh diff --git a/unstructured/__version__.py b/unstructured/__version__.py index e6e3d9462b..d89caf2384 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.10.12-dev2" # pragma: no cover +__version__ = "0.10.12-dev3" # pragma: no cover diff --git a/unstructured/ingest/cli/cli.py b/unstructured/ingest/cli/cli.py index eaa7e57df8..9a39d483bc 100644 --- a/unstructured/ingest/cli/cli.py +++ b/unstructured/ingest/cli/cli.py @@ -32,6 +32,7 @@ def ingest(): cli_cmds.confluence, cli_cmds.sharepoint, cli_cmds.airtable, + cli_cmds.salesforce, ] for subcommand in subcommands: diff --git a/unstructured/ingest/cli/cmds/__init__.py b/unstructured/ingest/cli/cmds/__init__.py index e2937b783d..2e488dbcd4 100644 --- a/unstructured/ingest/cli/cmds/__init__.py +++ b/unstructured/ingest/cli/cmds/__init__.py @@ -23,6 +23,7 @@ from .reddit import get_cmd as reddit from .s3_2 import get_dest_cmd as s3_dest from .s3_2 import get_source_cmd as s3 +from .salesforce import get_cmd as salesforce from .sharepoint import get_cmd as sharepoint from .slack import get_cmd as slack from .wikipedia import get_cmd as wikipedia @@ -52,6 +53,7 @@ "outlook", "reddit", "s3", + "salesforce", "sharepoint", "slack", "wikipedia", diff --git a/unstructured/ingest/cli/cmds/salesforce.py b/unstructured/ingest/cli/cmds/salesforce.py new file mode 100644 index 0000000000..82142ff08f --- /dev/null +++ b/unstructured/ingest/cli/cmds/salesforce.py @@ -0,0 +1,62 @@ +import logging + +import click + +from unstructured.ingest.cli.common import ( + add_recursive_option, + add_shared_options, + log_options, + map_to_processor_config, + map_to_standard_config, + run_init_checks, +) +from unstructured.ingest.logger import ingest_log_streaming_init, logger +from unstructured.ingest.runner import salesforce as salesforce_fn + + +@click.command() +@click.option( + "--categories", + default=None, + required=True, + help="Comma separated list of Salesforce categories to download. " + "Currently only Account, Case, Campaign, EmailMessage, Lead.", +) +@click.option( + "--username", + required=True, + help="Salesforce username usually looks like an email.", +) +@click.option( + "--consumer-key", + required=True, + help="For the Salesforce JWT auth. Found in Consumer Details.", +) +@click.option( + "--private-key-path", + required=True, + help="Path to the private key for the Salesforce JWT auth. Usually named server.key.", +) +def salesforce(**options): + verbose = options.get("verbose", False) + ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) + log_options(options) + try: + run_init_checks(**options) + connector_config = map_to_standard_config(options) + processor_config = map_to_processor_config(options) + salesforce_fn( + connector_config=connector_config, + processor_config=processor_config, + **options, + ) + except Exception as e: + logger.error(e, exc_info=True) + raise click.ClickException(str(e)) from e + + +def get_cmd() -> click.Command: + cmd = salesforce + add_recursive_option(cmd) + add_shared_options(cmd) + return cmd diff --git a/unstructured/ingest/connector/registry.py b/unstructured/ingest/connector/registry.py index 47fc95501b..b0b3a1ab6d 100644 --- a/unstructured/ingest/connector/registry.py +++ b/unstructured/ingest/connector/registry.py @@ -25,6 +25,7 @@ from unstructured.ingest.connector.outlook import OutlookIngestDoc from unstructured.ingest.connector.reddit import RedditIngestDoc from unstructured.ingest.connector.s3_2 import S3IngestDoc +from unstructured.ingest.connector.salesforce import SalesforceIngestDoc from unstructured.ingest.connector.sharepoint import SharepointIngestDoc from unstructured.ingest.connector.slack import SlackIngestDoc from unstructured.ingest.connector.wikipedia import ( @@ -55,6 +56,7 @@ "outlook": OutlookIngestDoc, "reddit": RedditIngestDoc, "s3": S3IngestDoc, + "salesforce": SalesforceIngestDoc, "sharepoint": SharepointIngestDoc, "slack": SlackIngestDoc, "wikipedia_html": WikipediaIngestHTMLDoc, diff --git a/unstructured/ingest/connector/salesforce.py b/unstructured/ingest/connector/salesforce.py new file mode 100644 index 0000000000..ed0d2b0ca4 --- /dev/null +++ b/unstructured/ingest/connector/salesforce.py @@ -0,0 +1,340 @@ +""" +Salesforce Connector +Able to download Account, Case, Campaign, EmailMessage, Lead +Salesforce returns everything as a list of json. +This saves each entry as a separate file to be partitioned. +Using JWT authorization +https://developer.salesforce.com/docs/atlas.en-us.sfdx_dev.meta/sfdx_dev/sfdx_dev_auth_key_and_cert.htm +https://developer.salesforce.com/docs/atlas.en-us.sfdx_dev.meta/sfdx_dev/sfdx_dev_auth_connected_app.htm +""" +import os +from dataclasses import dataclass +from email.utils import formatdate +from pathlib import Path +from string import Template +from textwrap import dedent +from typing import Any, Dict, List, Type + +from dateutil import parser # type: ignore + +from unstructured.ingest.interfaces import ( + BaseConnector, + BaseConnectorConfig, + BaseIngestDoc, + ConnectorCleanupMixin, + IngestDocCleanupMixin, + StandardConnectorConfig, +) +from unstructured.ingest.logger import logger +from unstructured.utils import requires_dependencies + + +class MissingCategoryError(Exception): + """There are no categories with that name.""" + + +ACCEPTED_CATEGORIES = ["Account", "Case", "Campaign", "EmailMessage", "Lead"] + + +EMAIL_TEMPLATE = Template( + """MIME-Version: 1.0 +Date: $date +Message-ID: $message_identifier +Subject: $subject +From: $from_email +To: $to_email +Content-Type: multipart/alternative; boundary="00000000000095c9b205eff92630" +--00000000000095c9b205eff92630 +Content-Type: text/plain; charset="UTF-8" +$textbody +--00000000000095c9b205eff92630 +Content-Type: text/html; charset="UTF-8" +$textbody +--00000000000095c9b205eff92630-- +""", +) + +ACCOUNT_TEMPLATE = Template( + """Id: $id +Name: $name +Type: $account_type +Phone: $phone +AccountNumber: $account_number +Website: $website +Industry: $industry +AnnualRevenue: $annual_revenue +NumberOfEmployees: $number_employees +Ownership: $ownership +TickerSymbol: $ticker_symbol +Description: $description +Rating: $rating +DandbCompanyId: $dnb_id +""", +) + +LEAD_TEMPLATE = Template( + """Id: $id +Name: $name +Title: $title +Company: $company +Phone: $phone +Email: $email +Website: $website +Description: $description +LeadSource: $lead_source +Rating: $rating +Status: $status +Industry: $industry +""", +) + +CASE_TEMPLATE = Template( + """Id: $id +Type: $type +Status: $status +Reason: $reason +Origin: $origin +Subject: $subject +Priority: $priority +Description: $description +Comments: $comments +""", +) + +CAMPAIGN_TEMPLATE = Template( + """Id: $id +Name: $name +Type: $type +Status: $status +StartDate: $start_date +EndDate: $end_date +BudgetedCost: $budgeted_cost +ActualCost: $actual_cost +Description: $description +NumberOfLeads: $number_of_leads +NumberOfConvertedLeads: $number_of_converted_leads +""", +) + + +@dataclass +class SimpleSalesforceConfig(BaseConnectorConfig): + """Connector specific attributes""" + + categories: List[str] + username: str + consumer_key: str + private_key_path: str + recursive: bool = False + + @staticmethod + def parse_folders(folder_str: str) -> List[str]: + """Parses a comma separated string of Outlook folders into a list.""" + return [x.strip() for x in folder_str.split(",")] + + @requires_dependencies(["simple_salesforce"], extras="salesforce") + def _get_client(self): + from simple_salesforce import Salesforce + + return Salesforce( + username=self.username, + consumer_key=self.consumer_key, + privatekey_file=self.private_key_path, + ) + + +@dataclass +class SalesforceIngestDoc(IngestDocCleanupMixin, BaseIngestDoc): + record_type: str + record_id: str + config: SimpleSalesforceConfig + registry_name: str = "salesforce" + + def _tmp_download_file(self) -> Path: + if self.record_type == "EmailMessage": + record_file = self.record_id + ".eml" + elif self.record_type in ["Account", "Lead", "Case", "Campaign"]: + record_file = self.record_id + ".txt" + else: + raise MissingCategoryError( + f"There are no categories with the name: {self.record_type}", + ) + return Path(self.standard_config.download_dir) / self.record_type / record_file + + @property + def _output_filename(self) -> Path: + record_file = self.record_id + ".json" + return Path(self.standard_config.output_dir) / self.record_type / record_file + + def _create_full_tmp_dir_path(self): + self._tmp_download_file().parent.mkdir(parents=True, exist_ok=True) + + def create_account(self, account_json: Dict[str, Any]) -> str: + """Creates partitionable account file""" + account = ACCOUNT_TEMPLATE.substitute( + id=account_json.get("Id"), + name=account_json.get("Name"), + account_type=account_json.get("Type"), + phone=account_json.get("Phone"), + account_number=account_json.get("AccountNumber"), + website=account_json.get("Website"), + industry=account_json.get("Industry"), + annual_revenue=account_json.get("AnnualRevenue"), + number_employees=account_json.get("NumberOfEmployees"), + ownership=account_json.get("Ownership"), + ticker_symbol=account_json.get("TickerSymbol"), + description=account_json.get("Description"), + rating=account_json.get("Rating"), + dnb_id=account_json.get("DandbCompanyId"), + ) + return dedent(account) + + def create_lead(self, lead_json: Dict[str, Any]) -> str: + """Creates partitionable lead file""" + lead = LEAD_TEMPLATE.substitute( + id=lead_json.get("Id"), + name=lead_json.get("Name"), + title=lead_json.get("Title"), + company=lead_json.get("Company"), + phone=lead_json.get("Phone"), + email=lead_json.get("Email"), + website=lead_json.get("Website"), + description=lead_json.get("Description"), + lead_source=lead_json.get("LeadSource"), + rating=lead_json.get("Rating"), + status=lead_json.get("Status"), + industry=lead_json.get("Industry"), + ) + return dedent(lead) + + def create_case(self, case_json: Dict[str, Any]) -> str: + """Creates partitionable case file""" + case = CASE_TEMPLATE.substitute( + id=case_json.get("Id"), + type=case_json.get("Type"), + status=case_json.get("Status"), + reason=case_json.get("Reason"), + origin=case_json.get("Origin"), + subject=case_json.get("Subject"), + priority=case_json.get("Priority"), + description=case_json.get("Description"), + comments=case_json.get("Comments"), + ) + return dedent(case) + + def create_campaign(self, campaign_json: Dict[str, Any]) -> str: + """Creates partitionable campaign file""" + campaign = CAMPAIGN_TEMPLATE.substitute( + id=campaign_json.get("Id"), + name=campaign_json.get("Name"), + type=campaign_json.get("Type"), + status=campaign_json.get("Status"), + start_date=campaign_json.get("StartDate"), + end_date=campaign_json.get("EndDate"), + budgeted_cost=campaign_json.get("BudgetedCost"), + actual_cost=campaign_json.get("ActualCost"), + description=campaign_json.get("Description"), + number_of_leads=campaign_json.get("NumberOfLeads"), + number_of_converted_leads=campaign_json.get("NumberOfConvertedLeads"), + ) + return dedent(campaign) + + def create_eml(self, email_json: Dict[str, Any]) -> str: + """Recreates standard expected .eml format using template.""" + eml = EMAIL_TEMPLATE.substitute( + date=formatdate(parser.parse(email_json.get("MessageDate")).timestamp()), + message_identifier=email_json.get("MessageIdentifier"), + subject=email_json.get("Subject"), + from_email=email_json.get("FromAddress"), + to_email=email_json.get("ToAddress"), + textbody=email_json.get("TextBody"), + ) + return dedent(eml) + + @BaseIngestDoc.skip_if_file_exists + def get_file(self): + """Saves individual json records locally.""" + self._create_full_tmp_dir_path() + logger.debug(f"Writing file {self.record_id} - PID: {os.getpid()}") + + client = self.config._get_client() + + # Get record from Salesforce based on id + record = client.query_all( + f"select FIELDS(STANDARD) from {self.record_type} where Id='{self.record_id}'", + )["records"][0] + + try: + if self.record_type == "EmailMessage": + formatted_record = self.create_eml(record) + elif self.record_type == "Account": + formatted_record = self.create_account(record) + elif self.record_type == "Lead": + formatted_record = self.create_lead(record) + elif self.record_type == "Case": + formatted_record = self.create_case(record) + elif self.record_type == "Campaign": + formatted_record = self.create_campaign(record) + + with open(self._tmp_download_file(), "w") as page_file: + page_file.write(formatted_record) + + except Exception as e: + logger.error( + f"Error while downloading and saving file: {self.record_id}.", + ) + logger.error(e) + + @property + def filename(self): + """The filename of the file created from a Salesforce record""" + return self._tmp_download_file() + + +class SalesforceConnector(ConnectorCleanupMixin, BaseConnector): + ingest_doc_cls: Type[SalesforceIngestDoc] = SalesforceIngestDoc + config: SimpleSalesforceConfig + + def __init__( + self, + config: SimpleSalesforceConfig, + standard_config: StandardConnectorConfig, + ) -> None: + super().__init__(standard_config, config) + + def initialize(self): + pass + + @requires_dependencies(["simple_salesforce"], extras="salesforce") + def get_ingest_docs(self) -> List[SalesforceIngestDoc]: + """Get Salesforce Ids for the records. + Send them to next phase where each doc gets downloaded into the + appropriate format for partitioning. + """ + from simple_salesforce.exceptions import SalesforceMalformedRequest + + client = self.config._get_client() + + ingest_docs = [] + for record_type in self.config.categories: + if record_type not in ACCEPTED_CATEGORIES: + raise ValueError(f"{record_type} not currently an accepted Salesforce category") + + try: + # Get ids from Salesforce + records = client.query_all( + f"select Id from {record_type}", + ) + for record in records["records"]: + ingest_docs.append( + SalesforceIngestDoc( + self.standard_config, + self.config, + record_type, + record["Id"], + ), + ) + except SalesforceMalformedRequest as e: + raise SalesforceMalformedRequest(f"Problem with Salesforce query: {e}") + + return ingest_docs diff --git a/unstructured/ingest/processor.py b/unstructured/ingest/processor.py index c3626c31de..98636037a5 100644 --- a/unstructured/ingest/processor.py +++ b/unstructured/ingest/processor.py @@ -76,7 +76,8 @@ def run(self): # Debugging tip: use the below line and comment out the mp.Pool loop # block to remain in single process - # self.doc_processor_fn(docs[0]) + # json_docs = [doc.to_json() for doc in docs] + # self.doc_processor_fn(json_docs[0]) logger.info(f"Processing {len(docs)} docs") json_docs = [doc.to_json() for doc in docs] try: diff --git a/unstructured/ingest/runner/__init__.py b/unstructured/ingest/runner/__init__.py index 726dc242fb..fe9819a2ac 100644 --- a/unstructured/ingest/runner/__init__.py +++ b/unstructured/ingest/runner/__init__.py @@ -18,6 +18,7 @@ from .outlook import outlook from .reddit import reddit from .s3 import s3 +from .salesforce import salesforce from .sharepoint import sharepoint from .slack import slack from .wikipedia import wikipedia @@ -43,6 +44,7 @@ "outlook", "reddit", "s3", + "salesforce", "sharepoint", "slack", "wikipedia", diff --git a/unstructured/ingest/runner/salesforce.py b/unstructured/ingest/runner/salesforce.py new file mode 100644 index 0000000000..cb0e3ffb10 --- /dev/null +++ b/unstructured/ingest/runner/salesforce.py @@ -0,0 +1,47 @@ +import hashlib +import logging + +from unstructured.ingest.interfaces import ProcessorConfigs, StandardConnectorConfig +from unstructured.ingest.logger import ingest_log_streaming_init, logger +from unstructured.ingest.processor import process_documents +from unstructured.ingest.runner.utils import update_download_dir_hash + + +def salesforce( + verbose: bool, + connector_config: StandardConnectorConfig, + processor_config: ProcessorConfigs, + recursive: bool, + categories: str, + username: str, + consumer_key: str, + private_key_path: str, + **kwargs, +): + ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) + + hashed_dir_name = hashlib.sha256(username.encode("utf-8")) + connector_config.download_dir = update_download_dir_hash( + connector_name="salesforce", + connector_config=connector_config, + hashed_dir_name=hashed_dir_name, + logger=logger, + ) + + from unstructured.ingest.connector.salesforce import ( + SalesforceConnector, + SimpleSalesforceConfig, + ) + + doc_connector = SalesforceConnector( # type: ignore + standard_config=connector_config, + config=SimpleSalesforceConfig( + categories=SimpleSalesforceConfig.parse_folders(categories), + username=username, + consumer_key=consumer_key, + private_key_path=private_key_path, + recursive=recursive, + ), + ) + + process_documents(doc_connector=doc_connector, processor_config=processor_config)