Release 0.07047
[dbsrgits/DBIx-Class-Schema-Loader.git] / lib / DBIx / Class / Schema / Loader / DBI / MSSQL.pm
1 package DBIx::Class::Schema::Loader::DBI::MSSQL;
2
3 use strict;
4 use warnings;
5 use base 'DBIx::Class::Schema::Loader::DBI::Sybase::Common';
6 use mro 'c3';
7 use Try::Tiny;
8 use List::Util 'any';
9 use namespace::clean;
10
11 use DBIx::Class::Schema::Loader::Table::Sybase ();
12
13 our $VERSION = '0.07047';
14
15 =head1 NAME
16
17 DBIx::Class::Schema::Loader::DBI::MSSQL - DBIx::Class::Schema::Loader::DBI MSSQL Implementation.
18
19 =head1 DESCRIPTION
20
21 Base driver for Microsoft SQL Server, used by
22 L<DBIx::Class::Schema::Loader::DBI::Sybase::Microsoft_SQL_Server> for support
23 via L<DBD::Sybase> and
24 L<DBIx::Class::Schema::Loader::DBI::ODBC::Microsoft_SQL_Server> for support via
25 L<DBD::ODBC>.
26
27 See L<DBIx::Class::Schema::Loader> and L<DBIx::Class::Schema::Loader::Base> for
28 usage information.
29
30 =head1 CASE SENSITIVITY
31
32 Most MSSQL databases use C<CI> (case-insensitive) collation, for this reason
33 generated column names are lower-cased as this makes them easier to work with
34 in L<DBIx::Class>.
35
36 We attempt to detect the database collation at startup for any database
37 included in L<db_schema|DBIx::Class::Schema::Loader::Base/db_schema>, and set
38 the column lowercasing behavior accordingly, as lower-cased column names do not
39 work on case-sensitive databases.
40
41 To manually control case-sensitive mode, put:
42
43     preserve_case => 1|0
44
45 in your Loader options.
46
47 See L<preserve_case|DBIx::Class::Schema::Loader::Base/preserve_case>.
48
49 B<NOTE:> this option used to be called C<case_sensitive_collation>, but has
50 been renamed to a more generic option.
51
52 =cut
53
54 # SQL Server 2000: Ancient as time itself, but still out in the wild
55 sub _is_2k {
56     return shift->schema->storage->_server_info->{normalized_dbms_version} < 9;
57 }
58
59 sub _system_databases {
60     return (qw/
61         master model tempdb msdb
62     /);
63 }
64
65 sub _system_tables {
66     return (qw/
67         spt_fallback_db spt_fallback_dev spt_fallback_usg spt_monitor spt_values MSreplication_options
68     /);
69 }
70
71 sub _schemas {
72     my ($self, $db) = @_;
73
74     my $owners = $self->dbh->selectcol_arrayref($self->_is_2k ? <<"EOF2K" : <<"EOF");
75 SELECT name
76 FROM [$db].dbo.sysusers
77 WHERE uid <> gid
78 EOF2K
79 SELECT name
80 FROM [$db].sys.schemas
81 EOF
82
83     return grep !/^(?:#|guest|INFORMATION_SCHEMA|sys)/, @$owners;
84 }
85
86 sub _current_schema {
87     my $self = shift;
88
89     if ($self->_is_2k) {
90         return ($self->dbh->selectrow_array('SELECT user_name()'))[0];
91     }
92
93     return ($self->dbh->selectrow_array('SELECT schema_name()'))[0];
94 }
95
96 sub _current_db {
97     my $self = shift;
98     return ($self->dbh->selectrow_array('SELECT db_name()'))[0];
99 }
100
101 sub _switch_db {
102     my ($self, $db) = @_;
103     $self->dbh->do("use [$db]");
104 }
105
106 sub _setup {
107     my $self = shift;
108
109     $self->next::method(@_);
110
111     my $current_db = $self->_current_db;
112
113     if (ref $self->db_schema eq 'HASH') {
114         if (keys %{ $self->db_schema } < 2) {
115             my ($db) = keys %{ $self->db_schema };
116
117             $db ||= $current_db;
118
119             if ($db eq '%') {
120                 my $owners = $self->db_schema->{$db};
121
122                 my $db_names = $self->dbh->selectcol_arrayref(<<'EOF');
123 SELECT name
124 FROM master.dbo.sysdatabases
125 EOF
126
127                 my @dbs;
128
129                 foreach my $db_name (@$db_names) {
130                     push @dbs, $db_name
131                         unless any { $_ eq $db_name } $self->_system_databases;
132                 }
133
134                 $self->db_schema({});
135
136                 DB: foreach my $db (@dbs) {
137                     if (not ((ref $owners eq 'ARRAY' && $owners->[0] eq '%') || $owners eq '%')) {
138                         my @owners;
139
140                         foreach my $owner (@$owners) {
141                             push @owners, $owner
142                                 if $self->dbh->selectrow_array(<<"EOF");
143 SELECT name
144 FROM [$db].dbo.sysusers
145 WHERE name = @{[ $self->dbh->quote($owner) ]}
146 EOF
147                         }
148
149                         next DB unless @owners;
150
151                         $self->db_schema->{$db} = \@owners;
152                     }
153                     else {
154                         # for post-processing below
155                         $self->db_schema->{$db} = '%';
156                     }
157                 }
158
159                 $self->qualify_objects(1);
160             }
161             else {
162                 if ($db ne $current_db) {
163                     $self->_switch_db($db);
164
165                     $self->qualify_objects(1);
166                 }
167             }
168         }
169         else {
170             $self->qualify_objects(1);
171         }
172     }
173     elsif (ref $self->db_schema eq 'ARRAY' || (not defined $self->db_schema)) {
174         my $owners = $self->db_schema;
175         $owners ||= [ $self->_current_schema ];
176
177         $self->qualify_objects(1) if @$owners > 1;
178
179         $self->db_schema({ $current_db => $owners });
180     }
181
182     foreach my $db (keys %{ $self->db_schema }) {
183         if ($self->db_schema->{$db} eq '%') {
184             $self->db_schema->{$db} = [ $self->_schemas($db) ];
185
186             $self->qualify_objects(1);
187         }
188     }
189
190     if (not defined $self->preserve_case) {
191         foreach my $db (keys %{ $self->db_schema }) {
192             # We use the sys.databases query for the general case, and fallback to
193             # databasepropertyex() if for some reason sys.databases is not available,
194             # which does not work over DBD::ODBC with unixODBC+FreeTDS.
195             #
196             # XXX why does databasepropertyex() not work over DBD::ODBC ?
197             #
198             # more on collations here: http://msdn.microsoft.com/en-us/library/ms143515.aspx
199
200             my $current_db = $self->_current_db;
201
202             $self->_switch_db($db);
203
204             my $collation_name =
205                    (eval { $self->dbh->selectrow_array("SELECT collation_name FROM [$db].sys.databases WHERE name = @{[ $self->dbh->quote($db) ]}") })[0]
206                 || (eval { $self->dbh->selectrow_array("SELECT CAST(databasepropertyex(@{[ $self->dbh->quote($db) ]}, 'Collation') AS VARCHAR)") })[0];
207
208             $self->_switch_db($current_db);
209
210             if (not $collation_name) {
211                 warn <<"EOF";
212
213 WARNING: MSSQL Collation detection failed for database '$db'. Defaulting to
214 case-insensitive mode. Override the 'preserve_case' attribute in your Loader
215 options if needed.
216
217 See 'preserve_case' in
218 perldoc DBIx::Class::Schema::Loader::Base
219 EOF
220                 $self->preserve_case(0) unless $self->preserve_case;
221             }
222             else {
223                 my $case_sensitive = $collation_name =~ /_(?:CS|BIN2?)(?:_|\z)/;
224
225                 if ($case_sensitive && (not $self->preserve_case)) {
226                     $self->preserve_case(1);
227                 }
228                 else {
229                     $self->preserve_case(0);
230                 }
231             }
232         }
233     }
234 }
235
236 sub _tables_list {
237     my ($self) = @_;
238
239     my @tables;
240
241     while (my ($db, $owners) = each %{ $self->db_schema }) {
242         foreach my $owner (@$owners) {
243             my $table_names = $self->dbh->selectcol_arrayref(<<"EOF");
244 SELECT table_name
245 FROM [$db].INFORMATION_SCHEMA.TABLES
246 WHERE table_schema = @{[ $self->dbh->quote($owner) ]}
247 EOF
248
249             TABLE: foreach my $table_name (@$table_names) {
250                 next TABLE if any { $_ eq $table_name } $self->_system_tables;
251
252                 push @tables, DBIx::Class::Schema::Loader::Table::Sybase->new(
253                     loader   => $self,
254                     name     => $table_name,
255                     database => $db,
256                     schema   => $owner,
257                 );
258             }
259         }
260     }
261
262     return $self->_filter_tables(\@tables);
263 }
264
265 sub _table_pk_info {
266     my ($self, $table) = @_;
267
268     my $db = $table->database;
269
270     my $pk = $self->dbh->selectcol_arrayref(<<"EOF");
271 SELECT kcu.column_name
272 FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
273 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
274     ON kcu.table_name = tc.table_name
275         AND kcu.table_schema = tc.table_schema
276         AND kcu.constraint_name = tc.constraint_name
277 WHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
278     AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
279     AND tc.constraint_type = 'PRIMARY KEY'
280 ORDER BY kcu.ordinal_position
281 EOF
282
283     $pk = [ map $self->_lc($_), @$pk ];
284
285     return $pk;
286 }
287
288 sub _table_fk_info {
289     my ($self, $table) = @_;
290
291     my $db = $table->database;
292
293     my $sth = $self->dbh->prepare(<<"EOF");
294 SELECT rc.constraint_name, rc.unique_constraint_schema, uk_tc.table_name,
295        fk_kcu.column_name, uk_kcu.column_name, rc.delete_rule, rc.update_rule
296 FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS fk_tc
297 JOIN [$db].INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc
298     ON rc.constraint_name = fk_tc.constraint_name
299         AND rc.constraint_schema = fk_tc.table_schema
300 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE fk_kcu
301     ON fk_kcu.constraint_name = fk_tc.constraint_name
302         AND fk_kcu.table_name = fk_tc.table_name
303         AND fk_kcu.table_schema = fk_tc.table_schema
304 JOIN [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS uk_tc
305     ON uk_tc.constraint_name = rc.unique_constraint_name
306         AND uk_tc.table_schema = rc.unique_constraint_schema
307 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE uk_kcu
308     ON uk_kcu.constraint_name = rc.unique_constraint_name
309         AND uk_kcu.ordinal_position = fk_kcu.ordinal_position
310         AND uk_kcu.table_name = uk_tc.table_name
311         AND uk_kcu.table_schema = rc.unique_constraint_schema
312 WHERE fk_tc.table_name = @{[ $self->dbh->quote($table->name) ]}
313     AND fk_tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
314 ORDER BY fk_kcu.ordinal_position
315 EOF
316
317     $sth->execute;
318
319     my %rels;
320
321     while (my ($fk, $remote_schema, $remote_table, $col, $remote_col,
322                $delete_rule, $update_rule) = $sth->fetchrow_array) {
323         push @{ $rels{$fk}{local_columns}  }, $self->_lc($col);
324         push @{ $rels{$fk}{remote_columns} }, $self->_lc($remote_col);
325
326         $rels{$fk}{remote_table} = DBIx::Class::Schema::Loader::Table::Sybase->new(
327             loader   => $self,
328             name     => $remote_table,
329             database => $db,
330             schema   => $remote_schema,
331         ) unless exists $rels{$fk}{remote_table};
332
333         $rels{$fk}{attrs} ||= {
334             on_delete     => uc $delete_rule,
335             on_update     => uc $update_rule,
336             is_deferrable => 1 # constraints can be temporarily disabled, but DEFERRABLE is not supported
337         };
338     }
339
340     return [ values %rels ];
341 }
342
343 sub _table_uniq_info {
344     my ($self, $table) = @_;
345
346     my $db = $table->database;
347
348     my $sth = $self->dbh->prepare(<<"EOF");
349 SELECT tc.constraint_name, kcu.column_name
350 FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
351 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
352     ON kcu.constraint_name = tc.constraint_name
353         AND kcu.table_name = tc.table_name
354         AND kcu.table_schema = tc.table_schema
355 wHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
356     AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
357     AND tc.constraint_type = 'UNIQUE'
358 ORDER BY kcu.ordinal_position
359 EOF
360
361     $sth->execute;
362
363     my %uniq;
364
365     while (my ($constr, $col) = $sth->fetchrow_array) {
366         push @{ $uniq{$constr} }, $self->_lc($col);
367     }
368
369     return [ map [ $_ => $uniq{$_} ], sort keys %uniq ];
370 }
371
372 sub _columns_info_for {
373     my $self    = shift;
374     my ($table) = @_;
375
376     my $db = $table->database;
377
378     my $result = $self->next::method(@_);
379
380     # get type info (and identity)
381     my $rows = $self->dbh->selectall_arrayref($self->_is_2k ? <<"EOF2K" : <<"EOF");
382 SELECT c.column_name, c.character_maximum_length, c.data_type, c.datetime_precision, c.column_default, (sc.status & 0x80) is_identity
383 FROM [$db].INFORMATION_SCHEMA.COLUMNS c
384 JOIN [$db].dbo.sysusers ss ON
385     c.table_schema = ss.name
386 JOIN [$db].dbo.sysobjects so ON
387     c.table_name = so.name
388     AND so.uid = ss.uid
389 JOIN [$db].dbo.syscolumns sc ON
390     c.column_name = sc.name
391     AND sc.id = so.Id
392 WHERE c.table_schema = @{[ $self->dbh->quote($table->schema) ]}
393     AND c.table_name = @{[ $self->dbh->quote($table->name) ]}
394 EOF2K
395 SELECT c.column_name, c.character_maximum_length, c.data_type, c.datetime_precision, c.column_default, sc.is_identity
396 FROM [$db].INFORMATION_SCHEMA.COLUMNS c
397 JOIN [$db].sys.schemas ss ON
398     c.table_schema = ss.name
399 JOIN [$db].sys.objects so ON
400       c.table_name   = so.name
401     AND so.schema_id = ss.schema_id
402 JOIN [$db].sys.columns sc ON
403     c.column_name = sc.name
404     AND sc.object_id = so.object_id
405 WHERE c.table_schema = @{[ $self->dbh->quote($table->schema) ]}
406     AND c.table_name = @{[ $self->dbh->quote($table->name) ]}
407 EOF
408
409     foreach my $row (@$rows) {
410         my ($col, $char_max_length, $data_type, $datetime_precision, $default, $is_identity) = @$row;
411         $col = lc $col unless $self->preserve_case;
412         my $info = $result->{$col} || next;
413
414         $info->{data_type} = $data_type;
415
416         if (defined $char_max_length) {
417             $info->{size} = $char_max_length;
418             $info->{size} = 0 if $char_max_length < 0;
419         }
420
421         if ($is_identity) {
422             $info->{is_auto_increment} = 1;
423             $info->{data_type} =~ s/\s*identity//i;
424             delete $info->{size};
425         }
426
427         # fix types
428         if ($data_type eq 'int') {
429             $info->{data_type} = 'integer';
430         }
431         elsif ($data_type eq 'timestamp') {
432             $info->{inflate_datetime} = 0;
433         }
434         elsif ($data_type =~ /^(?:numeric|decimal)\z/) {
435             if (ref($info->{size}) && $info->{size}[0] == 18 && $info->{size}[1] == 0) {
436                 delete $info->{size};
437             }
438         }
439         elsif ($data_type eq 'float') {
440             $info->{data_type} = 'double precision';
441             delete $info->{size};
442         }
443         elsif ($data_type =~ /^(?:small)?datetime\z/) {
444             # fixup for DBD::Sybase
445             if ($info->{default_value} && $info->{default_value} eq '3') {
446                 delete $info->{default_value};
447             }
448         }
449         elsif ($data_type =~ /^(?:datetime(?:2|offset)|time)\z/) {
450             $info->{size} = $datetime_precision;
451
452             delete $info->{size} if $info->{size} == 7;
453         }
454         elsif ($data_type eq 'varchar'   && $info->{size} == 0) {
455             $info->{data_type} = 'text';
456             delete $info->{size};
457         }
458         elsif ($data_type eq 'nvarchar'  && $info->{size} == 0) {
459             $info->{data_type} = 'ntext';
460             delete $info->{size};
461         }
462         elsif ($data_type eq 'varbinary' && $info->{size} == 0) {
463             $info->{data_type} = 'image';
464             delete $info->{size};
465         }
466
467         if ($data_type !~ /^(?:n?char|n?varchar|binary|varbinary|numeric|decimal|float|datetime(?:2|offset)|time)\z/) {
468             delete $info->{size};
469         }
470
471         if (defined $default) {
472             # strip parens
473             $default =~ s/^\( (.*) \)\z/$1/x;
474
475             # Literal strings are in ''s, numbers are in ()s (in some versions of
476             # MSSQL, in others they are unquoted) everything else is a function.
477             $info->{default_value} =
478                 $default =~ /^['(] (.*) [)']\z/x ? $1 :
479                     $default =~ /^\d/ ? $default : \$default;
480
481             if ((eval { lc ${ $info->{default_value} } }||'') eq 'getdate()') {
482                 ${ $info->{default_value} } = 'current_timestamp';
483
484                 my $getdate = 'getdate()';
485                 $info->{original}{default_value} = \$getdate;
486             }
487         }
488     }
489
490     return $result;
491 }
492
493 =head1 SEE ALSO
494
495 L<DBIx::Class::Schema::Loader::DBI::Sybase::Microsoft_SQL_Server>,
496 L<DBIx::Class::Schema::Loader::DBI::ODBC::Microsoft_SQL_Server>,
497 L<DBIx::Class::Schema::Loader>, L<DBIx::Class::Schema::Loader::Base>,
498 L<DBIx::Class::Schema::Loader::DBI>
499
500 =head1 AUTHORS
501
502 See L<DBIx::Class::Schema::Loader/AUTHORS>.
503
504 =head1 LICENSE
505
506 This library is free software; you can redistribute it and/or modify it under
507 the same terms as Perl itself.
508
509 =cut
510
511 1;
512 # vim:et sts=4 sw=4 tw=0: