diff --git a/README.md b/README.md index badd72c..a72ab6a 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,14 @@ export S3_DATABASE_EXPORT_BUCKET='some-bucket-to-use' postgres_to_redshift ``` +Optional flags: + +```bash +# Optional debug flag if you'd like the copy job to only warn on Redshift loading errors +# (instead of exiting) and keep going. +export WARN_ON_LOADING_ERROR='true' +``` + ## Contributing 1. Fork it ( https://github.com/kitchensurfing/postgres_to_redshift/fork ) diff --git a/lib/postgres_to_redshift.rb b/lib/postgres_to_redshift.rb index c41e689..fa5ca4b 100644 --- a/lib/postgres_to_redshift.rb +++ b/lib/postgres_to_redshift.rb @@ -1,11 +1,11 @@ -require "postgres_to_redshift/version" +require 'postgres_to_redshift/version' require 'pg' require 'uri' require 'aws-sdk-v1' require 'zlib' require 'tempfile' -require "postgres_to_redshift/table" -require "postgres_to_redshift/column" +require 'postgres_to_redshift/table' +require 'postgres_to_redshift/column' class PostgresToRedshift class << self @@ -41,7 +41,7 @@ def self.target_uri def self.source_connection unless instance_variable_defined?(:"@source_connection") @source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user || ENV['USER'], password: source_uri.password, dbname: source_uri.path[1..-1]) - @source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;") + @source_connection.exec('SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;') end @source_connection @@ -85,7 +85,7 @@ def bucket end def copy_table(table) - tmpfile = Tempfile.new("psql2rs") + tmpfile = Tempfile.new('psql2rs') zip = Zlib::GzipWriter.new(tmpfile) chunksize = 5 * GIGABYTE # uncompressed chunk = 1 @@ -97,14 +97,14 @@ def copy_table(table) source_connection.copy_data(copy_command) do while row = source_connection.get_copy_data zip.write(row) - if (zip.pos > chunksize) + if zip.pos > chunksize zip.finish tmpfile.rewind upload_table(table, tmpfile, chunk) chunk += 1 zip.close unless zip.closed? tmpfile.unlink - tmpfile = Tempfile.new("psql2rs") + tmpfile = Tempfile.new('psql2rs') zip = Zlib::GzipWriter.new(tmpfile) end end @@ -128,14 +128,45 @@ def import_table(table) puts "Importing #{table.target_table_name}" target_connection.exec("DROP TABLE IF EXISTS public.#{table.target_table_name}_updating") - target_connection.exec("BEGIN;") + begin + target_connection.exec('BEGIN;') + + target_connection.exec("ALTER TABLE public.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating") + + target_connection.exec("CREATE TABLE public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})") + + target_connection.exec("COPY public.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';") + + target_connection.exec('COMMIT;') + + rescue PG::InternalError => exception + handle_pg_exception(table, exception) + end + end - target_connection.exec("ALTER TABLE public.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating") + def handle_pg_exception(table, exception) + target_connection.exec('ROLLBACK;') - target_connection.exec("CREATE TABLE public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})") + if exception.message.include?('stl_load_errors') + puts exception.message + puts "ERROR: Last entry in Redshift's 'stl_load_errors' table:" + print_last_redshift_loading_error - target_connection.exec("COPY public.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';") + if !ENV['WARN_ON_LOADING_ERROR'].nil? && ENV['WARN_ON_LOADING_ERROR'].casecmp('true') == 0 + puts "\nINFO: Skipping '#{table.name}' and continuing on." + else + exit + end + else + puts 'ERROR: Unhandled PG error:' + raise + end + end - target_connection.exec("COMMIT;") + def print_last_redshift_loading_error + error_row = target_connection.exec('SELECT * FROM pg_catalog.stl_load_errors ORDER BY starttime DESC LIMIT 1').first + error_row.each do |k, v| + puts "\t#{k}: #{v}" + end end end