release 0.07033
[dbsrgits/DBIx-Class-Schema-Loader.git] / lib / DBIx / Class / Schema / Loader / DBI / MSSQL.pm
1 package DBIx::Class::Schema::Loader::DBI::MSSQL;
2
3 use strict;
4 use warnings;
5 use base 'DBIx::Class::Schema::Loader::DBI::Sybase::Common';
6 use mro 'c3';
7 use Try::Tiny;
8 use List::MoreUtils 'any';
9 use namespace::clean;
10
11 use DBIx::Class::Schema::Loader::Table::Sybase ();
12
13 our $VERSION = '0.07033';
14
15 =head1 NAME
16
17 DBIx::Class::Schema::Loader::DBI::MSSQL - DBIx::Class::Schema::Loader::DBI MSSQL Implementation.
18
19 =head1 DESCRIPTION
20
21 Base driver for Microsoft SQL Server, used by
22 L<DBIx::Class::Schema::Loader::DBI::Sybase::Microsoft_SQL_Server> for support
23 via L<DBD::Sybase> and
24 L<DBIx::Class::Schema::Loader::DBI::ODBC::Microsoft_SQL_Server> for support via
25 L<DBD::ODBC>.
26
27 See L<DBIx::Class::Schema::Loader> and L<DBIx::Class::Schema::Loader::Base> for
28 usage information.
29
30 =head1 CASE SENSITIVITY
31
32 Most MSSQL databases use C<CI> (case-insensitive) collation, for this reason
33 generated column names are lower-cased as this makes them easier to work with
34 in L<DBIx::Class>.
35
36 We attempt to detect the database collation at startup for any database
37 included in L<db_schema|DBIx::Class::Schema::Loader::Base/db_schema>, and set
38 the column lowercasing behavior accordingly, as lower-cased column names do not
39 work on case-sensitive databases.
40
41 To manually control case-sensitive mode, put:
42
43     preserve_case => 1|0
44
45 in your Loader options.
46
47 See L<preserve_case|DBIx::Class::Schema::Loader::Base/preserve_case>.
48
49 B<NOTE:> this option used to be called C<case_sensitive_collation>, but has
50 been renamed to a more generic option.
51
52 =cut
53
54 sub _system_databases {
55     return (qw/
56         master model tempdb msdb
57     /);
58 }
59
60 sub _system_tables {
61     return (qw/
62         spt_fallback_db spt_fallback_dev spt_fallback_usg spt_monitor spt_values MSreplication_options
63     /);
64 }
65
66 sub _owners {
67     my ($self, $db) = @_;
68
69     my $owners = $self->dbh->selectcol_arrayref(<<"EOF");
70 SELECT name
71 FROM [$db].dbo.sysusers
72 WHERE uid <> gid
73 EOF
74
75     return grep !/^(?:#|guest|INFORMATION_SCHEMA|sys)/, @$owners;
76 }
77
78 sub _current_db {
79     my $self = shift;
80     return ($self->dbh->selectrow_array('SELECT db_name()'))[0];
81 }
82
83 sub _switch_db {
84     my ($self, $db) = @_;
85     $self->dbh->do("use [$db]");
86 }
87
88 sub _setup {
89     my $self = shift;
90
91     $self->next::method(@_);
92
93     my $current_db = $self->_current_db;
94
95     if (ref $self->db_schema eq 'HASH') {
96         if (keys %{ $self->db_schema } < 2) {
97             my ($db) = keys %{ $self->db_schema };
98
99             $db ||= $current_db;
100
101             if ($db eq '%') {
102                 my $owners = $self->db_schema->{$db};
103
104                 my $db_names = $self->dbh->selectcol_arrayref(<<'EOF');
105 SELECT name
106 FROM master.dbo.sysdatabases
107 EOF
108
109                 my @dbs;
110
111                 foreach my $db_name (@$db_names) {
112                     push @dbs, $db_name
113                         unless any { $_ eq $db_name } $self->_system_databases;
114                 }
115
116                 $self->db_schema({});
117
118                 DB: foreach my $db (@dbs) {
119                     if (not ((ref $owners eq 'ARRAY' && $owners->[0] eq '%') || $owners eq '%')) {
120                         my @owners;
121
122                         foreach my $owner (@$owners) {
123                             push @owners, $owner
124                                 if $self->dbh->selectrow_array(<<"EOF");
125 SELECT name
126 FROM [$db].dbo.sysusers
127 WHERE name = @{[ $self->dbh->quote($owner) ]}
128 EOF
129                         }
130
131                         next DB unless @owners;
132
133                         $self->db_schema->{$db} = \@owners;
134                     }
135                     else {
136                         # for post-processing below
137                         $self->db_schema->{$db} = '%';
138                     }
139                 }
140
141                 $self->qualify_objects(1);
142             }
143             else {
144                 if ($db ne $current_db) {
145                     $self->dbh->do("USE [$db]");
146
147                     $self->qualify_objects(1);
148                 }
149             }
150         }
151         else {
152             $self->qualify_objects(1);
153         }
154     }
155     elsif (ref $self->db_schema eq 'ARRAY' || (not defined $self->db_schema)) {
156         my $owners = $self->db_schema;
157         $owners ||= [ $self->dbh->selectrow_array('SELECT user_name()') ];
158
159         $self->qualify_objects(1) if @$owners > 1;
160
161         $self->db_schema({ $current_db => $owners });
162     }
163
164     foreach my $db (keys %{ $self->db_schema }) {
165         if ($self->db_schema->{$db} eq '%') {
166             $self->db_schema->{$db} = [ $self->_owners($db) ];
167
168             $self->qualify_objects(1);
169         }
170     }
171
172     if (not defined $self->preserve_case) {
173         foreach my $db (keys %{ $self->db_schema }) {
174             # We use the sys.databases query for the general case, and fallback to
175             # databasepropertyex() if for some reason sys.databases is not available,
176             # which does not work over DBD::ODBC with unixODBC+FreeTDS.
177             #
178             # XXX why does databasepropertyex() not work over DBD::ODBC ?
179             #
180             # more on collations here: http://msdn.microsoft.com/en-us/library/ms143515.aspx
181
182             my $current_db = $self->_current_db;
183
184             $self->_switch_db($db);
185
186             my $collation_name =
187                    (eval { $self->dbh->selectrow_array("SELECT collation_name FROM [$db].sys.databases WHERE name = @{[ $self->dbh->quote($db) ]}") })[0]
188                 || (eval { $self->dbh->selectrow_array("SELECT CAST(databasepropertyex(@{[ $self->dbh->quote($db) ]}, 'Collation') AS VARCHAR)") })[0];
189
190             $self->_switch_db($current_db);
191
192             if (not $collation_name) {
193                 warn <<"EOF";
194
195 WARNING: MSSQL Collation detection failed for database '$db'. Defaulting to
196 case-insensitive mode. Override the 'preserve_case' attribute in your Loader
197 options if needed.
198
199 See 'preserve_case' in
200 perldoc DBIx::Class::Schema::Loader::Base
201 EOF
202                 $self->preserve_case(0) unless $self->preserve_case;
203             }
204             else {
205                 my $case_sensitive = $collation_name =~ /_(?:CS|BIN2?)(?:_|\z)/;
206
207                 if ($case_sensitive && (not $self->preserve_case)) {
208                     $self->preserve_case(1);
209                 }
210                 else {
211                     $self->preserve_case(0);
212                 }
213             }
214         }
215     }
216 }
217
218 sub _tables_list {
219     my ($self, $opts) = @_;
220
221     my @tables;
222
223     while (my ($db, $owners) = each %{ $self->db_schema }) {
224         foreach my $owner (@$owners) {
225             my $table_names = $self->dbh->selectcol_arrayref(<<"EOF");
226 SELECT table_name
227 FROM [$db].INFORMATION_SCHEMA.TABLES
228 WHERE table_schema = @{[ $self->dbh->quote($owner) ]}
229 EOF
230
231             TABLE: foreach my $table_name (@$table_names) {
232                 next TABLE if any { $_ eq $table_name } $self->_system_tables;
233
234                 push @tables, DBIx::Class::Schema::Loader::Table::Sybase->new(
235                     loader   => $self,
236                     name     => $table_name,
237                     database => $db,
238                     schema   => $owner,
239                 );
240             }
241         }
242     }
243
244     return $self->_filter_tables(\@tables, $opts);
245 }
246
247 sub _table_pk_info {
248     my ($self, $table) = @_;
249
250     my $db = $table->database;
251
252     my $pk = $self->dbh->selectcol_arrayref(<<"EOF");
253 SELECT kcu.column_name
254 FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
255 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
256     ON kcu.table_name = tc.table_name
257         AND kcu.table_schema = tc.table_schema
258         AND kcu.constraint_name = tc.constraint_name
259 WHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
260     AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
261     AND tc.constraint_type = 'PRIMARY KEY'
262 ORDER BY kcu.ordinal_position
263 EOF
264
265     $pk = [ map $self->_lc($_), @$pk ];
266
267     return $pk;
268 }
269
270 sub _table_fk_info {
271     my ($self, $table) = @_;
272
273     my $db = $table->database;
274
275     my $sth = $self->dbh->prepare(<<"EOF");
276 SELECT rc.constraint_name, rc.unique_constraint_schema, uk_tc.table_name,
277        fk_kcu.column_name, uk_kcu.column_name, rc.delete_rule, rc.update_rule
278 FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS fk_tc
279 JOIN [$db].INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc
280     ON rc.constraint_name = fk_tc.constraint_name
281         AND rc.constraint_schema = fk_tc.table_schema
282 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE fk_kcu
283     ON fk_kcu.constraint_name = fk_tc.constraint_name
284         AND fk_kcu.table_name = fk_tc.table_name
285         AND fk_kcu.table_schema = fk_tc.table_schema 
286 JOIN [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS uk_tc
287     ON uk_tc.constraint_name = rc.unique_constraint_name
288         AND uk_tc.table_schema = rc.unique_constraint_schema
289 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE uk_kcu
290     ON uk_kcu.constraint_name = rc.unique_constraint_name
291         AND uk_kcu.ordinal_position = fk_kcu.ordinal_position
292         AND uk_kcu.table_name = uk_tc.table_name
293         AND uk_kcu.table_schema = rc.unique_constraint_schema
294 WHERE fk_tc.table_name = @{[ $self->dbh->quote($table->name) ]}
295     AND fk_tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
296 ORDER BY fk_kcu.ordinal_position
297 EOF
298
299     $sth->execute;
300
301     my %rels;
302
303     while (my ($fk, $remote_schema, $remote_table, $col, $remote_col,
304                $delete_rule, $update_rule) = $sth->fetchrow_array) {
305         push @{ $rels{$fk}{local_columns}  }, $self->_lc($col);
306         push @{ $rels{$fk}{remote_columns} }, $self->_lc($remote_col);
307         
308         $rels{$fk}{remote_table} = DBIx::Class::Schema::Loader::Table::Sybase->new(
309             loader   => $self,
310             name     => $remote_table,
311             database => $db,
312             schema   => $remote_schema,
313         ) unless exists $rels{$fk}{remote_table};
314
315         $rels{$fk}{attrs} ||= {
316             on_delete     => uc $delete_rule,
317             on_update     => uc $update_rule,
318             is_deferrable => 1 # constraints can be temporarily disabled, but DEFERRABLE is not supported
319         };
320     }
321
322     return [ values %rels ];
323 }
324
325 sub _table_uniq_info {
326     my ($self, $table) = @_;
327
328     my $db = $table->database;
329
330     my $sth = $self->dbh->prepare(<<"EOF");
331 SELECT tc.constraint_name, kcu.column_name
332 FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
333 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
334     ON kcu.constraint_name = tc.constraint_name
335         AND kcu.table_name = tc.table_name
336         AND kcu.table_schema = tc.table_schema
337 wHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
338     AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
339     AND tc.constraint_type = 'UNIQUE'
340 ORDER BY kcu.ordinal_position
341 EOF
342
343     $sth->execute;
344
345     my %uniq;
346
347     while (my ($constr, $col) = $sth->fetchrow_array) {
348         push @{ $uniq{$constr} }, $self->_lc($col);
349     }
350
351     return [ map [ $_ => $uniq{$_} ], keys %uniq ];
352 }
353
354 sub _columns_info_for {
355     my $self    = shift;
356     my ($table) = @_;
357
358     my $db = $table->database;
359
360     my $result = $self->next::method(@_);
361
362     # SQL Server: Ancient as time itself, but still out in the wild
363     my $is_2k = $self->schema->storage->_server_info->{normalized_dbms_version} < 9;
364     
365     # get type info (and identity)
366     my $rows = $self->dbh->selectall_arrayref($is_2k ? <<"EOF2K" : <<"EOF");
367 SELECT c.column_name, c.character_maximum_length, c.data_type, c.datetime_precision, c.column_default, (sc.status & 0x80) is_identity
368 FROM [$db].INFORMATION_SCHEMA.COLUMNS c
369 JOIN [$db].dbo.sysusers ss ON
370     c.table_schema = ss.name
371 JOIN [$db].dbo.sysobjects so ON
372     c.table_name = so.name
373     AND so.uid = ss.uid
374 JOIN [$db].dbo.syscolumns sc ON
375     c.column_name = sc.name
376     AND sc.id = so.Id
377 WHERE c.table_schema = @{[ $self->dbh->quote($table->schema) ]}
378     AND c.table_name = @{[ $self->dbh->quote($table->name) ]}
379 EOF2K
380 SELECT c.column_name, c.character_maximum_length, c.data_type, c.datetime_precision, c.column_default, sc.is_identity
381 FROM [$db].INFORMATION_SCHEMA.COLUMNS c
382 JOIN [$db].sys.schemas ss ON
383     c.table_schema = ss.name
384 JOIN [$db].sys.objects so ON
385       c.table_name   = so.name
386     AND so.schema_id = ss.schema_id
387 JOIN [$db].sys.columns sc ON
388     c.column_name = sc.name
389     AND sc.object_id = so.object_id
390 WHERE c.table_schema = @{[ $self->dbh->quote($table->schema) ]}
391     AND c.table_name = @{[ $self->dbh->quote($table->name) ]}
392 EOF
393
394     foreach my $row (@$rows) {
395         my ($col, $char_max_length, $data_type, $datetime_precision, $default, $is_identity) = @$row;
396         $col = lc $col unless $self->preserve_case;
397         my $info = $result->{$col} || next;
398
399         $info->{data_type} = $data_type;
400
401         if (defined $char_max_length) {
402             $info->{size} = $char_max_length;
403             $info->{size} = 0 if $char_max_length < 0;
404         }
405
406         if ($is_identity) {
407             $info->{is_auto_increment} = 1;
408             $info->{data_type} =~ s/\s*identity//i;
409             delete $info->{size};
410         }
411
412         # fix types
413         if ($data_type eq 'int') {
414             $info->{data_type} = 'integer';
415         }
416         elsif ($data_type eq 'timestamp') {
417             $info->{inflate_datetime} = 0;
418         }
419         elsif ($data_type =~ /^(?:numeric|decimal)\z/) {
420             if (ref($info->{size}) && $info->{size}[0] == 18 && $info->{size}[1] == 0) {
421                 delete $info->{size};
422             }
423         }
424         elsif ($data_type eq 'float') {
425             $info->{data_type} = 'double precision';
426             delete $info->{size};
427         }
428         elsif ($data_type =~ /^(?:small)?datetime\z/) {
429             # fixup for DBD::Sybase
430             if ($info->{default_value} && $info->{default_value} eq '3') {
431                 delete $info->{default_value};
432             }
433         }
434         elsif ($data_type =~ /^(?:datetime(?:2|offset)|time)\z/) {
435             $info->{size} = $datetime_precision;
436
437             delete $info->{size} if $info->{size} == 7;
438         }
439         elsif ($data_type eq 'varchar'   && $info->{size} == 0) {
440             $info->{data_type} = 'text';
441             delete $info->{size};
442         }
443         elsif ($data_type eq 'nvarchar'  && $info->{size} == 0) {
444             $info->{data_type} = 'ntext';
445             delete $info->{size};
446         }
447         elsif ($data_type eq 'varbinary' && $info->{size} == 0) {
448             $info->{data_type} = 'image';
449             delete $info->{size};
450         }
451
452         if ($data_type !~ /^(?:n?char|n?varchar|binary|varbinary|numeric|decimal|float|datetime(?:2|offset)|time)\z/) {
453             delete $info->{size};
454         }
455
456         if (defined $default) {
457             # strip parens
458             $default =~ s/^\( (.*) \)\z/$1/x;
459
460             # Literal strings are in ''s, numbers are in ()s (in some versions of
461             # MSSQL, in others they are unquoted) everything else is a function.
462             $info->{default_value} =
463                 $default =~ /^['(] (.*) [)']\z/x ? $1 :
464                     $default =~ /^\d/ ? $default : \$default;
465
466             if ((eval { lc ${ $info->{default_value} } }||'') eq 'getdate()') {
467                 ${ $info->{default_value} } = 'current_timestamp';
468
469                 my $getdate = 'getdate()';
470                 $info->{original}{default_value} = \$getdate;
471             }
472         }
473     }
474
475     return $result;
476 }
477
478 =head1 SEE ALSO
479
480 L<DBIx::Class::Schema::Loader::DBI::Sybase::Microsoft_SQL_Server>,
481 L<DBIx::Class::Schema::Loader::DBI::ODBC::Microsoft_SQL_Server>,
482 L<DBIx::Class::Schema::Loader>, L<DBIx::Class::Schema::Loader::Base>,
483 L<DBIx::Class::Schema::Loader::DBI>
484
485 =head1 AUTHOR
486
487 See L<DBIx::Class::Schema::Loader/AUTHOR> and L<DBIx::Class::Schema::Loader/CONTRIBUTORS>.
488
489 =head1 LICENSE
490
491 This library is free software; you can redistribute it and/or modify it under
492 the same terms as Perl itself.
493
494 =cut
495
496 1;
497 # vim:et sts=4 sw=4 tw=0: