Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

print Redshift loading error #23

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ export S3_DATABASE_EXPORT_BUCKET='some-bucket-to-use'
postgres_to_redshift
```

Optional flags:

```bash
# Optional debug flag if you'd like the copy job to only warn on Redshift loading errors
# (instead of exiting) and keep going.
export WARN_ON_LOADING_ERROR='true'
```

## Contributing

1. Fork it ( https://github.com/kitchensurfing/postgres_to_redshift/fork )
Expand Down
55 changes: 43 additions & 12 deletions lib/postgres_to_redshift.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
require "postgres_to_redshift/version"
require 'postgres_to_redshift/version'
require 'pg'
require 'uri'
require 'aws-sdk-v1'
require 'zlib'
require 'tempfile'
require "postgres_to_redshift/table"
require "postgres_to_redshift/column"
require 'postgres_to_redshift/table'
require 'postgres_to_redshift/column'

class PostgresToRedshift
class << self
Expand Down Expand Up @@ -41,7 +41,7 @@ def self.target_uri
def self.source_connection
unless instance_variable_defined?(:"@source_connection")
@source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user || ENV['USER'], password: source_uri.password, dbname: source_uri.path[1..-1])
@source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;")
@source_connection.exec('SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;')
end

@source_connection
Expand Down Expand Up @@ -85,7 +85,7 @@ def bucket
end

def copy_table(table)
tmpfile = Tempfile.new("psql2rs")
tmpfile = Tempfile.new('psql2rs')
zip = Zlib::GzipWriter.new(tmpfile)
chunksize = 5 * GIGABYTE # uncompressed
chunk = 1
Expand All @@ -97,14 +97,14 @@ def copy_table(table)
source_connection.copy_data(copy_command) do
while row = source_connection.get_copy_data
zip.write(row)
if (zip.pos > chunksize)
if zip.pos > chunksize
zip.finish
tmpfile.rewind
upload_table(table, tmpfile, chunk)
chunk += 1
zip.close unless zip.closed?
tmpfile.unlink
tmpfile = Tempfile.new("psql2rs")
tmpfile = Tempfile.new('psql2rs')
zip = Zlib::GzipWriter.new(tmpfile)
end
end
Expand All @@ -128,14 +128,45 @@ def import_table(table)
puts "Importing #{table.target_table_name}"
target_connection.exec("DROP TABLE IF EXISTS public.#{table.target_table_name}_updating")

target_connection.exec("BEGIN;")
begin
target_connection.exec('BEGIN;')

target_connection.exec("ALTER TABLE public.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating")

target_connection.exec("CREATE TABLE public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})")

target_connection.exec("COPY public.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';")

target_connection.exec('COMMIT;')

rescue PG::InternalError => exception
handle_pg_exception(table, exception)
end
end

target_connection.exec("ALTER TABLE public.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating")
def handle_pg_exception(table, exception)
target_connection.exec('ROLLBACK;')

target_connection.exec("CREATE TABLE public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})")
if exception.message.include?('stl_load_errors')
puts exception.message
puts "ERROR: Last entry in Redshift's 'stl_load_errors' table:"
print_last_redshift_loading_error

target_connection.exec("COPY public.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';")
if !ENV['WARN_ON_LOADING_ERROR'].nil? && ENV['WARN_ON_LOADING_ERROR'].casecmp('true') == 0
puts "\nINFO: Skipping '#{table.name}' and continuing on."
else
exit
end
else
puts 'ERROR: Unhandled PG error:'
raise
end
end

target_connection.exec("COMMIT;")
def print_last_redshift_loading_error
error_row = target_connection.exec('SELECT * FROM pg_catalog.stl_load_errors ORDER BY starttime DESC LIMIT 1').first
error_row.each do |k, v|
puts "\t#{k}: #{v}"
end
end
end