Skip to content

Commit

Permalink
Implement support for multiple schemas for PostgreSQL (closes #64)
Browse files Browse the repository at this point in the history
  • Loading branch information
willbryant committed Jun 28, 2020
1 parent 5bfc484 commit df0da8a
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 38 deletions.
10 changes: 9 additions & 1 deletion src/filters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,15 @@ void apply_filters(const TableFilters &table_filters, Tables &tables) {
map<string, Table*> tables_by_id;

for (Table &table : tables) {
tables_by_id[table.name] = &table;
// we use id_from_name here as we want to include the schema name in the key we look for,
// as tables with the same name may be present in multiple schemas. this is simply the
// table name itself when there is no schema name, unless the table name includes a period
// itself (which is very rare since that's a terrible idea!), and when there is a schema
// name it's the normal schemaname.tablename format. the only people who would be offended
// by this behavior are people who expect tables in other schemas that are in the schema
// search path to get filtered even without putting the schema name in the filter file, but
// we can't really have it both ways.
tables_by_id[table.id_from_name()] = &table;
}

for (auto const &it : table_filters) {
Expand Down
12 changes: 11 additions & 1 deletion src/ks_mysql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,17 @@ string &MySQLClient::append_quoted_column_value_to(string &result, const Column
}

void MySQLClient::convert_unsupported_database_schema(Database &database) {
set<string> table_names_seen;

for (Table &table : database.tables) {
// postgresql supports multiple schemas within a database; mysql doesn't. in some respects mysql's databases
// are implemented more like postgresql's schemas, but not in others; we can't assume anything in particular
// about how the user would want to break out multiple schemas, so until told otherwise we must put everything
// into the target database - it'd certainly be a big surprise if we wrote to any other database than that chosen.
if (table_names_seen.count(table.name)) throw runtime_error("Conflicting tables named " + table.name + " present in multiple schemas");
table_names_seen.insert(table.name);
table.schema_name.clear();

for (Column &column : table.columns) {
// postgresql allows numeric with no precision or scale specification and preserves the given input data
// up to an implementation-defined precision and scale limit; mysql doesn't, and silently converts
Expand Down Expand Up @@ -1139,7 +1149,7 @@ struct MySQLTableLister {
inline MySQLTableLister(MySQLClient &client, Database &database, const ColumnTypeList &accepted_types): client(client), database(database), accepted_types(accepted_types) {}

inline void operator()(MySQLRow &row) {
Table table(row.string_at(0));
Table table("" /* schema */, row.string_at(0));

MySQLColumnLister column_lister(client, table, accepted_types);
client.query("SELECT COLUMN_NAME, COLUMN_TYPE, IS_NULLABLE, COLUMN_DEFAULT, EXTRA, " + generation_expression_column() + ", " + srid_column() + ", COLUMN_COMMENT, " + json_check_constraint_expression() + " AS JSON_CHECK_CONSTRAINT FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = SCHEMA() AND TABLE_NAME = '" + client.escape_string_value(table.name) + "' ORDER BY ORDINAL_POSITION", column_lister);
Expand Down
34 changes: 29 additions & 5 deletions src/ks_postgresql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ class PostgreSQLClient: public GlobalKeys, public SequenceColumns, public DropKe
string key_definition(const Table &table, const Key &key);

inline string quote_identifier(const string &name) { return ::quote_identifier(name, '"'); };
inline string quote_table_name(const Table &table) { return ::quote_identifier(table.name, '"'); };
inline string quote_schema_name(const string &schema_name) { return quote_identifier(schema_name.empty() ? "public" : schema_name); } // whereas in postgresql itself not specifying a schema means use the first entry in the search_path, but in KS it always means the normal default namespace for the database server in question
inline string quote_table_name(const Table &table) { return quote_schema_name(table.schema_name) + '.' + quote_identifier(table.name); }
inline bool supports_jsonb_column_type() const { return (server_version >= POSTGRESQL_9_4); }
inline bool supports_generated_as_identity() const { return (server_version >= POSTGRESQL_10); }
inline bool supports_generated_columns() const { return (server_version >= POSTGRESQL_12); }
Expand Down Expand Up @@ -608,7 +609,7 @@ string PostgreSQLClient::column_default(const Table &table, const Column &column
return " GENERATED ALWAYS AS IDENTITY";

case DefaultType::generated_by_sequence:
return string(" DEFAULT nextval('" + escape_string_value(quote_identifier(column.default_value)) + "'::regclass)");
return string(" DEFAULT nextval('" + escape_string_value(quote_schema_name(table.schema_name) + '.' + quote_identifier(column.default_value)) + "'::regclass)");

case DefaultType::default_value: {
string result(" DEFAULT ");
Expand Down Expand Up @@ -711,7 +712,17 @@ struct PostgreSQLColumnLister {
column.default_value.substr(column.default_value.length() - 12, 12) == "'::regclass)") {
// this is what you got back when you used the historic SERIAL pseudo-type (subsequently replaced by the new standard IDENTITY GENERATED ... AS IDENTITY)
column.default_type = DefaultType::generated_by_sequence;
column.default_value = unquote_identifier(column.default_value.substr(9, column.default_value.length() - 21));
string quoted_sequence_name(column.default_value.substr(9, column.default_value.length() - 21));

// postgresql requires that sequences belong to the same schema as the table that uses them, so we should always find that the sequence name is prefixed
// by the same schema name, if it's not in the default schema. chop this off here; otherwise when it's applied at the other end, it'll all get quoted
// together with the sequence name as a single big identifier, which won't work. we look for both quoted and unquoted forms rather than trying to guess
// whether postgresql would've used the quoted form.
if (!table.schema_name.empty()) {
quoted_sequence_name = remove_specific_schema_from_identifier(quoted_sequence_name, table.schema_name);
}

column.default_value = unquote_identifier(quoted_sequence_name);

} else if (column.default_value.substr(0, 6) == "NULL::" && db_type.substr(0, column.default_value.length() - 6) == column.default_value.substr(6)) {
// postgresql treats a NULL default as distinct to no default, so we try to respect that by keeping the value as a function,
Expand Down Expand Up @@ -884,6 +895,19 @@ struct PostgreSQLColumnLister {
return result;
}

inline string remove_specific_schema_from_identifier(const string &escaped, const string &schema_name) {
if (escaped.substr(0, table.schema_name.length() + 1) == schema_name + '.') {
return escaped.substr(schema_name.length() + 1);
}

string quoted_table_schema_name(quote_identifier(schema_name, '"'));
if (escaped.substr(0, quoted_table_schema_name.length() + 1) == quoted_table_schema_name + '.') {
return escaped.substr(quoted_table_schema_name.length() + 1);
}

return escaped;
}

inline tuple<string, string> extract_spatial_subtype_and_reference_system(string subtype) {
transform(subtype.begin(), subtype.end(), subtype.begin(), [](unsigned char c){ return tolower(c); });

Expand Down Expand Up @@ -952,7 +976,7 @@ struct PostgreSQLTableLister {

void operator()(PostgreSQLRow &row) {
string schema_name(row.string_at(0));
Table table(row.string_at(1));
Table table(schema_name == "public" ? "" : schema_name, row.string_at(1));

PostgreSQLColumnLister column_lister(table, type_map, accepted_types);
client.query(
Expand Down Expand Up @@ -1080,7 +1104,7 @@ void PostgreSQLClient::populate_database_schema(Database &database, const Column
"WHERE pg_namespace.nspname = ANY (current_schemas(false)) AND "
"pg_class.relnamespace = pg_namespace.oid AND "
"relkind = 'r' "
"ORDER BY pg_relation_size(pg_class.oid) DESC, relname ASC",
"ORDER BY pg_relation_size(pg_class.oid) DESC, pg_namespace.nspname ASC, relname ASC",
table_lister);
}

Expand Down
17 changes: 13 additions & 4 deletions src/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ enum class PrimaryKeyType {
};

struct Table {
string schema_name; // only used on databases supporting multiple schema, and only set when not the default (eg. "public") for that database server
string name;
Columns columns;
ColumnIndices primary_key_columns;
Expand All @@ -117,13 +118,13 @@ struct Table {
// the following member isn't serialized currently (could be, but not required):
string where_conditions;

inline Table(const string &name): name(name) {}
inline Table(const string &schema_name, const string &name): schema_name(schema_name), name(name) {}
inline Table() {}

inline const string &id_from_name() const { return name; }
inline const string id_from_name() const { return (schema_name.empty() ? "" : escape_name_for_id(schema_name) + '.') + escape_name_for_id(name); }

inline bool operator <(const Table &other) const { return (name < other.name); }
inline bool operator ==(const Table &other) const { return (name == other.name && columns == other.columns && same_primary_key_as(other) && keys == other.keys); }
inline bool operator <(const Table &other) const { return (schema_name == other.schema_name ? name < other.name : schema_name < other.schema_name); }
inline bool operator ==(const Table &other) const { return (schema_name == other.schema_name && name == other.name && columns == other.columns && same_primary_key_as(other) && keys == other.keys); }
inline bool operator !=(const Table &other) const { return (!(*this == other)); }
size_t index_of_column(const string &name) const;

Expand All @@ -136,6 +137,14 @@ struct Table {
size_t that_explicit_columns = other.primary_key_type == PrimaryKeyType::explicit_primary_key ? other.primary_key_columns.size() : 0;
return (this_explicit_columns == that_explicit_columns && equal(primary_key_columns.begin(), primary_key_columns.begin() + this_explicit_columns, other.primary_key_columns.begin()));
}

inline string escape_name_for_id(const string &str) const {
string result(str);
for (size_t pos = 0; (pos = result.find(".\\", pos)) != string::npos; pos += 2) {
result.insert(pos, 1, '\\');
}
return result;
}
};

typedef vector<Table> Tables;
Expand Down
6 changes: 3 additions & 3 deletions src/schema_matcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ struct CreateTableSequencesStatements <DatabaseClient, true> {
for (const Column &column : table.columns) {
if (column.default_type == DefaultType::generated_by_sequence) {
string result("DROP SEQUENCE IF EXISTS ");
result += client.quote_identifier(column.default_value);
result += client.quote_schema_name(table.schema_name) + '.' + client.quote_identifier(column.default_value);
statements.push_back(result);

result = "CREATE SEQUENCE ";
result += client.quote_identifier(column.default_value);
result += client.quote_schema_name(table.schema_name) + '.' + client.quote_identifier(column.default_value);
statements.push_back(result);
}
}
Expand All @@ -78,7 +78,7 @@ struct OwnTableSequencesStatements <DatabaseClient, true> {
for (const Column &column : table.columns) {
if (column.default_type == DefaultType::generated_by_sequence) {
string result("ALTER SEQUENCE ");
result += client.quote_identifier(column.default_value);
result += client.quote_schema_name(table.schema_name) + '.' + client.quote_identifier(column.default_value);
result += " OWNED BY ";
result += client.quote_table_name(table);
result += '.';
Expand Down
12 changes: 10 additions & 2 deletions src/schema_serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,13 @@ void operator << (Packer<OutputStream> &packer, const Table &table) {
legacy_serialize(packer, table);
return;
}
pack_map_length(packer, 5);
if (table.schema_name.empty()) {
pack_map_length(packer, 5);
} else {
pack_map_length(packer, 6);
packer << string("schema_name");
packer << table.schema_name;
}
packer << string("name");
packer << table.name;
packer << string("columns");
Expand Down Expand Up @@ -279,7 +285,9 @@ void operator >> (Unpacker<InputStream> &unpacker, Table &table) {
while (map_length--) {
string attr_key = unpacker.template next<string>();

if (attr_key == "name") {
if (attr_key == "schema_name") {
unpacker >> table.schema_name;
} else if (attr_key == "name") {
unpacker >> table.name;
} else if (attr_key == "columns") {
unpacker >> table.columns;
Expand Down
47 changes: 42 additions & 5 deletions test/schema_from_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ def from_or_to
end
end

test_each "returns schema in legacy format for protocol version 7 and earlier, without an explicit TYPES command" do
clear_schema
create_noprimarytbl

send_handshake_commands(protocol_version: 7, accepted_types: nil)

send_command Commands::SCHEMA
expect_command Commands::SCHEMA,
[{"tables" => [noprimarytbl_def_v7]}]
end

test_each "ignores views" do
clear_schema
create_footbl
Expand All @@ -157,7 +168,7 @@ def from_or_to
[footbl_def["name"], [], []]
end

test_each "ignores tables with the same name in other schemas" do
test_each "ignores tables with the same name in other schemas by default" do
omit "This database doesn't support multiple schemas" unless connection.supports_multiple_schemas?
clear_schema
create_adapterspecifictbl
Expand All @@ -169,15 +180,41 @@ def from_or_to
expect_command Commands::SCHEMA,
[{"tables" => [adapterspecifictbl_def]}]
end
end

test_each "returns schema in legacy format for protocol version 7 and earlier, without an explicit TYPES command" do
class SchemaFromMultipleSchemasTest < KitchenSync::EndpointTestCase
include TestTableSchemas

def from_or_to
:from
end

def program_env
super.merge("ENDPOINT_SET_VARIABLES" => "search_path=public,#{connection.private_schema_name}")
end

test_each "returns tables in other schemas if they're in the search path" do
omit "This database doesn't support multiple schemas" unless connection.supports_multiple_schemas?
clear_schema
create_noprimarytbl
connection.create_private_schema_adapterspecifictbl

send_handshake_commands(protocol_version: 7, accepted_types: nil)
send_handshake_commands

send_command Commands::SCHEMA
expect_command Commands::SCHEMA,
[{"tables" => [noprimarytbl_def_v7]}]
[{"tables" => [adapterspecifictbl_def.merge("schema_name" => connection.private_schema_name)]}]
end

test_each "returns tables in each schema if there are tables with the same name in multiple schemas and they're in the search path" do
omit "This database doesn't support multiple schemas" unless connection.supports_multiple_schemas?
clear_schema
create_adapterspecifictbl
connection.create_private_schema_adapterspecifictbl

send_handshake_commands

send_command Commands::SCHEMA
expect_command Commands::SCHEMA,
[{"tables" => [adapterspecifictbl_def(schema_name: connection.private_schema_name), adapterspecifictbl_def]}]
end
end
58 changes: 58 additions & 0 deletions test/schema_to_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -971,3 +971,61 @@ def assert_same_keys(table_def)
expect_quit_and_close
end
end

class SchemaToMultipleSchemasTest < KitchenSync::EndpointTestCase
include TestTableSchemas

def from_or_to
:to
end

def program_env
if connection.supports_multiple_schemas?
super.merge("ENDPOINT_SET_VARIABLES" => "search_path=public,#{connection.private_schema_name}")
else
super
end
end

test_each "creates tables in other schemas if they're in the search path" do
omit "This database doesn't support multiple schemas" unless connection.supports_multiple_schemas?
clear_schema
expect_handshake_commands(schema: {"tables" => [adapterspecifictbl_def(schema_name: connection.private_schema_name)]})

expect_command Commands::RANGE, ["#{connection.private_schema_name}.#{adapterspecifictbl_def["name"]}"]
send_command Commands::RANGE, ["#{connection.private_schema_name}.#{adapterspecifictbl_def["name"]}", [], []]
expect_quit_and_close

assert_equal({
connection.private_schema_name => [adapterspecifictbl_def["name"]],
}, connection.tables_by_schema)
end

test_each "creates tables in each schema if there are tables with the same name in multiple schemas and they're in the search path" do
omit "This database doesn't support multiple schemas" unless connection.supports_multiple_schemas?
clear_schema
expect_handshake_commands(schema: {"tables" => [adapterspecifictbl_def, adapterspecifictbl_def(schema_name: connection.private_schema_name)]})

expect_command Commands::RANGE, [adapterspecifictbl_def["name"]]
send_command Commands::RANGE, [adapterspecifictbl_def["name"], [], []]
expect_command Commands::RANGE, ["#{connection.private_schema_name}.#{adapterspecifictbl_def["name"]}"]
send_command Commands::RANGE, ["#{connection.private_schema_name}.#{adapterspecifictbl_def["name"]}", [], []]
expect_quit_and_close

assert_equal({
connection.private_schema_name => [adapterspecifictbl_def["name"]],
"public" => [adapterspecifictbl_def["name"]],
}, connection.tables_by_schema)
assert connection.table_column_defaults(adapterspecifictbl_def["name"], connection.private_schema_name)["second"].include?("#{connection.private_schema_name}.second_seq")
assert !connection.table_column_defaults(adapterspecifictbl_def["name"], "public")["second"].include?("#{connection.private_schema_name}.second_seq")
end

test_each "raises an error if there are tables with the same name in multiple schemas on databases that don't support that" do
omit "This database supports multiple schemas fully" if connection.supports_multiple_schemas?
clear_schema
expect_handshake_commands(schema: {"tables" => [adapterspecifictbl_def, adapterspecifictbl_def(schema_name: "other_schema")]})
expect_stderr("Conflicting tables named #{adapterspecifictbl_def["name"]} present in multiple schemas") do
read_command
end
end
end
4 changes: 1 addition & 3 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,7 @@ def query(sql)
end

def clear_schema
connection.views.each {|view_name| execute "DROP VIEW #{connection.quote_ident view_name}"}
connection.tables.each {|table_name| execute "DROP TABLE #{connection.quote_ident table_name}"}
connection.sequence_generators.each {|sequence_name| execute "DROP SEQUENCE #{connection.quote_ident sequence_name}"}
connection.clear_schema
end

def hash_of(rows, hash_algorithm = HashAlgorithm::BLAKE3)
Expand Down
14 changes: 8 additions & 6 deletions test/test_helper_mysql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ def tables
query("SHOW FULL TABLES").select {|row| row["Table_type"] == "BASE TABLE"}.collect {|row| row.values.first}
end

def views
query("SHOW FULL TABLES").select {|row| row["Table_type"] == "VIEW"}.collect {|row| row.values.first}
def clear_schema
views.each {|view_name| execute "DROP VIEW #{quote_ident view_name}"}
tables.each {|table_name| execute "DROP TABLE #{quote_ident table_name}"}
end

def sequence_generators
[]
def views
query("SHOW FULL TABLES").select {|row| row["Table_type"] == "VIEW"}.collect {|row| row.values.first}
end

def table_primary_key_name(table_name)
Expand Down Expand Up @@ -309,7 +310,7 @@ def create_adapterspecifictbl
SQL
end

def adapterspecifictbl_def(compatible_with: self)
def adapterspecifictbl_def(compatible_with: self, schema_name: nil)
{ "name" => "`mysql`tbl",
"columns" => [
{"name" => "pri", "column_type" => compatible_with.is_a?(MysqlAdapter) ? ColumnType::UINT_32BIT : ColumnType::SINT_32BIT, "nullable" => false, identity_default_type => ""},
Expand All @@ -335,7 +336,8 @@ def adapterspecifictbl_def(compatible_with: self)
].compact,
"primary_key_type" => PrimaryKeyType::EXPLICIT_PRIMARY_KEY,
"primary_key_columns" => [0],
"keys" => [{"name" => "parent_id", "columns" => [1]}] } # automatically created
"keys" => [{"name" => "parent_id", "columns" => [1]}] }.merge( # automatically created
schema_name ? { "schema_name" => schema_name } : {}) # not actually supported by mysql, but error handling is tested
end

def adapterspecifictbl_row
Expand Down
Loading

0 comments on commit df0da8a

Please sign in to comment.