Release 0.07047
[dbsrgits/DBIx-Class-Schema-Loader.git] / lib / DBIx / Class / Schema / Loader / DBI / MSSQL.pm
index ee6d18a..e2810f2 100644 (file)
@@ -3,10 +3,14 @@ package DBIx::Class::Schema::Loader::DBI::MSSQL;
 use strict;
 use warnings;
 use base 'DBIx::Class::Schema::Loader::DBI::Sybase::Common';
-use Carp::Clan qw/^DBIx::Class/;
-use Class::C3;
+use mro 'c3';
+use Try::Tiny;
+use List::Util 'any';
+use namespace::clean;
 
-our $VERSION = '0.07000';
+use DBIx::Class::Schema::Loader::Table::Sybase ();
+
+our $VERSION = '0.07047';
 
 =head1 NAME
 
@@ -29,9 +33,10 @@ Most MSSQL databases use C<CI> (case-insensitive) collation, for this reason
 generated column names are lower-cased as this makes them easier to work with
 in L<DBIx::Class>.
 
-We attempt to detect the database collation at startup, and set the column
-lowercasing behavior accordingly, as lower-cased column names do not work on
-case-sensitive databases.
+We attempt to detect the database collation at startup for any database
+included in L<db_schema|DBIx::Class::Schema::Loader::Base/db_schema>, and set
+the column lowercasing behavior accordingly, as lower-cased column names do not
+work on case-sensitive databases.
 
 To manually control case-sensitive mode, put:
 
@@ -46,225 +51,422 @@ been renamed to a more generic option.
 
 =cut
 
+# SQL Server 2000: Ancient as time itself, but still out in the wild
+sub _is_2k {
+    return shift->schema->storage->_server_info->{normalized_dbms_version} < 9;
+}
+
+sub _system_databases {
+    return (qw/
+        master model tempdb msdb
+    /);
+}
+
+sub _system_tables {
+    return (qw/
+        spt_fallback_db spt_fallback_dev spt_fallback_usg spt_monitor spt_values MSreplication_options
+    /);
+}
+
+sub _schemas {
+    my ($self, $db) = @_;
+
+    my $owners = $self->dbh->selectcol_arrayref($self->_is_2k ? <<"EOF2K" : <<"EOF");
+SELECT name
+FROM [$db].dbo.sysusers
+WHERE uid <> gid
+EOF2K
+SELECT name
+FROM [$db].sys.schemas
+EOF
+
+    return grep !/^(?:#|guest|INFORMATION_SCHEMA|sys)/, @$owners;
+}
+
+sub _current_schema {
+    my $self = shift;
+
+    if ($self->_is_2k) {
+        return ($self->dbh->selectrow_array('SELECT user_name()'))[0];
+    }
+
+    return ($self->dbh->selectrow_array('SELECT schema_name()'))[0];
+}
+
+sub _current_db {
+    my $self = shift;
+    return ($self->dbh->selectrow_array('SELECT db_name()'))[0];
+}
+
+sub _switch_db {
+    my ($self, $db) = @_;
+    $self->dbh->do("use [$db]");
+}
+
 sub _setup {
     my $self = shift;
 
     $self->next::method(@_);
 
-    return if defined $self->preserve_case;
+    my $current_db = $self->_current_db;
 
-    my $dbh = $self->schema->storage->dbh;
+    if (ref $self->db_schema eq 'HASH') {
+        if (keys %{ $self->db_schema } < 2) {
+            my ($db) = keys %{ $self->db_schema };
 
-    # We use the sys.databases query for the general case, and fallback to
-    # databasepropertyex() if for some reason sys.databases is not available,
-    # which does not work over DBD::ODBC with unixODBC+FreeTDS.
-    #
-    # XXX why does databasepropertyex() not work over DBD::ODBC ?
-    #
-    # more on collations here: http://msdn.microsoft.com/en-us/library/ms143515.aspx
-    my ($collation_name) =
-           eval { $dbh->selectrow_array('SELECT collation_name FROM sys.databases WHERE name = DB_NAME()') }
-        || eval { $dbh->selectrow_array("SELECT CAST(databasepropertyex(DB_NAME(), 'Collation') AS VARCHAR)") };
+            $db ||= $current_db;
 
-    if (not $collation_name) {
-        warn <<'EOF';
+            if ($db eq '%') {
+                my $owners = $self->db_schema->{$db};
 
-WARNING: MSSQL Collation detection failed. Defaulting to case-insensitive mode.
-Override the 'preserve_case' attribute in your Loader options if needed.
+                my $db_names = $self->dbh->selectcol_arrayref(<<'EOF');
+SELECT name
+FROM master.dbo.sysdatabases
+EOF
 
-See 'preserve_case' in
-perldoc DBIx::Class::Schema::Loader::Base
+                my @dbs;
+
+                foreach my $db_name (@$db_names) {
+                    push @dbs, $db_name
+                        unless any { $_ eq $db_name } $self->_system_databases;
+                }
+
+                $self->db_schema({});
+
+                DB: foreach my $db (@dbs) {
+                    if (not ((ref $owners eq 'ARRAY' && $owners->[0] eq '%') || $owners eq '%')) {
+                        my @owners;
+
+                        foreach my $owner (@$owners) {
+                            push @owners, $owner
+                                if $self->dbh->selectrow_array(<<"EOF");
+SELECT name
+FROM [$db].dbo.sysusers
+WHERE name = @{[ $self->dbh->quote($owner) ]}
 EOF
-        $self->preserve_case(0);
-        return;
+                        }
+
+                        next DB unless @owners;
+
+                        $self->db_schema->{$db} = \@owners;
+                    }
+                    else {
+                        # for post-processing below
+                        $self->db_schema->{$db} = '%';
+                    }
+                }
+
+                $self->qualify_objects(1);
+            }
+            else {
+                if ($db ne $current_db) {
+                    $self->_switch_db($db);
+
+                    $self->qualify_objects(1);
+                }
+            }
+        }
+        else {
+            $self->qualify_objects(1);
+        }
+    }
+    elsif (ref $self->db_schema eq 'ARRAY' || (not defined $self->db_schema)) {
+        my $owners = $self->db_schema;
+        $owners ||= [ $self->_current_schema ];
+
+        $self->qualify_objects(1) if @$owners > 1;
+
+        $self->db_schema({ $current_db => $owners });
     }
 
-    my $case_sensitive = $collation_name =~ /_(?:CS|BIN2?)(?:_|\z)/;
+    foreach my $db (keys %{ $self->db_schema }) {
+        if ($self->db_schema->{$db} eq '%') {
+            $self->db_schema->{$db} = [ $self->_schemas($db) ];
+
+            $self->qualify_objects(1);
+        }
+    }
+
+    if (not defined $self->preserve_case) {
+        foreach my $db (keys %{ $self->db_schema }) {
+            # We use the sys.databases query for the general case, and fallback to
+            # databasepropertyex() if for some reason sys.databases is not available,
+            # which does not work over DBD::ODBC with unixODBC+FreeTDS.
+            #
+            # XXX why does databasepropertyex() not work over DBD::ODBC ?
+            #
+            # more on collations here: http://msdn.microsoft.com/en-us/library/ms143515.aspx
+
+            my $current_db = $self->_current_db;
 
-    $self->preserve_case($case_sensitive ? 1 : 0);
+            $self->_switch_db($db);
+
+            my $collation_name =
+                   (eval { $self->dbh->selectrow_array("SELECT collation_name FROM [$db].sys.databases WHERE name = @{[ $self->dbh->quote($db) ]}") })[0]
+                || (eval { $self->dbh->selectrow_array("SELECT CAST(databasepropertyex(@{[ $self->dbh->quote($db) ]}, 'Collation') AS VARCHAR)") })[0];
+
+            $self->_switch_db($current_db);
+
+            if (not $collation_name) {
+                warn <<"EOF";
+
+WARNING: MSSQL Collation detection failed for database '$db'. Defaulting to
+case-insensitive mode. Override the 'preserve_case' attribute in your Loader
+options if needed.
+
+See 'preserve_case' in
+perldoc DBIx::Class::Schema::Loader::Base
+EOF
+                $self->preserve_case(0) unless $self->preserve_case;
+            }
+            else {
+                my $case_sensitive = $collation_name =~ /_(?:CS|BIN2?)(?:_|\z)/;
+
+                if ($case_sensitive && (not $self->preserve_case)) {
+                    $self->preserve_case(1);
+                }
+                else {
+                    $self->preserve_case(0);
+                }
+            }
+        }
+    }
 }
 
 sub _tables_list {
-    my ($self, $opts) = @_;
+    my ($self) = @_;
 
-    my $dbh = $self->schema->storage->dbh;
-    my $sth = $dbh->prepare(<<'EOF');
-SELECT t.table_name
-FROM INFORMATION_SCHEMA.TABLES t
-WHERE t.table_schema = ?
+    my @tables;
+
+    while (my ($db, $owners) = each %{ $self->db_schema }) {
+        foreach my $owner (@$owners) {
+            my $table_names = $self->dbh->selectcol_arrayref(<<"EOF");
+SELECT table_name
+FROM [$db].INFORMATION_SCHEMA.TABLES
+WHERE table_schema = @{[ $self->dbh->quote($owner) ]}
 EOF
-    $sth->execute($self->db_schema);
 
-    my @tables = map @$_, @{ $sth->fetchall_arrayref };
+            TABLE: foreach my $table_name (@$table_names) {
+                next TABLE if any { $_ eq $table_name } $self->_system_tables;
+
+                push @tables, DBIx::Class::Schema::Loader::Table::Sybase->new(
+                    loader   => $self,
+                    name     => $table_name,
+                    database => $db,
+                    schema   => $owner,
+                );
+            }
+        }
+    }
 
-    return $self->_filter_tables(\@tables, $opts);
+    return $self->_filter_tables(\@tables);
 }
 
 sub _table_pk_info {
     my ($self, $table) = @_;
-    my $dbh = $self->schema->storage->dbh;
-    my $sth = $dbh->prepare(qq{sp_pkeys '$table'});
-    $sth->execute;
 
-    my @keydata;
+    my $db = $table->database;
+
+    my $pk = $self->dbh->selectcol_arrayref(<<"EOF");
+SELECT kcu.column_name
+FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
+    ON kcu.table_name = tc.table_name
+        AND kcu.table_schema = tc.table_schema
+        AND kcu.constraint_name = tc.constraint_name
+WHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
+    AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+    AND tc.constraint_type = 'PRIMARY KEY'
+ORDER BY kcu.ordinal_position
+EOF
 
-    while (my $row = $sth->fetchrow_hashref) {
-        push @keydata, $self->_lc($row->{COLUMN_NAME});
-    }
+    $pk = [ map $self->_lc($_), @$pk ];
 
-    return \@keydata;
+    return $pk;
 }
 
 sub _table_fk_info {
     my ($self, $table) = @_;
 
-    my ($local_cols, $remote_cols, $remote_table, @rels, $sth);
-    my $dbh = $self->schema->storage->dbh;
-    eval {
-        $sth = $dbh->prepare(qq{sp_fkeys \@fktable_name = '$table'});
-        $sth->execute;
-    };
-
-    while (my $row = eval { $sth->fetchrow_hashref }) {
-        my $fk = $row->{FK_NAME};
-        push @{$local_cols->{$fk}}, $self->_lc($row->{FKCOLUMN_NAME});
-        push @{$remote_cols->{$fk}}, $self->_lc($row->{PKCOLUMN_NAME});
-        $remote_table->{$fk} = $row->{PKTABLE_NAME};
-    }
+    my $db = $table->database;
+
+    my $sth = $self->dbh->prepare(<<"EOF");
+SELECT rc.constraint_name, rc.unique_constraint_schema, uk_tc.table_name,
+       fk_kcu.column_name, uk_kcu.column_name, rc.delete_rule, rc.update_rule
+FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS fk_tc
+JOIN [$db].INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc
+    ON rc.constraint_name = fk_tc.constraint_name
+        AND rc.constraint_schema = fk_tc.table_schema
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE fk_kcu
+    ON fk_kcu.constraint_name = fk_tc.constraint_name
+        AND fk_kcu.table_name = fk_tc.table_name
+        AND fk_kcu.table_schema = fk_tc.table_schema
+JOIN [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS uk_tc
+    ON uk_tc.constraint_name = rc.unique_constraint_name
+        AND uk_tc.table_schema = rc.unique_constraint_schema
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE uk_kcu
+    ON uk_kcu.constraint_name = rc.unique_constraint_name
+        AND uk_kcu.ordinal_position = fk_kcu.ordinal_position
+        AND uk_kcu.table_name = uk_tc.table_name
+        AND uk_kcu.table_schema = rc.unique_constraint_schema
+WHERE fk_tc.table_name = @{[ $self->dbh->quote($table->name) ]}
+    AND fk_tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+ORDER BY fk_kcu.ordinal_position
+EOF
 
-    foreach my $fk (keys %$remote_table) {
-        push @rels, {
-                      local_columns => \@{$local_cols->{$fk}},
-                      remote_columns => \@{$remote_cols->{$fk}},
-                      remote_table => $remote_table->{$fk},
-                    };
+    $sth->execute;
 
+    my %rels;
+
+    while (my ($fk, $remote_schema, $remote_table, $col, $remote_col,
+               $delete_rule, $update_rule) = $sth->fetchrow_array) {
+        push @{ $rels{$fk}{local_columns}  }, $self->_lc($col);
+        push @{ $rels{$fk}{remote_columns} }, $self->_lc($remote_col);
+
+        $rels{$fk}{remote_table} = DBIx::Class::Schema::Loader::Table::Sybase->new(
+            loader   => $self,
+            name     => $remote_table,
+            database => $db,
+            schema   => $remote_schema,
+        ) unless exists $rels{$fk}{remote_table};
+
+        $rels{$fk}{attrs} ||= {
+            on_delete     => uc $delete_rule,
+            on_update     => uc $update_rule,
+            is_deferrable => 1 # constraints can be temporarily disabled, but DEFERRABLE is not supported
+        };
     }
-    return \@rels;
+
+    return [ values %rels ];
 }
 
 sub _table_uniq_info {
     my ($self, $table) = @_;
 
-    my $dbh = $self->schema->storage->dbh;
-    local $dbh->{FetchHashKeyName} = 'NAME_lc';
+    my $db = $table->database;
+
+    my $sth = $self->dbh->prepare(<<"EOF");
+SELECT tc.constraint_name, kcu.column_name
+FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
+    ON kcu.constraint_name = tc.constraint_name
+        AND kcu.table_name = tc.table_name
+        AND kcu.table_schema = tc.table_schema
+wHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
+    AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+    AND tc.constraint_type = 'UNIQUE'
+ORDER BY kcu.ordinal_position
+EOF
 
-    my $sth = $dbh->prepare(qq{
-SELECT ccu.constraint_name, ccu.column_name
-FROM INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu
-JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc on (ccu.constraint_name = tc.constraint_name)
-JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu on (ccu.constraint_name = kcu.constraint_name and ccu.column_name = kcu.column_name)
-wHERE ccu.table_name = @{[ $dbh->quote($table) ]} AND constraint_type = 'UNIQUE' ORDER BY kcu.ordinal_position
-    });
     $sth->execute;
-    my $constraints;
-    while (my $row = $sth->fetchrow_hashref) {
-        my $name = $row->{constraint_name};
-        my $col  = $self->_lc($row->{column_name});
-        push @{$constraints->{$name}}, $col;
+
+    my %uniq;
+
+    while (my ($constr, $col) = $sth->fetchrow_array) {
+        push @{ $uniq{$constr} }, $self->_lc($col);
     }
 
-    my @uniqs = map { [ $_ => $constraints->{$_} ] } keys %$constraints;
-    return \@uniqs;
+    return [ map [ $_ => $uniq{$_} ], sort keys %uniq ];
 }
 
 sub _columns_info_for {
     my $self    = shift;
     my ($table) = @_;
 
+    my $db = $table->database;
+
     my $result = $self->next::method(@_);
 
-    while (my ($col, $info) = each %$result) {
-        my $dbh = $self->schema->storage->dbh;
-
-# find identities
-        my $sth = $dbh->prepare(qq{
-SELECT column_name 
-FROM INFORMATION_SCHEMA.COLUMNS
-WHERE columnproperty(object_id(@{[ $dbh->quote($table) ]}, 'U'), @{[ $dbh->quote($col) ]}, 'IsIdentity') = 1
-AND table_name = @{[ $dbh->quote($table) ]} AND column_name = @{[ $dbh->quote($col) ]}
-        });
-        if (eval { $sth->execute; $sth->fetchrow_array }) {
+    # get type info (and identity)
+    my $rows = $self->dbh->selectall_arrayref($self->_is_2k ? <<"EOF2K" : <<"EOF");
+SELECT c.column_name, c.character_maximum_length, c.data_type, c.datetime_precision, c.column_default, (sc.status & 0x80) is_identity
+FROM [$db].INFORMATION_SCHEMA.COLUMNS c
+JOIN [$db].dbo.sysusers ss ON
+    c.table_schema = ss.name
+JOIN [$db].dbo.sysobjects so ON
+    c.table_name = so.name
+    AND so.uid = ss.uid
+JOIN [$db].dbo.syscolumns sc ON
+    c.column_name = sc.name
+    AND sc.id = so.Id
+WHERE c.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+    AND c.table_name = @{[ $self->dbh->quote($table->name) ]}
+EOF2K
+SELECT c.column_name, c.character_maximum_length, c.data_type, c.datetime_precision, c.column_default, sc.is_identity
+FROM [$db].INFORMATION_SCHEMA.COLUMNS c
+JOIN [$db].sys.schemas ss ON
+    c.table_schema = ss.name
+JOIN [$db].sys.objects so ON
+      c.table_name   = so.name
+    AND so.schema_id = ss.schema_id
+JOIN [$db].sys.columns sc ON
+    c.column_name = sc.name
+    AND sc.object_id = so.object_id
+WHERE c.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+    AND c.table_name = @{[ $self->dbh->quote($table->name) ]}
+EOF
+
+    foreach my $row (@$rows) {
+        my ($col, $char_max_length, $data_type, $datetime_precision, $default, $is_identity) = @$row;
+        $col = lc $col unless $self->preserve_case;
+        my $info = $result->{$col} || next;
+
+        $info->{data_type} = $data_type;
+
+        if (defined $char_max_length) {
+            $info->{size} = $char_max_length;
+            $info->{size} = 0 if $char_max_length < 0;
+        }
+
+        if ($is_identity) {
             $info->{is_auto_increment} = 1;
             $info->{data_type} =~ s/\s*identity//i;
             delete $info->{size};
         }
 
-# fix types
-        if ($info->{data_type} eq 'int') {
+        # fix types
+        if ($data_type eq 'int') {
             $info->{data_type} = 'integer';
         }
-        elsif ($info->{data_type} eq 'timestamp') {
+        elsif ($data_type eq 'timestamp') {
             $info->{inflate_datetime} = 0;
         }
-        elsif ($info->{data_type} =~ /^(?:numeric|decimal)\z/) {
+        elsif ($data_type =~ /^(?:numeric|decimal)\z/) {
             if (ref($info->{size}) && $info->{size}[0] == 18 && $info->{size}[1] == 0) {
                 delete $info->{size};
             }
         }
-        elsif ($info->{data_type} eq 'float') {
+        elsif ($data_type eq 'float') {
             $info->{data_type} = 'double precision';
+            delete $info->{size};
         }
-        elsif ($info->{data_type} =~ /^(?:small)?datetime\z/) {
+        elsif ($data_type =~ /^(?:small)?datetime\z/) {
             # fixup for DBD::Sybase
             if ($info->{default_value} && $info->{default_value} eq '3') {
                 delete $info->{default_value};
             }
         }
-        elsif ($info->{data_type} eq 'datetimeoffset') {
-            $info->{size} = {
-                26 => 0,
-                28 => 1,
-                29 => 2,
-                30 => 3,
-                31 => 4,
-                32 => 5,
-                33 => 6,
-                34 => 7,
-            }->{$info->{size}};
+        elsif ($data_type =~ /^(?:datetime(?:2|offset)|time)\z/) {
+            $info->{size} = $datetime_precision;
 
             delete $info->{size} if $info->{size} == 7;
         }
-        elsif ($info->{data_type} eq 'datetime2') {
-            $info->{size} = {
-                19 => 0,
-                21 => 1,
-                22 => 2,
-                23 => 3,
-                24 => 4,
-                25 => 5,
-                26 => 6,
-                27 => 7,
-            }->{$info->{size}};
-
-            delete $info->{size} if $info->{size} == 7;
+        elsif ($data_type eq 'varchar'   && $info->{size} == 0) {
+            $info->{data_type} = 'text';
+            delete $info->{size};
         }
-        elsif ($info->{data_type} eq 'time') {
-            $info->{size} = {
-                 8 => 0,
-                10 => 1,
-                11 => 2,
-                12 => 3,
-                13 => 4,
-                14 => 5,
-                15 => 6,
-                16 => 7,
-            }->{$info->{size}};
-
-            delete $info->{size} if $info->{size} == 7;
+        elsif ($data_type eq 'nvarchar'  && $info->{size} == 0) {
+            $info->{data_type} = 'ntext';
+            delete $info->{size};
         }
-
-        if ($info->{data_type} !~ /^(?:n?char|n?varchar|binary|varbinary|numeric|decimal|float|datetime(?:2|offset)|time)\z/) {
+        elsif ($data_type eq 'varbinary' && $info->{size} == 0) {
+            $info->{data_type} = 'image';
             delete $info->{size};
         }
 
-# get default
-        $sth = $dbh->prepare(qq{
-SELECT column_default
-FROM INFORMATION_SCHEMA.COLUMNS
-wHERE table_name = @{[ $dbh->quote($table) ]} AND column_name = @{[ $dbh->quote($col) ]}
-        });
-        my ($default) = eval { $sth->execute; $sth->fetchrow_array };
+        if ($data_type !~ /^(?:n?char|n?varchar|binary|varbinary|numeric|decimal|float|datetime(?:2|offset)|time)\z/) {
+            delete $info->{size};
+        }
 
         if (defined $default) {
             # strip parens
@@ -278,6 +480,9 @@ wHERE table_name = @{[ $dbh->quote($table) ]} AND column_name = @{[ $dbh->quote(
 
             if ((eval { lc ${ $info->{default_value} } }||'') eq 'getdate()') {
                 ${ $info->{default_value} } = 'current_timestamp';
+
+                my $getdate = 'getdate()';
+                $info->{original}{default_value} = \$getdate;
             }
         }
     }
@@ -292,9 +497,9 @@ L<DBIx::Class::Schema::Loader::DBI::ODBC::Microsoft_SQL_Server>,
 L<DBIx::Class::Schema::Loader>, L<DBIx::Class::Schema::Loader::Base>,
 L<DBIx::Class::Schema::Loader::DBI>
 
-=head1 AUTHOR
+=head1 AUTHORS
 
-See L<DBIx::Class::Schema::Loader/AUTHOR> and L<DBIx::Class::Schema::Loader/CONTRIBUTORS>.
+See L<DBIx::Class::Schema::Loader/AUTHORS>.
 
 =head1 LICENSE