multi db_schema support
[dbsrgits/DBIx-Class-Schema-Loader.git] / lib / DBIx / Class / Schema / Loader / DBI / MSSQL.pm
index c657349..ae12ded 100644 (file)
@@ -4,10 +4,12 @@ use strict;
 use warnings;
 use base 'DBIx::Class::Schema::Loader::DBI::Sybase::Common';
 use mro 'c3';
-use Carp::Clan qw/^DBIx::Class/;
 use Try::Tiny;
+use List::MoreUtils 'any';
 use namespace::clean;
 
+use DBIx::Class::Schema::Loader::Table::Sybase ();
+
 our $VERSION = '0.07010';
 
 =head1 NAME
@@ -31,9 +33,10 @@ Most MSSQL databases use C<CI> (case-insensitive) collation, for this reason
 generated column names are lower-cased as this makes them easier to work with
 in L<DBIx::Class>.
 
-We attempt to detect the database collation at startup, and set the column
-lowercasing behavior accordingly, as lower-cased column names do not work on
-case-sensitive databases.
+We attempt to detect the database collation at startup for any database
+included in L<db_schema|DBIx::Class::Schema::Loader::Base/db_schema>, and set
+the column lowercasing behavior accordingly, as lower-cased column names do not
+work on case-sensitive databases.
 
 To manually control case-sensitive mode, put:
 
@@ -48,145 +51,295 @@ been renamed to a more generic option.
 
 =cut
 
+sub _system_databases {
+    return (qw/
+        master model tempdb msdb
+    /);
+}
+
+sub _system_tables {
+    return (qw/
+        spt_fallback_db spt_fallback_dev spt_fallback_usg spt_monitor spt_values MSreplication_options
+    /);
+}
+
+sub _owners {
+    my ($self, $db) = @_;
+
+    my $owners = $self->dbh->selectcol_arrayref(<<"EOF");
+SELECT name
+FROM [$db].dbo.sysusers
+WHERE uid <> gid
+EOF
+
+    return grep !/^(?:#|guest|INFORMATION_SCHEMA|sys)/, @$owners;
+}
+
 sub _setup {
     my $self = shift;
 
     $self->next::method(@_);
 
-    return if defined $self->preserve_case;
+    my ($current_db) = $self->dbh->selectrow_array('SELECT db_name()');
 
-    my $dbh = $self->schema->storage->dbh;
+    if (ref $self->db_schema eq 'HASH') {
+        if (keys %{ $self->db_schema } < 2) {
+            my ($db) = keys %{ $self->db_schema };
 
-    # We use the sys.databases query for the general case, and fallback to
-    # databasepropertyex() if for some reason sys.databases is not available,
-    # which does not work over DBD::ODBC with unixODBC+FreeTDS.
-    #
-    # XXX why does databasepropertyex() not work over DBD::ODBC ?
-    #
-    # more on collations here: http://msdn.microsoft.com/en-us/library/ms143515.aspx
-    my ($collation_name) =
-           eval { $dbh->selectrow_array('SELECT collation_name FROM sys.databases WHERE name = DB_NAME()') }
-        || eval { $dbh->selectrow_array("SELECT CAST(databasepropertyex(DB_NAME(), 'Collation') AS VARCHAR)") };
+            $db ||= $current_db;
 
-    if (not $collation_name) {
-        warn <<'EOF';
+            if ($db eq '%') {
+                my $owners = $self->db_schema->{$db};
 
-WARNING: MSSQL Collation detection failed. Defaulting to case-insensitive mode.
-Override the 'preserve_case' attribute in your Loader options if needed.
+                my $db_names = $self->dbh->selectcol_arrayref(<<'EOF');
+SELECT name
+FROM master.dbo.sysdatabases
+EOF
 
-See 'preserve_case' in
-perldoc DBIx::Class::Schema::Loader::Base
+                my @dbs;
+
+                foreach my $db_name (@$db_names) {
+                    push @dbs, $db_name
+                        unless any { $_ eq $db_name } $self->_system_databases;
+                }
+
+                $self->db_schema({});
+
+                DB: foreach my $db (@dbs) {
+                    if (not ((ref $owners eq 'ARRAY' && $owners->[0] eq '%') || $owners eq '%')) {
+                        my @owners;
+
+                        foreach my $owner (@$owners) {
+                            push @owners, $owner
+                                if $self->dbh->selectrow_array(<<"EOF");
+SELECT name
+FROM [$db].dbo.sysusers
+WHERE name = @{[ $self->dbh->quote($owner) ]}
 EOF
-        $self->preserve_case(0);
-        return;
+                        }
+
+                        next DB unless @owners;
+
+                        $self->db_schema->{$db} = \@owners;
+                    }
+                    else {
+                        # for post-processing below
+                        $self->db_schema->{$db} = '%';
+                    }
+                }
+
+                $self->qualify_objects(1);
+            }
+            else {
+                if ($db ne $current_db) {
+                    $self->dbh->do("USE [$db]");
+
+                    $self->qualify_objects(1);
+                }
+            }
+        }
+        else {
+            $self->qualify_objects(1);
+        }
+    }
+    elsif (ref $self->db_schema eq 'ARRAY' || (not defined $self->db_schema)) {
+        my $owners = $self->db_schema;
+        $owners ||= [ $self->dbh->selectrow_array('SELECT user_name()') ];
+
+        $self->qualify_objects(1) if @$owners > 1;
+
+        $self->db_schema({ $current_db => $owners });
     }
 
-    my $case_sensitive = $collation_name =~ /_(?:CS|BIN2?)(?:_|\z)/;
+    foreach my $db (keys %{ $self->db_schema }) {
+        if ($self->db_schema->{$db} eq '%') {
+            $self->db_schema->{$db} = [ $self->_owners($db) ];
+
+            $self->qualify_objects(1);
+        }
+    }
 
-    $self->preserve_case($case_sensitive ? 1 : 0);
+    if (not defined $self->preserve_case) {
+        foreach my $db (keys %{ $self->db_schema }) {
+            # We use the sys.databases query for the general case, and fallback to
+            # databasepropertyex() if for some reason sys.databases is not available,
+            # which does not work over DBD::ODBC with unixODBC+FreeTDS.
+            #
+            # XXX why does databasepropertyex() not work over DBD::ODBC ?
+            #
+            # more on collations here: http://msdn.microsoft.com/en-us/library/ms143515.aspx
+            my ($collation_name) =
+                   eval { $self->dbh->selectrow_array("SELECT collation_name FROM sys.databases WHERE name = @{[ $self->dbh->quote($db) ]}") }
+                || eval { $self->dbh->selectrow_array("SELECT CAST(databasepropertyex(@{[ $self->dbh->quote($db) ]}, 'Collation') AS VARCHAR)") };
+
+            if (not $collation_name) {
+                warn <<"EOF";
+
+WARNING: MSSQL Collation detection failed for database '$db'. Defaulting to
+case-insensitive mode. Override the 'preserve_case' attribute in your Loader
+options if needed.
+
+See 'preserve_case' in
+perldoc DBIx::Class::Schema::Loader::Base
+EOF
+                $self->preserve_case(0) unless $self->preserve_case;
+            }
+            else {
+                my $case_sensitive = $collation_name =~ /_(?:CS|BIN2?)(?:_|\z)/;
+
+                if ($case_sensitive && (not $self->preserve_case)) {
+                    $self->preserve_case(1);
+                }
+                else {
+                    $self->preserve_case(0);
+                }
+            }
+        }
+    }
 }
 
 sub _tables_list {
     my ($self, $opts) = @_;
 
-    my $dbh = $self->schema->storage->dbh;
-    my $sth = $dbh->prepare(<<'EOF');
-SELECT t.table_name
-FROM INFORMATION_SCHEMA.TABLES t
-WHERE t.table_schema = ?
+    my @tables;
+
+    while (my ($db, $owners) = each %{ $self->db_schema }) {
+        foreach my $owner (@$owners) {
+            my $table_names = $self->dbh->selectcol_arrayref(<<"EOF");
+SELECT table_name
+FROM [$db].INFORMATION_SCHEMA.TABLES
+WHERE table_schema = @{[ $self->dbh->quote($owner) ]}
 EOF
-    $sth->execute($self->db_schema);
 
-    my @tables = map @$_, @{ $sth->fetchall_arrayref };
+            TABLE: foreach my $table_name (@$table_names) {
+                next TABLE if any { $_ eq $table_name } $self->_system_tables;
+
+                push @tables, DBIx::Class::Schema::Loader::Table::Sybase->new(
+                    loader   => $self,
+                    name     => $table_name,
+                    database => $db,
+                    schema   => $owner,
+                );
+            }
+        }
+    }
 
     return $self->_filter_tables(\@tables, $opts);
 }
 
 sub _table_pk_info {
     my ($self, $table) = @_;
-    my $dbh = $self->schema->storage->dbh;
-    my $sth = $dbh->prepare(qq{sp_pkeys '$table'});
-    $sth->execute;
-
-    my @keydata;
 
-    while (my $row = $sth->fetchrow_hashref) {
-        push @keydata, $self->_lc($row->{COLUMN_NAME});
-    }
-
-    return \@keydata;
+    my $db = $table->database;
+
+    return $self->dbh->selectcol_arrayref(<<"EOF")
+SELECT kcu.column_name
+FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
+    ON kcu.table_name = tc.table_name
+        AND kcu.table_schema = tc.table_schema
+        AND kcu.constraint_name = tc.constraint_name
+WHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
+    AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+    AND tc.constraint_type = 'PRIMARY KEY'
+ORDER BY kcu.ordinal_position
+EOF
 }
 
 sub _table_fk_info {
     my ($self, $table) = @_;
 
-    my ($local_cols, $remote_cols, $remote_table, @rels, $sth);
-    my $dbh = $self->schema->storage->dbh;
-    eval {
-        $sth = $dbh->prepare(qq{sp_fkeys \@fktable_name = '$table'});
-        $sth->execute;
-    };
-
-    while (my $row = eval { $sth->fetchrow_hashref }) {
-        my $fk = $row->{FK_NAME};
-        push @{$local_cols->{$fk}}, $self->_lc($row->{FKCOLUMN_NAME});
-        push @{$remote_cols->{$fk}}, $self->_lc($row->{PKCOLUMN_NAME});
-        $remote_table->{$fk} = $row->{PKTABLE_NAME};
-    }
+    my $db = $table->database;
+
+    my $sth = $self->dbh->prepare(<<"EOF");
+SELECT rc.constraint_name, rc.unique_constraint_schema, uk_tc.table_name, fk_kcu.column_name, uk_kcu.column_name
+FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS fk_tc
+JOIN [$db].INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc
+    ON rc.constraint_name = fk_tc.constraint_name
+        AND rc.constraint_schema = fk_tc.table_schema
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE fk_kcu
+    ON fk_kcu.constraint_name = fk_tc.constraint_name
+        AND fk_kcu.table_name = fk_tc.table_name
+        AND fk_kcu.table_schema = fk_tc.table_schema 
+JOIN [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS uk_tc
+    ON uk_tc.constraint_name = rc.unique_constraint_name
+        AND uk_tc.table_schema = rc.unique_constraint_schema
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE uk_kcu
+    ON uk_kcu.constraint_name = rc.unique_constraint_name
+        AND uk_kcu.ordinal_position = fk_kcu.ordinal_position
+        AND uk_kcu.table_name = uk_tc.table_name
+        AND uk_kcu.table_schema = rc.unique_constraint_schema
+WHERE fk_tc.table_name = @{[ $self->dbh->quote($table->name) ]}
+    AND fk_tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+ORDER BY fk_kcu.ordinal_position
+EOF
 
-    foreach my $fk (keys %$remote_table) {
-        push @rels, {
-                      local_columns => \@{$local_cols->{$fk}},
-                      remote_columns => \@{$remote_cols->{$fk}},
-                      remote_table => $remote_table->{$fk},
-                    };
+    $sth->execute;
 
+    my %rels;
+
+    while (my ($fk, $remote_schema, $remote_table, $col, $remote_col) = $sth->fetchrow_array) {
+        push @{ $rels{$fk}{local_columns}  }, $col;
+        push @{ $rels{$fk}{remote_columns} }, $remote_col;
+        
+        $rels{$fk}{remote_table} = DBIx::Class::Schema::Loader::Table::Sybase->new(
+            loader   => $self,
+            name     => $remote_table,
+            database => $db,
+            schema   => $remote_schema,
+        ) unless exists $rels{$fk}{remote_table};
     }
-    return \@rels;
+
+    return [ values %rels ];
 }
 
 sub _table_uniq_info {
     my ($self, $table) = @_;
 
-    my $dbh = $self->schema->storage->dbh;
-    local $dbh->{FetchHashKeyName} = 'NAME_lc';
+    my $db = $table->database;
+
+    my $sth = $self->dbh->prepare(<<"EOF");
+SELECT tc.constraint_name, kcu.column_name
+FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
+    ON kcu.constraint_name = tc.constraint_name
+        AND kcu.table_name = tc.table_name
+        AND kcu.table_schema = tc.table_schema
+wHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
+    AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+    AND tc.constraint_type = 'UNIQUE'
+ORDER BY kcu.ordinal_position
+EOF
 
-    my $sth = $dbh->prepare(qq{
-SELECT ccu.constraint_name, ccu.column_name
-FROM INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu
-JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc on (ccu.constraint_name = tc.constraint_name)
-JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu on (ccu.constraint_name = kcu.constraint_name and ccu.column_name = kcu.column_name)
-wHERE ccu.table_name = @{[ $dbh->quote($table) ]} AND constraint_type = 'UNIQUE' ORDER BY kcu.ordinal_position
-    });
     $sth->execute;
-    my $constraints;
-    while (my $row = $sth->fetchrow_hashref) {
-        my $name = $row->{constraint_name};
-        my $col  = $self->_lc($row->{column_name});
-        push @{$constraints->{$name}}, $col;
+
+    my %uniq;
+
+    while (my ($constr, $col) = $sth->fetchrow_array) {
+        push @{ $uniq{$constr} }, $self->_lc($col);
     }
 
-    my @uniqs = map { [ $_ => $constraints->{$_} ] } keys %$constraints;
-    return \@uniqs;
+    return [ map [ $_ => $uniq{$_} ], keys %uniq ];
 }
 
 sub _columns_info_for {
     my $self    = shift;
     my ($table) = @_;
 
-    my $result = $self->next::method(@_);
+    my $db = $table->database;
 
-    my $dbh = $self->schema->storage->dbh;
+    my $result = $self->next::method(@_);
 
     while (my ($col, $info) = each %$result) {
 # get type info
-        my $sth = $dbh->prepare(qq{
-SELECT character_maximum_length, data_type, datetime_precision
-FROM INFORMATION_SCHEMA.COLUMNS
-WHERE table_name = @{[ $dbh->quote($table) ]} AND column_name = @{[ $dbh->quote($col) ]}
-        });
-        $sth->execute;
-        my ($char_max_length, $data_type, $datetime_precision) = $sth->fetchrow_array;
+        my ($char_max_length, $data_type, $datetime_precision, $default) =
+            $self->dbh->selectrow_array(<<"EOF");
+SELECT character_maximum_length, data_type, datetime_precision, column_default
+FROM [$db].INFORMATION_SCHEMA.COLUMNS
+WHERE table_name = @{[ $self->dbh->quote($table->name) ]}
+    AND table_schema = @{[ $self->dbh->quote($table->schema) ]}
+    AND column_name = @{[ $self->dbh->quote($col) ]}
+EOF
 
         $info->{data_type} = $data_type;
 
@@ -196,13 +349,21 @@ WHERE table_name = @{[ $dbh->quote($table) ]} AND column_name = @{[ $dbh->quote(
         }
 
 # find identities
-        $sth = $dbh->prepare(qq{
-SELECT column_name 
-FROM INFORMATION_SCHEMA.COLUMNS
-WHERE columnproperty(object_id(@{[ $dbh->quote($table) ]}, 'U'), @{[ $dbh->quote($col) ]}, 'IsIdentity') = 1
-AND table_name = @{[ $dbh->quote($table) ]} AND column_name = @{[ $dbh->quote($col) ]}
-        });
-        if (try { $sth->execute; $sth->fetchrow_array }) {
+        my ($is_identity) = $self->dbh->selectrow_array(<<"EOF");
+SELECT is_identity
+FROM [$db].sys.columns
+WHERE object_id = (
+    SELECT object_id
+    FROM [$db].sys.objects
+    WHERE name = @{[ $self->dbh->quote($table->name) ]}
+        AND schema_id = (
+            SELECT schema_id
+            FROM [$db].sys.schemas
+            WHERE name = @{[ $self->dbh->quote($table->schema) ]}
+        )
+) AND name = @{[ $self->dbh->quote($col) ]}
+EOF
+        if ($is_identity) {
             $info->{is_auto_increment} = 1;
             $info->{data_type} =~ s/\s*identity//i;
             delete $info->{size};
@@ -252,14 +413,6 @@ AND table_name = @{[ $dbh->quote($table) ]} AND column_name = @{[ $dbh->quote($c
             delete $info->{size};
         }
 
-# get default
-        $sth = $dbh->prepare(qq{
-SELECT column_default
-FROM INFORMATION_SCHEMA.COLUMNS
-wHERE table_name = @{[ $dbh->quote($table) ]} AND column_name = @{[ $dbh->quote($col) ]}
-        });
-        my ($default) = eval { $sth->execute; $sth->fetchrow_array };
-
         if (defined $default) {
             # strip parens
             $default =~ s/^\( (.*) \)\z/$1/x;