Cache column and primary/foreign/unique key info
[dbsrgits/DBIx-Class-Schema-Loader.git] / lib / DBIx / Class / Schema / Loader / DBI.pm
1 package DBIx::Class::Schema::Loader::DBI;
2
3 use strict;
4 use warnings;
5 use base qw/DBIx::Class::Schema::Loader::Base/;
6 use mro 'c3';
7 use Try::Tiny;
8 use Scalar::Util 'blessed';
9 use List::Util 'any';
10 use Carp::Clan qw/^DBIx::Class/;
11 use Class::Method::Modifiers 'install_modifier';
12
13 use namespace::clean;
14 use DBIx::Class::Schema::Loader::Table ();
15
16 our $VERSION = '0.07043';
17
18 __PACKAGE__->mk_group_accessors('simple', qw/
19     _disable_pk_detection
20     _disable_uniq_detection
21     _disable_fk_detection
22     _passwords
23     quote_char
24     name_sep
25 /);
26
27 =head1 NAME
28
29 DBIx::Class::Schema::Loader::DBI - DBIx::Class::Schema::Loader DBI Implementation.
30
31 =head1 SYNOPSIS
32
33 See L<DBIx::Class::Schema::Loader::Base>
34
35 =head1 DESCRIPTION
36
37 This is the base class for L<DBIx::Class::Schema::Loader::Base> classes for
38 DBI-based storage backends, and implements the common functionality between them.
39
40 See L<DBIx::Class::Schema::Loader::Base> for the available options.
41
42 =head1 METHODS
43
44 =head2 new
45
46 Overlays L<DBIx::Class::Schema::Loader::Base/new> to do some DBI-specific
47 things.
48
49 =cut
50
51 sub new {
52     my $self = shift->next::method(@_);
53
54     # rebless to vendor-specific class if it exists and loads and we're not in a
55     # custom class.
56     if (not $self->loader_class) {
57         my $driver = $self->dbh->{Driver}->{Name};
58
59         my $subclass = 'DBIx::Class::Schema::Loader::DBI::' . $driver;
60         if ((not $self->isa($subclass)) && $self->load_optional_class($subclass)) {
61             bless $self, $subclass;
62             $self->_rebless;
63             Class::C3::reinitialize() if $] < 5.009005;
64         }
65     }
66
67     # Set up the default quoting character and name separators
68     $self->quote_char($self->_build_quote_char);
69     $self->name_sep($self->_build_name_sep);
70
71     $self->_setup;
72
73     return $self;
74 }
75
76 sub _build_quote_char {
77     my $self = shift;
78
79     my $quote_char = $self->dbh->get_info(29)
80         || $self->schema->storage->sql_maker->quote_char
81         || q{"};
82
83     # For our usage as regex matches, concatenating multiple quote_char
84     # values works fine (e.g. s/[\Q<>\E]// if quote_char was [ '<', '>' ])
85     if (ref $quote_char eq 'ARRAY') {
86         $quote_char = join '', @$quote_char;
87     }
88
89     return $quote_char;
90 }
91
92 sub _build_name_sep {
93     my $self = shift;
94     return $self->dbh->get_info(41)
95         || $self->schema->storage->sql_maker->name_sep
96         || '.';
97 }
98
99 # Override this in vendor modules to do things at the end of ->new()
100 sub _setup {
101     my ($self) = @_;
102
103     for my $method (qw(
104         _columns_info_for _table_columns
105         _table_pk_info _table_fk_info _table_uniq_info
106     )) {
107         $self->_setup_per_table_cache($method);
108     }
109 }
110
111 {
112     my %has_cache_setup;
113     sub _setup_per_table_cache {
114         my ($self, $method) = @_;
115         my $class = blessed($self);
116
117         return if $has_cache_setup{$class}{$method}++;
118
119         install_modifier($class, around => $method => sub {
120             my ($orig, $self, $table) = @_;
121             $self->{_cache}{$method}{$table->sql_name} ||= $self->$orig($table);
122         });
123     }
124 }
125
126 # Override this in vendor module to load a subclass if necessary
127 sub _rebless { }
128
129 sub _system_schemas {
130     return ('information_schema');
131 }
132
133 sub _system_tables {
134     return ();
135 }
136
137 sub _dbh_tables {
138     my $self = shift;
139
140     return $self->dbh->tables(undef, @_);
141 }
142
143 # default to be overridden in subclasses if necessary
144 sub _supports_db_schema { 1 }
145
146 # Returns an array of table objects
147 sub _tables_list {
148     my ($self) = @_;
149
150     my @tables;
151
152     my $qt  = qr/[\Q$self->{quote_char}\E"'`\[\]]/;
153     my $nqt = qr/[^\Q$self->{quote_char}\E"'`\[\]]/;
154     my $ns  = qr/[\Q$self->{name_sep}\E]/;
155     my $nns = qr/[^\Q$self->{name_sep}\E]/;
156
157     foreach my $schema (@{ $self->db_schema || [undef] }) {
158         my @raw_table_names = $self->_dbh_tables($schema);
159
160         TABLE: foreach my $raw_table_name (@raw_table_names) {
161             my $quoted = $raw_table_name =~ /^$qt/;
162
163             # These regexes are not entirely correct, but hopefully they will work
164             # in most cases. RT reports welcome.
165             my ($schema_name, $table_name1, $table_name2) = $quoted ?
166                 $raw_table_name =~ /^(?:${qt}(${nqt}+?)${qt}${ns})?(?:${qt}(.+?)${qt}|(${nns}+))\z/
167                 :
168                 $raw_table_name =~ /^(?:(${nns}+?)${ns})?(?:${qt}(.+?)${qt}|(${nns}+))\z/;
169
170             my $table_name = $table_name1 || $table_name2;
171
172             foreach my $system_schema ($self->_system_schemas) {
173                 if ($schema_name) {
174                     my $matches = 0;
175
176                     if (ref $system_schema) {
177                         $matches = 1
178                             if $schema_name =~ $system_schema
179                                 && $schema  !~ $system_schema;
180                     }
181                     else {
182                         $matches = 1
183                             if $schema_name eq $system_schema
184                                 && $schema  ne $system_schema;
185                     }
186
187                     next TABLE if $matches;
188                 }
189             }
190
191             foreach my $system_table ($self->_system_tables) {
192                 my $matches = 0;
193
194                 if (ref $system_table) {
195                     $matches = 1 if $table_name =~ $system_table;
196                 }
197                 else {
198                     $matches = 1 if $table_name eq $system_table
199                 }
200
201                 next TABLE if $matches;
202             }
203
204             $schema_name ||= $schema;
205
206             my $table = DBIx::Class::Schema::Loader::Table->new(
207                 loader => $self,
208                 name   => $table_name,
209                 schema => $schema_name,
210                 ($self->_supports_db_schema ? () : (
211                     ignore_schema => 1
212                 )),
213             );
214
215             push @tables, $table;
216         }
217     }
218
219     return $self->_filter_tables(\@tables);
220 }
221
222 sub _recurse_constraint {
223     my ($constraint, @parts) = @_;
224
225     my $name = shift @parts;
226
227     # If there are any parts left, the constraint must be an arrayref
228     croak "depth of constraint/exclude array does not match length of moniker_parts"
229         unless !!@parts == !!(ref $constraint eq 'ARRAY');
230
231     # if ths is the last part, use the constraint directly
232     return $name =~ $constraint unless @parts;
233
234     # recurse into the first matching subconstraint
235     foreach (@{$constraint}) {
236         my ($re, $sub) = @{$_};
237         return _recurse_constraint($sub, @parts)
238             if $name =~ $re;
239     }
240     return 0;
241 }
242
243 sub _check_constraint {
244     my ($include, $constraint, @tables) = @_;
245
246     return @tables unless defined $constraint;
247
248     return grep { !$include xor _recurse_constraint($constraint, @{$_}) } @tables
249         if ref $constraint eq 'ARRAY';
250
251     return grep { !$include xor /$constraint/ } @tables;
252 }
253
254
255
256 # apply constraint/exclude and ignore bad tables and views
257 sub _filter_tables {
258     my ($self, $tables) = @_;
259
260     my @tables = @$tables;
261     my @filtered_tables;
262
263     @tables = _check_constraint(1, $self->constraint, @tables);
264     @tables = _check_constraint(0, $self->exclude, @tables);
265
266     TABLE: for my $table (@tables) {
267         try {
268             local $^W = 0; # for ADO
269             my $sth = $self->_sth_for($table, undef, \'1 = 0');
270             $sth->execute;
271             1;
272         }
273         catch {
274             warn "Bad table or view '$table', ignoring: $_\n";
275             0;
276         } or next TABLE;
277
278         push @filtered_tables, $table;
279     }
280
281     return @filtered_tables;
282 }
283
284 =head2 load
285
286 We override L<DBIx::Class::Schema::Loader::Base/load> here to hook in our localized settings for C<$dbh> error handling.
287
288 =cut
289
290 sub load {
291     my $self = shift;
292
293     local $self->dbh->{RaiseError} = 1;
294     local $self->dbh->{PrintError} = 0;
295
296     $self->next::method(@_);
297 }
298
299 sub _sth_for {
300     my ($self, $table, $fields, $where) = @_;
301
302     my $sth = $self->dbh->prepare($self->schema->storage->sql_maker
303         ->select(\$table->sql_name, $fields, $where));
304
305     return $sth;
306 }
307
308 # Returns an arrayref of column names
309 sub _table_columns {
310     my ($self, $table) = @_;
311
312     my $sth = $self->_sth_for($table, undef, \'1 = 0');
313     $sth->execute;
314
315     my $retval = [ map $self->_lc($_), @{$sth->{NAME}} ];
316
317     $sth->finish;
318
319     return $retval;
320 }
321
322 # Returns arrayref of pk col names
323 sub _table_pk_info {
324     my ($self, $table) = @_;
325
326     return [] if $self->_disable_pk_detection;
327
328     my @primary = try {
329         $self->dbh->primary_key('', $table->schema, $table->name);
330     }
331     catch {
332         warn "Cannot find primary keys for this driver: $_";
333         $self->_disable_pk_detection(1);
334         return ();
335     };
336
337     return [] if not @primary;
338
339     @primary = map { $self->_lc($_) } @primary;
340     s/[\Q$self->{quote_char}\E]//g for @primary;
341
342     return \@primary;
343 }
344
345 # Override this for vendor-specific uniq info
346 sub _table_uniq_info {
347     my ($self, $table) = @_;
348
349     return [] if $self->_disable_uniq_detection;
350
351     if (not $self->dbh->can('statistics_info')) {
352         warn "No UNIQUE constraint information can be gathered for this driver";
353         $self->_disable_uniq_detection(1);
354         return [];
355     }
356
357     my %indices;
358     my $sth = $self->dbh->statistics_info(undef, $table->schema, $table->name, 1, 1);
359     while(my $row = $sth->fetchrow_hashref) {
360         # skip table-level stats, conditional indexes, and any index missing
361         #  critical fields
362         next if $row->{TYPE} eq 'table'
363             || defined $row->{FILTER_CONDITION}
364             || !$row->{INDEX_NAME}
365             || !defined $row->{ORDINAL_POSITION};
366
367         $indices{$row->{INDEX_NAME}}[$row->{ORDINAL_POSITION}] = $self->_lc($row->{COLUMN_NAME} || '');
368     }
369     $sth->finish;
370
371     my @retval;
372     foreach my $index_name (sort keys %indices) {
373         my (undef, @cols) = @{$indices{$index_name}};
374         # skip indexes with missing column names (e.g. expression indexes)
375         next unless @cols == grep $_, @cols;
376         push(@retval, [ $index_name => \@cols ]);
377     }
378
379     return \@retval;
380 }
381
382 sub _table_comment {
383     my ($self, $table) = @_;
384     my $dbh = $self->dbh;
385
386     my $comments_table = $table->clone;
387     $comments_table->name($self->table_comments_table);
388
389     my ($comment) =
390         (exists $self->_tables->{$comments_table->sql_name} || undef)
391         && try { $dbh->selectrow_array(<<"EOF") };
392 SELECT comment_text
393 FROM @{[ $comments_table->sql_name ]}
394 WHERE table_name = @{[ $dbh->quote($table->name) ]}
395 EOF
396
397     # Failback: try the REMARKS column on table_info
398     if (!$comment) {
399         my $info = $self->_dbh_table_info( $dbh, $table );
400         $comment = $info->{REMARKS} if $info;
401     }
402
403     return $comment;
404 }
405
406 sub _column_comment {
407     my ($self, $table, $column_number, $column_name) = @_;
408     my $dbh = $self->dbh;
409
410     my $comments_table = $table->clone;
411     $comments_table->name($self->column_comments_table);
412
413     my ($comment) =
414         (exists $self->_tables->{$comments_table->sql_name} || undef)
415         && try { $dbh->selectrow_array(<<"EOF") };
416 SELECT comment_text
417 FROM @{[ $comments_table->sql_name ]}
418 WHERE table_name = @{[ $dbh->quote($table->name) ]}
419 AND column_name = @{[ $dbh->quote($column_name) ]}
420 EOF
421
422     # Failback: try the REMARKS column on column_info
423     if (!$comment && $dbh->can('column_info')) {
424         if (my $sth = try { $self->_dbh_column_info( $dbh, undef, $table->schema, $table->name, $column_name ) }) {
425             my $info = $sth->fetchrow_hashref();
426             $comment = $info->{REMARKS};
427         }
428     }
429
430     return $comment;
431 }
432
433 # Find relationships
434 sub _table_fk_info {
435     my ($self, $table) = @_;
436
437     return [] if $self->_disable_fk_detection;
438
439     my $sth = try {
440         $self->dbh->foreign_key_info( '', '', '',
441                                 '', ($table->schema || ''), $table->name );
442     }
443     catch {
444         warn "Cannot introspect relationships for this driver: $_";
445         $self->_disable_fk_detection(1);
446         return undef;
447     };
448
449     return [] if !$sth;
450
451     my %rels;
452
453     my @rules = (
454         'CASCADE',
455         'RESTRICT',
456         'SET NULL',
457         'NO ACTION',
458         'SET DEFAULT',
459     );
460
461     my $i = 1; # for unnamed rels, which hopefully have only 1 column ...
462     REL: while(my $raw_rel = $sth->fetchrow_arrayref) {
463         my $uk_scm  = $raw_rel->[1];
464         my $uk_tbl  = $raw_rel->[2];
465         my $uk_col  = $self->_lc($raw_rel->[3]);
466         my $fk_scm  = $raw_rel->[5];
467         my $fk_col  = $self->_lc($raw_rel->[7]);
468         my $key_seq = $raw_rel->[8] - 1;
469         my $relid   = ($raw_rel->[11] || ( "__dcsld__" . $i++ ));
470
471         my $update_rule = $raw_rel->[9];
472         my $delete_rule = $raw_rel->[10];
473
474         $update_rule = $rules[$update_rule] if defined $update_rule;
475         $delete_rule = $rules[$delete_rule] if defined $delete_rule;
476
477         my $is_deferrable = $raw_rel->[13];
478
479         ($is_deferrable = $is_deferrable == 7 ? 0 : 1)
480             if defined $is_deferrable;
481
482         foreach my $var ($uk_scm, $uk_tbl, $uk_col, $fk_scm, $fk_col, $relid) {
483             $var =~ s/[\Q$self->{quote_char}\E]//g if defined $var;
484         }
485
486         if ($self->db_schema && $self->db_schema->[0] ne '%'
487             && (not any { $_ eq $uk_scm } @{ $self->db_schema })) {
488
489             next REL;
490         }
491
492         $rels{$relid}{tbl} ||= DBIx::Class::Schema::Loader::Table->new(
493             loader => $self,
494             name   => $uk_tbl,
495             schema => $uk_scm,
496             ($self->_supports_db_schema ? () : (
497                 ignore_schema => 1
498             )),
499         );
500
501         $rels{$relid}{attrs}{on_delete}     = $delete_rule if $delete_rule;
502         $rels{$relid}{attrs}{on_update}     = $update_rule if $update_rule;
503         $rels{$relid}{attrs}{is_deferrable} = $is_deferrable if defined $is_deferrable;
504
505         # Add this data IN ORDER
506         $rels{$relid}{rcols}[$key_seq] = $uk_col;
507         $rels{$relid}{lcols}[$key_seq] = $fk_col;
508     }
509     $sth->finish;
510
511     my @rels;
512     foreach my $relid (keys %rels) {
513         push(@rels, {
514             remote_columns => [ grep defined, @{ $rels{$relid}{rcols} } ],
515             local_columns  => [ grep defined, @{ $rels{$relid}{lcols} } ],
516             remote_table   => $rels{$relid}->{tbl},
517             (exists $rels{$relid}{attrs} ?
518                 (attrs => $rels{$relid}{attrs})
519                 :
520                 ()
521             ),
522             _constraint_name => $relid,
523         });
524     }
525
526     return \@rels;
527 }
528
529 # ported in from DBIx::Class::Storage::DBI:
530 sub _columns_info_for {
531     my ($self, $table) = @_;
532
533     my $dbh = $self->schema->storage->dbh;
534
535     my %result;
536
537     if (my $sth = try { $self->_dbh_column_info($dbh, undef, $table->schema, $table->name, '%' ) }) {
538         COL_INFO: while (my $info = try { $sth->fetchrow_hashref } catch { +{} }) {
539             next COL_INFO unless %$info;
540
541             my $column_info = {};
542             $column_info->{data_type}     = lc $info->{TYPE_NAME};
543
544             my $size = $info->{COLUMN_SIZE};
545
546             if (defined $size && defined $info->{DECIMAL_DIGITS}) {
547                 $column_info->{size} = [$size, $info->{DECIMAL_DIGITS}];
548             }
549             elsif (defined $size) {
550                 $column_info->{size} = $size;
551             }
552
553             $column_info->{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
554             $column_info->{default_value} = $info->{COLUMN_DEF} if defined $info->{COLUMN_DEF};
555             my $col_name = $info->{COLUMN_NAME};
556             $col_name =~ s/^\"(.*)\"$/$1/;
557
558             my $extra_info = $self->_extra_column_info(
559                 $table, $col_name, $column_info, $info
560             ) || {};
561             $column_info = { %$column_info, %$extra_info };
562
563             $result{$col_name} = $column_info;
564         }
565         $sth->finish;
566     }
567
568     my $sth = $self->_sth_for($table, undef, \'1 = 0');
569     $sth->execute;
570
571     my @columns = @{ $sth->{NAME} };
572
573     COL: for my $i (0 .. $#columns) {
574         next COL if %{ $result{ $columns[$i] }||{} };
575
576         my $column_info = {};
577         $column_info->{data_type} = lc $sth->{TYPE}[$i];
578
579         my $size = $sth->{PRECISION}[$i];
580
581         if (defined $size && defined $sth->{SCALE}[$i]) {
582             $column_info->{size} = [$size, $sth->{SCALE}[$i]];
583         }
584         elsif (defined $size) {
585             $column_info->{size} = $size;
586         }
587
588         $column_info->{is_nullable} = $sth->{NULLABLE}[$i] ? 1 : 0;
589
590         if ($column_info->{data_type} =~ m/^(.*?)\((.*?)\)$/) {
591             $column_info->{data_type} = $1;
592             $column_info->{size}    = $2;
593         }
594
595         my $extra_info = $self->_extra_column_info($table, $columns[$i], $column_info, $sth) || {};
596         $column_info = { %$column_info, %$extra_info };
597
598         $result{ $columns[$i] } = $column_info;
599     }
600     $sth->finish;
601
602     foreach my $col (keys %result) {
603         my $colinfo = $result{$col};
604         my $type_num = $colinfo->{data_type};
605         my $type_name;
606         if (defined $type_num && $type_num =~ /^-?\d+\z/ && $dbh->can('type_info')) {
607             my $type_name = $self->_dbh_type_info_type_name($type_num);
608             $colinfo->{data_type} = lc $type_name if $type_name;
609         }
610     }
611
612     # check for instances of the same column name with different case in preserve_case=0 mode
613     if (not $self->preserve_case) {
614         my %lc_colnames;
615
616         foreach my $col (keys %result) {
617             push @{ $lc_colnames{lc $col} }, $col;
618         }
619
620         if (keys %lc_colnames != keys %result) {
621             my @offending_colnames = map @$_, grep @$_ > 1, values %lc_colnames;
622
623             my $offending_colnames = join ", ", map "'$_'", @offending_colnames;
624
625             croak "columns $offending_colnames in table @{[ $table->sql_name ]} collide in preserve_case=0 mode. preserve_case=1 mode required";
626         }
627
628         # apply lowercasing
629         my %lc_result;
630
631         while (my ($col, $info) = each %result) {
632             $lc_result{ $self->_lc($col) } = $info;
633         }
634
635         %result = %lc_result;
636     }
637
638     return \%result;
639 }
640
641 # Need to override this for the buggy Firebird ODBC driver.
642 sub _dbh_type_info_type_name {
643     my ($self, $type_num) = @_;
644
645     # We wrap it in a try block for MSSQL+DBD::Sybase, which can have issues.
646     # TODO investigate further
647     my $type_info = try { $self->dbh->type_info($type_num) };
648
649     return $type_info ? $type_info->{TYPE_NAME} : undef;
650 }
651
652 # do not use this, override _columns_info_for instead
653 sub _extra_column_info {}
654
655 # override to mask warnings if needed
656 sub _dbh_table_info {
657     my ($self, $dbh, $table) = (shift, shift, shift);
658
659     return undef if !$dbh->can('table_info');
660     my $sth = $dbh->table_info(undef, $table->schema, $table->name);
661     while (my $info = $sth->fetchrow_hashref) {
662         next if !$self->_table_info_matches($table, $info);
663         return $info;
664     }
665     return undef;
666 }
667
668 sub _table_info_matches {
669     my ($self, $table, $info) = @_;
670
671     no warnings 'uninitialized';
672     return $info->{TABLE_SCHEM} eq $table->schema
673         && $info->{TABLE_NAME}  eq $table->name;
674 }
675
676 # override to mask warnings if needed (see mysql)
677 sub _dbh_column_info {
678     my ($self, $dbh) = (shift, shift);
679
680     return $dbh->column_info(@_);
681 }
682
683 # If a coderef uses DBI->connect, this should get its connect info.
684 sub _try_infer_connect_info_from_coderef {
685     my ($self, $code) = @_;
686
687     my ($dsn, $user, $pass, $params);
688
689     no warnings 'redefine';
690
691     local *DBI::connect = sub {
692         (undef, $dsn, $user, $pass, $params) = @_;
693     };
694
695     $code->();
696
697     return ($dsn, $user, $pass, $params);
698 }
699
700 sub dbh {
701     my $self = shift;
702
703     return $self->schema->storage->dbh;
704 }
705
706 sub _table_is_view {
707     my ($self, $table) = @_;
708
709     my $info = $self->_dbh_table_info($self->dbh, $table)
710         or return 0;
711     return $info->{TABLE_TYPE} eq 'VIEW';
712 }
713
714 =head1 SEE ALSO
715
716 L<DBIx::Class::Schema::Loader>
717
718 =head1 AUTHORS
719
720 See L<DBIx::Class::Schema::Loader/AUTHORS>.
721
722 =head1 LICENSE
723
724 This library is free software; you can redistribute it and/or modify it under
725 the same terms as Perl itself.
726
727 =cut
728
729 1;
730 # vim:et sts=4 sw=4 tw=0: