Release 0.07047
[dbsrgits/DBIx-Class-Schema-Loader.git] / lib / DBIx / Class / Schema / Loader / DBI / MSSQL.pm
index 71d49d4..e2810f2 100644 (file)
@@ -3,14 +3,14 @@ package DBIx::Class::Schema::Loader::DBI::MSSQL;
 use strict;
 use warnings;
 use base 'DBIx::Class::Schema::Loader::DBI::Sybase::Common';
-use Carp::Clan qw/^DBIx::Class/;
-use Class::C3;
+use mro 'c3';
+use Try::Tiny;
+use List::Util 'any';
+use namespace::clean;
 
-__PACKAGE__->mk_group_accessors('simple', qw/
-    case_sensitive_collation
-/);
+use DBIx::Class::Schema::Loader::Table::Sybase ();
 
-our $VERSION = '0.06000';
+our $VERSION = '0.07047';
 
 =head1 NAME
 
@@ -27,150 +27,446 @@ L<DBD::ODBC>.
 See L<DBIx::Class::Schema::Loader> and L<DBIx::Class::Schema::Loader::Base> for
 usage information.
 
+=head1 CASE SENSITIVITY
+
+Most MSSQL databases use C<CI> (case-insensitive) collation, for this reason
+generated column names are lower-cased as this makes them easier to work with
+in L<DBIx::Class>.
+
+We attempt to detect the database collation at startup for any database
+included in L<db_schema|DBIx::Class::Schema::Loader::Base/db_schema>, and set
+the column lowercasing behavior accordingly, as lower-cased column names do not
+work on case-sensitive databases.
+
+To manually control case-sensitive mode, put:
+
+    preserve_case => 1|0
+
+in your Loader options.
+
+See L<preserve_case|DBIx::Class::Schema::Loader::Base/preserve_case>.
+
+B<NOTE:> this option used to be called C<case_sensitive_collation>, but has
+been renamed to a more generic option.
+
 =cut
 
