Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Bug: PostgreSQL Cache can't rename intermediate Table after replication #148

Closed
tinomerl opened this issue Mar 28, 2024 · 5 comments · Fixed by #149
Closed

🐛 Bug: PostgreSQL Cache can't rename intermediate Table after replication #148

tinomerl opened this issue Mar 28, 2024 · 5 comments · Fixed by #149

Comments

@tinomerl
Copy link
Contributor

PyAirbyte Version: 0.8.1
Airbyte Source LinkedIn Pages Version: 1.0.2

I just tested replicating the LinkedIn Pages Source to a local PostgreSQL and the Connector failed. I tested with two different streams. organization_lookup and follower_statistics. Both times the Connector failed with the Error Message.

ProgrammingError(psycopg2.errors.UndefinedTable) relation "<stream_name>" does not exist

Both of the Tables exist in the DB after running the Script but are empty.

image

The number of runs doesn't matter. It happens when the table will be created first time but also when the table is already existing

Sample Code

import airbyte as ab
from airbyte.caches import PostgresCache

cache = PostgresCache(
    host="localhost",
    username="postgres",
    password="pass123",
    database="postgres",
    port=5432
)

source: ab.Source = ab.get_source("source-linkedin-pages")

config = {
    "org_id": "redacted",
    "credentials": {
        "auth_method": "oAuth2.0",
        "client_id": "redacted",
        "client_secret": "redacted",
        "refresh_token": "redacted"
    }
}

source.set_config(config)
source.check()

source.select_streams("follower_statistics")
read_results = source.read(cache)
Error Log organization_lookup

Started `source-linkedin-pages` read operation at 11:11:38...
Failed `source-linkedin-pages` read operation at 11:11:48.
Traceback (most recent call last):
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/sources/base.py", line 740, in read
    cache.processor.process_airbyte_messages(
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/base.py", line 203, in process_airbyte_messages
    self.write_all_stream_data(
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/base.py", line 214, in write_all_stream_data
    self.write_stream_data(stream_name, write_strategy=write_strategy)
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/sql/base.py", line 542, in write_stream_data
    self._write_temp_table_to_final_table(
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/sql/base.py", line 695, in _write_temp_table_to_final_table
    self._swap_temp_table_with_final_table(
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/sql/base.py", line 800, in _swap_temp_table_with_final_table
    self._execute_sql(commands)
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/sql/base.py", line 608, in _execute_sql
    raise SQLRuntimeError(msg) from None  # from ex
airbyte._processors.sql.base.SQLRuntimeError: Error when executing SQL:
ALTER TABLE organization_lookup RENAME TO organization_lookup_deleteme;
ALTER TABLE organization_lookup_01ht28pnfnyp9hy9xzcj3f557x RENAME TO organization_lookup;
DROP TABLE organization_lookup_deleteme;
ProgrammingError(psycopg2.errors.UndefinedTable) relation "organization_lookup" does not exist

[SQL: ALTER TABLE organization_lookup RENAME TO organization_lookup_deleteme;
ALTER TABLE organization_lookup_01ht28pnfnyp9hy9xzcj3f557x RENAME TO organization_lookup;
DROP TABLE organization_lookup_deleteme;]
(Background on this error at: https://sqlalche.me/e/14/f405)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/sources/base.py", line 749, in read
    raise exc.AirbyteConnectorFailedError(
airbyte.exceptions.AirbyteConnectorFailedError: AirbyteConnectorFailedError: Connector failed.

Log output: 
        Starting syncing SourceLinkedinPages
    Marking stream organization_lookup as STARTED
    Syncing stream: organization_lookup 
    Marking stream organization_lookup as RUNNING
    Read 1 records from organization_lookup stream
    Marking stream organization_lookup as STOPPED
    Finished syncing organization_lookup
    SourceLinkedinPages runtimes:
    Syncing stream organization_lookup 0:00:07.137647
    Finished syncing SourceLinkedinPages

Error Log follower_stastics

Started `source-linkedin-pages` read operation at 11:17:02...
Failed `source-linkedin-pages` read operation at 11:17:11.
Traceback (most recent call last):
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/sources/base.py", line 740, in read
    cache.processor.process_airbyte_messages(
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/base.py", line 203, in process_airbyte_messages
    self.write_all_stream_data(
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/base.py", line 214, in write_all_stream_data
    self.write_stream_data(stream_name, write_strategy=write_strategy)
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/sql/base.py", line 542, in write_stream_data
    self._write_temp_table_to_final_table(
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/sql/base.py", line 695, in _write_temp_table_to_final_table
    self._swap_temp_table_with_final_table(
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/sql/base.py", line 800, in _swap_temp_table_with_final_table
    self._execute_sql(commands)
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/sql/base.py", line 608, in _execute_sql
    raise SQLRuntimeError(msg) from None  # from ex
airbyte._processors.sql.base.SQLRuntimeError: Error when executing SQL:
ALTER TABLE follower_statistics RENAME TO follower_statistics_deleteme;
ALTER TABLE follower_statistics_01ht290hb63tkzmhjws1e2xyc7 RENAME TO follower_statistics;
DROP TABLE follower_statistics_deleteme;
ProgrammingError(psycopg2.errors.UndefinedTable) relation "follower_statistics" does not exist

[SQL: ALTER TABLE follower_statistics RENAME TO follower_statistics_deleteme;
ALTER TABLE follower_statistics_01ht290hb63tkzmhjws1e2xyc7 RENAME TO follower_statistics;
DROP TABLE follower_statistics_deleteme;]
(Background on this error at: https://sqlalche.me/e/14/f405)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/sources/base.py", line 749, in read
    raise exc.AirbyteConnectorFailedError(
airbyte.exceptions.AirbyteConnectorFailedError: AirbyteConnectorFailedError: Connector failed.

Log output: 
        Starting syncing SourceLinkedinPages
    Marking stream follower_statistics as STARTED
    Syncing stream: follower_statistics 
    Marking stream follower_statistics as RUNNING
    Read 1 records from follower_statistics stream
    Marking stream follower_statistics as STOPPED
    Finished syncing follower_statistics
    SourceLinkedinPages runtimes:
    Syncing stream follower_statistics 0:00:07.102127
    Finished syncing SourceLinkedinPages

@aaronsteers
Copy link
Contributor

aaronsteers commented Mar 28, 2024

@tinomerl - Thanks for logging this.

Can you try again, but specifying an explicit schema name in your Postgres cache config?

There may be an edge case here where omitting the schema name prevents us from properly referencing the tables as we create them. (If so, we may need to make schema name a required field.)

@tinomerl
Copy link
Contributor Author

hey @aaronsteers,

thanks for the quick followup. I also tried it with explicitly stating the schema. The results were the same, with the same error. Sorry for not including that in the issue! I then dug a bit around and implemented a fix in #149. Let me know if i should change/test anything else.

@bindipankhudi
Copy link
Contributor

Hi @tinomerl! Hope you don't mind me reaching out on this ticket. I work with @aaronsteers and I was wondering how you are using PyAirbyte in your data integration workflow. Would you be open to filling out a quick survey: https://forms.gle/TrGVga3uu2z63ASp7 or just email me with your use case: [email protected]. Looking forward to hearing from you!

@tinomerl
Copy link
Contributor Author

tinomerl commented Apr 2, 2024

Hey @bindipankhudi, i don't mind. Just filled out the Google Form. If you have any questions feel free to reach out. :)

@bindipankhudi
Copy link
Contributor

Thank you @tinomerl! Really appreciate it! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants