release 0.07014
[dbsrgits/DBIx-Class-Schema-Loader.git] / lib / DBIx / Class / Schema / Loader / DBI / MSSQL.pm
1 package DBIx::Class::Schema::Loader::DBI::MSSQL;
2
3 use strict;
4 use warnings;
5 use base 'DBIx::Class::Schema::Loader::DBI::Sybase::Common';
6 use mro 'c3';
7 use Try::Tiny;
8 use List::MoreUtils 'any';
9 use namespace::clean;
10
11 use DBIx::Class::Schema::Loader::Table::Sybase ();
12
13 our $VERSION = '0.07014';
14
15 =head1 NAME
16
17 DBIx::Class::Schema::Loader::DBI::MSSQL - DBIx::Class::Schema::Loader::DBI MSSQL Implementation.
18
19 =head1 DESCRIPTION
20
21 Base driver for Microsoft SQL Server, used by
22 L<DBIx::Class::Schema::Loader::DBI::Sybase::Microsoft_SQL_Server> for support
23 via L<DBD::Sybase> and
24 L<DBIx::Class::Schema::Loader::DBI::ODBC::Microsoft_SQL_Server> for support via
25 L<DBD::ODBC>.
26
27 See L<DBIx::Class::Schema::Loader> and L<DBIx::Class::Schema::Loader::Base> for
28 usage information.
29
30 =head1 CASE SENSITIVITY
31
32 Most MSSQL databases use C<CI> (case-insensitive) collation, for this reason
33 generated column names are lower-cased as this makes them easier to work with
34 in L<DBIx::Class>.
35
36 We attempt to detect the database collation at startup for any database
37 included in L<db_schema|DBIx::Class::Schema::Loader::Base/db_schema>, and set
38 the column lowercasing behavior accordingly, as lower-cased column names do not
39 work on case-sensitive databases.
40
41 To manually control case-sensitive mode, put:
42
43     preserve_case => 1|0
44
45 in your Loader options.
46
47 See L<preserve_case|DBIx::Class::Schema::Loader::Base/preserve_case>.
48
49 B<NOTE:> this option used to be called C<case_sensitive_collation>, but has
50 been renamed to a more generic option.
51
52 =cut
53
54 sub _system_databases {
55     return (qw/
56         master model tempdb msdb
57     /);
58 }
59
60 sub _system_tables {
61     return (qw/
62         spt_fallback_db spt_fallback_dev spt_fallback_usg spt_monitor spt_values MSreplication_options
63     /);
64 }
65
66 sub _owners {
67     my ($self, $db) = @_;
68
69     my $owners = $self->dbh->selectcol_arrayref(<<"EOF");
70 SELECT name
71 FROM [$db].dbo.sysusers
72 WHERE uid <> gid
73 EOF
74
75     return grep !/^(?:#|guest|INFORMATION_SCHEMA|sys)/, @$owners;
76 }
77
78 sub _setup {
79     my $self = shift;
80
81     $self->next::method(@_);
82
83     my ($current_db) = $self->dbh->selectrow_array('SELECT db_name()');
84
85     if (ref $self->db_schema eq 'HASH') {
86         if (keys %{ $self->db_schema } < 2) {
87             my ($db) = keys %{ $self->db_schema };
88
89             $db ||= $current_db;
90
91             if ($db eq '%') {
92                 my $owners = $self->db_schema->{$db};
93
94                 my $db_names = $self->dbh->selectcol_arrayref(<<'EOF');
95 SELECT name
96 FROM master.dbo.sysdatabases
97 EOF
98
99                 my @dbs;
100
101                 foreach my $db_name (@$db_names) {
102                     push @dbs, $db_name
103                         unless any { $_ eq $db_name } $self->_system_databases;
104                 }
105
106                 $self->db_schema({});
107
108                 DB: foreach my $db (@dbs) {
109                     if (not ((ref $owners eq 'ARRAY' && $owners->[0] eq '%') || $owners eq '%')) {
110                         my @owners;
111
112                         foreach my $owner (@$owners) {
113                             push @owners, $owner
114                                 if $self->dbh->selectrow_array(<<"EOF");
115 SELECT name
116 FROM [$db].dbo.sysusers
117 WHERE name = @{[ $self->dbh->quote($owner) ]}
118 EOF
119                         }
120
121                         next DB unless @owners;
122
123                         $self->db_schema->{$db} = \@owners;
124                     }
125                     else {
126                         # for post-processing below
127                         $self->db_schema->{$db} = '%';
128                     }
129                 }
130
131                 $self->qualify_objects(1);
132             }
133             else {
134                 if ($db ne $current_db) {
135                     $self->dbh->do("USE [$db]");
136
137                     $self->qualify_objects(1);
138                 }
139             }
140         }
141         else {
142             $self->qualify_objects(1);
143         }
144     }
145     elsif (ref $self->db_schema eq 'ARRAY' || (not defined $self->db_schema)) {
146         my $owners = $self->db_schema;
147         $owners ||= [ $self->dbh->selectrow_array('SELECT user_name()') ];
148
149         $self->qualify_objects(1) if @$owners > 1;
150
151         $self->db_schema({ $current_db => $owners });
152     }
153
154     foreach my $db (keys %{ $self->db_schema }) {
155         if ($self->db_schema->{$db} eq '%') {
156             $self->db_schema->{$db} = [ $self->_owners($db) ];
157
158             $self->qualify_objects(1);
159         }
160     }
161
162     if (not defined $self->preserve_case) {
163         foreach my $db (keys %{ $self->db_schema }) {
164             # We use the sys.databases query for the general case, and fallback to
165             # databasepropertyex() if for some reason sys.databases is not available,
166             # which does not work over DBD::ODBC with unixODBC+FreeTDS.
167             #
168             # XXX why does databasepropertyex() not work over DBD::ODBC ?
169             #
170             # more on collations here: http://msdn.microsoft.com/en-us/library/ms143515.aspx
171             my ($collation_name) =
172                    eval { $self->dbh->selectrow_array("SELECT collation_name FROM sys.databases WHERE name = @{[ $self->dbh->quote($db) ]}") }
173                 || eval { $self->dbh->selectrow_array("SELECT CAST(databasepropertyex(@{[ $self->dbh->quote($db) ]}, 'Collation') AS VARCHAR)") };
174
175             if (not $collation_name) {
176                 warn <<"EOF";
177
178 WARNING: MSSQL Collation detection failed for database '$db'. Defaulting to
179 case-insensitive mode. Override the 'preserve_case' attribute in your Loader
180 options if needed.
181
182 See 'preserve_case' in
183 perldoc DBIx::Class::Schema::Loader::Base
184 EOF
185                 $self->preserve_case(0) unless $self->preserve_case;
186             }
187             else {
188                 my $case_sensitive = $collation_name =~ /_(?:CS|BIN2?)(?:_|\z)/;
189
190                 if ($case_sensitive && (not $self->preserve_case)) {
191                     $self->preserve_case(1);
192                 }
193                 else {
194                     $self->preserve_case(0);
195                 }
196             }
197         }
198     }
199 }
200
201 sub _tables_list {
202     my ($self, $opts) = @_;
203
204     my @tables;
205
206     while (my ($db, $owners) = each %{ $self->db_schema }) {
207         foreach my $owner (@$owners) {
208             my $table_names = $self->dbh->selectcol_arrayref(<<"EOF");
209 SELECT table_name
210 FROM [$db].INFORMATION_SCHEMA.TABLES
211 WHERE table_schema = @{[ $self->dbh->quote($owner) ]}
212 EOF
213
214             TABLE: foreach my $table_name (@$table_names) {
215                 next TABLE if any { $_ eq $table_name } $self->_system_tables;
216
217                 push @tables, DBIx::Class::Schema::Loader::Table::Sybase->new(
218                     loader   => $self,
219                     name     => $table_name,
220                     database => $db,
221                     schema   => $owner,
222                 );
223             }
224         }
225     }
226
227     return $self->_filter_tables(\@tables, $opts);
228 }
229
230 sub _table_pk_info {
231     my ($self, $table) = @_;
232
233     my $db = $table->database;
234
235     return $self->dbh->selectcol_arrayref(<<"EOF")
236 SELECT kcu.column_name
237 FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
238 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
239     ON kcu.table_name = tc.table_name
240         AND kcu.table_schema = tc.table_schema
241         AND kcu.constraint_name = tc.constraint_name
242 WHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
243     AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
244     AND tc.constraint_type = 'PRIMARY KEY'
245 ORDER BY kcu.ordinal_position
246 EOF
247 }
248
249 sub _table_fk_info {
250     my ($self, $table) = @_;
251
252     my $db = $table->database;
253
254     my $sth = $self->dbh->prepare(<<"EOF");
255 SELECT rc.constraint_name, rc.unique_constraint_schema, uk_tc.table_name, fk_kcu.column_name, uk_kcu.column_name
256 FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS fk_tc
257 JOIN [$db].INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc
258     ON rc.constraint_name = fk_tc.constraint_name
259         AND rc.constraint_schema = fk_tc.table_schema
260 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE fk_kcu
261     ON fk_kcu.constraint_name = fk_tc.constraint_name
262         AND fk_kcu.table_name = fk_tc.table_name
263         AND fk_kcu.table_schema = fk_tc.table_schema 
264 JOIN [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS uk_tc
265     ON uk_tc.constraint_name = rc.unique_constraint_name
266         AND uk_tc.table_schema = rc.unique_constraint_schema
267 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE uk_kcu
268     ON uk_kcu.constraint_name = rc.unique_constraint_name
269         AND uk_kcu.ordinal_position = fk_kcu.ordinal_position
270         AND uk_kcu.table_name = uk_tc.table_name
271         AND uk_kcu.table_schema = rc.unique_constraint_schema
272 WHERE fk_tc.table_name = @{[ $self->dbh->quote($table->name) ]}
273     AND fk_tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
274 ORDER BY fk_kcu.ordinal_position
275 EOF
276
277     $sth->execute;
278
279     my %rels;
280
281     while (my ($fk, $remote_schema, $remote_table, $col, $remote_col) = $sth->fetchrow_array) {
282         push @{ $rels{$fk}{local_columns}  }, $col;
283         push @{ $rels{$fk}{remote_columns} }, $remote_col;
284         
285         $rels{$fk}{remote_table} = DBIx::Class::Schema::Loader::Table::Sybase->new(
286             loader   => $self,
287             name     => $remote_table,
288             database => $db,
289             schema   => $remote_schema,
290         ) unless exists $rels{$fk}{remote_table};
291     }
292
293     return [ values %rels ];
294 }
295
296 sub _table_uniq_info {
297     my ($self, $table) = @_;
298
299     my $db = $table->database;
300
301     my $sth = $self->dbh->prepare(<<"EOF");
302 SELECT tc.constraint_name, kcu.column_name
303 FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
304 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
305     ON kcu.constraint_name = tc.constraint_name
306         AND kcu.table_name = tc.table_name
307         AND kcu.table_schema = tc.table_schema
308 wHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
309     AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
310     AND tc.constraint_type = 'UNIQUE'
311 ORDER BY kcu.ordinal_position
312 EOF
313
314     $sth->execute;
315
316     my %uniq;
317
318     while (my ($constr, $col) = $sth->fetchrow_array) {
319         push @{ $uniq{$constr} }, $self->_lc($col);
320     }
321
322     return [ map [ $_ => $uniq{$_} ], keys %uniq ];
323 }
324
325 sub _columns_info_for {
326     my $self    = shift;
327     my ($table) = @_;
328
329     my $db = $table->database;
330
331     my $result = $self->next::method(@_);
332
333     while (my ($col, $info) = each %$result) {
334 # get type info
335         my ($char_max_length, $data_type, $datetime_precision, $default) =
336             $self->dbh->selectrow_array(<<"EOF");
337 SELECT character_maximum_length, data_type, datetime_precision, column_default
338 FROM [$db].INFORMATION_SCHEMA.COLUMNS
339 WHERE table_name = @{[ $self->dbh->quote($table->name) ]}
340     AND table_schema = @{[ $self->dbh->quote($table->schema) ]}
341     AND column_name = @{[ $self->dbh->quote($col) ]}
342 EOF
343
344         $info->{data_type} = $data_type;
345
346         if (defined $char_max_length) {
347             $info->{size} = $char_max_length;
348             $info->{size} = 0 if $char_max_length < 0;
349         }
350
351 # find identities
352         my ($is_identity) = $self->dbh->selectrow_array(<<"EOF");
353 SELECT is_identity
354 FROM [$db].sys.columns
355 WHERE object_id = (
356     SELECT object_id
357     FROM [$db].sys.objects
358     WHERE name = @{[ $self->dbh->quote($table->name) ]}
359         AND schema_id = (
360             SELECT schema_id
361             FROM [$db].sys.schemas
362             WHERE name = @{[ $self->dbh->quote($table->schema) ]}
363         )
364 ) AND name = @{[ $self->dbh->quote($col) ]}
365 EOF
366         if ($is_identity) {
367             $info->{is_auto_increment} = 1;
368             $info->{data_type} =~ s/\s*identity//i;
369             delete $info->{size};
370         }
371
372 # fix types
373         if ($data_type eq 'int') {
374             $info->{data_type} = 'integer';
375         }
376         elsif ($data_type eq 'timestamp') {
377             $info->{inflate_datetime} = 0;
378         }
379         elsif ($data_type =~ /^(?:numeric|decimal)\z/) {
380             if (ref($info->{size}) && $info->{size}[0] == 18 && $info->{size}[1] == 0) {
381                 delete $info->{size};
382             }
383         }
384         elsif ($data_type eq 'float') {
385             $info->{data_type} = 'double precision';
386             delete $info->{size};
387         }
388         elsif ($data_type =~ /^(?:small)?datetime\z/) {
389             # fixup for DBD::Sybase
390             if ($info->{default_value} && $info->{default_value} eq '3') {
391                 delete $info->{default_value};
392             }
393         }
394         elsif ($data_type =~ /^(?:datetime(?:2|offset)|time)\z/) {
395             $info->{size} = $datetime_precision;
396
397             delete $info->{size} if $info->{size} == 7;
398         }
399         elsif ($data_type eq 'varchar'   && $info->{size} == 0) {
400             $info->{data_type} = 'text';
401             delete $info->{size};
402         }
403         elsif ($data_type eq 'nvarchar'  && $info->{size} == 0) {
404             $info->{data_type} = 'ntext';
405             delete $info->{size};
406         }
407         elsif ($data_type eq 'varbinary' && $info->{size} == 0) {
408             $info->{data_type} = 'image';
409             delete $info->{size};
410         }
411
412         if ($data_type !~ /^(?:n?char|n?varchar|binary|varbinary|numeric|decimal|float|datetime(?:2|offset)|time)\z/) {
413             delete $info->{size};
414         }
415
416         if (defined $default) {
417             # strip parens
418             $default =~ s/^\( (.*) \)\z/$1/x;
419
420             # Literal strings are in ''s, numbers are in ()s (in some versions of
421             # MSSQL, in others they are unquoted) everything else is a function.
422             $info->{default_value} =
423                 $default =~ /^['(] (.*) [)']\z/x ? $1 :
424                     $default =~ /^\d/ ? $default : \$default;
425
426             if ((eval { lc ${ $info->{default_value} } }||'') eq 'getdate()') {
427                 ${ $info->{default_value} } = 'current_timestamp';
428
429                 my $getdate = 'getdate()';
430                 $info->{original}{default_value} = \$getdate;
431             }
432         }
433     }
434
435     return $result;
436 }
437
438 =head1 SEE ALSO
439
440 L<DBIx::Class::Schema::Loader::DBI::Sybase::Microsoft_SQL_Server>,
441 L<DBIx::Class::Schema::Loader::DBI::ODBC::Microsoft_SQL_Server>,
442 L<DBIx::Class::Schema::Loader>, L<DBIx::Class::Schema::Loader::Base>,
443 L<DBIx::Class::Schema::Loader::DBI>
444
445 =head1 AUTHOR
446
447 See L<DBIx::Class::Schema::Loader/AUTHOR> and L<DBIx::Class::Schema::Loader/CONTRIBUTORS>.
448
449 =head1 LICENSE
450
451 This library is free software; you can redistribute it and/or modify it under
452 the same terms as Perl itself.
453
454 =cut
455
456 1;
457 # vim:et sts=4 sw=4 tw=0: