Skip to content

Commit

Permalink
Add s3 writer (#1223)
Browse files Browse the repository at this point in the history
### Description
Convert s3 cli code to also support writing to s3. Writers are added as
optional subcommands to the parent command with their own arguments.
Custom `click.Group` introduced to add some custom formatting and text
in help messages.

To limit the scope of this PR, most existing files were not touched but
instead new files were added for the new flow. This allowed _only_ the
s3 connector to be updated without breaking any other ones.
  • Loading branch information
rbiseck3 authored Aug 31, 2023
1 parent 810cfc2 commit ed7f991
Show file tree
Hide file tree
Showing 24 changed files with 1,313 additions and 63 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features

* Add in ingest cli s3 writer

### Fixes

## 0.10.10
Expand Down
50 changes: 7 additions & 43 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -204,49 +204,13 @@ install-pandoc:
## pip-compile: compiles all base/dev/test requirements
.PHONY: pip-compile
pip-compile:
pip-compile --upgrade requirements/base.in

# Extra requirements that are specific to document types
pip-compile --upgrade requirements/extra-csv.in
pip-compile --upgrade requirements/extra-docx.in
pip-compile --upgrade requirements/extra-epub.in
pip-compile --upgrade requirements/extra-pandoc.in
pip-compile --upgrade requirements/extra-markdown.in
pip-compile --upgrade requirements/extra-msg.in
pip-compile --upgrade requirements/extra-odt.in
pip-compile --upgrade requirements/extra-pdf-image.in
pip-compile --upgrade requirements/extra-pptx.in
pip-compile --upgrade requirements/extra-xlsx.in

# Extra requirements for huggingface staging functions
pip-compile --upgrade requirements/huggingface.in
pip-compile --upgrade requirements/test.in
pip-compile --upgrade requirements/dev.in
pip-compile --upgrade requirements/build.in
# NOTE(robinson) - docs/requirements.txt is where the GitHub action for building
# sphinx docs looks for additional requirements
@for file in $(shell ls requirements/*.in); \
do echo "running: pip-compile --upgrade $${file}" && \
pip-compile --upgrade $${file}; \
done
cp requirements/build.txt docs/requirements.txt
pip-compile --upgrade requirements/ingest-s3.in
pip-compile --upgrade requirements/ingest-biomed.in
pip-compile --upgrade requirements/ingest-box.in
pip-compile --upgrade requirements/ingest-gcs.in
pip-compile --upgrade requirements/ingest-dropbox.in
pip-compile --upgrade requirements/ingest-azure.in
pip-compile --upgrade requirements/ingest-delta-table.in
pip-compile --upgrade requirements/ingest-discord.in
pip-compile --upgrade requirements/ingest-reddit.in
pip-compile --upgrade requirements/ingest-github.in
pip-compile --upgrade requirements/ingest-gitlab.in
pip-compile --upgrade requirements/ingest-slack.in
pip-compile --upgrade requirements/ingest-wikipedia.in
pip-compile --upgrade requirements/ingest-google-drive.in
pip-compile --upgrade requirements/ingest-elasticsearch.in
pip-compile --upgrade requirements/ingest-onedrive.in
pip-compile --upgrade requirements/ingest-outlook.in
pip-compile --upgrade requirements/ingest-confluence.in
pip-compile --upgrade requirements/ingest-airtable.in
pip-compile --upgrade requirements/ingest-sharepoint.in
pip-compile --upgrade requirements/ingest-notion.in



## install-project-local: install unstructured into your local python environment
.PHONY: install-project-local
Expand Down Expand Up @@ -281,7 +245,7 @@ test-no-extras:
test_${PACKAGE_NAME}/partition/test_text.py \
test_${PACKAGE_NAME}/partition/test_email.py \
test_${PACKAGE_NAME}/partition/test_html_partition.py \
test_${PACKAGE_NAME}/partition/test_xml_partition.py
test_${PACKAGE_NAME}/partition/test_xml_partition.py

.PHONY: test-extra-csv
test-extra-csv:
Expand Down
8 changes: 6 additions & 2 deletions examples/ingest/s3-small-batch/ingest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@ PYTHONPATH=. ./unstructured/ingest/main.py \
s3 \
--remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \
--anonymous \
--structured-output-dir s3-small-batch-output \
--num-processes 2
--output-dir s3-small-batch-output \
--num-processes 2 \
--verbose \
s3 \
--anonymous \
--remote-url s3://utic-dev-tech-fixtures/small-pdf-set-output
9 changes: 4 additions & 5 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#
# pip-compile requirements/dev.in
#
anyio==3.7.1
anyio==4.0.0
# via jupyter-server
appnope==0.1.3
# via
Expand Down Expand Up @@ -101,7 +101,6 @@ importlib-resources==6.0.1
# jsonschema
# jsonschema-specifications
# jupyterlab
# notebook
ipykernel==6.25.1
# via
# jupyter
Expand Down Expand Up @@ -167,7 +166,7 @@ jupyter-events==0.7.0
# via jupyter-server
jupyter-lsp==2.2.0
# via jupyterlab
jupyter-server==2.7.2
jupyter-server==2.7.3
# via
# jupyter-lsp
# jupyterlab
Expand Down Expand Up @@ -211,7 +210,7 @@ nest-asyncio==1.5.7
# via ipykernel
nodeenv==1.8.0
# via pre-commit
notebook==7.0.2
notebook==7.0.3
# via jupyter
notebook-shim==0.2.3
# via
Expand Down Expand Up @@ -391,7 +390,7 @@ urllib3==1.26.16
# -c requirements/constraints.in
# -c requirements/test.txt
# requests
virtualenv==20.24.3
virtualenv==20.24.4
# via pre-commit
wcwidth==0.2.6
# via prompt-toolkit
Expand Down
1 change: 1 addition & 0 deletions requirements/extra-pdf-image.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ transformers==4.32.1
# via unstructured-inference
typing-extensions==4.7.1
# via
# -c requirements/base.txt
# filelock
# huggingface-hub
# iopath
Expand Down
1 change: 1 addition & 0 deletions requirements/huggingface.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ transformers==4.32.1
# via -r requirements/huggingface.in
typing-extensions==4.7.1
# via
# -c requirements/base.txt
# filelock
# huggingface-hub
# torch
Expand Down
2 changes: 1 addition & 1 deletion requirements/ingest-elasticsearch.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ elastic-transport==8.4.0
# via elasticsearch
elasticsearch==8.9.0
# via -r requirements/ingest-elasticsearch.in
jq==1.4.1
jq==1.5.0
# via -r requirements/ingest-elasticsearch.in
urllib3==1.26.16
# via
Expand Down
2 changes: 1 addition & 1 deletion requirements/ingest-notion.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#
# pip-compile requirements/ingest-notion.in
#
anyio==3.7.1
anyio==4.0.0
# via httpcore
certifi==2023.7.22
# via
Expand Down
10 changes: 7 additions & 3 deletions test_unstructured_ingest/test-ingest-s3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ sh "$SCRIPT_DIR"/check-num-files-expected-output.sh 3 $OUTPUT_FOLDER_NAME 20k
PYTHONPATH=. ./unstructured/ingest/main.py \
s3 \
--download-dir "$DOWNLOAD_DIR" \
--metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified \
--partition-strategy hi_res \
--metadata-exclude coordinates \
--metadata-exclude filename \
--metadata-exclude file_directory \
--metadata-exclude metadata.data_source.date_processed \
--metadata-exclude metadata.last_modified \
--strategy hi_res \
--preserve-downloads \
--reprocess \
--structured-output-dir "$OUTPUT_DIR" \
--output-dir "$OUTPUT_DIR" \
--verbose \
--remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \
--anonymous
Expand Down
7 changes: 5 additions & 2 deletions unstructured/ingest/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def ingest():
# Dynamically update shared options for supported subcommands
subcommands = [
cli_cmds.box,
cli_cmds.s3,
cli_cmds.gcs,
cli_cmds.delta_table,
cli_cmds.dropbox,
Expand Down Expand Up @@ -42,7 +41,11 @@ def ingest():
def get_cmd() -> click.Command:
cmd = ingest
# Add all subcommands
for src_subcommand in cli_cmds.src:
# add destination subcommands
for dest_subcommand in cli_cmds.dest:
src_subcommand.add_command(dest_subcommand)
cmd.add_command(src_subcommand)
for subcommand in subcommands:
# add_shared_options(cmd)
cmd.add_command(subcommand())
return cmd
13 changes: 12 additions & 1 deletion unstructured/ingest/cli/cmds/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import typing as t

import click

from .airtable import get_cmd as airtable
from .azure import get_cmd as azure
from .biomed import get_cmd as biomed
Expand All @@ -17,11 +21,16 @@
from .onedrive import get_cmd as onedrive
from .outlook import get_cmd as outlook
from .reddit import get_cmd as reddit
from .s3 import get_cmd as s3
from .s3_2 import get_dest_cmd as s3_dest
from .s3_2 import get_source_cmd as s3
from .sharepoint import get_cmd as sharepoint
from .slack import get_cmd as slack
from .wikipedia import get_cmd as wikipedia

src: t.List[click.Group] = [s3()]

dest: t.List[click.Command] = [s3_dest()]

__all__ = [
"airtable",
"azure",
Expand All @@ -46,4 +55,6 @@
"sharepoint",
"slack",
"wikipedia",
"src",
"dest",
]
114 changes: 114 additions & 0 deletions unstructured/ingest/cli/cmds/s3_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import logging
from dataclasses import dataclass

import click

from unstructured.ingest.cli.cmds.utils import Group
from unstructured.ingest.cli.common import (
log_options,
)
from unstructured.ingest.cli.interfaces import (
CliMixin,
CliPartitionConfig,
CliReadConfig,
CliRecursiveConfig,
CliRemoteUrlConfig,
)
from unstructured.ingest.interfaces2 import BaseConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner.s3_2 import s3 as s3_fn


@dataclass
class S3CliConfigs(BaseConfig, CliMixin):
anonymous: bool = False

@staticmethod
def add_cli_options(cmd: click.Command) -> None:
options = [
click.Option(
["--anonymous"],
is_flag=True,
default=False,
help="Connect to s3 without local AWS credentials.",
),
]
cmd.params.extend(options)


@click.group(name="s3", invoke_without_command=True, cls=Group)
@click.pass_context
def s3_source(ctx: click.Context, **options):
if ctx.invoked_subcommand:
return

# Click sets all multiple fields as tuple, this needs to be updated to list
for k, v in options.items():
if isinstance(v, tuple):
options[k] = list(v)
verbose = options.get("verbose", False)
ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO)
log_options(options, verbose=verbose)
try:
# run_init_checks(**options)
read_configs = CliReadConfig.from_dict(options)
partition_configs = CliPartitionConfig.from_dict(options)
# Run for schema validation
S3CliConfigs.from_dict(options)
s3_fn(read_config=read_configs, partition_configs=partition_configs, **options)
except Exception as e:
logger.error(e, exc_info=True)
raise click.ClickException(str(e)) from e


@click.command(name="s3")
@click.pass_context
def s3_dest(ctx: click.Context, **options):
parent_options: dict = ctx.parent.params if ctx.parent else {}
# Click sets all multiple fields as tuple, this needs to be updated to list
for k, v in options.items():
if isinstance(v, tuple):
options[k] = list(v)
for k, v in parent_options.items():
if isinstance(v, tuple):
parent_options[k] = list(v)
verbose = parent_options.get("verbose", False)
ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO)
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
# run_init_checks(**options)
read_configs = CliReadConfig.from_dict(parent_options)
partition_configs = CliPartitionConfig.from_dict(parent_options)
# Run for schema validation
S3CliConfigs.from_dict(options)
s3_fn(
read_config=read_configs,
partition_configs=partition_configs,
writer_type="s3",
writer_kwargs=options,
**parent_options,
)
except Exception as e:
logger.error(e, exc_info=True)
raise click.ClickException(str(e)) from e


def get_dest_cmd() -> click.Command:
cmd = s3_dest
S3CliConfigs.add_cli_options(cmd)
CliRemoteUrlConfig.add_cli_options(cmd)
return cmd


def get_source_cmd() -> click.Group:
cmd = s3_source
S3CliConfigs.add_cli_options(cmd)
CliRemoteUrlConfig.add_cli_options(cmd)
CliRecursiveConfig.add_cli_options(cmd)

# Common CLI configs
CliReadConfig.add_cli_options(cmd)
CliPartitionConfig.add_cli_options(cmd)
cmd.params.append(click.Option(["-v", "--verbose"], is_flag=True, default=False))
return cmd
Loading

0 comments on commit ed7f991

Please sign in to comment.