-sub _is_case_sensitive {
+# SQL Server 2000: Ancient as time itself, but still out in the wild
+sub _is_2k {
+    return shift->schema->storage->_server_info->{normalized_dbms_version} < 9;
+}
+
+sub _system_databases {
+    return (qw/
+        master model tempdb msdb
+    /);
+}
+
+sub _system_tables {
+    return (qw/
+        spt_fallback_db spt_fallback_dev spt_fallback_usg spt_monitor spt_values MSreplication_options
+    /);
+}
+
+sub _schemas {
+    my ($self, $db) = @_;
+
+    my $owners = $self->dbh->selectcol_arrayref($self->_is_2k ? <<"EOF2K" : <<"EOF");
+SELECT name
+FROM [$db].dbo.sysusers
+WHERE uid <> gid
+EOF2K
+SELECT name
+FROM [$db].sys.schemas
+EOF
+
+    return grep !/^(?:#|guest|INFORMATION_SCHEMA|sys)/, @$owners;
+}
+
+sub _current_schema {
     my $self = shift;
 
-    return $self->case_sensitive_collation ? 1 : 0;
+    if ($self->_is_2k) {
+        return ($self->dbh->selectrow_array('SELECT user_name()'))[0];
+    }
+
+    return ($self->dbh->selectrow_array('SELECT schema_name()'))[0];
+}
+
+sub _current_db {
+    my $self = shift;
+    return ($self->dbh->selectrow_array('SELECT db_name()'))[0];
+}
+
+sub _switch_db {
+    my ($self, $db) = @_;
+    $self->dbh->do("use [$db]");
 }
 
 sub _setup {
     my $self = shift;
 
-    $self->next::method;
+    $self->next::method(@_);
 
-    my $dbh = $self->schema->storage->dbh;
+    my $current_db = $self->_current_db;
 
-    my ($collation_name) = $dbh->selectrow_array(<<'EOS');
-SELECT collation_name
-FROM sys.databases
-WHERE name = DB_NAME()
-EOS
+    if (ref $self->db_schema eq 'HASH') {
+        if (keys %{ $self->db_schema } < 2) {
+            my ($db) = keys %{ $self->db_schema };
 
-    my ($sensitivity) = $collation_name =~ /(C\w)_[A-z]+\z/;
+            $db ||= $current_db;
 
-    $self->case_sensitive_collation($sensitivity eq 'CS' ? 1 : 0);
-}
+            if ($db eq '%') {
+                my $owners = $self->db_schema->{$db};
+
+                my $db_names = $self->dbh->selectcol_arrayref(<<'EOF');
+SELECT name
+FROM master.dbo.sysdatabases
+EOF
+
+                my @dbs;
+
+                foreach my $db_name (@$db_names) {
+                    push @dbs, $db_name
+                        unless any { $_ eq $db_name } $self->_system_databases;
+                }
+
+                $self->db_schema({});
+
+                DB: foreach my $db (@dbs) {
+                    if (not ((ref $owners eq 'ARRAY' && $owners->[0] eq '%') || $owners eq '%')) {
+                        my @owners;
+
+                        foreach my $owner (@$owners) {
+                            push @owners, $owner
+                                if $self->dbh->selectrow_array(<<"EOF");
+SELECT name
+FROM [$db].dbo.sysusers
+WHERE name = @{[ $self->dbh->quote($owner) ]}
+EOF
+                        }
+
+                        next DB unless @owners;
+
+                        $self->db_schema->{$db} = \@owners;
+                    }
+                    else {
+                        # for post-processing below
+                        $self->db_schema->{$db} = '%';
+                    }
+                }
+
+                $self->qualify_objects(1);
+            }
+            else {
+                if ($db ne $current_db) {
+                    $self->_switch_db($db);
+
+                    $self->qualify_objects(1);
+                }
+            }
+        }
+        else {
+            $self->qualify_objects(1);
+        }
+    }
+    elsif (ref $self->db_schema eq 'ARRAY' || (not defined $self->db_schema)) {
+        my $owners = $self->db_schema;
+        $owners ||= [ $self->_current_schema ];
+
+        $self->qualify_objects(1) if @$owners > 1;
+
+        $self->db_schema({ $current_db => $owners });
+    }
+
+    foreach my $db (keys %{ $self->db_schema }) {
+        if ($self->db_schema->{$db} eq '%') {
+            $self->db_schema->{$db} = [ $self->_schemas($db) ];
+
+            $self->qualify_objects(1);
+        }
+    }
+
+    if (not defined $self->preserve_case) {
+        foreach my $db (keys %{ $self->db_schema }) {
+            # We use the sys.databases query for the general case, and fallback to
+            # databasepropertyex() if for some reason sys.databases is not available,
+            # which does not work over DBD::ODBC with unixODBC+FreeTDS.
+            #
+            # XXX why does databasepropertyex() not work over DBD::ODBC ?
+            #
+            # more on collations here: http://msdn.microsoft.com/en-us/library/ms143515.aspx
+
+            my $current_db = $self->_current_db;
 
-sub _lc {
-    my ($self, $name) = @_;
+            $self->_switch_db($db);
 
-    return $self->case_sensitive_collation ? $name : lc($name);
+            my $collation_name =
+                   (eval { $self->dbh->selectrow_array("SELECT collation_name FROM [$db].sys.databases WHERE name = @{[ $self->dbh->quote($db) ]}") })[0]
+                || (eval { $self->dbh->selectrow_array("SELECT CAST(databasepropertyex(@{[ $self->dbh->quote($db) ]}, 'Collation') AS VARCHAR)") })[0];
+
+            $self->_switch_db($current_db);
+
+            if (not $collation_name) {
+                warn <<"EOF";
+
+WARNING: MSSQL Collation detection failed for database '$db'. Defaulting to
+case-insensitive mode. Override the 'preserve_case' attribute in your Loader
+options if needed.
+
+See 'preserve_case' in
+perldoc DBIx::Class::Schema::Loader::Base
+EOF
+                $self->preserve_case(0) unless $self->preserve_case;
+            }
+            else {
+                my $case_sensitive = $collation_name =~ /_(?:CS|BIN2?)(?:_|\z)/;
+
+                if ($case_sensitive && (not $self->preserve_case)) {
+                    $self->preserve_case(1);
+                }
+                else {
+                    $self->preserve_case(0);
+                }
+            }
+        }
+    }
 }
 
 sub _tables_list {
-    my ($self, $opts) = @_;
+    my ($self) = @_;
+
+    my @tables;
 
-    my $dbh = $self->schema->storage->dbh;
-    my $sth = $dbh->prepare(<<'EOF');
-SELECT t.table_name
-FROM information_schema.tables t
-WHERE lower(t.table_schema) = ?
+    while (my ($db, $owners) = each %{ $self->db_schema }) {
+        foreach my $owner (@$owners) {
+            my $table_names = $self->dbh->selectcol_arrayref(<<"EOF");
+SELECT table_name
+FROM [$db].INFORMATION_SCHEMA.TABLES
+WHERE table_schema = @{[ $self->dbh->quote($owner) ]}
 EOF
-    $sth->execute(lc $self->db_schema);
 
-    my @tables = map @$_, @{ $sth->fetchall_arrayref };
+            TABLE: foreach my $table_name (@$table_names) {
+                next TABLE if any { $_ eq $table_name } $self->_system_tables;
+
+                push @tables, DBIx::Class::Schema::Loader::Table::Sybase->new(
+                    loader   => $self,
+                    name     => $table_name,
+                    database => $db,
+                    schema   => $owner,
+                );
+            }
+        }
+    }
 
-    return $self->_filter_tables(\@tables, $opts);
+    return $self->_filter_tables(\@tables);
 }
 
 sub _table_pk_info {
     my ($self, $table) = @_;
-    my $dbh = $self->schema->storage->dbh;
-    my $sth = $dbh->prepare(qq{sp_pkeys '$table'});
-    $sth->execute;
 
-    my @keydata;
+    my $db = $table->database;
+
+    my $pk = $self->dbh->selectcol_arrayref(<<"EOF");
+SELECT kcu.column_name
+FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
+    ON kcu.table_name = tc.table_name
+        AND kcu.table_schema = tc.table_schema
+        AND kcu.constraint_name = tc.constraint_name
+WHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
+    AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+    AND tc.constraint_type = 'PRIMARY KEY'
+ORDER BY kcu.ordinal_position
+EOF
 
-    while (my $row = $sth->fetchrow_hashref) {
-        push @keydata, $self->_lc($row->{COLUMN_NAME});
-    }
+    $pk = [ map $self->_lc($_), @$pk ];
 
-    return \@keydata;
+    return $pk;
 }
 
 sub _table_fk_info {
     my ($self, $table) = @_;
 
-    my ($local_cols, $remote_cols, $remote_table, @rels, $sth);
-    my $dbh = $self->schema->storage->dbh;
-    eval {
-        $sth = $dbh->prepare(qq{sp_fkeys \@fktable_name = '$table'});
-        $sth->execute;
-    };
-
-    while (my $row = eval { $sth->fetchrow_hashref }) {
-        my $fk = $row->{FK_NAME};
-        push @{$local_cols->{$fk}}, $self->_lc($row->{FKCOLUMN_NAME});
-        push @{$remote_cols->{$fk}}, $self->_lc($row->{PKCOLUMN_NAME});
-        $remote_table->{$fk} = $row->{PKTABLE_NAME};
-    }
+    my $db = $table->database;
+
+    my $sth = $self->dbh->prepare(<<"EOF");
+SELECT rc.constraint_name, rc.unique_constraint_schema, uk_tc.table_name,
+       fk_kcu.column_name, uk_kcu.column_name, rc.delete_rule, rc.update_rule
+FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS fk_tc
+JOIN [$db].INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc
+    ON rc.constraint_name = fk_tc.constraint_name
+        AND rc.constraint_schema = fk_tc.table_schema
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE fk_kcu
+    ON fk_kcu.constraint_name = fk_tc.constraint_name
+        AND fk_kcu.table_name = fk_tc.table_name
+        AND fk_kcu.table_schema = fk_tc.table_schema
+JOIN [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS uk_tc
+    ON uk_tc.constraint_name = rc.unique_constraint_name
+        AND uk_tc.table_schema = rc.unique_constraint_schema
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE uk_kcu
+    ON uk_kcu.constraint_name = rc.unique_constraint_name
+        AND uk_kcu.ordinal_position = fk_kcu.ordinal_position
+        AND uk_kcu.table_name = uk_tc.table_name
+        AND uk_kcu.table_schema = rc.unique_constraint_schema
+WHERE fk_tc.table_name = @{[ $self->dbh->quote($table->name) ]}
+    AND fk_tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+ORDER BY fk_kcu.ordinal_position
+EOF
 
-    foreach my $fk (keys %$remote_table) {
-        push @rels, {
-                      local_columns => \@{$local_cols->{$fk}},
-                      remote_columns => \@{$remote_cols->{$fk}},
-                      remote_table => $remote_table->{$fk},
-                    };
+    $sth->execute;
 
+    my %rels;
+
+    while (my ($fk, $remote_schema, $remote_table, $col, $remote_col,
+               $delete_rule, $update_rule) = $sth->fetchrow_array) {
+        push @{ $rels{$fk}{local_columns}  }, $self->_lc($col);
+        push @{ $rels{$fk}{remote_columns} }, $self->_lc($remote_col);
+
+        $rels{$fk}{remote_table} = DBIx::Class::Schema::Loader::Table::Sybase->new(
+            loader   => $self,
+            name     => $remote_table,
+            database => $db,
+            schema   => $remote_schema,
+        ) unless exists $rels{$fk}{remote_table};
+
+        $rels{$fk}{attrs} ||= {
+            on_delete     => uc $delete_rule,
+            on_update     => uc $update_rule,
+            is_deferrable => 1 # constraints can be temporarily disabled, but DEFERRABLE is not supported
+        };
     }
-    return \@rels;
+
+    return [ values %rels ];
 }
 
 sub _table_uniq_info {
     my ($self, $table) = @_;
 
-    my $dbh = $self->schema->storage->dbh;
-    local $dbh->{FetchHashKeyName} = 'NAME_lc';
+    my $db = $table->database;
+
+    my $sth = $self->dbh->prepare(<<"EOF");
+SELECT tc.constraint_name, kcu.column_name
+FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
+    ON kcu.constraint_name = tc.constraint_name
+        AND kcu.table_name = tc.table_name
+        AND kcu.table_schema = tc.table_schema
+wHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
+    AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+    AND tc.constraint_type = 'UNIQUE'
+ORDER BY kcu.ordinal_position
+EOF
 
-    my $sth = $dbh->prepare(qq{
-SELECT ccu.constraint_name, ccu.column_name
-FROM information_schema.constraint_column_usage ccu
-JOIN information_schema.table_constraints tc on (ccu.constraint_name = tc.constraint_name)
-JOIN information_schema.key_column_usage kcu on (ccu.constraint_name = kcu.constraint_name and ccu.column_name = kcu.column_name)
-wHERE lower(ccu.table_name) = @{[ $dbh->quote(lc $table) ]} AND constraint_type = 'UNIQUE' ORDER BY kcu.ordinal_position
-    });
     $sth->execute;
-    my $constraints;
-    while (my $row = $sth->fetchrow_hashref) {
-        my $name = $row->{constraint_name};
-        my $col  = $self->_lc($row->{column_name});
-        push @{$constraints->{$name}}, $col;
+
+    my %uniq;
+
+    while (my ($constr, $col) = $sth->fetchrow_array) {
+        push @{ $uniq{$constr} }, $self->_lc($col);
     }
 
-    my @uniqs = map { [ $_ => $constraints->{$_} ] } keys %$constraints;
-    return \@uniqs;
+    return [ map [ $_ => $uniq{$_} ], sort keys %uniq ];
 }
 
 sub _columns_info_for {
     my $self    = shift;
     my ($table) = @_;
 
+    my $db = $table->database;
+
     my $result = $self->next::method(@_);
 
-    while (my ($col, $info) = each %$result) {
-        my $dbh = $self->schema->storage->dbh;
+    # get type info (and identity)
+    my $rows = $self->dbh->selectall_arrayref($self->_is_2k ? <<"EOF2K" : <<"EOF");
+SELECT c.column_name, c.character_maximum_length, c.data_type, c.datetime_precision, c.column_default, (sc.status & 0x80) is_identity
+FROM [$db].INFORMATION_SCHEMA.COLUMNS c
+JOIN [$db].dbo.sysusers ss ON
+    c.table_schema = ss.name
+JOIN [$db].dbo.sysobjects so ON
+    c.table_name = so.name
+    AND so.uid = ss.uid
+JOIN [$db].dbo.syscolumns sc ON
+    c.column_name = sc.name
+    AND sc.id = so.Id
+WHERE c.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+    AND c.table_name = @{[ $self->dbh->quote($table->name) ]}
+EOF2K
+SELECT c.column_name, c.character_maximum_length, c.data_type, c.datetime_precision, c.column_default, sc.is_identity
+FROM [$db].INFORMATION_SCHEMA.COLUMNS c
+JOIN [$db].sys.schemas ss ON
+    c.table_schema = ss.name
+JOIN [$db].sys.objects so ON
+      c.table_name   = so.name
+    AND so.schema_id = ss.schema_id
+JOIN [$db].sys.columns sc ON
+    c.column_name = sc.name
+    AND sc.object_id = so.object_id
+WHERE c.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+    AND c.table_name = @{[ $self->dbh->quote($table->name) ]}
+EOF
+
+    foreach my $row (@$rows) {
+        my ($col, $char_max_length, $data_type, $datetime_precision, $default, $is_identity) = @$row;
+        $col = lc $col unless $self->preserve_case;
+        my $info = $result->{$col} || next;
+
+        $info->{data_type} = $data_type;
+
+        if (defined $char_max_length) {
+            $info->{size} = $char_max_length;
+            $info->{size} = 0 if $char_max_length < 0;
+        }
 
-        my $sth = $dbh->prepare(qq{
-SELECT column_name 
-FROM information_schema.columns
-WHERE columnproperty(object_id(@{[ $dbh->quote(lc $table) ]}, 'U'), @{[ $dbh->quote(lc $col) ]}, 'IsIdentity') = 1
-AND lower(table_name) = @{[ $dbh->quote(lc $table) ]} AND lower(column_name) = @{[ $dbh->quote(lc $col) ]}
-        });
-        if (eval { $sth->execute; $sth->fetchrow_array }) {
+        if ($is_identity) {
             $info->{is_auto_increment} = 1;
             $info->{data_type} =~ s/\s*identity//i;
             delete $info->{size};
         }
 
-# get default
-        $sth = $dbh->prepare(qq{
-SELECT column_default
-FROM information_schema.columns
-wHERE lower(table_name) = @{[ $dbh->quote(lc $table) ]} AND lower(column_name) = @{[ $dbh->quote(lc $col) ]}
-        });
-        my ($default) = eval { $sth->execute; $sth->fetchrow_array };
+        # fix types
+        if ($data_type eq 'int') {
+            $info->{data_type} = 'integer';
+        }
+        elsif ($data_type eq 'timestamp') {
+            $info->{inflate_datetime} = 0;
+        }
+        elsif ($data_type =~ /^(?:numeric|decimal)\z/) {
+            if (ref($info->{size}) && $info->{size}[0] == 18 && $info->{size}[1] == 0) {
+                delete $info->{size};
+            }
+        }
+        elsif ($data_type eq 'float') {
+            $info->{data_type} = 'double precision';
+            delete $info->{size};
+        }
+        elsif ($data_type =~ /^(?:small)?datetime\z/) {
+            # fixup for DBD::Sybase
+            if ($info->{default_value} && $info->{default_value} eq '3') {
+                delete $info->{default_value};
+            }
+        }
+        elsif ($data_type =~ /^(?:datetime(?:2|offset)|time)\z/) {
+            $info->{size} = $datetime_precision;
+
+            delete $info->{size} if $info->{size} == 7;
+        }
+        elsif ($data_type eq 'varchar'   && $info->{size} == 0) {
+            $info->{data_type} = 'text';
+            delete $info->{size};
+        }
+        elsif ($data_type eq 'nvarchar'  && $info->{size} == 0) {
+            $info->{data_type} = 'ntext';
+            delete $info->{size};
+        }
+        elsif ($data_type eq 'varbinary' && $info->{size} == 0) {
+            $info->{data_type} = 'image';
+            delete $info->{size};
+        }
+
+        if ($data_type !~ /^(?:n?char|n?varchar|binary|varbinary|numeric|decimal|float|datetime(?:2|offset)|time)\z/) {
+            delete $info->{size};
+        }
 
         if (defined $default) {
             # strip parens
@@ -181,6 +477,13 @@ wHERE lower(table_name) = @{[ $dbh->quote(lc $table) ]} AND lower(column_name) =
             $info->{default_value} =
                 $default =~ /^['(] (.*) [)']\z/x ? $1 :
                     $default =~ /^\d/ ? $default : \$default;
+
+            if ((eval { lc ${ $info->{default_value} } }||'') eq 'getdate()') {
+                ${ $info->{default_value} } = 'current_timestamp';
+
+                my $getdate = 'getdate()';
+                $info->{original}{default_value} = \$getdate;
+            }
         }
     }
 
@@ -194,9 +497,9 @@ L<DBIx::Class::Schema::Loader::DBI::ODBC::Microsoft_SQL_Server>,
 L<DBIx::Class::Schema::Loader>, L<DBIx::Class::Schema::Loader::Base>,
 L<DBIx::Class::Schema::Loader::DBI>
 
-=head1 AUTHOR
+=head1 AUTHORS
 
-See L<DBIx::Class::Schema::Loader/AUTHOR> and L<DBIx::Class::Schema::Loader/CONTRIBUTORS>.
+See L<DBIx::Class::Schema::Loader/AUTHORS>.
 
 =head1 LICENSE