use strict;
use warnings;
-use base qw/
- DBIx::Class::Schema::Loader::DBI
- DBIx::Class::Schema::Loader::DBI::Sybase::Common
-/;
-use Carp::Clan qw/^DBIx::Class/;
-use Class::C3;
+use base 'DBIx::Class::Schema::Loader::DBI::Sybase::Common';
+use mro 'c3';
+use Try::Tiny;
+use List::MoreUtils 'any';
+use namespace::clean;
-our $VERSION = '0.04999_12';
+use DBIx::Class::Schema::Loader::Table::Sybase ();
+
+our $VERSION = '0.07035';
=head1 NAME
DBIx::Class::Schema::Loader::DBI::MSSQL - DBIx::Class::Schema::Loader::DBI MSSQL Implementation.
-=head1 SYNOPSIS
+=head1 DESCRIPTION
+
+Base driver for Microsoft SQL Server, used by
+L<DBIx::Class::Schema::Loader::DBI::Sybase::Microsoft_SQL_Server> for support
+via L<DBD::Sybase> and
+L<DBIx::Class::Schema::Loader::DBI::ODBC::Microsoft_SQL_Server> for support via
+L<DBD::ODBC>.
- package My::Schema;
- use base qw/DBIx::Class::Schema::Loader/;
+See L<DBIx::Class::Schema::Loader> and L<DBIx::Class::Schema::Loader::Base> for
+usage information.
- __PACKAGE__->loader_options( debug => 1 );
+=head1 CASE SENSITIVITY
- 1;
+Most MSSQL databases use C<CI> (case-insensitive) collation, for this reason
+generated column names are lower-cased as this makes them easier to work with
+in L<DBIx::Class>.
-=head1 DESCRIPTION
+We attempt to detect the database collation at startup for any database
+included in L<db_schema|DBIx::Class::Schema::Loader::Base/db_schema>, and set
+the column lowercasing behavior accordingly, as lower-cased column names do not
+work on case-sensitive databases.
+
+To manually control case-sensitive mode, put:
+
+ preserve_case => 1|0
+
+in your Loader options.
+
+See L<preserve_case|DBIx::Class::Schema::Loader::Base/preserve_case>.
-See L<DBIx::Class::Schema::Loader::Base>.
+B<NOTE:> this option used to be called C<case_sensitive_collation>, but has
+been renamed to a more generic option.
=cut
+# SQL Server 2000: Ancient as time itself, but still out in the wild
+sub _is_2k {
+ return shift->schema->storage->_server_info->{normalized_dbms_version} < 9;
+}
+
+sub _system_databases {
+ return (qw/
+ master model tempdb msdb
+ /);
+}
+
+sub _system_tables {
+ return (qw/
+ spt_fallback_db spt_fallback_dev spt_fallback_usg spt_monitor spt_values MSreplication_options
+ /);
+}
+
+sub _schemas {
+ my ($self, $db) = @_;
+
+ my $owners = $self->dbh->selectcol_arrayref($self->_is_2k ? <<"EOF2K" : <<"EOF");
+SELECT name
+FROM [$db].dbo.sysusers
+WHERE uid <> gid
+EOF2K
+SELECT name
+FROM [$db].sys.schemas
+EOF
+
+ return grep !/^(?:#|guest|INFORMATION_SCHEMA|sys)/, @$owners;
+}
+
+sub _current_schema {
+ my $self = shift;
+
+ if ($self->_is_2k) {
+ return ($self->dbh->selectrow_array('SELECT user_name()'))[0];
+ }
+
+ return ($self->dbh->selectrow_array('SELECT schema_name()'))[0];
+}
+
+sub _current_db {
+ my $self = shift;
+ return ($self->dbh->selectrow_array('SELECT db_name()'))[0];
+}
+
+sub _switch_db {
+ my ($self, $db) = @_;
+ $self->dbh->do("use [$db]");
+}
+
sub _setup {
my $self = shift;
$self->next::method(@_);
- $self->{db_schema} ||= $self->_build_db_schema;
- $self->_set_quote_char_and_name_sep;
-}
-# remove 'IDENTITY' from column data_type
-sub _columns_info_for {
- my $self = shift;
- my $result = $self->next::method(@_);
+ my $current_db = $self->_current_db;
+
+ if (ref $self->db_schema eq 'HASH') {
+ if (keys %{ $self->db_schema } < 2) {
+ my ($db) = keys %{ $self->db_schema };
+
+ $db ||= $current_db;
+
+ if ($db eq '%') {
+ my $owners = $self->db_schema->{$db};
+
+ my $db_names = $self->dbh->selectcol_arrayref(<<'EOF');
+SELECT name
+FROM master.dbo.sysdatabases
+EOF
+
+ my @dbs;
+
+ foreach my $db_name (@$db_names) {
+ push @dbs, $db_name
+ unless any { $_ eq $db_name } $self->_system_databases;
+ }
+
+ $self->db_schema({});
+
+ DB: foreach my $db (@dbs) {
+ if (not ((ref $owners eq 'ARRAY' && $owners->[0] eq '%') || $owners eq '%')) {
+ my @owners;
+
+ foreach my $owner (@$owners) {
+ push @owners, $owner
+ if $self->dbh->selectrow_array(<<"EOF");
+SELECT name
+FROM [$db].dbo.sysusers
+WHERE name = @{[ $self->dbh->quote($owner) ]}
+EOF
+ }
+
+ next DB unless @owners;
+
+ $self->db_schema->{$db} = \@owners;
+ }
+ else {
+ # for post-processing below
+ $self->db_schema->{$db} = '%';
+ }
+ }
+
+ $self->qualify_objects(1);
+ }
+ else {
+ if ($db ne $current_db) {
+ $self->_switch_db($db);
+
+ $self->qualify_objects(1);
+ }
+ }
+ }
+ else {
+ $self->qualify_objects(1);
+ }
+ }
+ elsif (ref $self->db_schema eq 'ARRAY' || (not defined $self->db_schema)) {
+ my $owners = $self->db_schema;
+ $owners ||= [ $self->_current_schema ];
+
+ $self->qualify_objects(1) if @$owners > 1;
- for my $col (keys %$result) {
- $result->{$col}->{data_type} =~ s/\s* identity \s*//ix;
+ $self->db_schema({ $current_db => $owners });
}
- return $result;
+ foreach my $db (keys %{ $self->db_schema }) {
+ if ($self->db_schema->{$db} eq '%') {
+ $self->db_schema->{$db} = [ $self->_schemas($db) ];
+
+ $self->qualify_objects(1);
+ }
+ }
+
+ if (not defined $self->preserve_case) {
+ foreach my $db (keys %{ $self->db_schema }) {
+ # We use the sys.databases query for the general case, and fallback to
+ # databasepropertyex() if for some reason sys.databases is not available,
+ # which does not work over DBD::ODBC with unixODBC+FreeTDS.
+ #
+ # XXX why does databasepropertyex() not work over DBD::ODBC ?
+ #
+ # more on collations here: http://msdn.microsoft.com/en-us/library/ms143515.aspx
+
+ my $current_db = $self->_current_db;
+
+ $self->_switch_db($db);
+
+ my $collation_name =
+ (eval { $self->dbh->selectrow_array("SELECT collation_name FROM [$db].sys.databases WHERE name = @{[ $self->dbh->quote($db) ]}") })[0]
+ || (eval { $self->dbh->selectrow_array("SELECT CAST(databasepropertyex(@{[ $self->dbh->quote($db) ]}, 'Collation') AS VARCHAR)") })[0];
+
+ $self->_switch_db($current_db);
+
+ if (not $collation_name) {
+ warn <<"EOF";
+
+WARNING: MSSQL Collation detection failed for database '$db'. Defaulting to
+case-insensitive mode. Override the 'preserve_case' attribute in your Loader
+options if needed.
+
+See 'preserve_case' in
+perldoc DBIx::Class::Schema::Loader::Base
+EOF
+ $self->preserve_case(0) unless $self->preserve_case;
+ }
+ else {
+ my $case_sensitive = $collation_name =~ /_(?:CS|BIN2?)(?:_|\z)/;
+
+ if ($case_sensitive && (not $self->preserve_case)) {
+ $self->preserve_case(1);
+ }
+ else {
+ $self->preserve_case(0);
+ }
+ }
+ }
+ }
+}
+
+sub _tables_list {
+ my ($self, $opts) = @_;
+
+ my @tables;
+
+ while (my ($db, $owners) = each %{ $self->db_schema }) {
+ foreach my $owner (@$owners) {
+ my $table_names = $self->dbh->selectcol_arrayref(<<"EOF");
+SELECT table_name
+FROM [$db].INFORMATION_SCHEMA.TABLES
+WHERE table_schema = @{[ $self->dbh->quote($owner) ]}
+EOF
+
+ TABLE: foreach my $table_name (@$table_names) {
+ next TABLE if any { $_ eq $table_name } $self->_system_tables;
+
+ push @tables, DBIx::Class::Schema::Loader::Table::Sybase->new(
+ loader => $self,
+ name => $table_name,
+ database => $db,
+ schema => $owner,
+ );
+ }
+ }
+ }
+
+ return $self->_filter_tables(\@tables, $opts);
}
sub _table_pk_info {
my ($self, $table) = @_;
- my $dbh = $self->schema->storage->dbh;
- my $sth = $dbh->prepare(qq{sp_pkeys '$table'});
- $sth->execute;
- my @keydata;
+ my $db = $table->database;
- while (my $row = $sth->fetchrow_hashref) {
- push @keydata, lc $row->{COLUMN_NAME};
- }
+ my $pk = $self->dbh->selectcol_arrayref(<<"EOF");
+SELECT kcu.column_name
+FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
+ ON kcu.table_name = tc.table_name
+ AND kcu.table_schema = tc.table_schema
+ AND kcu.constraint_name = tc.constraint_name
+WHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
+ AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+ AND tc.constraint_type = 'PRIMARY KEY'
+ORDER BY kcu.ordinal_position
+EOF
- return \@keydata;
+ $pk = [ map $self->_lc($_), @$pk ];
+
+ return $pk;
}
sub _table_fk_info {
my ($self, $table) = @_;
- my ($local_cols, $remote_cols, $remote_table, @rels);
- my $dbh = $self->schema->storage->dbh;
- my $sth = $dbh->prepare(qq{sp_fkeys \@FKTABLE_NAME = '$table'});
+ my $db = $table->database;
+
+ my $sth = $self->dbh->prepare(<<"EOF");
+SELECT rc.constraint_name, rc.unique_constraint_schema, uk_tc.table_name,
+ fk_kcu.column_name, uk_kcu.column_name, rc.delete_rule, rc.update_rule
+FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS fk_tc
+JOIN [$db].INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc
+ ON rc.constraint_name = fk_tc.constraint_name
+ AND rc.constraint_schema = fk_tc.table_schema
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE fk_kcu
+ ON fk_kcu.constraint_name = fk_tc.constraint_name
+ AND fk_kcu.table_name = fk_tc.table_name
+ AND fk_kcu.table_schema = fk_tc.table_schema
+JOIN [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS uk_tc
+ ON uk_tc.constraint_name = rc.unique_constraint_name
+ AND uk_tc.table_schema = rc.unique_constraint_schema
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE uk_kcu
+ ON uk_kcu.constraint_name = rc.unique_constraint_name
+ AND uk_kcu.ordinal_position = fk_kcu.ordinal_position
+ AND uk_kcu.table_name = uk_tc.table_name
+ AND uk_kcu.table_schema = rc.unique_constraint_schema
+WHERE fk_tc.table_name = @{[ $self->dbh->quote($table->name) ]}
+ AND fk_tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+ORDER BY fk_kcu.ordinal_position
+EOF
+
$sth->execute;
- while (my $row = $sth->fetchrow_hashref) {
- my $fk = $row->{FK_NAME};
- push @{$local_cols->{$fk}}, lc $row->{FKCOLUMN_NAME};
- push @{$remote_cols->{$fk}}, lc $row->{PKCOLUMN_NAME};
- $remote_table->{$fk} = $row->{PKTABLE_NAME};
+ my %rels;
+
+ while (my ($fk, $remote_schema, $remote_table, $col, $remote_col,
+ $delete_rule, $update_rule) = $sth->fetchrow_array) {
+ push @{ $rels{$fk}{local_columns} }, $self->_lc($col);
+ push @{ $rels{$fk}{remote_columns} }, $self->_lc($remote_col);
+
+ $rels{$fk}{remote_table} = DBIx::Class::Schema::Loader::Table::Sybase->new(
+ loader => $self,
+ name => $remote_table,
+ database => $db,
+ schema => $remote_schema,
+ ) unless exists $rels{$fk}{remote_table};
+
+ $rels{$fk}{attrs} ||= {
+ on_delete => uc $delete_rule,
+ on_update => uc $update_rule,
+ is_deferrable => 1 # constraints can be temporarily disabled, but DEFERRABLE is not supported
+ };
}
- foreach my $fk (keys %$remote_table) {
- push @rels, {
- local_columns => \@{$local_cols->{$fk}},
- remote_columns => \@{$remote_cols->{$fk}},
- remote_table => $remote_table->{$fk},
- };
-
- }
- return \@rels;
+ return [ values %rels ];
}
sub _table_uniq_info {
my ($self, $table) = @_;
- my $dbh = $self->schema->storage->dbh;
- my $sth = $dbh->prepare(qq{SELECT CCU.CONSTRAINT_NAME, CCU.COLUMN_NAME FROM INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE CCU
- JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC ON (CCU.CONSTRAINT_NAME = TC.CONSTRAINT_NAME)
- JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU ON (CCU.CONSTRAINT_NAME = KCU.CONSTRAINT_NAME AND CCU.COLUMN_NAME = KCU.COLUMN_NAME)
- WHERE CCU.TABLE_NAME = @{[ $dbh->quote($table) ]} AND CONSTRAINT_TYPE = 'UNIQUE' ORDER BY KCU.ORDINAL_POSITION});
- $sth->execute;
- my $constraints;
- while (my $row = $sth->fetchrow_hashref) {
- my $name = lc $row->{CONSTRAINT_NAME};
- my $col = lc $row->{COLUMN_NAME};
- push @{$constraints->{$name}}, $col;
- }
+ my $db = $table->database;
+
+ my $sth = $self->dbh->prepare(<<"EOF");
+SELECT tc.constraint_name, kcu.column_name
+FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
+JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
+ ON kcu.constraint_name = tc.constraint_name
+ AND kcu.table_name = tc.table_name
+ AND kcu.table_schema = tc.table_schema
+wHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
+ AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+ AND tc.constraint_type = 'UNIQUE'
+ORDER BY kcu.ordinal_position
+EOF
- my @uniqs = map { [ $_ => $constraints->{$_} ] } keys %$constraints;
- return \@uniqs;
-}
+ $sth->execute;
-sub _extra_column_info {
- my ($self, $info) = @_;
- my %extra_info;
+ my %uniq;
- my ($table, $column) = @$info{qw/TABLE_NAME COLUMN_NAME/};
+ while (my ($constr, $col) = $sth->fetchrow_array) {
+ push @{ $uniq{$constr} }, $self->_lc($col);
+ }
- my $dbh = $self->schema->storage->dbh;
- my $sth = $dbh->prepare(qq{
- SELECT COLUMN_NAME
- FROM INFORMATION_SCHEMA.COLUMNS
- WHERE COLUMNPROPERTY(object_id(@{[ $dbh->quote($table) ]}, 'U'), '$column', 'IsIdentity') = 1
- AND TABLE_NAME = @{[ $dbh->quote($table) ]} AND COLUMN_NAME = @{[ $dbh->quote($column) ]}
- });
- $sth->execute();
+ return [ map [ $_ => $uniq{$_} ], keys %uniq ];
+}
- if ($sth->fetchrow_array) {
- $extra_info{is_auto_increment} = 1;
- }
+sub _columns_info_for {
+ my $self = shift;
+ my ($table) = @_;
-# get default
- $sth = $dbh->prepare(qq{
- SELECT COLUMN_DEFAULT
- FROM INFORMATION_SCHEMA.COLUMNS
- WHERE TABLE_NAME = @{[ $dbh->quote($table) ]} AND COLUMN_NAME = @{[ $dbh->quote($column) ]}
- });
- $sth->execute;
- my ($default) = $sth->fetchrow_array;
+ my $db = $table->database;
- if (defined $default) {
- # strip parens
- $default =~ s/^\( (.*) \)\z/$1/x;
+ my $result = $self->next::method(@_);
- # Literal strings are in ''s, numbers are in ()s (in some versions of
- # MSSQL, in others they are unquoted) everything else is a function.
- $extra_info{default_value} =
- $default =~ /^['(] (.*) [)']\z/x ? $1 :
- $default =~ /^\d/ ? $default : \$default;
+ # get type info (and identity)
+ my $rows = $self->dbh->selectall_arrayref($self->_is_2k ? <<"EOF2K" : <<"EOF");
+SELECT c.column_name, c.character_maximum_length, c.data_type, c.datetime_precision, c.column_default, (sc.status & 0x80) is_identity
+FROM [$db].INFORMATION_SCHEMA.COLUMNS c
+JOIN [$db].dbo.sysusers ss ON
+ c.table_schema = ss.name
+JOIN [$db].dbo.sysobjects so ON
+ c.table_name = so.name
+ AND so.uid = ss.uid
+JOIN [$db].dbo.syscolumns sc ON
+ c.column_name = sc.name
+ AND sc.id = so.Id
+WHERE c.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+ AND c.table_name = @{[ $self->dbh->quote($table->name) ]}
+EOF2K
+SELECT c.column_name, c.character_maximum_length, c.data_type, c.datetime_precision, c.column_default, sc.is_identity
+FROM [$db].INFORMATION_SCHEMA.COLUMNS c
+JOIN [$db].sys.schemas ss ON
+ c.table_schema = ss.name
+JOIN [$db].sys.objects so ON
+ c.table_name = so.name
+ AND so.schema_id = ss.schema_id
+JOIN [$db].sys.columns sc ON
+ c.column_name = sc.name
+ AND sc.object_id = so.object_id
+WHERE c.table_schema = @{[ $self->dbh->quote($table->schema) ]}
+ AND c.table_name = @{[ $self->dbh->quote($table->name) ]}
+EOF
+
+ foreach my $row (@$rows) {
+ my ($col, $char_max_length, $data_type, $datetime_precision, $default, $is_identity) = @$row;
+ $col = lc $col unless $self->preserve_case;
+ my $info = $result->{$col} || next;
+
+ $info->{data_type} = $data_type;
+
+ if (defined $char_max_length) {
+ $info->{size} = $char_max_length;
+ $info->{size} = 0 if $char_max_length < 0;
+ }
+
+ if ($is_identity) {
+ $info->{is_auto_increment} = 1;
+ $info->{data_type} =~ s/\s*identity//i;
+ delete $info->{size};
+ }
+
+ # fix types
+ if ($data_type eq 'int') {
+ $info->{data_type} = 'integer';
+ }
+ elsif ($data_type eq 'timestamp') {
+ $info->{inflate_datetime} = 0;
+ }
+ elsif ($data_type =~ /^(?:numeric|decimal)\z/) {
+ if (ref($info->{size}) && $info->{size}[0] == 18 && $info->{size}[1] == 0) {
+ delete $info->{size};
+ }
+ }
+ elsif ($data_type eq 'float') {
+ $info->{data_type} = 'double precision';
+ delete $info->{size};
+ }
+ elsif ($data_type =~ /^(?:small)?datetime\z/) {
+ # fixup for DBD::Sybase
+ if ($info->{default_value} && $info->{default_value} eq '3') {
+ delete $info->{default_value};
+ }
+ }
+ elsif ($data_type =~ /^(?:datetime(?:2|offset)|time)\z/) {
+ $info->{size} = $datetime_precision;
+
+ delete $info->{size} if $info->{size} == 7;
+ }
+ elsif ($data_type eq 'varchar' && $info->{size} == 0) {
+ $info->{data_type} = 'text';
+ delete $info->{size};
+ }
+ elsif ($data_type eq 'nvarchar' && $info->{size} == 0) {
+ $info->{data_type} = 'ntext';
+ delete $info->{size};
+ }
+ elsif ($data_type eq 'varbinary' && $info->{size} == 0) {
+ $info->{data_type} = 'image';
+ delete $info->{size};
+ }
+
+ if ($data_type !~ /^(?:n?char|n?varchar|binary|varbinary|numeric|decimal|float|datetime(?:2|offset)|time)\z/) {
+ delete $info->{size};
+ }
+
+ if (defined $default) {
+ # strip parens
+ $default =~ s/^\( (.*) \)\z/$1/x;
+
+ # Literal strings are in ''s, numbers are in ()s (in some versions of
+ # MSSQL, in others they are unquoted) everything else is a function.
+ $info->{default_value} =
+ $default =~ /^['(] (.*) [)']\z/x ? $1 :
+ $default =~ /^\d/ ? $default : \$default;
+
+ if ((eval { lc ${ $info->{default_value} } }||'') eq 'getdate()') {
+ ${ $info->{default_value} } = 'current_timestamp';
+
+ my $getdate = 'getdate()';
+ $info->{original}{default_value} = \$getdate;
+ }
+ }
}
- return \%extra_info;
+ return $result;
}
=head1 SEE ALSO
+L<DBIx::Class::Schema::Loader::DBI::Sybase::Microsoft_SQL_Server>,
+L<DBIx::Class::Schema::Loader::DBI::ODBC::Microsoft_SQL_Server>,
L<DBIx::Class::Schema::Loader>, L<DBIx::Class::Schema::Loader::Base>,
L<DBIx::Class::Schema::Loader::DBI>
=cut
1;
+# vim:et sts=4 sw=4 tw=0: