Skip to content

Commit

Permalink
fix: occasional SIGABRT with deltalake writer on Linux (#1567)
Browse files Browse the repository at this point in the history
- resolves an issue where occasionally deltalake writer results in
SIGABRT event though the writer finished writing table properly on linux
- this is first observed in ingest test
- Putting the writer into a process mitigates this problem by forcing
python to finish the deltalake rust backend to finish its tasks

## test

To test this it is best to setup an instance on a Linux system since the
problem has only been observed on Linux so far. Run

```bash
PYTHONPATH=. ./unstructured/ingest/main.py delta-table --num-processes 2 --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.date_created,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth --table-uri ../tables/delta/ --preserve-downloads --verbose delta-table --write-column json_data --mode overwrite --table-uri file:///tmp/delta
```

Without this fix occasionally we'd encounter `SIGABTR`.

---------

Co-authored-by: ryannikolaidis <[email protected]>
  • Loading branch information
badGarnet and ryannikolaidis authored Sep 29, 2023
1 parent 4e84e32 commit cd8c6a2
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ allowing the document to be loaded. Fix: Change parent class for Formula to Text
* **Fixes Sphinx errors.** Fixes errors when running Sphinx `make html` and installs library to suppress warnings.
* **Fixes a metadata backwards compatibility error** Problem: When calling `partition_via_api`, the hosted api may return an element schema that's newer than the current `unstructured`. In this case, metadata fields were added which did not exist in the local `ElementMetadata` dataclass, and `__init__()` threw an error. Fix: remove nonexistent fields before instantiating in `ElementMetadata.from_json()`. Importance: Crucial to avoid breaking changes when adding fields.
* **Fixes issue with Discord connector when a channel returns `None`** Problem: Getting the `jump_url` from a nonexistent Discord `channel` fails. Fix: property `jump_url` is now retrieved within the same context as the messages from the channel. Importance: Avoids cascading issues when the connector fails to fetch information about a Discord channel.
* **Fixes occasionally SIGABTR when writing table with `deltalake` on Linux** Problem: occasionally on Linux ingest can throw a `SIGABTR` when writing `deltalake` table even though the table was written correctly. Fix: put the writing function into a `Process` to ensure its execution to the fullest extent before returning to the main process. Importance: Improves stability of connectors using `deltalake`


## 0.10.16
Expand Down
2 changes: 1 addition & 1 deletion test_unstructured_ingest/test-ingest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ trap print_last_run EXIT

for script in "${scripts[@]}"; do
CURRENT_SCRIPT=$script
if [[ "$CURRENT_SCRIPT" == "test-ingest-notion.sh" ]] || [[ "$CURRENT_SCRIPT" == "test-ingest-delta-table.sh" ]]; then
if [[ "$CURRENT_SCRIPT" == "test-ingest-notion.sh" ]]; then
echo "--------- RUNNING SCRIPT $script --- IGNORING FAILURES"
set +e
echo "Running ./test_unstructured_ingest/$script"
Expand Down
18 changes: 14 additions & 4 deletions unstructured/ingest/connector/delta_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import typing as t
from dataclasses import dataclass
from datetime import datetime as dt
from multiprocessing import Process
from pathlib import Path

import pandas as pd
Expand Down Expand Up @@ -182,8 +183,17 @@ def write(self, docs: t.List[BaseIngestDoc]) -> None:
f"writing {len(json_list)} rows to destination "
f"table at {self.connector_config.table_uri}",
)
write_deltalake(
table_or_uri=self.connector_config.table_uri,
data=pd.DataFrame(data={self.write_config.write_column: json_list}),
mode=self.write_config.mode,
# NOTE: deltalake writer on Linux sometimes can finish but still trigger a SIGABRT and cause
# ingest to fail, even though all tasks are completed normally. Putting the writer into a
# process mitigates this issue by ensuring python interpreter waits properly for deltalake's
# rust backend to finish
writer = Process(
target=write_deltalake,
kwargs={
"table_or_uri": self.connector_config.table_uri,
"data": pd.DataFrame(data={self.write_config.write_column: json_list}),
"mode": self.write_config.mode,
},
)
writer.start()
writer.join()

0 comments on commit cd8c6a2

Please sign in to comment.