From ed7f991ab95459467ae100e452c4570417457d2c Mon Sep 17 00:00:00 2001 From: Roman Isecke <136338424+rbiseck3@users.noreply.github.com> Date: Thu, 31 Aug 2023 18:19:53 -0400 Subject: [PATCH] Add s3 writer (#1223) ### Description Convert s3 cli code to also support writing to s3. Writers are added as optional subcommands to the parent command with their own arguments. Custom `click.Group` introduced to add some custom formatting and text in help messages. To limit the scope of this PR, most existing files were not touched but instead new files were added for the new flow. This allowed _only_ the s3 connector to be updated without breaking any other ones. --- CHANGELOG.md | 2 + Makefile | 50 +-- examples/ingest/s3-small-batch/ingest.sh | 8 +- requirements/dev.txt | 9 +- requirements/extra-pdf-image.txt | 1 + requirements/huggingface.txt | 1 + requirements/ingest-elasticsearch.txt | 2 +- requirements/ingest-notion.txt | 2 +- test_unstructured_ingest/test-ingest-s3.sh | 10 +- unstructured/ingest/cli/cli.py | 7 +- unstructured/ingest/cli/cmds/__init__.py | 13 +- unstructured/ingest/cli/cmds/s3_2.py | 114 +++++ unstructured/ingest/cli/cmds/utils.py | 54 +++ unstructured/ingest/cli/common.py | 4 +- unstructured/ingest/cli/interfaces.py | 175 ++++++++ unstructured/ingest/connector/registry.py | 11 +- unstructured/ingest/connector/s3_2.py | 264 ++++++++++++ .../ingest/doc_processor/generalized.py | 4 +- unstructured/ingest/interfaces2.py | 395 ++++++++++++++++++ unstructured/ingest/processor2.py | 125 ++++++ unstructured/ingest/runner/__init__.py | 1 + unstructured/ingest/runner/s3_2.py | 55 +++ unstructured/ingest/runner/utils2.py | 47 +++ unstructured/ingest/runner/writers.py | 22 + 24 files changed, 1313 insertions(+), 63 deletions(-) create mode 100644 unstructured/ingest/cli/cmds/s3_2.py create mode 100644 unstructured/ingest/cli/cmds/utils.py create mode 100644 unstructured/ingest/cli/interfaces.py create mode 100644 unstructured/ingest/connector/s3_2.py create mode 100644 unstructured/ingest/interfaces2.py create mode 100644 unstructured/ingest/processor2.py create mode 100644 unstructured/ingest/runner/s3_2.py create mode 100644 unstructured/ingest/runner/utils2.py create mode 100644 unstructured/ingest/runner/writers.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f26834366..7594afc96b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features +* Add in ingest cli s3 writer + ### Fixes ## 0.10.10 diff --git a/Makefile b/Makefile index 9dc2ba765e..1aa286ed85 100644 --- a/Makefile +++ b/Makefile @@ -204,49 +204,13 @@ install-pandoc: ## pip-compile: compiles all base/dev/test requirements .PHONY: pip-compile pip-compile: - pip-compile --upgrade requirements/base.in - - # Extra requirements that are specific to document types - pip-compile --upgrade requirements/extra-csv.in - pip-compile --upgrade requirements/extra-docx.in - pip-compile --upgrade requirements/extra-epub.in - pip-compile --upgrade requirements/extra-pandoc.in - pip-compile --upgrade requirements/extra-markdown.in - pip-compile --upgrade requirements/extra-msg.in - pip-compile --upgrade requirements/extra-odt.in - pip-compile --upgrade requirements/extra-pdf-image.in - pip-compile --upgrade requirements/extra-pptx.in - pip-compile --upgrade requirements/extra-xlsx.in - - # Extra requirements for huggingface staging functions - pip-compile --upgrade requirements/huggingface.in - pip-compile --upgrade requirements/test.in - pip-compile --upgrade requirements/dev.in - pip-compile --upgrade requirements/build.in - # NOTE(robinson) - docs/requirements.txt is where the GitHub action for building - # sphinx docs looks for additional requirements + @for file in $(shell ls requirements/*.in); \ + do echo "running: pip-compile --upgrade $${file}" && \ + pip-compile --upgrade $${file}; \ + done cp requirements/build.txt docs/requirements.txt - pip-compile --upgrade requirements/ingest-s3.in - pip-compile --upgrade requirements/ingest-biomed.in - pip-compile --upgrade requirements/ingest-box.in - pip-compile --upgrade requirements/ingest-gcs.in - pip-compile --upgrade requirements/ingest-dropbox.in - pip-compile --upgrade requirements/ingest-azure.in - pip-compile --upgrade requirements/ingest-delta-table.in - pip-compile --upgrade requirements/ingest-discord.in - pip-compile --upgrade requirements/ingest-reddit.in - pip-compile --upgrade requirements/ingest-github.in - pip-compile --upgrade requirements/ingest-gitlab.in - pip-compile --upgrade requirements/ingest-slack.in - pip-compile --upgrade requirements/ingest-wikipedia.in - pip-compile --upgrade requirements/ingest-google-drive.in - pip-compile --upgrade requirements/ingest-elasticsearch.in - pip-compile --upgrade requirements/ingest-onedrive.in - pip-compile --upgrade requirements/ingest-outlook.in - pip-compile --upgrade requirements/ingest-confluence.in - pip-compile --upgrade requirements/ingest-airtable.in - pip-compile --upgrade requirements/ingest-sharepoint.in - pip-compile --upgrade requirements/ingest-notion.in + + ## install-project-local: install unstructured into your local python environment .PHONY: install-project-local @@ -281,7 +245,7 @@ test-no-extras: test_${PACKAGE_NAME}/partition/test_text.py \ test_${PACKAGE_NAME}/partition/test_email.py \ test_${PACKAGE_NAME}/partition/test_html_partition.py \ - test_${PACKAGE_NAME}/partition/test_xml_partition.py + test_${PACKAGE_NAME}/partition/test_xml_partition.py .PHONY: test-extra-csv test-extra-csv: diff --git a/examples/ingest/s3-small-batch/ingest.sh b/examples/ingest/s3-small-batch/ingest.sh index 4d69d8bcb5..4c2406d038 100755 --- a/examples/ingest/s3-small-batch/ingest.sh +++ b/examples/ingest/s3-small-batch/ingest.sh @@ -12,5 +12,9 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ s3 \ --remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \ --anonymous \ - --structured-output-dir s3-small-batch-output \ - --num-processes 2 + --output-dir s3-small-batch-output \ + --num-processes 2 \ + --verbose \ + s3 \ + --anonymous \ + --remote-url s3://utic-dev-tech-fixtures/small-pdf-set-output diff --git a/requirements/dev.txt b/requirements/dev.txt index 0d7c376774..185009ac0e 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -4,7 +4,7 @@ # # pip-compile requirements/dev.in # -anyio==3.7.1 +anyio==4.0.0 # via jupyter-server appnope==0.1.3 # via @@ -101,7 +101,6 @@ importlib-resources==6.0.1 # jsonschema # jsonschema-specifications # jupyterlab - # notebook ipykernel==6.25.1 # via # jupyter @@ -167,7 +166,7 @@ jupyter-events==0.7.0 # via jupyter-server jupyter-lsp==2.2.0 # via jupyterlab -jupyter-server==2.7.2 +jupyter-server==2.7.3 # via # jupyter-lsp # jupyterlab @@ -211,7 +210,7 @@ nest-asyncio==1.5.7 # via ipykernel nodeenv==1.8.0 # via pre-commit -notebook==7.0.2 +notebook==7.0.3 # via jupyter notebook-shim==0.2.3 # via @@ -391,7 +390,7 @@ urllib3==1.26.16 # -c requirements/constraints.in # -c requirements/test.txt # requests -virtualenv==20.24.3 +virtualenv==20.24.4 # via pre-commit wcwidth==0.2.6 # via prompt-toolkit diff --git a/requirements/extra-pdf-image.txt b/requirements/extra-pdf-image.txt index 827e9be991..a9e6b4a44c 100644 --- a/requirements/extra-pdf-image.txt +++ b/requirements/extra-pdf-image.txt @@ -202,6 +202,7 @@ transformers==4.32.1 # via unstructured-inference typing-extensions==4.7.1 # via + # -c requirements/base.txt # filelock # huggingface-hub # iopath diff --git a/requirements/huggingface.txt b/requirements/huggingface.txt index 66e1ab42b9..25ea178663 100644 --- a/requirements/huggingface.txt +++ b/requirements/huggingface.txt @@ -95,6 +95,7 @@ transformers==4.32.1 # via -r requirements/huggingface.in typing-extensions==4.7.1 # via + # -c requirements/base.txt # filelock # huggingface-hub # torch diff --git a/requirements/ingest-elasticsearch.txt b/requirements/ingest-elasticsearch.txt index ae8e5b4d98..89f91ad54b 100644 --- a/requirements/ingest-elasticsearch.txt +++ b/requirements/ingest-elasticsearch.txt @@ -13,7 +13,7 @@ elastic-transport==8.4.0 # via elasticsearch elasticsearch==8.9.0 # via -r requirements/ingest-elasticsearch.in -jq==1.4.1 +jq==1.5.0 # via -r requirements/ingest-elasticsearch.in urllib3==1.26.16 # via diff --git a/requirements/ingest-notion.txt b/requirements/ingest-notion.txt index ca9e500d4b..a5cb1c36a7 100644 --- a/requirements/ingest-notion.txt +++ b/requirements/ingest-notion.txt @@ -4,7 +4,7 @@ # # pip-compile requirements/ingest-notion.in # -anyio==3.7.1 +anyio==4.0.0 # via httpcore certifi==2023.7.22 # via diff --git a/test_unstructured_ingest/test-ingest-s3.sh b/test_unstructured_ingest/test-ingest-s3.sh index 195d537115..14f683d16f 100755 --- a/test_unstructured_ingest/test-ingest-s3.sh +++ b/test_unstructured_ingest/test-ingest-s3.sh @@ -13,11 +13,15 @@ sh "$SCRIPT_DIR"/check-num-files-expected-output.sh 3 $OUTPUT_FOLDER_NAME 20k PYTHONPATH=. ./unstructured/ingest/main.py \ s3 \ --download-dir "$DOWNLOAD_DIR" \ - --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified \ - --partition-strategy hi_res \ + --metadata-exclude coordinates \ + --metadata-exclude filename \ + --metadata-exclude file_directory \ + --metadata-exclude metadata.data_source.date_processed \ + --metadata-exclude metadata.last_modified \ + --strategy hi_res \ --preserve-downloads \ --reprocess \ - --structured-output-dir "$OUTPUT_DIR" \ + --output-dir "$OUTPUT_DIR" \ --verbose \ --remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \ --anonymous diff --git a/unstructured/ingest/cli/cli.py b/unstructured/ingest/cli/cli.py index 2744c6d645..eaa7e57df8 100644 --- a/unstructured/ingest/cli/cli.py +++ b/unstructured/ingest/cli/cli.py @@ -11,7 +11,6 @@ def ingest(): # Dynamically update shared options for supported subcommands subcommands = [ cli_cmds.box, - cli_cmds.s3, cli_cmds.gcs, cli_cmds.delta_table, cli_cmds.dropbox, @@ -42,7 +41,11 @@ def ingest(): def get_cmd() -> click.Command: cmd = ingest # Add all subcommands + for src_subcommand in cli_cmds.src: + # add destination subcommands + for dest_subcommand in cli_cmds.dest: + src_subcommand.add_command(dest_subcommand) + cmd.add_command(src_subcommand) for subcommand in subcommands: - # add_shared_options(cmd) cmd.add_command(subcommand()) return cmd diff --git a/unstructured/ingest/cli/cmds/__init__.py b/unstructured/ingest/cli/cmds/__init__.py index 592b71dce3..e2937b783d 100644 --- a/unstructured/ingest/cli/cmds/__init__.py +++ b/unstructured/ingest/cli/cmds/__init__.py @@ -1,3 +1,7 @@ +import typing as t + +import click + from .airtable import get_cmd as airtable from .azure import get_cmd as azure from .biomed import get_cmd as biomed @@ -17,11 +21,16 @@ from .onedrive import get_cmd as onedrive from .outlook import get_cmd as outlook from .reddit import get_cmd as reddit -from .s3 import get_cmd as s3 +from .s3_2 import get_dest_cmd as s3_dest +from .s3_2 import get_source_cmd as s3 from .sharepoint import get_cmd as sharepoint from .slack import get_cmd as slack from .wikipedia import get_cmd as wikipedia +src: t.List[click.Group] = [s3()] + +dest: t.List[click.Command] = [s3_dest()] + __all__ = [ "airtable", "azure", @@ -46,4 +55,6 @@ "sharepoint", "slack", "wikipedia", + "src", + "dest", ] diff --git a/unstructured/ingest/cli/cmds/s3_2.py b/unstructured/ingest/cli/cmds/s3_2.py new file mode 100644 index 0000000000..e7f63ce87a --- /dev/null +++ b/unstructured/ingest/cli/cmds/s3_2.py @@ -0,0 +1,114 @@ +import logging +from dataclasses import dataclass + +import click + +from unstructured.ingest.cli.cmds.utils import Group +from unstructured.ingest.cli.common import ( + log_options, +) +from unstructured.ingest.cli.interfaces import ( + CliMixin, + CliPartitionConfig, + CliReadConfig, + CliRecursiveConfig, + CliRemoteUrlConfig, +) +from unstructured.ingest.interfaces2 import BaseConfig +from unstructured.ingest.logger import ingest_log_streaming_init, logger +from unstructured.ingest.runner.s3_2 import s3 as s3_fn + + +@dataclass +class S3CliConfigs(BaseConfig, CliMixin): + anonymous: bool = False + + @staticmethod + def add_cli_options(cmd: click.Command) -> None: + options = [ + click.Option( + ["--anonymous"], + is_flag=True, + default=False, + help="Connect to s3 without local AWS credentials.", + ), + ] + cmd.params.extend(options) + + +@click.group(name="s3", invoke_without_command=True, cls=Group) +@click.pass_context +def s3_source(ctx: click.Context, **options): + if ctx.invoked_subcommand: + return + + # Click sets all multiple fields as tuple, this needs to be updated to list + for k, v in options.items(): + if isinstance(v, tuple): + options[k] = list(v) + verbose = options.get("verbose", False) + ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) + log_options(options, verbose=verbose) + try: + # run_init_checks(**options) + read_configs = CliReadConfig.from_dict(options) + partition_configs = CliPartitionConfig.from_dict(options) + # Run for schema validation + S3CliConfigs.from_dict(options) + s3_fn(read_config=read_configs, partition_configs=partition_configs, **options) + except Exception as e: + logger.error(e, exc_info=True) + raise click.ClickException(str(e)) from e + + +@click.command(name="s3") +@click.pass_context +def s3_dest(ctx: click.Context, **options): + parent_options: dict = ctx.parent.params if ctx.parent else {} + # Click sets all multiple fields as tuple, this needs to be updated to list + for k, v in options.items(): + if isinstance(v, tuple): + options[k] = list(v) + for k, v in parent_options.items(): + if isinstance(v, tuple): + parent_options[k] = list(v) + verbose = parent_options.get("verbose", False) + ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) + log_options(parent_options, verbose=verbose) + log_options(options, verbose=verbose) + try: + # run_init_checks(**options) + read_configs = CliReadConfig.from_dict(parent_options) + partition_configs = CliPartitionConfig.from_dict(parent_options) + # Run for schema validation + S3CliConfigs.from_dict(options) + s3_fn( + read_config=read_configs, + partition_configs=partition_configs, + writer_type="s3", + writer_kwargs=options, + **parent_options, + ) + except Exception as e: + logger.error(e, exc_info=True) + raise click.ClickException(str(e)) from e + + +def get_dest_cmd() -> click.Command: + cmd = s3_dest + S3CliConfigs.add_cli_options(cmd) + CliRemoteUrlConfig.add_cli_options(cmd) + return cmd + + +def get_source_cmd() -> click.Group: + cmd = s3_source + S3CliConfigs.add_cli_options(cmd) + CliRemoteUrlConfig.add_cli_options(cmd) + CliRecursiveConfig.add_cli_options(cmd) + + # Common CLI configs + CliReadConfig.add_cli_options(cmd) + CliPartitionConfig.add_cli_options(cmd) + cmd.params.append(click.Option(["-v", "--verbose"], is_flag=True, default=False)) + return cmd diff --git a/unstructured/ingest/cli/cmds/utils.py b/unstructured/ingest/cli/cmds/utils.py new file mode 100644 index 0000000000..88fc50f821 --- /dev/null +++ b/unstructured/ingest/cli/cmds/utils.py @@ -0,0 +1,54 @@ +from gettext import gettext as _ + +import click + + +class Group(click.Group): + def parse_args(self, ctx, args): + """ + This allows for subcommands to be called with the --help flag without breaking + if parent command is missing any of its required parameters + """ + + try: + return super().parse_args(ctx, args) + except click.MissingParameter: + if "--help" not in args: + raise + + # remove the required params so that help can display + for param in self.params: + param.required = False + return super().parse_args(ctx, args) + + def format_commands(self, ctx: click.Context, formatter: click.HelpFormatter) -> None: + """ + Copy of the original click.Group format_commands() method but replacing + 'Commands' -> 'Destinations' + """ + commands = [] + for subcommand in self.list_commands(ctx): + cmd = self.get_command(ctx, subcommand) + # What is this, the tool lied about a command. Ignore it + if cmd is None: + continue + if cmd.hidden: + continue + + commands.append((subcommand, cmd)) + + # allow for 3 times the default spacing + if len(commands): + if formatter.width: + limit = formatter.width - 6 - max(len(cmd[0]) for cmd in commands) + else: + limit = -6 - max(len(cmd[0]) for cmd in commands) + + rows = [] + for subcommand, cmd in commands: + help = cmd.get_short_help_str(limit) + rows.append((subcommand, help)) + + if rows: + with formatter.section(_("Destinations")): + formatter.write_dl(rows) diff --git a/unstructured/ingest/cli/common.py b/unstructured/ingest/cli/common.py index 046ad4419b..4e26045cf2 100644 --- a/unstructured/ingest/cli/common.py +++ b/unstructured/ingest/cli/common.py @@ -62,8 +62,8 @@ def run_init_checks( ) -def log_options(options: dict): - ingest_log_streaming_init(logging.DEBUG if options["verbose"] else logging.INFO) +def log_options(options: dict, verbose=False): + ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) sensitive_fields = [ "account_name", "account_key", diff --git a/unstructured/ingest/cli/interfaces.py b/unstructured/ingest/cli/interfaces.py new file mode 100644 index 0000000000..cc8a58ce2e --- /dev/null +++ b/unstructured/ingest/cli/interfaces.py @@ -0,0 +1,175 @@ +from abc import abstractmethod + +import click + +from unstructured.ingest.interfaces2 import BaseConfig, PartitionConfig, ReadConfig + + +class CliMixin: + @staticmethod + @abstractmethod + def add_cli_options(cmd: click.Command) -> None: + pass + + +class CliReadConfig(ReadConfig, CliMixin): + @staticmethod + def add_cli_options(cmd: click.Command) -> None: + options = [ + click.Option( + ["--download-dir"], + help="Where files are downloaded to, defaults to a location at" + "`$HOME/.cache/unstructured/ingest//`.", + ), + click.Option( + ["--re-download"], + is_flag=True, + default=False, + help="Re-download files even if they are already present in download dir.", + ), + click.Option( + ["--preserve-downloads"], + is_flag=True, + default=False, + help="Preserve downloaded files. Otherwise each file is removed " + "after being processed successfully.", + ), + click.Option( + ["--download-only"], + is_flag=True, + default=False, + help="Download any files that are not already present in either --download-dir or " + "the default download ~/.cache/... location in case --download-dir " + "is not specified and " + "skip processing them through unstructured.", + ), + ] + cmd.params.extend(options) + + +class CliPartitionConfig(PartitionConfig, CliMixin): + @staticmethod + def add_cli_options(cmd: click.Command) -> None: + options = [ + click.Option( + ["--output-dir"], + default="structured-output", + help="Where to place structured output .json files.", + ), + click.Option( + ["--num-processes"], + default=2, + show_default=True, + help="Number of parallel processes to process docs in.", + ), + click.Option( + ["--max-docs"], + default=None, + type=int, + help="If specified, process at most specified number of documents.", + ), + click.Option( + ["--pdf-infer-table-structure"], + default=False, + help="If set to True, partition will include the table's text " + "content in the response.", + ), + click.Option( + ["--strategy"], + default="auto", + help="The method that will be used to process the documents. " + "Default: auto. Other strategies include `fast` and `hi_res`.", + ), + click.Option( + ["--reprocess"], + is_flag=True, + default=False, + help="Reprocess a downloaded file even if the relevant structured " + "output .json file in output directory already exists.", + ), + click.Option( + ["--ocr-languages"], + default="eng", + help="A list of language packs to specify which languages to use for OCR, " + "separated by '+' e.g. 'eng+deu' to use the English and German language packs. " + "The appropriate Tesseract " + "language pack needs to be installed." + "Default: eng", + ), + click.Option( + ["--encoding"], + default=None, + help="Text encoding to use when reading documents. By default the encoding is " + "detected automatically.", + ), + click.Option( + ["--fields-include"], + multiple=True, + default=["element_id", "text", "type", "metadata"], + help="If set, include the specified top-level fields in an element. ", + ), + click.Option( + ["--flatten-metadata"], + is_flag=True, + default=False, + help="Results in flattened json elements. " + "Specifically, the metadata key values are brought to " + "the top-level of the element, and the `metadata` key itself is removed.", + ), + click.Option( + ["--metadata-include"], + default=[], + multiple=True, + help="If set, include the specified metadata fields if they exist " + "and drop all other fields. ", + ), + click.Option( + ["--metadata-exclude"], + default=[], + multiple=True, + help="If set, drop the specified metadata fields if they exist. ", + ), + click.Option( + ["--partition-endpoint"], + default=None, + help="If provided, will use api to run partition", + ), + click.Option( + ["--api-key"], + default=None, + help="API Key for partition endpoint.", + ), + ] + cmd.params.extend(options) + + +class CliRecursiveConfig(BaseConfig, CliMixin): + recursive: bool + + @staticmethod + def add_cli_options(cmd: click.Command) -> None: + options = [ + click.Option( + ["--recursive"], + is_flag=True, + default=False, + help="Recursively download files in their respective folders " + "otherwise stop at the files in provided folder level.", + ), + ] + cmd.params.extend(options) + + +class CliRemoteUrlConfig(BaseConfig, CliMixin): + remote_url: str + + @staticmethod + def add_cli_options(cmd: click.Command) -> None: + options = [ + click.Option( + ["--remote-url"], + required=True, + help="Remote fsspec URL formatted as `protocol://dir/path`", + ), + ] + cmd.params.extend(options) diff --git a/unstructured/ingest/connector/registry.py b/unstructured/ingest/connector/registry.py index bea0dd2faa..47fc95501b 100644 --- a/unstructured/ingest/connector/registry.py +++ b/unstructured/ingest/connector/registry.py @@ -24,7 +24,7 @@ from unstructured.ingest.connector.onedrive import OneDriveIngestDoc from unstructured.ingest.connector.outlook import OutlookIngestDoc from unstructured.ingest.connector.reddit import RedditIngestDoc -from unstructured.ingest.connector.s3 import S3IngestDoc +from unstructured.ingest.connector.s3_2 import S3IngestDoc from unstructured.ingest.connector.sharepoint import SharepointIngestDoc from unstructured.ingest.connector.slack import SlackIngestDoc from unstructured.ingest.connector.wikipedia import ( @@ -64,7 +64,14 @@ def create_ingest_doc_from_json(ingest_doc_json: str) -> BaseIngestDoc: - ingest_doc_dict = json.loads(ingest_doc_json) + try: + ingest_doc_dict: dict = json.loads(ingest_doc_json) + except TypeError as te: + raise TypeError( + f"failed to load json string when deserializing IngestDoc: {ingest_doc_json}", + ) from te + if "registry_name" not in ingest_doc_dict: + raise ValueError(f"registry_name not present in ingest doc: {ingest_doc_dict}") registry_name = ingest_doc_dict.pop("registry_name") try: ingest_doc_cls = INGEST_DOC_NAME_TO_CLASS[registry_name] diff --git a/unstructured/ingest/connector/s3_2.py b/unstructured/ingest/connector/s3_2.py new file mode 100644 index 0000000000..51219ee7f6 --- /dev/null +++ b/unstructured/ingest/connector/s3_2.py @@ -0,0 +1,264 @@ +import os +import re +import typing as t +from dataclasses import dataclass, field +from pathlib import Path +from typing import Type + +from unstructured.ingest.interfaces2 import ( + BaseConnectorConfig, + BaseDestinationConnector, + BaseIngestDoc, + BaseSourceConnector, + IngestDocCleanupMixin, + PartitionConfig, + ReadConfig, + SourceConnectorCleanupMixin, + WriteConfig, +) +from unstructured.ingest.logger import logger +from unstructured.utils import requires_dependencies + +SUPPORTED_REMOTE_FSSPEC_PROTOCOLS = [ + "s3", + "s3a", + "abfs", + "az", + "gs", + "gcs", + "box", + "dropbox", +] + + +@dataclass +class SimpleFsspecConfig(BaseConnectorConfig): + # fsspec specific options + path: str + recursive: bool = False + access_kwargs: dict = field(default_factory=dict) + protocol: str = field(init=False) + path_without_protocol: str = field(init=False) + dir_path: str = field(init=False) + file_path: str = field(init=False) + + def __post_init__(self): + self.protocol, self.path_without_protocol = self.path.split("://") + if self.protocol not in SUPPORTED_REMOTE_FSSPEC_PROTOCOLS: + raise ValueError( + f"Protocol {self.protocol} not supported yet, only " + f"{SUPPORTED_REMOTE_FSSPEC_PROTOCOLS} are supported.", + ) + + # dropbox root is an empty string + match = re.match(rf"{self.protocol}://([\s])/", self.path) + if match and self.protocol == "dropbox": + self.dir_path = " " + self.file_path = "" + return + + # just a path with no trailing prefix + match = re.match(rf"{self.protocol}://([^/\s]+?)(/*)$", self.path) + if match: + self.dir_path = match.group(1) + self.file_path = "" + return + + # valid path with a dir and/or file + match = re.match(rf"{self.protocol}://([^/\s]+?)/([^\s]*)", self.path) + if not match: + raise ValueError( + f"Invalid path {self.path}. Expected :///.", + ) + self.dir_path = match.group(1) + self.file_path = match.group(2) or "" + + +@dataclass +class FsspecIngestDoc(IngestDocCleanupMixin, BaseIngestDoc): + """Class encapsulating fetching a doc and writing processed results (but not + doing the processing!). + + Also includes a cleanup method. When things go wrong and the cleanup + method is not called, the file is left behind on the filesystem to assist debugging. + """ + + connector_config: SimpleFsspecConfig + remote_file_path: str + + def _tmp_download_file(self): + download_dir = self.read_config.download_dir if self.read_config.download_dir else "" + return Path(download_dir) / self.remote_file_path.replace( + f"{self.connector_config.dir_path}/", + "", + ) + + @property + def _output_filename(self): + return ( + Path(self.partition_config.output_dir) + / f"{self.remote_file_path.replace(f'{self.connector_config.dir_path}/', '')}.json" + ) + + def _create_full_tmp_dir_path(self): + """Includes "directories" in the object path""" + self._tmp_download_file().parent.mkdir(parents=True, exist_ok=True) + + @BaseIngestDoc.skip_if_file_exists + def get_file(self): + """Fetches the file from the current filesystem and stores it locally.""" + from fsspec import AbstractFileSystem, get_filesystem_class + + self._create_full_tmp_dir_path() + fs: AbstractFileSystem = get_filesystem_class(self.connector_config.protocol)( + **self.connector_config.access_kwargs, + ) + logger.debug(f"Fetching {self} - PID: {os.getpid()}") + fs.get(rpath=self.remote_file_path, lpath=self._tmp_download_file().as_posix()) + + @property + def filename(self): + """The filename of the file after downloading from cloud""" + return self._tmp_download_file() + + +class FsspecSourceConnector(SourceConnectorCleanupMixin, BaseSourceConnector): + """Objects of this class support fetching document(s) from""" + + connector_config: SimpleFsspecConfig + ingest_doc_cls: Type[FsspecIngestDoc] = FsspecIngestDoc + + def __init__( + self, + read_config: ReadConfig, + connector_config: BaseConnectorConfig, + partition_config: PartitionConfig, + ): + from fsspec import AbstractFileSystem, get_filesystem_class + + super().__init__( + read_config=read_config, + connector_config=connector_config, + partition_config=partition_config, + ) + self.fs: AbstractFileSystem = get_filesystem_class(self.connector_config.protocol)( + **self.connector_config.access_kwargs, + ) + + def initialize(self): + """Verify that can get metadata for an object, validates connections info.""" + ls_output = self.fs.ls(self.connector_config.path_without_protocol) + if len(ls_output) < 1: + raise ValueError( + f"No objects found in {self.connector_config.path}.", + ) + + def _list_files(self): + if not self.connector_config.recursive: + # fs.ls does not walk directories + # directories that are listed in cloud storage can cause problems + # because they are seen as 0 byte files + return [ + x.get("name") + for x in self.fs.ls(self.connector_config.path_without_protocol, detail=True) + if x.get("size") > 0 + ] + else: + # fs.find will recursively walk directories + # "size" is a common key for all the cloud protocols with fs + return [ + k + for k, v in self.fs.find( + self.connector_config.path_without_protocol, + detail=True, + ).items() + if v.get("size") > 0 + ] + + def get_ingest_docs(self): + return [ + self.ingest_doc_cls( + read_config=self.read_config, + connector_config=self.connector_config, + partition_config=self.partition_config, + remote_file_path=file, + ) + for file in self._list_files() + ] + + +class FsspecDestinationConnector(BaseDestinationConnector): + connector_config: SimpleFsspecConfig + + def __init__(self, write_config: WriteConfig, connector_config: BaseConnectorConfig): + from fsspec import AbstractFileSystem, get_filesystem_class + + super().__init__(write_config=write_config, connector_config=connector_config) + self.fs: AbstractFileSystem = get_filesystem_class(self.connector_config.protocol)( + **self.connector_config.access_kwargs, + ) + + def initialize(self): + pass + + def write(self, docs: t.List[BaseIngestDoc]) -> None: + from fsspec import AbstractFileSystem, get_filesystem_class + + fs: AbstractFileSystem = get_filesystem_class(self.connector_config.protocol)( + **self.connector_config.access_kwargs, + ) + + logger.info(f"Writing content using filesystem: {type(fs).__name__}") + + for doc in docs: + s3_file_path = str(doc._output_filename).replace( + doc.partition_config.output_dir, + self.connector_config.path, + ) + s3_folder = self.connector_config.path + if s3_folder[-1] != "/": + s3_folder = f"{s3_file_path}/" + if s3_file_path[0] == "/": + s3_file_path = s3_file_path[1:] + + s3_output_path = s3_folder + s3_file_path + logger.debug(f"Uploading {doc._output_filename} -> {s3_output_path}") + fs.put_file(lpath=doc._output_filename, rpath=s3_output_path) + + +@dataclass +class SimpleS3Config(SimpleFsspecConfig): + pass + + +@dataclass +class S3IngestDoc(FsspecIngestDoc): + remote_file_path: str + registry_name: str = "s3" + + @requires_dependencies(["s3fs", "fsspec"], extras="s3") + def get_file(self): + super().get_file() + + +class S3SourceConnector(FsspecSourceConnector): + ingest_doc_cls: Type[S3IngestDoc] = S3IngestDoc + + @requires_dependencies(["s3fs", "fsspec"], extras="s3") + def __init__( + self, + read_config: ReadConfig, + connector_config: BaseConnectorConfig, + partition_config: PartitionConfig, + ): + super().__init__( + read_config=read_config, + connector_config=connector_config, + partition_config=partition_config, + ) + + +class S3DestinationConnector(FsspecDestinationConnector): + @requires_dependencies(["s3fs", "fsspec"], extras="s3") + def __init__(self, write_config: WriteConfig, connector_config: BaseConnectorConfig): + super().__init__(write_config=write_config, connector_config=connector_config) diff --git a/unstructured/ingest/doc_processor/generalized.py b/unstructured/ingest/doc_processor/generalized.py index 8123b620b1..849b53853c 100644 --- a/unstructured/ingest/doc_processor/generalized.py +++ b/unstructured/ingest/doc_processor/generalized.py @@ -40,6 +40,7 @@ def process_document(ingest_doc_json: str, **partition_kwargs) -> Optional[List[ """ global session_handle isd_elems_no_filename = None + doc = None try: doc = create_ingest_doc_from_json(ingest_doc_json) if isinstance(doc, IngestDocSessionHandleMixin): @@ -63,5 +64,6 @@ def process_document(ingest_doc_json: str, **partition_kwargs) -> Optional[List[ # TODO(crag) save the exception instead of print? logger.error(f"Failed to process {doc}", exc_info=True) finally: - doc.cleanup_file() + if doc: + doc.cleanup_file() return isd_elems_no_filename diff --git a/unstructured/ingest/interfaces2.py b/unstructured/ingest/interfaces2.py new file mode 100644 index 0000000000..610a46ddda --- /dev/null +++ b/unstructured/ingest/interfaces2.py @@ -0,0 +1,395 @@ +"""Defines Abstract Base Classes (ABC's) core to batch processing documents +through Unstructured.""" + +import functools +import json +import os +import typing as t +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path + +import requests +from dataclasses_json import DataClassJsonMixin + +from unstructured.documents.elements import DataSourceMetadata +from unstructured.ingest.error import PartitionError, SourceConnectionError +from unstructured.ingest.logger import logger +from unstructured.partition.auto import partition +from unstructured.staging.base import convert_to_dict + + +@dataclass +class BaseSessionHandle(ABC): + """Abstract Base Class for sharing resources that are local to an individual process. + e.g., a connection for making a request for fetching documents.""" + + +class BaseConfig(DataClassJsonMixin, ABC): + pass + + +@dataclass +class PartitionConfig(BaseConfig): + # where to write structured data outputs + output_dir: str = "structured-output" + num_processes: int = 2 + max_docs: t.Optional[int] = None + pdf_infer_table_structure: bool = False + strategy: str = "auto" + reprocess: bool = False + ocr_languages: str = "eng" + encoding: t.Optional[str] = None + fields_include: t.List[str] = field( + default_factory=lambda: ["element_id", "text", "type", "metadata"], + ) + flatten_metadata: bool = False + metadata_exclude: t.List[str] = field(default_factory=list) + metadata_include: t.List[str] = field(default_factory=list) + partition_endpoint: t.Optional[str] = None + api_key: t.Optional[str] = None + + +@dataclass +class ReadConfig(BaseConfig): + # where raw documents are stored for processing, and then removed if not preserve_downloads + download_dir: t.Optional[str] = None + re_download: bool = False + preserve_downloads: bool = False + download_only: bool = False + + +@dataclass +class WriteConfig(BaseConfig): + pass + + +class BaseConnectorConfig(ABC): + """Abstract definition on which to define connector-specific attributes.""" + + +@dataclass +class BaseIngestDoc(DataClassJsonMixin, ABC): + """An "ingest document" is specific to a connector, and provides + methods to fetch a single raw document, store it locally for processing, any cleanup + needed after successful processing of the doc, and the ability to write the doc's + structured outputs once processed. + + Crucially, it is not responsible for the actual processing of the raw document. + """ + + read_config: ReadConfig + partition_config: PartitionConfig + connector_config: BaseConnectorConfig + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._date_processed = None + + @property + def date_created(self) -> t.Optional[str]: + """The date the document was created on the source system.""" + return None + + @property + def date_modified(self) -> t.Optional[str]: + """The date the document was last modified on the source system.""" + return None + + @property + def date_processed(self) -> t.Optional[str]: + """The date the document was last processed by Unstructured. + self._date_processed is assigned internally in self.partition_file()""" + return self._date_processed + + @property + def exists(self) -> t.Optional[bool]: + """Whether the document exists on the remote source.""" + return None + + @property + @abstractmethod + def filename(self): + """The local filename of the document after fetching from remote source.""" + + @property + @abstractmethod + def _output_filename(self): + """Filename of the structured output for this doc.""" + + @property + def record_locator(self) -> t.Optional[t.Dict[str, t.Any]]: # Values must be JSON-serializable + """A dictionary with any data necessary to uniquely identify the document on + the source system.""" + return None + + @property + def source_url(self) -> t.Optional[str]: + """The url of the source document.""" + return None + + @property + def version(self) -> t.Optional[str]: + """The version of the source document, this could be the last modified date, an + explicit version number, or anything else that can be used to uniquely identify + the version of the document.""" + return None + + @abstractmethod + def cleanup_file(self): + """Removes the local copy the file (or anything else) after successful processing.""" + pass + + @staticmethod + def skip_if_file_exists(func): + """Decorator that checks if a file exists, is not empty, and should not re-download, + if so log a message indicating as much and skip the decorated function.""" + + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + if ( + not self.read_config.re_download + and self.filename.is_file() + and self.filename.stat().st_size + ): + logger.debug(f"File exists: {self.filename}, skipping {func.__name__}") + return None + return func(self, *args, **kwargs) + + return wrapper + + # NOTE(crag): Future BaseIngestDoc classes could define get_file_object() methods + # in addition to or instead of get_file() + @abstractmethod + @SourceConnectionError.wrap + def get_file(self): + """Fetches the "remote" doc and stores it locally on the filesystem.""" + pass + + def has_output(self) -> bool: + """Determine if structured output for this doc already exists.""" + return self._output_filename.is_file() and self._output_filename.stat().st_size + + def write_result(self): + """Write the structured json result for this doc. result must be json serializable.""" + if self.read_config.download_only: + return + self._output_filename.parent.mkdir(parents=True, exist_ok=True) + with open(self._output_filename, "w", encoding="utf8") as output_f: + json.dump(self.isd_elems_no_filename, output_f, ensure_ascii=False, indent=2) + logger.info(f"Wrote {self._output_filename}") + + @PartitionError.wrap + def partition_file(self, **partition_kwargs) -> t.List[t.Dict[str, t.Any]]: + if not self.partition_config.partition_endpoint: + logger.debug("Using local partition") + elements = partition( + filename=str(self.filename), + data_source_metadata=DataSourceMetadata( + url=self.source_url, + version=self.version, + record_locator=self.record_locator, + date_created=self.date_created, + date_modified=self.date_modified, + date_processed=self.date_processed, + ), + **partition_kwargs, + ) + return convert_to_dict(elements) + + else: + endpoint = self.partition_config.partition_endpoint + + logger.debug(f"Using remote partition ({endpoint})") + + with open(self.filename, "rb") as f: + headers_dict = {} + if self.partition_config.api_key: + headers_dict["UNSTRUCTURED-API-KEY"] = self.partition_config.api_key + response = requests.post( + f"{endpoint}", + files={"files": (str(self.filename), f)}, + headers=headers_dict, + # TODO: add m_data_source_metadata to unstructured-api pipeline_api and then + # pass the stringified json here + ) + + if response.status_code != 200: + raise RuntimeError(f"Caught {response.status_code} from API: {response.text}") + + return response.json() + + def process_file(self, **partition_kwargs) -> t.Optional[t.List[t.Dict[str, t.Any]]]: + self._date_processed = datetime.utcnow().isoformat() + if self.read_config.download_only: + return None + logger.info(f"Processing {self.filename}") + + isd_elems = self.partition_file(**partition_kwargs) + + self.isd_elems_no_filename: t.List[t.Dict[str, t.Any]] = [] + for elem in isd_elems: + # type: ignore + if self.partition_config.metadata_exclude and self.partition_config.metadata_include: + raise ValueError( + "Arguments `--metadata-include` and `--metadata-exclude` are " + "mutually exclusive with each other.", + ) + elif self.partition_config.metadata_exclude: + ex_list = self.partition_config.metadata_exclude + for ex in ex_list: + if "." in ex: # handle nested fields + nested_fields = ex.split(".") + current_elem = elem + for f in nested_fields[:-1]: + if f in current_elem: + current_elem = current_elem[f] + field_to_exclude = nested_fields[-1] + if field_to_exclude in current_elem: + current_elem.pop(field_to_exclude, None) + else: # handle top-level fields + elem["metadata"].pop(ex, None) # type: ignore[attr-defined] + elif self.partition_config.metadata_include: + in_list = self.partition_config.metadata_include + for k in list(elem["metadata"].keys()): # type: ignore[attr-defined] + if k not in in_list: + elem["metadata"].pop(k, None) # type: ignore[attr-defined] + + in_list = self.partition_config.fields_include + elem = {k: v for k, v in elem.items() if k in in_list} + + if self.partition_config.flatten_metadata: + for k, v in elem["metadata"].items(): # type: ignore[attr-defined] + elem[k] = v + elem.pop("metadata") # type: ignore[attr-defined] + + self.isd_elems_no_filename.append(elem) + + return self.isd_elems_no_filename + + +@dataclass +class BaseSourceConnector(DataClassJsonMixin, ABC): + """Abstract Base Class for a connector to a remote source, e.g. S3 or Google Drive.""" + + read_config: ReadConfig + connector_config: BaseConnectorConfig + partition_config: PartitionConfig + + def __init__( + self, + read_config: ReadConfig, + connector_config: BaseConnectorConfig, + partition_config: PartitionConfig, + ): + """Expects a standard_config object that implements StandardConnectorConfig + and config object that implements BaseConnectorConfig.""" + self.read_config = read_config + self.connector_config = connector_config + self.partition_config = partition_config + + @abstractmethod + def cleanup(self, cur_dir=None): + """Any additional cleanup up need after processing is complete. E.g., removing + temporary download dirs that are empty. + + By convention, documents that failed to process are typically not cleaned up.""" + pass + + @abstractmethod + def initialize(self): + """Initializes the connector. Should also validate the connector is properly + configured: e.g., list a single a document from the source.""" + pass + + @abstractmethod + def get_ingest_docs(self): + """Returns all ingest docs (derived from BaseIngestDoc). + This does not imply downloading all the raw documents themselves, + rather each IngestDoc is capable of fetching its content (in another process) + with IngestDoc.get_file().""" + pass + + +class BaseDestinationConnector(DataClassJsonMixin, ABC): + write_config: WriteConfig + connector_config: BaseConnectorConfig + + def __init__(self, write_config: WriteConfig, connector_config: BaseConnectorConfig): + self.write_config = write_config + self.connector_config = connector_config + + @abstractmethod + def initialize(self): + """Initializes the connector. Should also validate the connector is properly + configured.""" + pass + + @abstractmethod + def write(self, docs: t.List[BaseIngestDoc]) -> None: + pass + + +class SourceConnectorCleanupMixin: + read_config: ReadConfig + + def cleanup(self, cur_dir=None): + """Recursively clean up downloaded files and directories.""" + if self.read_config.preserve_downloads or self.read_config.download_only: + return + if cur_dir is None: + cur_dir = self.read_config.download_dir + if cur_dir is None or not Path(cur_dir).is_dir(): + return + sub_dirs = os.listdir(cur_dir) + os.chdir(cur_dir) + for sub_dir in sub_dirs: + # don't traverse symlinks, not that there every should be any + if os.path.isdir(sub_dir) and not os.path.islink(sub_dir): + self.cleanup(sub_dir) + os.chdir("..") + if len(os.listdir(cur_dir)) == 0: + os.rmdir(cur_dir) + + +class IngestDocCleanupMixin: + read_config: ReadConfig + + @property + @abstractmethod + def filename(self): + """The local filename of the document after fetching from remote source.""" + + def cleanup_file(self): + """Removes the local copy of the file after successful processing.""" + if ( + not self.read_config.preserve_downloads + and self.filename.is_file() + and not self.read_config.download_only + ): + logger.debug(f"Cleaning up {self}") + os.unlink(self.filename) + + +class ConfigSessionHandleMixin: + @abstractmethod + def create_session_handle(self) -> BaseSessionHandle: + """Creates a session handle that will be assigned on each IngestDoc to share + session related resources across all document handling for a given subprocess.""" + + +class IngestDocSessionHandleMixin: + config: ConfigSessionHandleMixin + _session_handle: t.Optional[BaseSessionHandle] = None + + @property + def session_handle(self): + """If a session handle is not assigned, creates a new one and assigns it.""" + if self._session_handle is None: + self._session_handle = self.config.create_session_handle() + return self._session_handle + + @session_handle.setter + def session_handle(self, session_handle: BaseSessionHandle): + self._session_handle = session_handle diff --git a/unstructured/ingest/processor2.py b/unstructured/ingest/processor2.py new file mode 100644 index 0000000000..fd9a78fd70 --- /dev/null +++ b/unstructured/ingest/processor2.py @@ -0,0 +1,125 @@ +import logging +import multiprocessing as mp +import typing as t +from contextlib import suppress +from functools import partial + +from unstructured.ingest.doc_processor.generalized import initialize, process_document +from unstructured.ingest.interfaces2 import ( + BaseDestinationConnector, + BaseSourceConnector, + PartitionConfig, +) +from unstructured.ingest.logger import ingest_log_streaming_init, logger + +with suppress(RuntimeError): + mp.set_start_method("spawn") + + +class Processor: + def __init__( + self, + source_doc_connector: BaseSourceConnector, + doc_processor_fn, + num_processes: int, + reprocess: bool, + verbose: bool, + max_docs: t.Optional[int], + dest_doc_connector: t.Optional[BaseDestinationConnector] = None, + ): + # initialize the reader and writer + self.source_doc_connector = source_doc_connector + self.doc_processor_fn = doc_processor_fn + self.num_processes = num_processes + self.reprocess = reprocess + self.verbose = verbose + self.max_docs = max_docs + self.dest_doc_connector = dest_doc_connector + + def initialize(self): + """Slower initialization things: check connections, load things into memory, etc.""" + ingest_log_streaming_init(logging.DEBUG if self.verbose else logging.INFO) + self.source_doc_connector.initialize() + if self.dest_doc_connector: + self.dest_doc_connector.initialize() + initialize() + + def cleanup(self): + self.source_doc_connector.cleanup() + + def _filter_docs_with_outputs(self, docs): + num_docs_all = len(docs) + docs = [doc for doc in docs if not doc.has_output()] + if self.max_docs is not None: + if num_docs_all > self.max_docs: + num_docs_all = self.max_docs + docs = docs[: self.max_docs] + num_docs_to_process = len(docs) + if num_docs_to_process == 0: + logger.info( + "All docs have structured outputs, nothing to do. Use --reprocess to process all.", + ) + return None + elif num_docs_to_process != num_docs_all: + logger.info( + f"Skipping processing for {num_docs_all - num_docs_to_process} docs out of " + f"{num_docs_all} since their structured outputs already exist, use --reprocess to " + "reprocess those in addition to the unprocessed ones.", + ) + return docs + + def run_partition(self, docs): + if not self.reprocess: + docs = self._filter_docs_with_outputs(docs) + if not docs: + return + + # Debugging tip: use the below line and comment out the mp.Pool loop + # block to remain in single process + # self.doc_processor_fn(docs[0]) + logger.info(f"Processing {len(docs)} docs") + json_docs = [doc.to_json() for doc in docs] + with mp.Pool( + processes=self.num_processes, + initializer=ingest_log_streaming_init, + initargs=(logging.DEBUG if self.verbose else logging.INFO,), + ) as pool: + pool.map(self.doc_processor_fn, json_docs) + + def run(self): + self.initialize() + + # fetch the list of lazy downloading IngestDoc obj's + docs = self.source_doc_connector.get_ingest_docs() + + try: + self.run_partition(docs=docs) + if self.dest_doc_connector: + self.dest_doc_connector.write(docs=docs) + finally: + self.cleanup() + + +def process_documents( + source_doc_connector: BaseSourceConnector, + partition_config: PartitionConfig, + verbose: bool, + dest_doc_connector: t.Optional[BaseDestinationConnector] = None, +) -> None: + process_document_with_partition_args = partial( + process_document, + strategy=partition_config.strategy, + ocr_languages=partition_config.ocr_languages, + encoding=partition_config.encoding, + pdf_infer_table_structure=partition_config.pdf_infer_table_structure, + ) + + Processor( + source_doc_connector=source_doc_connector, + doc_processor_fn=process_document_with_partition_args, + num_processes=partition_config.num_processes, + reprocess=partition_config.reprocess, + verbose=verbose, + max_docs=partition_config.max_docs, + dest_doc_connector=dest_doc_connector, + ).run() diff --git a/unstructured/ingest/runner/__init__.py b/unstructured/ingest/runner/__init__.py index e60613e7f7..726dc242fb 100644 --- a/unstructured/ingest/runner/__init__.py +++ b/unstructured/ingest/runner/__init__.py @@ -46,4 +46,5 @@ "sharepoint", "slack", "wikipedia", + "writer_map", ] diff --git a/unstructured/ingest/runner/s3_2.py b/unstructured/ingest/runner/s3_2.py new file mode 100644 index 0000000000..7396a81cf2 --- /dev/null +++ b/unstructured/ingest/runner/s3_2.py @@ -0,0 +1,55 @@ +import logging +import typing as t + +from unstructured.ingest.interfaces2 import PartitionConfig, ReadConfig +from unstructured.ingest.logger import ingest_log_streaming_init, logger +from unstructured.ingest.processor2 import process_documents +from unstructured.ingest.runner.utils2 import update_download_dir_remote_url +from unstructured.ingest.runner.writers import writer_map + + +def s3( + verbose: bool, + read_config: ReadConfig, + partition_configs: PartitionConfig, + remote_url: str, + recursive: bool, + anonymous: bool, + writer_type: t.Optional[str] = None, + writer_kwargs: t.Optional[dict] = None, + **kwargs, +): + writer_kwargs = writer_kwargs if writer_kwargs else {} + ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) + + read_config.download_dir = update_download_dir_remote_url( + connector_name="s3", + read_configs=read_config, + remote_url=remote_url, + logger=logger, + ) + + from unstructured.ingest.connector.s3_2 import S3SourceConnector, SimpleS3Config + + source_doc_connector = S3SourceConnector( # type: ignore + read_config=read_config, + connector_config=SimpleS3Config( + path=remote_url, + recursive=recursive, + access_kwargs={"anon": anonymous}, + ), + partition_config=partition_configs, + ) + + dest_doc_connector = None + if writer_type: + writer = writer_map[writer_type] + print(f"WRITER CONFIGS: {writer_kwargs}") + dest_doc_connector = writer(**writer_kwargs) + + process_documents( + source_doc_connector=source_doc_connector, + partition_config=partition_configs, + verbose=verbose, + dest_doc_connector=dest_doc_connector, + ) diff --git a/unstructured/ingest/runner/utils2.py b/unstructured/ingest/runner/utils2.py new file mode 100644 index 0000000000..7356077744 --- /dev/null +++ b/unstructured/ingest/runner/utils2.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import hashlib +import logging +from pathlib import Path + +from unstructured.ingest.interfaces2 import ( + ReadConfig, +) + + +def update_download_dir_remote_url( + connector_name: str, + read_configs: ReadConfig, + remote_url: str, + logger: logging.Logger, +) -> str: + hashed_dir_name = hashlib.sha256(remote_url.encode("utf-8")) + return update_download_dir_hash( + connector_name=connector_name, + read_configs=read_configs, + hashed_dir_name=hashed_dir_name, + logger=logger, + ) + + +def update_download_dir_hash( + connector_name: str, + read_configs: ReadConfig, + hashed_dir_name: hashlib._Hash, + logger: logging.Logger, +) -> str: + if not read_configs.download_dir: + cache_path = Path.home() / ".cache" / "unstructured" / "ingest" + if not cache_path.exists(): + cache_path.mkdir(parents=True, exist_ok=True) + download_dir = cache_path / connector_name / hashed_dir_name.hexdigest()[:10] + if read_configs.preserve_downloads: + logger.warning( + f"Preserving downloaded files but download_dir is not specified," + f" using {download_dir}", + ) + new_download_dir = str(download_dir) + logger.debug(f"updating download directory to: {new_download_dir}") + else: + new_download_dir = read_configs.download_dir + return new_download_dir diff --git a/unstructured/ingest/runner/writers.py b/unstructured/ingest/runner/writers.py new file mode 100644 index 0000000000..8960ed7704 --- /dev/null +++ b/unstructured/ingest/runner/writers.py @@ -0,0 +1,22 @@ +def s3_writer( + remote_url: str, + anonymous: bool, + verbose: bool = False, +): + from unstructured.ingest.connector.s3_2 import ( + S3DestinationConnector, + SimpleS3Config, + ) + + return S3DestinationConnector( + write_config=None, + connector_config=SimpleS3Config( + path=remote_url, + access_kwargs={"anon": anonymous}, + ), + ) + + +writer_map = { + "s3": s3_writer, +}