Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

print Redshift loading error #23

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ export S3_DATABASE_EXPORT_BUCKET='some-bucket-to-use'
postgres_to_redshift
```

Optional flags:

```bash
# Optional debug flag if you'd like the copy job to ignore Redshift loading errors
# and keep going.
export IGNORE_LOADING_ERRORS_AND_CONTINUE='true'
```

## Contributing

1. Fork it ( https://github.com/kitchensurfing/postgres_to_redshift/fork )
Expand Down
46 changes: 34 additions & 12 deletions lib/postgres_to_redshift.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
require "postgres_to_redshift/version"
require 'postgres_to_redshift/version'
require 'pg'
require 'uri'
require 'aws-sdk-v1'
require 'zlib'
require 'tempfile'
require "postgres_to_redshift/table"
require "postgres_to_redshift/column"
require 'postgres_to_redshift/table'
require 'postgres_to_redshift/column'

class PostgresToRedshift
class << self
Expand Down Expand Up @@ -41,7 +41,7 @@ def self.target_uri
def self.source_connection
unless instance_variable_defined?(:"@source_connection")
@source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user || ENV['USER'], password: source_uri.password, dbname: source_uri.path[1..-1])
@source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;")
@source_connection.exec('SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;')
end

@source_connection
Expand Down Expand Up @@ -85,7 +85,7 @@ def bucket
end

def copy_table(table)
tmpfile = Tempfile.new("psql2rs")
tmpfile = Tempfile.new('psql2rs')
zip = Zlib::GzipWriter.new(tmpfile)
chunksize = 5 * GIGABYTE # uncompressed
chunk = 1
Expand All @@ -97,14 +97,14 @@ def copy_table(table)
source_connection.copy_data(copy_command) do
while row = source_connection.get_copy_data
zip.write(row)
if (zip.pos > chunksize)
if zip.pos > chunksize
zip.finish
tmpfile.rewind
upload_table(table, tmpfile, chunk)
chunk += 1
zip.close unless zip.closed?
tmpfile.unlink
tmpfile = Tempfile.new("psql2rs")
tmpfile = Tempfile.new('psql2rs')
zip = Zlib::GzipWriter.new(tmpfile)
end
end
Expand All @@ -128,14 +128,36 @@ def import_table(table)
puts "Importing #{table.target_table_name}"
target_connection.exec("DROP TABLE IF EXISTS public.#{table.target_table_name}_updating")

target_connection.exec("BEGIN;")
begin
target_connection.exec('BEGIN;')

target_connection.exec("ALTER TABLE public.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating")

target_connection.exec("CREATE TABLE public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})")

target_connection.exec("COPY public.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';")

target_connection.exec('COMMIT;')

target_connection.exec("ALTER TABLE public.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating")
rescue PG::InternalError => e
target_connection.exec('ROLLBACK;')

target_connection.exec("CREATE TABLE public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})")
print_last_redshift_loading_error if e.message.include?('stl_load_errors')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this will not print any errors that do not match. Will other kinds of errors fail silently?


target_connection.exec("COPY public.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';")
continue_after_error =
!ENV['IGNORE_LOADING_ERRORS_AND_CONTINUE'].nil? &&
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this logic to a helper method, and change the name to something like WARN_ON_LOADING_ERROR

ENV['IGNORE_LOADING_ERRORS_AND_CONTINUE'].downcase == 'true'

target_connection.exec("COMMIT;")
raise unless continue_after_error
end
end

def print_last_redshift_loading_error
puts 'ERROR: Last Redshift loading error:'
error_row = target_connection.exec('SELECT * FROM pg_catalog.stl_load_errors ORDER BY starttime DESC LIMIT 1').first
error_row.each do |k, v|
puts "\t#{k}: #{v}"
end
puts
end
end