diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f26834366..7594afc96b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features +* Add in ingest cli s3 writer + ### Fixes ## 0.10.10 diff --git a/Makefile b/Makefile index 9dc2ba765e..1aa286ed85 100644 --- a/Makefile +++ b/Makefile @@ -204,49 +204,13 @@ install-pandoc: ## pip-compile: compiles all base/dev/test requirements .PHONY: pip-compile pip-compile: - pip-compile --upgrade requirements/base.in - - # Extra requirements that are specific to document types - pip-compile --upgrade requirements/extra-csv.in - pip-compile --upgrade requirements/extra-docx.in - pip-compile --upgrade requirements/extra-epub.in - pip-compile --upgrade requirements/extra-pandoc.in - pip-compile --upgrade requirements/extra-markdown.in - pip-compile --upgrade requirements/extra-msg.in - pip-compile --upgrade requirements/extra-odt.in - pip-compile --upgrade requirements/extra-pdf-image.in - pip-compile --upgrade requirements/extra-pptx.in - pip-compile --upgrade requirements/extra-xlsx.in - - # Extra requirements for huggingface staging functions - pip-compile --upgrade requirements/huggingface.in - pip-compile --upgrade requirements/test.in - pip-compile --upgrade requirements/dev.in - pip-compile --upgrade requirements/build.in - # NOTE(robinson) - docs/requirements.txt is where the GitHub action for building - # sphinx docs looks for additional requirements + @for file in $(shell ls requirements/*.in); \ + do echo "running: pip-compile --upgrade $${file}" && \ + pip-compile --upgrade $${file}; \ + done cp requirements/build.txt docs/requirements.txt - pip-compile --upgrade requirements/ingest-s3.in - pip-compile --upgrade requirements/ingest-biomed.in - pip-compile --upgrade requirements/ingest-box.in - pip-compile --upgrade requirements/ingest-gcs.in - pip-compile --upgrade requirements/ingest-dropbox.in - pip-compile --upgrade requirements/ingest-azure.in - pip-compile --upgrade requirements/ingest-delta-table.in - pip-compile --upgrade requirements/ingest-discord.in - pip-compile --upgrade requirements/ingest-reddit.in - pip-compile --upgrade requirements/ingest-github.in - pip-compile --upgrade requirements/ingest-gitlab.in - pip-compile --upgrade requirements/ingest-slack.in - pip-compile --upgrade requirements/ingest-wikipedia.in - pip-compile --upgrade requirements/ingest-google-drive.in - pip-compile --upgrade requirements/ingest-elasticsearch.in - pip-compile --upgrade requirements/ingest-onedrive.in - pip-compile --upgrade requirements/ingest-outlook.in - pip-compile --upgrade requirements/ingest-confluence.in - pip-compile --upgrade requirements/ingest-airtable.in - pip-compile --upgrade requirements/ingest-sharepoint.in - pip-compile --upgrade requirements/ingest-notion.in + + ## install-project-local: install unstructured into your local python environment .PHONY: install-project-local @@ -281,7 +245,7 @@ test-no-extras: test_${PACKAGE_NAME}/partition/test_text.py \ test_${PACKAGE_NAME}/partition/test_email.py \ test_${PACKAGE_NAME}/partition/test_html_partition.py \ - test_${PACKAGE_NAME}/partition/test_xml_partition.py + test_${PACKAGE_NAME}/partition/test_xml_partition.py .PHONY: test-extra-csv test-extra-csv: diff --git a/examples/ingest/s3-small-batch/ingest.sh b/examples/ingest/s3-small-batch/ingest.sh index 4d69d8bcb5..4c2406d038 100755 --- a/examples/ingest/s3-small-batch/ingest.sh +++ b/examples/ingest/s3-small-batch/ingest.sh @@ -12,5 +12,9 @@ PYTHONPATH=. ./unstructured/ingest/main.py \ s3 \ --remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \ --anonymous \ - --structured-output-dir s3-small-batch-output \ - --num-processes 2 + --output-dir s3-small-batch-output \ + --num-processes 2 \ + --verbose \ + s3 \ + --anonymous \ + --remote-url s3://utic-dev-tech-fixtures/small-pdf-set-output diff --git a/requirements/dev.txt b/requirements/dev.txt index 0d7c376774..185009ac0e 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -4,7 +4,7 @@ # # pip-compile requirements/dev.in # -anyio==3.7.1 +anyio==4.0.0 # via jupyter-server appnope==0.1.3 # via @@ -101,7 +101,6 @@ importlib-resources==6.0.1 # jsonschema # jsonschema-specifications # jupyterlab - # notebook ipykernel==6.25.1 # via # jupyter @@ -167,7 +166,7 @@ jupyter-events==0.7.0 # via jupyter-server jupyter-lsp==2.2.0 # via jupyterlab -jupyter-server==2.7.2 +jupyter-server==2.7.3 # via # jupyter-lsp # jupyterlab @@ -211,7 +210,7 @@ nest-asyncio==1.5.7 # via ipykernel nodeenv==1.8.0 # via pre-commit -notebook==7.0.2 +notebook==7.0.3 # via jupyter notebook-shim==0.2.3 # via @@ -391,7 +390,7 @@ urllib3==1.26.16 # -c requirements/constraints.in # -c requirements/test.txt # requests -virtualenv==20.24.3 +virtualenv==20.24.4 # via pre-commit wcwidth==0.2.6 # via prompt-toolkit diff --git a/requirements/extra-pdf-image.txt b/requirements/extra-pdf-image.txt index 827e9be991..a9e6b4a44c 100644 --- a/requirements/extra-pdf-image.txt +++ b/requirements/extra-pdf-image.txt @@ -202,6 +202,7 @@ transformers==4.32.1 # via unstructured-inference typing-extensions==4.7.1 # via + # -c requirements/base.txt # filelock # huggingface-hub # iopath diff --git a/requirements/huggingface.txt b/requirements/huggingface.txt index 66e1ab42b9..25ea178663 100644 --- a/requirements/huggingface.txt +++ b/requirements/huggingface.txt @@ -95,6 +95,7 @@ transformers==4.32.1 # via -r requirements/huggingface.in typing-extensions==4.7.1 # via + # -c requirements/base.txt # filelock # huggingface-hub # torch diff --git a/requirements/ingest-elasticsearch.txt b/requirements/ingest-elasticsearch.txt index ae8e5b4d98..89f91ad54b 100644 --- a/requirements/ingest-elasticsearch.txt +++ b/requirements/ingest-elasticsearch.txt @@ -13,7 +13,7 @@ elastic-transport==8.4.0 # via elasticsearch elasticsearch==8.9.0 # via -r requirements/ingest-elasticsearch.in -jq==1.4.1 +jq==1.5.0 # via -r requirements/ingest-elasticsearch.in urllib3==1.26.16 # via diff --git a/requirements/ingest-notion.txt b/requirements/ingest-notion.txt index ca9e500d4b..a5cb1c36a7 100644 --- a/requirements/ingest-notion.txt +++ b/requirements/ingest-notion.txt @@ -4,7 +4,7 @@ # # pip-compile requirements/ingest-notion.in # -anyio==3.7.1 +anyio==4.0.0 # via httpcore certifi==2023.7.22 # via diff --git a/test_unstructured_ingest/test-ingest-s3.sh b/test_unstructured_ingest/test-ingest-s3.sh index 195d537115..14f683d16f 100755 --- a/test_unstructured_ingest/test-ingest-s3.sh +++ b/test_unstructured_ingest/test-ingest-s3.sh @@ -13,11 +13,15 @@ sh "$SCRIPT_DIR"/check-num-files-expected-output.sh 3 $OUTPUT_FOLDER_NAME 20k PYTHONPATH=. ./unstructured/ingest/main.py \ s3 \ --download-dir "$DOWNLOAD_DIR" \ - --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified \ - --partition-strategy hi_res \ + --metadata-exclude coordinates \ + --metadata-exclude filename \ + --metadata-exclude file_directory \ + --metadata-exclude metadata.data_source.date_processed \ + --metadata-exclude metadata.last_modified \ + --strategy hi_res \ --preserve-downloads \ --reprocess \ - --structured-output-dir "$OUTPUT_DIR" \ + --output-dir "$OUTPUT_DIR" \ --verbose \ --remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \ --anonymous diff --git a/unstructured/ingest/cli/cli.py b/unstructured/ingest/cli/cli.py index 2744c6d645..eaa7e57df8 100644 --- a/unstructured/ingest/cli/cli.py +++ b/unstructured/ingest/cli/cli.py @@ -11,7 +11,6 @@ def ingest(): # Dynamically update shared options for supported subcommands subcommands = [ cli_cmds.box, - cli_cmds.s3, cli_cmds.gcs, cli_cmds.delta_table, cli_cmds.dropbox, @@ -42,7 +41,11 @@ def ingest(): def get_cmd() -> click.Command: cmd = ingest # Add all subcommands + for src_subcommand in cli_cmds.src: + # add destination subcommands + for dest_subcommand in cli_cmds.dest: + src_subcommand.add_command(dest_subcommand) + cmd.add_command(src_subcommand) for subcommand in subcommands: - # add_shared_options(cmd) cmd.add_command(subcommand()) return cmd diff --git a/unstructured/ingest/cli/cmds/__init__.py b/unstructured/ingest/cli/cmds/__init__.py index 592b71dce3..e2937b783d 100644 --- a/unstructured/ingest/cli/cmds/__init__.py +++ b/unstructured/ingest/cli/cmds/__init__.py @@ -1,3 +1,7 @@ +import typing as t + +import click + from .airtable import get_cmd as airtable from .azure import get_cmd as azure from .biomed import get_cmd as biomed @@ -17,11 +21,16 @@ from .onedrive import get_cmd as onedrive from .outlook import get_cmd as outlook from .reddit import get_cmd as reddit -from .s3 import get_cmd as s3 +from .s3_2 import get_dest_cmd as s3_dest +from .s3_2 import get_source_cmd as s3 from .sharepoint import get_cmd as sharepoint from .slack import get_cmd as slack from .wikipedia import get_cmd as wikipedia +src: t.List[click.Group] = [s3()] + +dest: t.List[click.Command] = [s3_dest()] + __all__ = [ "airtable", "azure", @@ -46,4 +55,6 @@ "sharepoint", "slack", "wikipedia", + "src", + "dest", ] diff --git a/unstructured/ingest/cli/cmds/s3_2.py b/unstructured/ingest/cli/cmds/s3_2.py new file mode 100644 index 0000000000..e7f63ce87a --- /dev/null +++ b/unstructured/ingest/cli/cmds/s3_2.py @@ -0,0 +1,114 @@ +import logging +from dataclasses import dataclass + +import click + +from unstructured.ingest.cli.cmds.utils import Group +from unstructured.ingest.cli.common import ( + log_options, +) +from unstructured.ingest.cli.interfaces import ( + CliMixin, + CliPartitionConfig, + CliReadConfig, + CliRecursiveConfig, + CliRemoteUrlConfig, +) +from unstructured.ingest.interfaces2 import BaseConfig +from unstructured.ingest.logger import ingest_log_streaming_init, logger +from unstructured.ingest.runner.s3_2 import s3 as s3_fn + + +@dataclass +class S3CliConfigs(BaseConfig, CliMixin): + anonymous: bool = False + + @staticmethod + def add_cli_options(cmd: click.Command) -> None: + options = [ + click.Option( + ["--anonymous"], + is_flag=True, + default=False, + help="Connect to s3 without local AWS credentials.", + ), + ] + cmd.params.extend(options) + + +@click.group(name="s3", invoke_without_command=True, cls=Group) +@click.pass_context +def s3_source(ctx: click.Context, **options): + if ctx.invoked_subcommand: + return + + # Click sets all multiple fields as tuple, this needs to be updated to list + for k, v in options.items(): + if isinstance(v, tuple): + options[k] = list(v) + verbose = options.get("verbose", False) + ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) + log_options(options, verbose=verbose) + try: + # run_init_checks(**options) + read_configs = CliReadConfig.from_dict(options) + partition_configs = CliPartitionConfig.from_dict(options) + # Run for schema validation + S3CliConfigs.from_dict(options) + s3_fn(read_config=read_configs, partition_configs=partition_configs, **options) + except Exception as e: + logger.error(e, exc_info=True) + raise click.ClickException(str(e)) from e + + +@click.command(name="s3") +@click.pass_context +def s3_dest(ctx: click.Context, **options): + parent_options: dict = ctx.parent.params if ctx.parent else {} + # Click sets all multiple fields as tuple, this needs to be updated to list + for k, v in options.items(): + if isinstance(v, tuple): + options[k] = list(v) + for k, v in parent_options.items(): + if isinstance(v, tuple): + parent_options[k] = list(v) + verbose = parent_options.get("verbose", False) + ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) + log_options(parent_options, verbose=verbose) + log_options(options, verbose=verbose) + try: + # run_init_checks(**options) + read_configs = CliReadConfig.from_dict(parent_options) + partition_configs = CliPartitionConfig.from_dict(parent_options) + # Run for schema validation + S3CliConfigs.from_dict(options) + s3_fn( + read_config=read_configs, + partition_configs=partition_configs, + writer_type="s3", + writer_kwargs=options, + **parent_options, + ) + except Exception as e: + logger.error(e, exc_info=True) + raise click.ClickException(str(e)) from e + + +def get_dest_cmd() -> click.Command: + cmd = s3_dest + S3CliConfigs.add_cli_options(cmd) + CliRemoteUrlConfig.add_cli_options(cmd) + return cmd + + +def get_source_cmd() -> click.Group: + cmd = s3_source + S3CliConfigs.add_cli_options(cmd) + CliRemoteUrlConfig.add_cli_options(cmd) + CliRecursiveConfig.add_cli_options(cmd) + + # Common CLI configs + CliReadConfig.add_cli_options(cmd) + CliPartitionConfig.add_cli_options(cmd) + cmd.params.append(click.Option(["-v", "--verbose"], is_flag=True, default=False)) + return cmd diff --git a/unstructured/ingest/cli/cmds/utils.py b/unstructured/ingest/cli/cmds/utils.py new file mode 100644 index 0000000000..88fc50f821 --- /dev/null +++ b/unstructured/ingest/cli/cmds/utils.py @@ -0,0 +1,54 @@ +from gettext import gettext as _ + +import click + + +class Group(click.Group): + def parse_args(self, ctx, args): + """ + This allows for subcommands to be called with the --help flag without breaking + if parent command is missing any of its required parameters + """ + + try: + return super().parse_args(ctx, args) + except click.MissingParameter: + if "--help" not in args: + raise + + # remove the required params so that help can display + for param in self.params: + param.required = False + return super().parse_args(ctx, args) + + def format_commands(self, ctx: click.Context, formatter: click.HelpFormatter) -> None: + """ + Copy of the original click.Group format_commands() method but replacing + 'Commands' -> 'Destinations' + """ + commands = [] + for subcommand in self.list_commands(ctx): + cmd = self.get_command(ctx, subcommand) + # What is this, the tool lied about a command. Ignore it + if cmd is None: + continue + if cmd.hidden: + continue + + commands.append((subcommand, cmd)) + + # allow for 3 times the default spacing + if len(commands): + if formatter.width: + limit = formatter.width - 6 - max(len(cmd[0]) for cmd in commands) + else: + limit = -6 - max(len(cmd[0]) for cmd in commands) + + rows = [] + for subcommand, cmd in commands: + help = cmd.get_short_help_str(limit) + rows.append((subcommand, help)) + + if rows: + with formatter.section(_("Destinations")): + formatter.write_dl(rows) diff --git a/unstructured/ingest/cli/common.py b/unstructured/ingest/cli/common.py index 046ad4419b..4e26045cf2 100644 --- a/unstructured/ingest/cli/common.py +++ b/unstructured/ingest/cli/common.py @@ -62,8 +62,8 @@ def run_init_checks( ) -def log_options(options: dict): - ingest_log_streaming_init(logging.DEBUG if options["verbose"] else logging.INFO) +def log_options(options: dict, verbose=False): + ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) sensitive_fields = [ "account_name", "account_key", diff --git a/unstructured/ingest/cli/interfaces.py b/unstructured/ingest/cli/interfaces.py new file mode 100644 index 0000000000..cc8a58ce2e --- /dev/null +++ b/unstructured/ingest/cli/interfaces.py @@ -0,0 +1,175 @@ +from abc import abstractmethod + +import click + +from unstructured.ingest.interfaces2 import BaseConfig, PartitionConfig, ReadConfig + + +class CliMixin: + @staticmethod + @abstractmethod + def add_cli_options(cmd: click.Command) -> None: + pass + + +class CliReadConfig(ReadConfig, CliMixin): + @staticmethod + def add_cli_options(cmd: click.Command) -> None: + options = [ + click.Option( + ["--download-dir"], + help="Where files are downloaded to, defaults to a location at" + "`$HOME/.cache/unstructured/ingest//`.", + ), + click.Option( + ["--re-download"], + is_flag=True, + default=False, + help="Re-download files even if they are already present in download dir.", + ), + click.Option( + ["--preserve-downloads"], + is_flag=True, + default=False, + help="Preserve downloaded files. Otherwise each file is removed " + "after being processed successfully.", + ), + click.Option( + ["--download-only"], + is_flag=True, + default=False, + help="Download any files that are not already present in either --download-dir or " + "the default download ~/.cache/... location in case --download-dir " + "is not specified and " + "skip processing them through unstructured.", + ), + ] + cmd.params.extend(options) + + +class CliPartitionConfig(PartitionConfig, CliMixin): + @staticmethod + def add_cli_options(cmd: click.Command) -> None: + options = [ + click.Option( + ["--output-dir"], + default="structured-output", + help="Where to place structured output .json files.", + ), + click.Option( + ["--num-processes"], + default=2, + show_default=True, + help="Number of parallel processes to process docs in.", + ), + click.Option( + ["--max-docs"], + default=None, + type=int, + help="If specified, process at most specified number of documents.", + ), + click.Option( + ["--pdf-infer-table-structure"], + default=False, + help="If set to True, partition will include the table's text " + "content in the response.", + ), + click.Option( + ["--strategy"], + default="auto", + help="The method that will be used to process the documents. " + "Default: auto. Other strategies include `fast` and `hi_res`.", + ), + click.Option( + ["--reprocess"], + is_flag=True, + default=False, + help="Reprocess a downloaded file even if the relevant structured " + "output .json file in output directory already exists.", + ), + click.Option( + ["--ocr-languages"], + default="eng", + help="A list of language packs to specify which languages to use for OCR, " + "separated by '+' e.g. 'eng+deu' to use the English and German language packs. " + "The appropriate Tesseract " + "language pack needs to be installed." + "Default: eng", + ), + click.Option( + ["--encoding"], + default=None, + help="Text encoding to use when reading documents. By default the encoding is " + "detected automatically.", + ), + click.Option( + ["--fields-include"], + multiple=True, + default=["element_id", "text", "type", "metadata"], + help="If set, include the specified top-level fields in an element. ", + ), + click.Option( + ["--flatten-metadata"], + is_flag=True, + default=False, + help="Results in flattened json elements. " + "Specifically, the metadata key values are brought to " + "the top-level of the element, and the `metadata` key itself is removed.", + ), + click.Option( + ["--metadata-include"], + default=[], + multiple=True, + help="If set, include the specified metadata fields if they exist " + "and drop all other fields. ", + ), + click.Option( + ["--metadata-exclude"], + default=[], + multiple=True, + help="If set, drop the specified metadata fields if they exist. ", + ), + click.Option( + ["--partition-endpoint"], + default=None, + help="If provided, will use api to run partition", + ), + click.Option( + ["--api-key"], + default=None, + help="API Key for partition endpoint.", + ), + ] + cmd.params.extend(options) + + +class CliRecursiveConfig(BaseConfig, CliMixin): + recursive: bool + + @staticmethod + def add_cli_options(cmd: click.Command) -> None: + options = [ + click.Option( + ["--recursive"], + is_flag=True, + default=False, + help="Recursively download files in their respective folders " + "otherwise stop at the files in provided folder level.", + ), + ] + cmd.params.extend(options) + + +class CliRemoteUrlConfig(BaseConfig, CliMixin): + remote_url: str + + @staticmethod + def add_cli_options(cmd: click.Command) -> None: + options = [ + click.Option( + ["--remote-url"], + required=True, + help="Remote fsspec URL formatted as `protocol://dir/path`", + ), + ] + cmd.params.extend(options) diff --git a/unstructured/ingest/connector/registry.py b/unstructured/ingest/connector/registry.py index bea0dd2faa..47fc95501b 100644 --- a/unstructured/ingest/connector/registry.py +++ b/unstructured/ingest/connector/registry.py @@ -24,7 +24,7 @@ from unstructured.ingest.connector.onedrive import OneDriveIngestDoc from unstructured.ingest.connector.outlook import OutlookIngestDoc from unstructured.ingest.connector.reddit import RedditIngestDoc -from unstructured.ingest.connector.s3 import S3IngestDoc +from unstructured.ingest.connector.s3_2 import S3IngestDoc from unstructured.ingest.connector.sharepoint import SharepointIngestDoc from unstructured.ingest.connector.slack import SlackIngestDoc from unstructured.ingest.connector.wikipedia import ( @@ -64,7 +64,14 @@ def create_ingest_doc_from_json(ingest_doc_json: str) -> BaseIngestDoc: - ingest_doc_dict = json.loads(ingest_doc_json) + try: + ingest_doc_dict: dict = json.loads(ingest_doc_json) + except TypeError as te: + raise TypeError( + f"failed to load json string when deserializing IngestDoc: {ingest_doc_json}", + ) from te + if "registry_name" not in ingest_doc_dict: + raise ValueError(f"registry_name not present in ingest doc: {ingest_doc_dict}") registry_name = ingest_doc_dict.pop("registry_name") try: ingest_doc_cls = INGEST_DOC_NAME_TO_CLASS[registry_name] diff --git a/unstructured/ingest/connector/s3_2.py b/unstructured/ingest/connector/s3_2.py new file mode 100644 index 0000000000..51219ee7f6 --- /dev/null +++ b/unstructured/ingest/connector/s3_2.py @@ -0,0 +1,264 @@ +import os +import re +import typing as t +from dataclasses import dataclass, field +from pathlib import Path +from typing import Type + +from unstructured.ingest.interfaces2 import ( + BaseConnectorConfig, + BaseDestinationConnector, + BaseIngestDoc, + BaseSourceConnector, + IngestDocCleanupMixin, + PartitionConfig, + ReadConfig, + SourceConnectorCleanupMixin, + WriteConfig, +) +from unstructured.ingest.logger import logger +from unstructured.utils import requires_dependencies + +SUPPORTED_REMOTE_FSSPEC_PROTOCOLS = [ + "s3", + "s3a", + "abfs", + "az", + "gs", + "gcs", + "box", + "dropbox", +] + + +@dataclass +class SimpleFsspecConfig(BaseConnectorConfig): + # fsspec specific options + path: str + recursive: bool = False + access_kwargs: dict = field(default_factory=dict) + protocol: str = field(init=False) + path_without_protocol: str = field(init=False) + dir_path: str = field(init=False) + file_path: str = field(init=False) + + def __post_init__(self): + self.protocol, self.path_without_protocol = self.path.split("://") + if self.protocol not in SUPPORTED_REMOTE_FSSPEC_PROTOCOLS: + raise ValueError( + f"Protocol {self.protocol} not supported yet, only " + f"{SUPPORTED_REMOTE_FSSPEC_PROTOCOLS} are supported.", + ) + + # dropbox root is an empty string + match = re.match(rf"{self.protocol}://([\s])/", self.path) + if match and self.protocol == "dropbox": + self.dir_path = " " + self.file_path = "" + return + + # just a path with no trailing prefix + match = re.match(rf"{self.protocol}://([^/\s]+?)(/*)$", self.path) + if match: + self.dir_path = match.group(1) + self.file_path = "" + return + + # valid path with a dir and/or file + match = re.match(rf"{self.protocol}://([^/\s]+?)/([^\s]*)", self.path) + if not match: + raise ValueError( + f"Invalid path {self.path}. Expected :///.", + ) + self.dir_path = match.group(1) + self.file_path = match.group(2) or "" + + +@dataclass +class FsspecIngestDoc(IngestDocCleanupMixin, BaseIngestDoc): + """Class encapsulating fetching a doc and writing processed results (but not + doing the processing!). + + Also includes a cleanup method. When things go wrong and the cleanup + method is not called, the file is left behind on the filesystem to assist debugging. + """ + + connector_config: SimpleFsspecConfig + remote_file_path: str + + def _tmp_download_file(self): + download_dir = self.read_config.download_dir if self.read_config.download_dir else "" + return Path(download_dir) / self.remote_file_path.replace( + f"{self.connector_config.dir_path}/", + "", + ) + + @property + def _output_filename(self): + return ( + Path(self.partition_config.output_dir) + / f"{self.remote_file_path.replace(f'{self.connector_config.dir_path}/', '')}.json" + ) + + def _create_full_tmp_dir_path(self): + """Includes "directories" in the object path""" + self._tmp_download_file().parent.mkdir(parents=True, exist_ok=True) + + @BaseIngestDoc.skip_if_file_exists + def get_file(self): + """Fetches the file from the current filesystem and stores it locally.""" + from fsspec import AbstractFileSystem, get_filesystem_class + + self._create_full_tmp_dir_path() + fs: AbstractFileSystem = get_filesystem_class(self.connector_config.protocol)( + **self.connector_config.access_kwargs, + ) + logger.debug(f"Fetching {self} - PID: {os.getpid()}") + fs.get(rpath=self.remote_file_path, lpath=self._tmp_download_file().as_posix()) + + @property + def filename(self): + """The filename of the file after downloading from cloud""" + return self._tmp_download_file() + + +class FsspecSourceConnector(SourceConnectorCleanupMixin, BaseSourceConnector): + """Objects of this class support fetching document(s) from""" + + connector_config: SimpleFsspecConfig + ingest_doc_cls: Type[FsspecIngestDoc] = FsspecIngestDoc + + def __init__( + self, + read_config: ReadConfig, + connector_config: BaseConnectorConfig, + partition_config: PartitionConfig, + ): + from fsspec import AbstractFileSystem, get_filesystem_class + + super().__init__( + read_config=read_config, + connector_config=connector_config, + partition_config=partition_config, + ) + self.fs: AbstractFileSystem = get_filesystem_class(self.connector_config.protocol)( + **self.connector_config.access_kwargs, + ) + + def initialize(self): + """Verify that can get metadata for an object, validates connections info.""" + ls_output = self.fs.ls(self.connector_config.path_without_protocol) + if len(ls_output) < 1: + raise ValueError( + f"No objects found in {self.connector_config.path}.", + ) + + def _list_files(self): + if not self.connector_config.recursive: + # fs.ls does not walk directories + # directories that are listed in cloud storage can cause problems + # because they are seen as 0 byte files + return [ + x.get("name") + for x in self.fs.ls(self.connector_config.path_without_protocol, detail=True) + if x.get("size") > 0 + ] + else: + # fs.find will recursively walk directories + # "size" is a common key for all the cloud protocols with fs + return [ + k + for k, v in self.fs.find( + self.connector_config.path_without_protocol, + detail=True, + ).items() + if v.get("size") > 0 + ] + + def get_ingest_docs(self): + return [ + self.ingest_doc_cls( + read_config=self.read_config, + connector_config=self.connector_config, + partition_config=self.partition_config, + remote_file_path=file, + ) + for file in self._list_files() + ] + + +class FsspecDestinationConnector(BaseDestinationConnector): + connector_config: SimpleFsspecConfig + + def __init__(self, write_config: WriteConfig, connector_config: BaseConnectorConfig): + from fsspec import AbstractFileSystem, get_filesystem_class + + super().__init__(write_config=write_config, connector_config=connector_config) + self.fs: AbstractFileSystem = get_filesystem_class(self.connector_config.protocol)( + **self.connector_config.access_kwargs, + ) + + def initialize(self): + pass + + def write(self, docs: t.List[BaseIngestDoc]) -> None: + from fsspec import AbstractFileSystem, get_filesystem_class + + fs: AbstractFileSystem = get_filesystem_class(self.connector_config.protocol)( + **self.connector_config.access_kwargs, + ) + + logger.info(f"Writing content using filesystem: {type(fs).__name__}") + + for doc in docs: + s3_file_path = str(doc._output_filename).replace( + doc.partition_config.output_dir, + self.connector_config.path, + ) + s3_folder = self.connector_config.path + if s3_folder[-1] != "/": + s3_folder = f"{s3_file_path}/" + if s3_file_path[0] == "/": + s3_file_path = s3_file_path[1:] + + s3_output_path = s3_folder + s3_file_path + logger.debug(f"Uploading {doc._output_filename} -> {s3_output_path}") + fs.put_file(lpath=doc._output_filename, rpath=s3_output_path) + + +@dataclass +class SimpleS3Config(SimpleFsspecConfig): + pass + + +@dataclass +class S3IngestDoc(FsspecIngestDoc): + remote_file_path: str + registry_name: str = "s3" + + @requires_dependencies(["s3fs", "fsspec"], extras="s3") + def get_file(self): + super().get_file() + + +class S3SourceConnector(FsspecSourceConnector): + ingest_doc_cls: Type[S3IngestDoc] = S3IngestDoc + + @requires_dependencies(["s3fs", "fsspec"], extras="s3") + def __init__( + self, + read_config: ReadConfig, + connector_config: BaseConnectorConfig, + partition_config: PartitionConfig, + ): + super().__init__( + read_config=read_config, + connector_config=connector_config, + partition_config=partition_config, + ) + + +class S3DestinationConnector(FsspecDestinationConnector): + @requires_dependencies(["s3fs", "fsspec"], extras="s3") + def __init__(self, write_config: WriteConfig, connector_config: BaseConnectorConfig): + super().__init__(write_config=write_config, connector_config=connector_config) diff --git a/unstructured/ingest/doc_processor/generalized.py b/unstructured/ingest/doc_processor/generalized.py index 8123b620b1..849b53853c 100644 --- a/unstructured/ingest/doc_processor/generalized.py +++ b/unstructured/ingest/doc_processor/generalized.py @@ -40,6 +40,7 @@ def process_document(ingest_doc_json: str, **partition_kwargs) -> Optional[List[ """ global session_handle isd_elems_no_filename = None + doc = None try: doc = create_ingest_doc_from_json(ingest_doc_json) if isinstance(doc, IngestDocSessionHandleMixin): @@ -63,5 +64,6 @@ def process_document(ingest_doc_json: str, **partition_kwargs) -> Optional[List[ # TODO(crag) save the exception instead of print? logger.error(f"Failed to process {doc}", exc_info=True) finally: - doc.cleanup_file() + if doc: + doc.cleanup_file() return isd_elems_no_filename diff --git a/unstructured/ingest/interfaces2.py b/unstructured/ingest/interfaces2.py new file mode 100644 index 0000000000..610a46ddda --- /dev/null +++ b/unstructured/ingest/interfaces2.py @@ -0,0 +1,395 @@ +"""Defines Abstract Base Classes (ABC's) core to batch processing documents +through Unstructured.""" + +import functools +import json +import os +import typing as t +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path + +import requests +from dataclasses_json import DataClassJsonMixin + +from unstructured.documents.elements import DataSourceMetadata +from unstructured.ingest.error import PartitionError, SourceConnectionError +from unstructured.ingest.logger import logger +from unstructured.partition.auto import partition +from unstructured.staging.base import convert_to_dict + + +@dataclass +class BaseSessionHandle(ABC): + """Abstract Base Class for sharing resources that are local to an individual process. + e.g., a connection for making a request for fetching documents.""" + + +class BaseConfig(DataClassJsonMixin, ABC): + pass + + +@dataclass +class PartitionConfig(BaseConfig): + # where to write structured data outputs + output_dir: str = "structured-output" + num_processes: int = 2 + max_docs: t.Optional[int] = None + pdf_infer_table_structure: bool = False + strategy: str = "auto" + reprocess: bool = False + ocr_languages: str = "eng" + encoding: t.Optional[str] = None + fields_include: t.List[str] = field( + default_factory=lambda: ["element_id", "text", "type", "metadata"], + ) + flatten_metadata: bool = False + metadata_exclude: t.List[str] = field(default_factory=list) + metadata_include: t.List[str] = field(default_factory=list) + partition_endpoint: t.Optional[str] = None + api_key: t.Optional[str] = None + + +@dataclass +class ReadConfig(BaseConfig): + # where raw documents are stored for processing, and then removed if not preserve_downloads + download_dir: t.Optional[str] = None + re_download: bool = False + preserve_downloads: bool = False + download_only: bool = False + + +@dataclass +class WriteConfig(BaseConfig): + pass + + +class BaseConnectorConfig(ABC): + """Abstract definition on which to define connector-specific attributes.""" + + +@dataclass +class BaseIngestDoc(DataClassJsonMixin, ABC): + """An "ingest document" is specific to a connector, and provides + methods to fetch a single raw document, store it locally for processing, any cleanup + needed after successful processing of the doc, and the ability to write the doc's + structured outputs once processed. + + Crucially, it is not responsible for the actual processing of the raw document. + """ + + read_config: ReadConfig + partition_config: PartitionConfig + connector_config: BaseConnectorConfig + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._date_processed = None + + @property + def date_created(self) -> t.Optional[str]: + """The date the document was created on the source system.""" + return None + + @property + def date_modified(self) -> t.Optional[str]: + """The date the document was last modified on the source system.""" + return None + + @property + def date_processed(self) -> t.Optional[str]: + """The date the document was last processed by Unstructured. + self._date_processed is assigned internally in self.partition_file()""" + return self._date_processed + + @property + def exists(self) -> t.Optional[bool]: + """Whether the document exists on the remote source.""" + return None + + @property + @abstractmethod + def filename(self): + """The local filename of the document after fetching from remote source.""" + + @property + @abstractmethod + def _output_filename(self): + """Filename of the structured output for this doc.""" + + @property + def record_locator(self) -> t.Optional[t.Dict[str, t.Any]]: # Values must be JSON-serializable + """A dictionary with any data necessary to uniquely identify the document on + the source system.""" + return None + + @property + def source_url(self) -> t.Optional[str]: + """The url of the source document.""" + return None + + @property + def version(self) -> t.Optional[str]: + """The version of the source document, this could be the last modified date, an + explicit version number, or anything else that can be used to uniquely identify + the version of the document.""" + return None + + @abstractmethod + def cleanup_file(self): + """Removes the local copy the file (or anything else) after successful processing.""" + pass + + @staticmethod + def skip_if_file_exists(func): + """Decorator that checks if a file exists, is not empty, and should not re-download, + if so log a message indicating as much and skip the decorated function.""" + + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + if ( + not self.read_config.re_download + and self.filename.is_file() + and self.filename.stat().st_size + ): + logger.debug(f"File exists: {self.filename}, skipping {func.__name__}") + return None + return func(self, *args, **kwargs) + + return wrapper + + # NOTE(crag): Future BaseIngestDoc classes could define get_file_object() methods + # in addition to or instead of get_file() + @abstractmethod + @SourceConnectionError.wrap + def get_file(self): + """Fetches the "remote" doc and stores it locally on the filesystem.""" + pass + + def has_output(self) -> bool: + """Determine if structured output for this doc already exists.""" + return self._output_filename.is_file() and self._output_filename.stat().st_size + + def write_result(self): + """Write the structured json result for this doc. result must be json serializable.""" + if self.read_config.download_only: + return + self._output_filename.parent.mkdir(parents=True, exist_ok=True) + with open(self._output_filename, "w", encoding="utf8") as output_f: + json.dump(self.isd_elems_no_filename, output_f, ensure_ascii=False, indent=2) + logger.info(f"Wrote {self._output_filename}") + + @PartitionError.wrap + def partition_file(self, **partition_kwargs) -> t.List[t.Dict[str, t.Any]]: + if not self.partition_config.partition_endpoint: + logger.debug("Using local partition") + elements = partition( + filename=str(self.filename), + data_source_metadata=DataSourceMetadata( + url=self.source_url, + version=self.version, + record_locator=self.record_locator, + date_created=self.date_created, + date_modified=self.date_modified, + date_processed=self.date_processed, + ), + **partition_kwargs, + ) + return convert_to_dict(elements) + + else: + endpoint = self.partition_config.partition_endpoint + + logger.debug(f"Using remote partition ({endpoint})") + + with open(self.filename, "rb") as f: + headers_dict = {} + if self.partition_config.api_key: + headers_dict["UNSTRUCTURED-API-KEY"] = self.partition_config.api_key + response = requests.post( + f"{endpoint}", + files={"files": (str(self.filename), f)}, + headers=headers_dict, + # TODO: add m_data_source_metadata to unstructured-api pipeline_api and then + # pass the stringified json here + ) + + if response.status_code != 200: + raise RuntimeError(f"Caught {response.status_code} from API: {response.text}") + + return response.json() + + def process_file(self, **partition_kwargs) -> t.Optional[t.List[t.Dict[str, t.Any]]]: + self._date_processed = datetime.utcnow().isoformat() + if self.read_config.download_only: + return None + logger.info(f"Processing {self.filename}") + + isd_elems = self.partition_file(**partition_kwargs) + + self.isd_elems_no_filename: t.List[t.Dict[str, t.Any]] = [] + for elem in isd_elems: + # type: ignore + if self.partition_config.metadata_exclude and self.partition_config.metadata_include: + raise ValueError( + "Arguments `--metadata-include` and `--metadata-exclude` are " + "mutually exclusive with each other.", + ) + elif self.partition_config.metadata_exclude: + ex_list = self.partition_config.metadata_exclude + for ex in ex_list: + if "." in ex: # handle nested fields + nested_fields = ex.split(".") + current_elem = elem + for f in nested_fields[:-1]: + if f in current_elem: + current_elem = current_elem[f] + field_to_exclude = nested_fields[-1] + if field_to_exclude in current_elem: + current_elem.pop(field_to_exclude, None) + else: # handle top-level fields + elem["metadata"].pop(ex, None) # type: ignore[attr-defined] + elif self.partition_config.metadata_include: + in_list = self.partition_config.metadata_include + for k in list(elem["metadata"].keys()): # type: ignore[attr-defined] + if k not in in_list: + elem["metadata"].pop(k, None) # type: ignore[attr-defined] + + in_list = self.partition_config.fields_include + elem = {k: v for k, v in elem.items() if k in in_list} + + if self.partition_config.flatten_metadata: + for k, v in elem["metadata"].items(): # type: ignore[attr-defined] + elem[k] = v + elem.pop("metadata") # type: ignore[attr-defined] + + self.isd_elems_no_filename.append(elem) + + return self.isd_elems_no_filename + + +@dataclass +class BaseSourceConnector(DataClassJsonMixin, ABC): + """Abstract Base Class for a connector to a remote source, e.g. S3 or Google Drive.""" + + read_config: ReadConfig + connector_config: BaseConnectorConfig + partition_config: PartitionConfig + + def __init__( + self, + read_config: ReadConfig, + connector_config: BaseConnectorConfig, + partition_config: PartitionConfig, + ): + """Expects a standard_config object that implements StandardConnectorConfig + and config object that implements BaseConnectorConfig.""" + self.read_config = read_config + self.connector_config = connector_config + self.partition_config = partition_config + + @abstractmethod + def cleanup(self, cur_dir=None): + """Any additional cleanup up need after processing is complete. E.g., removing + temporary download dirs that are empty. + + By convention, documents that failed to process are typically not cleaned up.""" + pass + + @abstractmethod + def initialize(self): + """Initializes the connector. Should also validate the connector is properly + configured: e.g., list a single a document from the source.""" + pass + + @abstractmethod + def get_ingest_docs(self): + """Returns all ingest docs (derived from BaseIngestDoc). + This does not imply downloading all the raw documents themselves, + rather each IngestDoc is capable of fetching its content (in another process) + with IngestDoc.get_file().""" + pass + + +class BaseDestinationConnector(DataClassJsonMixin, ABC): + write_config: WriteConfig + connector_config: BaseConnectorConfig + + def __init__(self, write_config: WriteConfig, connector_config: BaseConnectorConfig): + self.write_config = write_config + self.connector_config = connector_config + + @abstractmethod + def initialize(self): + """Initializes the connector. Should also validate the connector is properly + configured.""" + pass + + @abstractmethod + def write(self, docs: t.List[BaseIngestDoc]) -> None: + pass + + +class SourceConnectorCleanupMixin: + read_config: ReadConfig + + def cleanup(self, cur_dir=None): + """Recursively clean up downloaded files and directories.""" + if self.read_config.preserve_downloads or self.read_config.download_only: + return + if cur_dir is None: + cur_dir = self.read_config.download_dir + if cur_dir is None or not Path(cur_dir).is_dir(): + return + sub_dirs = os.listdir(cur_dir) + os.chdir(cur_dir) + for sub_dir in sub_dirs: + # don't traverse symlinks, not that there every should be any + if os.path.isdir(sub_dir) and not os.path.islink(sub_dir): + self.cleanup(sub_dir) + os.chdir("..") + if len(os.listdir(cur_dir)) == 0: + os.rmdir(cur_dir) + + +class IngestDocCleanupMixin: + read_config: ReadConfig + + @property + @abstractmethod + def filename(self): + """The local filename of the document after fetching from remote source.""" + + def cleanup_file(self): + """Removes the local copy of the file after successful processing.""" + if ( + not self.read_config.preserve_downloads + and self.filename.is_file() + and not self.read_config.download_only + ): + logger.debug(f"Cleaning up {self}") + os.unlink(self.filename) + + +class ConfigSessionHandleMixin: + @abstractmethod + def create_session_handle(self) -> BaseSessionHandle: + """Creates a session handle that will be assigned on each IngestDoc to share + session related resources across all document handling for a given subprocess.""" + + +class IngestDocSessionHandleMixin: + config: ConfigSessionHandleMixin + _session_handle: t.Optional[BaseSessionHandle] = None + + @property + def session_handle(self): + """If a session handle is not assigned, creates a new one and assigns it.""" + if self._session_handle is None: + self._session_handle = self.config.create_session_handle() + return self._session_handle + + @session_handle.setter + def session_handle(self, session_handle: BaseSessionHandle): + self._session_handle = session_handle diff --git a/unstructured/ingest/processor2.py b/unstructured/ingest/processor2.py new file mode 100644 index 0000000000..fd9a78fd70 --- /dev/null +++ b/unstructured/ingest/processor2.py @@ -0,0 +1,125 @@ +import logging +import multiprocessing as mp +import typing as t +from contextlib import suppress +from functools import partial + +from unstructured.ingest.doc_processor.generalized import initialize, process_document +from unstructured.ingest.interfaces2 import ( + BaseDestinationConnector, + BaseSourceConnector, + PartitionConfig, +) +from unstructured.ingest.logger import ingest_log_streaming_init, logger + +with suppress(RuntimeError): + mp.set_start_method("spawn") + + +class Processor: + def __init__( + self, + source_doc_connector: BaseSourceConnector, + doc_processor_fn, + num_processes: int, + reprocess: bool, + verbose: bool, + max_docs: t.Optional[int], + dest_doc_connector: t.Optional[BaseDestinationConnector] = None, + ): + # initialize the reader and writer + self.source_doc_connector = source_doc_connector + self.doc_processor_fn = doc_processor_fn + self.num_processes = num_processes + self.reprocess = reprocess + self.verbose = verbose + self.max_docs = max_docs + self.dest_doc_connector = dest_doc_connector + + def initialize(self): + """Slower initialization things: check connections, load things into memory, etc.""" + ingest_log_streaming_init(logging.DEBUG if self.verbose else logging.INFO) + self.source_doc_connector.initialize() + if self.dest_doc_connector: + self.dest_doc_connector.initialize() + initialize() + + def cleanup(self): + self.source_doc_connector.cleanup() + + def _filter_docs_with_outputs(self, docs): + num_docs_all = len(docs) + docs = [doc for doc in docs if not doc.has_output()] + if self.max_docs is not None: + if num_docs_all > self.max_docs: + num_docs_all = self.max_docs + docs = docs[: self.max_docs] + num_docs_to_process = len(docs) + if num_docs_to_process == 0: + logger.info( + "All docs have structured outputs, nothing to do. Use --reprocess to process all.", + ) + return None + elif num_docs_to_process != num_docs_all: + logger.info( + f"Skipping processing for {num_docs_all - num_docs_to_process} docs out of " + f"{num_docs_all} since their structured outputs already exist, use --reprocess to " + "reprocess those in addition to the unprocessed ones.", + ) + return docs + + def run_partition(self, docs): + if not self.reprocess: + docs = self._filter_docs_with_outputs(docs) + if not docs: + return + + # Debugging tip: use the below line and comment out the mp.Pool loop + # block to remain in single process + # self.doc_processor_fn(docs[0]) + logger.info(f"Processing {len(docs)} docs") + json_docs = [doc.to_json() for doc in docs] + with mp.Pool( + processes=self.num_processes, + initializer=ingest_log_streaming_init, + initargs=(logging.DEBUG if self.verbose else logging.INFO,), + ) as pool: + pool.map(self.doc_processor_fn, json_docs) + + def run(self): + self.initialize() + + # fetch the list of lazy downloading IngestDoc obj's + docs = self.source_doc_connector.get_ingest_docs() + + try: + self.run_partition(docs=docs) + if self.dest_doc_connector: + self.dest_doc_connector.write(docs=docs) + finally: + self.cleanup() + + +def process_documents( + source_doc_connector: BaseSourceConnector, + partition_config: PartitionConfig, + verbose: bool, + dest_doc_connector: t.Optional[BaseDestinationConnector] = None, +) -> None: + process_document_with_partition_args = partial( + process_document, + strategy=partition_config.strategy, + ocr_languages=partition_config.ocr_languages, + encoding=partition_config.encoding, + pdf_infer_table_structure=partition_config.pdf_infer_table_structure, + ) + + Processor( + source_doc_connector=source_doc_connector, + doc_processor_fn=process_document_with_partition_args, + num_processes=partition_config.num_processes, + reprocess=partition_config.reprocess, + verbose=verbose, + max_docs=partition_config.max_docs, + dest_doc_connector=dest_doc_connector, + ).run() diff --git a/unstructured/ingest/runner/__init__.py b/unstructured/ingest/runner/__init__.py index e60613e7f7..726dc242fb 100644 --- a/unstructured/ingest/runner/__init__.py +++ b/unstructured/ingest/runner/__init__.py @@ -46,4 +46,5 @@ "sharepoint", "slack", "wikipedia", + "writer_map", ] diff --git a/unstructured/ingest/runner/s3_2.py b/unstructured/ingest/runner/s3_2.py new file mode 100644 index 0000000000..7396a81cf2 --- /dev/null +++ b/unstructured/ingest/runner/s3_2.py @@ -0,0 +1,55 @@ +import logging +import typing as t + +from unstructured.ingest.interfaces2 import PartitionConfig, ReadConfig +from unstructured.ingest.logger import ingest_log_streaming_init, logger +from unstructured.ingest.processor2 import process_documents +from unstructured.ingest.runner.utils2 import update_download_dir_remote_url +from unstructured.ingest.runner.writers import writer_map + + +def s3( + verbose: bool, + read_config: ReadConfig, + partition_configs: PartitionConfig, + remote_url: str, + recursive: bool, + anonymous: bool, + writer_type: t.Optional[str] = None, + writer_kwargs: t.Optional[dict] = None, + **kwargs, +): + writer_kwargs = writer_kwargs if writer_kwargs else {} + ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) + + read_config.download_dir = update_download_dir_remote_url( + connector_name="s3", + read_configs=read_config, + remote_url=remote_url, + logger=logger, + ) + + from unstructured.ingest.connector.s3_2 import S3SourceConnector, SimpleS3Config + + source_doc_connector = S3SourceConnector( # type: ignore + read_config=read_config, + connector_config=SimpleS3Config( + path=remote_url, + recursive=recursive, + access_kwargs={"anon": anonymous}, + ), + partition_config=partition_configs, + ) + + dest_doc_connector = None + if writer_type: + writer = writer_map[writer_type] + print(f"WRITER CONFIGS: {writer_kwargs}") + dest_doc_connector = writer(**writer_kwargs) + + process_documents( + source_doc_connector=source_doc_connector, + partition_config=partition_configs, + verbose=verbose, + dest_doc_connector=dest_doc_connector, + ) diff --git a/unstructured/ingest/runner/utils2.py b/unstructured/ingest/runner/utils2.py new file mode 100644 index 0000000000..7356077744 --- /dev/null +++ b/unstructured/ingest/runner/utils2.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import hashlib +import logging +from pathlib import Path + +from unstructured.ingest.interfaces2 import ( + ReadConfig, +) + + +def update_download_dir_remote_url( + connector_name: str, + read_configs: ReadConfig, + remote_url: str, + logger: logging.Logger, +) -> str: + hashed_dir_name = hashlib.sha256(remote_url.encode("utf-8")) + return update_download_dir_hash( + connector_name=connector_name, + read_configs=read_configs, + hashed_dir_name=hashed_dir_name, + logger=logger, + ) + + +def update_download_dir_hash( + connector_name: str, + read_configs: ReadConfig, + hashed_dir_name: hashlib._Hash, + logger: logging.Logger, +) -> str: + if not read_configs.download_dir: + cache_path = Path.home() / ".cache" / "unstructured" / "ingest" + if not cache_path.exists(): + cache_path.mkdir(parents=True, exist_ok=True) + download_dir = cache_path / connector_name / hashed_dir_name.hexdigest()[:10] + if read_configs.preserve_downloads: + logger.warning( + f"Preserving downloaded files but download_dir is not specified," + f" using {download_dir}", + ) + new_download_dir = str(download_dir) + logger.debug(f"updating download directory to: {new_download_dir}") + else: + new_download_dir = read_configs.download_dir + return new_download_dir diff --git a/unstructured/ingest/runner/writers.py b/unstructured/ingest/runner/writers.py new file mode 100644 index 0000000000..8960ed7704 --- /dev/null +++ b/unstructured/ingest/runner/writers.py @@ -0,0 +1,22 @@ +def s3_writer( + remote_url: str, + anonymous: bool, + verbose: bool = False, +): + from unstructured.ingest.connector.s3_2 import ( + S3DestinationConnector, + SimpleS3Config, + ) + + return S3DestinationConnector( + write_config=None, + connector_config=SimpleS3Config( + path=remote_url, + access_kwargs={"anon": anonymous}, + ), + ) + + +writer_map = { + "s3": s3_writer, +}