Attempt to fix 'Attempt to free unreferenced scalar' on 5.8
[dbsrgits/DBIx-Class-Schema-Loader.git] / lib / DBIx / Class / Schema / Loader / DBI / MSSQL.pm
1 package DBIx::Class::Schema::Loader::DBI::MSSQL;
2
3 use strict;
4 use warnings;
5 use base 'DBIx::Class::Schema::Loader::DBI::Sybase::Common';
6 use mro 'c3';
7 use Try::Tiny;
8 use List::Util 'any';
9 use DBIx::Class::Schema::Loader::Table::Sybase ();
10 use namespace::clean;
11
12 our $VERSION = '0.07047';
13
14 =head1 NAME
15
16 DBIx::Class::Schema::Loader::DBI::MSSQL - DBIx::Class::Schema::Loader::DBI MSSQL Implementation.
17
18 =head1 DESCRIPTION
19
20 Base driver for Microsoft SQL Server, used by
21 L<DBIx::Class::Schema::Loader::DBI::Sybase::Microsoft_SQL_Server> for support
22 via L<DBD::Sybase> and
23 L<DBIx::Class::Schema::Loader::DBI::ODBC::Microsoft_SQL_Server> for support via
24 L<DBD::ODBC>.
25
26 See L<DBIx::Class::Schema::Loader> and L<DBIx::Class::Schema::Loader::Base> for
27 usage information.
28
29 =head1 CASE SENSITIVITY
30
31 Most MSSQL databases use C<CI> (case-insensitive) collation, for this reason
32 generated column names are lower-cased as this makes them easier to work with
33 in L<DBIx::Class>.
34
35 We attempt to detect the database collation at startup for any database
36 included in L<db_schema|DBIx::Class::Schema::Loader::Base/db_schema>, and set
37 the column lowercasing behavior accordingly, as lower-cased column names do not
38 work on case-sensitive databases.
39
40 To manually control case-sensitive mode, put:
41
42     preserve_case => 1|0
43
44 in your Loader options.
45
46 See L<preserve_case|DBIx::Class::Schema::Loader::Base/preserve_case>.
47
48 B<NOTE:> this option used to be called C<case_sensitive_collation>, but has
49 been renamed to a more generic option.
50
51 =cut
52
53 # SQL Server 2000: Ancient as time itself, but still out in the wild
54 sub _is_2k {
55     return shift->schema->storage->_server_info->{normalized_dbms_version} < 9;
56 }
57
58 sub _system_databases {
59     return (qw/
60         master model tempdb msdb
61     /);
62 }
63
64 sub _system_tables {
65     return (qw/
66         spt_fallback_db spt_fallback_dev spt_fallback_usg spt_monitor spt_values MSreplication_options
67     /);
68 }
69
70 sub _schemas {
71     my ($self, $db) = @_;
72
73     my $owners = $self->dbh->selectcol_arrayref($self->_is_2k ? <<"EOF2K" : <<"EOF");
74 SELECT name
75 FROM [$db].dbo.sysusers
76 WHERE uid <> gid
77 EOF2K
78 SELECT name
79 FROM [$db].sys.schemas
80 EOF
81
82     return grep !/^(?:#|guest|INFORMATION_SCHEMA|sys)/, @$owners;
83 }
84
85 sub _current_schema {
86     my $self = shift;
87
88     if ($self->_is_2k) {
89         return ($self->dbh->selectrow_array('SELECT user_name()'))[0];
90     }
91
92     return ($self->dbh->selectrow_array('SELECT schema_name()'))[0];
93 }
94
95 sub _current_db {
96     my $self = shift;
97     return ($self->dbh->selectrow_array('SELECT db_name()'))[0];
98 }
99
100 sub _switch_db {
101     my ($self, $db) = @_;
102     $self->dbh->do("use [$db]");
103 }
104
105 sub _setup {
106     my $self = shift;
107
108     $self->next::method(@_);
109
110     my $current_db = $self->_current_db;
111
112     if (ref $self->db_schema eq 'HASH') {
113         if (keys %{ $self->db_schema } < 2) {
114             my ($db) = keys %{ $self->db_schema };
115
116             $db ||= $current_db;
117
118             if ($db eq '%') {
119                 my $owners = $self->db_schema->{$db};
120
121                 my $db_names = $self->dbh->selectcol_arrayref(<<'EOF');
122 SELECT name
123 FROM master.dbo.sysdatabases
124 EOF
125
126                 my @dbs;
127
128                 foreach my $db_name (@$db_names) {
129                     push @dbs, $db_name
130                         unless any { $_ eq $db_name } $self->_system_databases;
131                 }
132
133                 $self->db_schema({});
134
135                 DB: foreach my $db (@dbs) {
136                     if (not ((ref $owners eq 'ARRAY' && $owners->[0] eq '%') || $owners eq '%')) {
137                         my @owners;
138
139                         foreach my $owner (@$owners) {
140                             push @owners, $owner
141                                 if $self->dbh->selectrow_array(<<"EOF");
142 SELECT name
143 FROM [$db].dbo.sysusers
144 WHERE name = @{[ $self->dbh->quote($owner) ]}
145 EOF
146                         }
147
148                         next DB unless @owners;
149
150                         $self->db_schema->{$db} = \@owners;
151                     }
152                     else {
153                         # for post-processing below
154                         $self->db_schema->{$db} = '%';
155                     }
156                 }
157
158                 $self->qualify_objects(1);
159             }
160             else {
161                 if ($db ne $current_db) {
162                     $self->_switch_db($db);
163
164                     $self->qualify_objects(1);
165                 }
166             }
167         }
168         else {
169             $self->qualify_objects(1);
170         }
171     }
172     elsif (ref $self->db_schema eq 'ARRAY' || (not defined $self->db_schema)) {
173         my $owners = $self->db_schema;
174         $owners ||= [ $self->_current_schema ];
175
176         $self->qualify_objects(1) if @$owners > 1;
177
178         $self->db_schema({ $current_db => $owners });
179     }
180
181     foreach my $db (keys %{ $self->db_schema }) {
182         if ($self->db_schema->{$db} eq '%') {
183             $self->db_schema->{$db} = [ $self->_schemas($db) ];
184
185             $self->qualify_objects(1);
186         }
187     }
188
189     if (not defined $self->preserve_case) {
190         foreach my $db (keys %{ $self->db_schema }) {
191             # We use the sys.databases query for the general case, and fallback to
192             # databasepropertyex() if for some reason sys.databases is not available,
193             # which does not work over DBD::ODBC with unixODBC+FreeTDS.
194             #
195             # XXX why does databasepropertyex() not work over DBD::ODBC ?
196             #
197             # more on collations here: http://msdn.microsoft.com/en-us/library/ms143515.aspx
198
199             my $current_db = $self->_current_db;
200
201             $self->_switch_db($db);
202
203             my $collation_name =
204                    (eval { $self->dbh->selectrow_array("SELECT collation_name FROM [$db].sys.databases WHERE name = @{[ $self->dbh->quote($db) ]}") })[0]
205                 || (eval { $self->dbh->selectrow_array("SELECT CAST(databasepropertyex(@{[ $self->dbh->quote($db) ]}, 'Collation') AS VARCHAR)") })[0];
206
207             $self->_switch_db($current_db);
208
209             if (not $collation_name) {
210                 warn <<"EOF";
211
212 WARNING: MSSQL Collation detection failed for database '$db'. Defaulting to
213 case-insensitive mode. Override the 'preserve_case' attribute in your Loader
214 options if needed.
215
216 See 'preserve_case' in
217 perldoc DBIx::Class::Schema::Loader::Base
218 EOF
219                 $self->preserve_case(0) unless $self->preserve_case;
220             }
221             else {
222                 my $case_sensitive = $collation_name =~ /_(?:CS|BIN2?)(?:_|\z)/;
223
224                 if ($case_sensitive && (not $self->preserve_case)) {
225                     $self->preserve_case(1);
226                 }
227                 else {
228                     $self->preserve_case(0);
229                 }
230             }
231         }
232     }
233 }
234
235 sub _tables_list {
236     my ($self) = @_;
237
238     my @tables;
239
240     while (my ($db, $owners) = each %{ $self->db_schema }) {
241         foreach my $owner (@$owners) {
242             my $table_names = $self->dbh->selectcol_arrayref(<<"EOF");
243 SELECT table_name
244 FROM [$db].INFORMATION_SCHEMA.TABLES
245 WHERE table_schema = @{[ $self->dbh->quote($owner) ]}
246 EOF
247
248             TABLE: foreach my $table_name (@$table_names) {
249                 next TABLE if any { $_ eq $table_name } $self->_system_tables;
250
251                 push @tables, DBIx::Class::Schema::Loader::Table::Sybase->new(
252                     loader   => $self,
253                     name     => $table_name,
254                     database => $db,
255                     schema   => $owner,
256                 );
257             }
258         }
259     }
260
261     return $self->_filter_tables(\@tables);
262 }
263
264 sub _table_pk_info {
265     my ($self, $table) = @_;
266
267     my $db = $table->database;
268
269     my $pk = $self->dbh->selectcol_arrayref(<<"EOF");
270 SELECT kcu.column_name
271 FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
272 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
273     ON kcu.table_name = tc.table_name
274         AND kcu.table_schema = tc.table_schema
275         AND kcu.constraint_name = tc.constraint_name
276 WHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
277     AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
278     AND tc.constraint_type = 'PRIMARY KEY'
279 ORDER BY kcu.ordinal_position
280 EOF
281
282     $pk = [ map $self->_lc($_), @$pk ];
283
284     return $pk;
285 }
286
287 sub _table_fk_info {
288     my ($self, $table) = @_;
289
290     my $db = $table->database;
291
292     my $sth = $self->dbh->prepare(<<"EOF");
293 SELECT rc.constraint_name, rc.unique_constraint_schema, uk_tc.table_name,
294        fk_kcu.column_name, uk_kcu.column_name, rc.delete_rule, rc.update_rule
295 FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS fk_tc
296 JOIN [$db].INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc
297     ON rc.constraint_name = fk_tc.constraint_name
298         AND rc.constraint_schema = fk_tc.table_schema
299 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE fk_kcu
300     ON fk_kcu.constraint_name = fk_tc.constraint_name
301         AND fk_kcu.table_name = fk_tc.table_name
302         AND fk_kcu.table_schema = fk_tc.table_schema
303 JOIN [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS uk_tc
304     ON uk_tc.constraint_name = rc.unique_constraint_name
305         AND uk_tc.table_schema = rc.unique_constraint_schema
306 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE uk_kcu
307     ON uk_kcu.constraint_name = rc.unique_constraint_name
308         AND uk_kcu.ordinal_position = fk_kcu.ordinal_position
309         AND uk_kcu.table_name = uk_tc.table_name
310         AND uk_kcu.table_schema = rc.unique_constraint_schema
311 WHERE fk_tc.table_name = @{[ $self->dbh->quote($table->name) ]}
312     AND fk_tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
313 ORDER BY fk_kcu.ordinal_position
314 EOF
315
316     $sth->execute;
317
318     my %rels;
319
320     while (my ($fk, $remote_schema, $remote_table, $col, $remote_col,
321                $delete_rule, $update_rule) = $sth->fetchrow_array) {
322         push @{ $rels{$fk}{local_columns}  }, $self->_lc($col);
323         push @{ $rels{$fk}{remote_columns} }, $self->_lc($remote_col);
324
325         $rels{$fk}{remote_table} = DBIx::Class::Schema::Loader::Table::Sybase->new(
326             loader   => $self,
327             name     => $remote_table,
328             database => $db,
329             schema   => $remote_schema,
330         ) unless exists $rels{$fk}{remote_table};
331
332         $rels{$fk}{attrs} ||= {
333             on_delete     => uc $delete_rule,
334             on_update     => uc $update_rule,
335             is_deferrable => 1 # constraints can be temporarily disabled, but DEFERRABLE is not supported
336         };
337     }
338
339     return [ values %rels ];
340 }
341
342 sub _table_uniq_info {
343     my ($self, $table) = @_;
344
345     my $db = $table->database;
346
347     my $sth = $self->dbh->prepare(<<"EOF");
348 SELECT tc.constraint_name, kcu.column_name
349 FROM [$db].INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
350 JOIN [$db].INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
351     ON kcu.constraint_name = tc.constraint_name
352         AND kcu.table_name = tc.table_name
353         AND kcu.table_schema = tc.table_schema
354 wHERE tc.table_name = @{[ $self->dbh->quote($table->name) ]}
355     AND tc.table_schema = @{[ $self->dbh->quote($table->schema) ]}
356     AND tc.constraint_type = 'UNIQUE'
357 ORDER BY kcu.ordinal_position
358 EOF
359
360     $sth->execute;
361
362     my %uniq;
363
364     while (my ($constr, $col) = $sth->fetchrow_array) {
365         push @{ $uniq{$constr} }, $self->_lc($col);
366     }
367
368     return [ map [ $_ => $uniq{$_} ], sort keys %uniq ];
369 }
370
371 sub _columns_info_for {
372     my $self    = shift;
373     my ($table) = @_;
374
375     my $db = $table->database;
376
377     my $result = $self->next::method(@_);
378
379     # get type info (and identity)
380     my $rows = $self->dbh->selectall_arrayref($self->_is_2k ? <<"EOF2K" : <<"EOF");
381 SELECT c.column_name, c.character_maximum_length, c.data_type, c.datetime_precision, c.column_default, (sc.status & 0x80) is_identity
382 FROM [$db].INFORMATION_SCHEMA.COLUMNS c
383 JOIN [$db].dbo.sysusers ss ON
384     c.table_schema = ss.name
385 JOIN [$db].dbo.sysobjects so ON
386     c.table_name = so.name
387     AND so.uid = ss.uid
388 JOIN [$db].dbo.syscolumns sc ON
389     c.column_name = sc.name
390     AND sc.id = so.Id
391 WHERE c.table_schema = @{[ $self->dbh->quote($table->schema) ]}
392     AND c.table_name = @{[ $self->dbh->quote($table->name) ]}
393 EOF2K
394 SELECT c.column_name, c.character_maximum_length, c.data_type, c.datetime_precision, c.column_default, sc.is_identity
395 FROM [$db].INFORMATION_SCHEMA.COLUMNS c
396 JOIN [$db].sys.schemas ss ON
397     c.table_schema = ss.name
398 JOIN [$db].sys.objects so ON
399       c.table_name   = so.name
400     AND so.schema_id = ss.schema_id
401 JOIN [$db].sys.columns sc ON
402     c.column_name = sc.name
403     AND sc.object_id = so.object_id
404 WHERE c.table_schema = @{[ $self->dbh->quote($table->schema) ]}
405     AND c.table_name = @{[ $self->dbh->quote($table->name) ]}
406 EOF
407
408     foreach my $row (@$rows) {
409         my ($col, $char_max_length, $data_type, $datetime_precision, $default, $is_identity) = @$row;
410         $col = lc $col unless $self->preserve_case;
411         my $info = $result->{$col} || next;
412
413         $info->{data_type} = $data_type;
414
415         if (defined $char_max_length) {
416             $info->{size} = $char_max_length;
417             $info->{size} = 0 if $char_max_length < 0;
418         }
419
420         if ($is_identity) {
421             $info->{is_auto_increment} = 1;
422             $info->{data_type} =~ s/\s*identity//i;
423             delete $info->{size};
424         }
425
426         # fix types
427         if ($data_type eq 'int') {
428             $info->{data_type} = 'integer';
429         }
430         elsif ($data_type eq 'timestamp') {
431             $info->{inflate_datetime} = 0;
432         }
433         elsif ($data_type =~ /^(?:numeric|decimal)\z/) {
434             if (ref($info->{size}) && $info->{size}[0] == 18 && $info->{size}[1] == 0) {
435                 delete $info->{size};
436             }
437         }
438         elsif ($data_type eq 'float') {
439             $info->{data_type} = 'double precision';
440             delete $info->{size};
441         }
442         elsif ($data_type =~ /^(?:small)?datetime\z/) {
443             # fixup for DBD::Sybase
444             if ($info->{default_value} && $info->{default_value} eq '3') {
445                 delete $info->{default_value};
446             }
447         }
448         elsif ($data_type =~ /^(?:datetime(?:2|offset)|time)\z/) {
449             $info->{size} = $datetime_precision;
450
451             delete $info->{size} if $info->{size} == 7;
452         }
453         elsif ($data_type eq 'varchar'   && $info->{size} == 0) {
454             $info->{data_type} = 'text';
455             delete $info->{size};
456         }
457         elsif ($data_type eq 'nvarchar'  && $info->{size} == 0) {
458             $info->{data_type} = 'ntext';
459             delete $info->{size};
460         }
461         elsif ($data_type eq 'varbinary' && $info->{size} == 0) {
462             $info->{data_type} = 'image';
463             delete $info->{size};
464         }
465
466         if ($data_type !~ /^(?:n?char|n?varchar|binary|varbinary|numeric|decimal|float|datetime(?:2|offset)|time)\z/) {
467             delete $info->{size};
468         }
469
470         if (defined $default) {
471             # strip parens
472             $default =~ s/^\( (.*) \)\z/$1/x;
473
474             # Literal strings are in ''s, numbers are in ()s (in some versions of
475             # MSSQL, in others they are unquoted) everything else is a function.
476             $info->{default_value} =
477                 $default =~ /^['(] (.*) [)']\z/x ? $1 :
478                     $default =~ /^\d/ ? $default : \$default;
479
480             if ((eval { lc ${ $info->{default_value} } }||'') eq 'getdate()') {
481                 ${ $info->{default_value} } = 'current_timestamp';
482
483                 my $getdate = 'getdate()';
484                 $info->{original}{default_value} = \$getdate;
485             }
486         }
487     }
488
489     return $result;
490 }
491
492 =head1 SEE ALSO
493
494 L<DBIx::Class::Schema::Loader::DBI::Sybase::Microsoft_SQL_Server>,
495 L<DBIx::Class::Schema::Loader::DBI::ODBC::Microsoft_SQL_Server>,
496 L<DBIx::Class::Schema::Loader>, L<DBIx::Class::Schema::Loader::Base>,
497 L<DBIx::Class::Schema::Loader::DBI>
498
499 =head1 AUTHORS
500
501 See L<DBIx::Class::Schema::Loader/AUTHORS>.
502
503 =head1 LICENSE
504
505 This library is free software; you can redistribute it and/or modify it under
506 the same terms as Perl itself.
507
508 =cut
509
510 1;
511 # vim:et sts=4 sw=4 tw=0: