use carp instead of warn
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use SQL::Abstract::Limit;
11 use DBIx::Class::Storage::DBI::Cursor;
12 use DBIx::Class::Storage::Statistics;
13 use Scalar::Util qw/blessed weaken/;
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
18 );
19
20 # the values for these accessors are picked out (and deleted) from
21 # the attribute hashref passed to connect_info
22 my @storage_options = qw/
23   on_connect_do on_disconnect_do disable_sth_caching unsafe auto_savepoint
24 /;
25 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
26
27
28 # default cursor class, overridable in connect_info attributes
29 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
30
31 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
32 __PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
33
34 BEGIN {
35
36 package # Hide from PAUSE
37   DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
38
39 use base qw/SQL::Abstract::Limit/;
40 use Carp::Clan qw/^DBIx::Class/;
41
42 sub new {
43   my $self = shift->SUPER::new(@_);
44
45   # This prevents the caching of $dbh in S::A::L, I believe
46   # If limit_dialect is a ref (like a $dbh), go ahead and replace
47   #   it with what it resolves to:
48   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
49     if ref $self->{limit_dialect};
50
51   $self;
52 }
53
54
55
56 # Some databases (sqlite) do not handle multiple parenthesis
57 # around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
58 # is interpreted as x IN 1 or something similar.
59 #
60 # Since we currently do not have access to the SQLA AST, resort
61 # to barbaric mutilation of any SQL supplied in literal form
62
63 sub _strip_outer_paren {
64   my ($self, $arg) = @_;
65
66 use Data::Dumper;
67
68   return $self->_SWITCH_refkind ($arg, {
69     ARRAYREFREF => sub {
70       $$arg->[0] = __strip_outer_paren ($$arg->[0]);
71       return $arg;
72     },
73     SCALARREF => sub {
74       return \__strip_outer_paren( $$arg );
75     },
76     FALLBACK => sub {
77       return $arg
78     },
79   });
80 }
81
82 sub __strip_outer_paren {
83   my $sql = shift;
84
85   if ($sql and not ref $sql) {
86     while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
87       $sql = $1;
88     }
89   }
90
91   return $sql;
92 }
93
94 sub _where_field_IN {
95   my ($self, $lhs, $op, $rhs) = @_;
96   $rhs = $self->_strip_outer_paren ($rhs);
97   return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
98 }
99
100 sub _where_field_BETWEEN {
101   my ($self, $lhs, $op, $rhs) = @_;
102   $rhs = $self->_strip_outer_paren ($rhs);
103   return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
104 }
105
106
107
108 # DB2 is the only remaining DB using this. Even though we are not sure if
109 # RowNumberOver is still needed here (should be part of SQLA) leave the 
110 # code in place
111 sub _RowNumberOver {
112   my ($self, $sql, $order, $rows, $offset ) = @_;
113
114   $offset += 1;
115   my $last = $rows + $offset;
116   my ( $order_by ) = $self->_order_by( $order );
117
118   $sql = <<"SQL";
119 SELECT * FROM
120 (
121    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
122       $sql
123       $order_by
124    ) Q1
125 ) Q2
126 WHERE ROW_NUM BETWEEN $offset AND $last
127
128 SQL
129
130   return $sql;
131 }
132
133
134 # While we're at it, this should make LIMIT queries more efficient,
135 #  without digging into things too deeply
136 use Scalar::Util 'blessed';
137 sub _find_syntax {
138   my ($self, $syntax) = @_;
139   
140   # DB2 is the only remaining DB using this. Even though we are not sure if
141   # RowNumberOver is still needed here (should be part of SQLA) leave the 
142   # code in place
143   my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
144   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
145     return 'RowNumberOver';
146   }
147   
148   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
149 }
150
151 sub select {
152   my ($self, $table, $fields, $where, $order, @rest) = @_;
153   local $self->{having_bind} = [];
154   local $self->{from_bind} = [];
155
156   if (ref $table eq 'SCALAR') {
157     $table = $$table;
158   }
159   elsif (not ref $table) {
160     $table = $self->_quote($table);
161   }
162   local $self->{rownum_hack_count} = 1
163     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
164   @rest = (-1) unless defined $rest[0];
165   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
166     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
167   my ($sql, @where_bind) = $self->SUPER::select(
168     $table, $self->_recurse_fields($fields), $where, $order, @rest
169   );
170   $sql .= 
171     $self->{for} ?
172     (
173       $self->{for} eq 'update' ? ' FOR UPDATE' :
174       $self->{for} eq 'shared' ? ' FOR SHARE'  :
175       ''
176     ) :
177     ''
178   ;
179   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
180 }
181
182 sub insert {
183   my $self = shift;
184   my $table = shift;
185   $table = $self->_quote($table) unless ref($table);
186   $self->SUPER::insert($table, @_);
187 }
188
189 sub update {
190   my $self = shift;
191   my $table = shift;
192   $table = $self->_quote($table) unless ref($table);
193   $self->SUPER::update($table, @_);
194 }
195
196 sub delete {
197   my $self = shift;
198   my $table = shift;
199   $table = $self->_quote($table) unless ref($table);
200   $self->SUPER::delete($table, @_);
201 }
202
203 sub _emulate_limit {
204   my $self = shift;
205   if ($_[3] == -1) {
206     return $_[1].$self->_order_by($_[2]);
207   } else {
208     return $self->SUPER::_emulate_limit(@_);
209   }
210 }
211
212 sub _recurse_fields {
213   my ($self, $fields, $params) = @_;
214   my $ref = ref $fields;
215   return $self->_quote($fields) unless $ref;
216   return $$fields if $ref eq 'SCALAR';
217
218   if ($ref eq 'ARRAY') {
219     return join(', ', map {
220       $self->_recurse_fields($_)
221         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
222           ? ' AS col'.$self->{rownum_hack_count}++
223           : '')
224       } @$fields);
225   } elsif ($ref eq 'HASH') {
226     foreach my $func (keys %$fields) {
227       if ($func eq 'distinct') {
228         my $_fields = $fields->{$func};
229         if (ref $_fields eq 'ARRAY' && @{$_fields} > 1) {
230           die "Unsupported syntax, please use " . 
231               "{ group_by => [ qw/" . (join ' ', @$_fields) . "/ ] }" .
232               " or " .
233               "{ select => [ qw/" . (join ' ', @$_fields) . "/ ], distinct => 1 }";
234         }
235         else {
236           carp "This syntax will be deprecated in 09, please use " . 
237                "{ group_by => '${_fields}' }" . 
238                " or " .
239                "{ select => '${_fields}', distinct => 1 }";
240         }
241       }
242       
243       return $self->_sqlcase($func)
244         .'( '.$self->_recurse_fields($fields->{$func}).' )';
245     }
246   }
247   # Is the second check absolutely necessary?
248   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
249     return $self->_fold_sqlbind( $fields );
250   }
251   else {
252     Carp::croak($ref . qq{ unexpected in _recurse_fields()})
253   }
254 }
255
256 sub _order_by {
257   my $self = shift;
258   my $ret = '';
259   my @extra;
260   if (ref $_[0] eq 'HASH') {
261     if (defined $_[0]->{group_by}) {
262       $ret = $self->_sqlcase(' group by ')
263         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
264     }
265     if (defined $_[0]->{having}) {
266       my $frag;
267       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
268       push(@{$self->{having_bind}}, @extra);
269       $ret .= $self->_sqlcase(' having ').$frag;
270     }
271     if (defined $_[0]->{order_by}) {
272       $ret .= $self->_order_by($_[0]->{order_by});
273     }
274     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
275       return $self->SUPER::_order_by($_[0]);
276     }
277   } elsif (ref $_[0] eq 'SCALAR') {
278     $ret = $self->_sqlcase(' order by ').${ $_[0] };
279   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
280     my @order = @{+shift};
281     $ret = $self->_sqlcase(' order by ')
282           .join(', ', map {
283                         my $r = $self->_order_by($_, @_);
284                         $r =~ s/^ ?ORDER BY //i;
285                         $r;
286                       } @order);
287   } else {
288     $ret = $self->SUPER::_order_by(@_);
289   }
290   return $ret;
291 }
292
293 sub _order_directions {
294   my ($self, $order) = @_;
295   $order = $order->{order_by} if ref $order eq 'HASH';
296   return $self->SUPER::_order_directions($order);
297 }
298
299 sub _table {
300   my ($self, $from) = @_;
301   if (ref $from eq 'ARRAY') {
302     return $self->_recurse_from(@$from);
303   } elsif (ref $from eq 'HASH') {
304     return $self->_make_as($from);
305   } else {
306     return $from; # would love to quote here but _table ends up getting called
307                   # twice during an ->select without a limit clause due to
308                   # the way S::A::Limit->select works. should maybe consider
309                   # bypassing this and doing S::A::select($self, ...) in
310                   # our select method above. meantime, quoting shims have
311                   # been added to select/insert/update/delete here
312   }
313 }
314
315 sub _recurse_from {
316   my ($self, $from, @join) = @_;
317   my @sqlf;
318   push(@sqlf, $self->_make_as($from));
319   foreach my $j (@join) {
320     my ($to, $on) = @$j;
321
322     # check whether a join type exists
323     my $join_clause = '';
324     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
325     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
326       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
327     } else {
328       $join_clause = ' JOIN ';
329     }
330     push(@sqlf, $join_clause);
331
332     if (ref $to eq 'ARRAY') {
333       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
334     } else {
335       push(@sqlf, $self->_make_as($to));
336     }
337     push(@sqlf, ' ON ', $self->_join_condition($on));
338   }
339   return join('', @sqlf);
340 }
341
342 sub _fold_sqlbind {
343   my ($self, $sqlbind) = @_;
344   my $sql = shift @$$sqlbind;
345   push @{$self->{from_bind}}, @$$sqlbind;
346   return $sql;
347 }
348
349 sub _make_as {
350   my ($self, $from) = @_;
351   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
352                         : ref $_ eq 'REF'    ? $self->_fold_sqlbind($_)
353                         : $self->_quote($_))
354                        } reverse each %{$self->_skip_options($from)});
355 }
356
357 sub _skip_options {
358   my ($self, $hash) = @_;
359   my $clean_hash = {};
360   $clean_hash->{$_} = $hash->{$_}
361     for grep {!/^-/} keys %$hash;
362   return $clean_hash;
363 }
364
365 sub _join_condition {
366   my ($self, $cond) = @_;
367   if (ref $cond eq 'HASH') {
368     my %j;
369     for (keys %$cond) {
370       my $v = $cond->{$_};
371       if (ref $v) {
372         # XXX no throw_exception() in this package and croak() fails with strange results
373         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
374             if ref($v) ne 'SCALAR';
375         $j{$_} = $v;
376       }
377       else {
378         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
379       }
380     };
381     return scalar($self->_recurse_where(\%j));
382   } elsif (ref $cond eq 'ARRAY') {
383     return join(' OR ', map { $self->_join_condition($_) } @$cond);
384   } else {
385     die "Can't handle this yet!";
386   }
387 }
388
389 sub _quote {
390   my ($self, $label) = @_;
391   return '' unless defined $label;
392   return "*" if $label eq '*';
393   return $label unless $self->{quote_char};
394   if(ref $self->{quote_char} eq "ARRAY"){
395     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
396       if !defined $self->{name_sep};
397     my $sep = $self->{name_sep};
398     return join($self->{name_sep},
399         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
400        split(/\Q$sep\E/,$label));
401   }
402   return $self->SUPER::_quote($label);
403 }
404
405 sub limit_dialect {
406     my $self = shift;
407     $self->{limit_dialect} = shift if @_;
408     return $self->{limit_dialect};
409 }
410
411 sub quote_char {
412     my $self = shift;
413     $self->{quote_char} = shift if @_;
414     return $self->{quote_char};
415 }
416
417 sub name_sep {
418     my $self = shift;
419     $self->{name_sep} = shift if @_;
420     return $self->{name_sep};
421 }
422
423 } # End of BEGIN block
424
425 =head1 NAME
426
427 DBIx::Class::Storage::DBI - DBI storage handler
428
429 =head1 SYNOPSIS
430
431   my $schema = MySchema->connect('dbi:SQLite:my.db');
432
433   $schema->storage->debug(1);
434   $schema->dbh_do("DROP TABLE authors");
435
436   $schema->resultset('Book')->search({
437      written_on => $schema->storage->datetime_parser(DateTime->now)
438   });
439
440 =head1 DESCRIPTION
441
442 This class represents the connection to an RDBMS via L<DBI>.  See
443 L<DBIx::Class::Storage> for general information.  This pod only
444 documents DBI-specific methods and behaviors.
445
446 =head1 METHODS
447
448 =cut
449
450 sub new {
451   my $new = shift->next::method(@_);
452
453   $new->transaction_depth(0);
454   $new->_sql_maker_opts({});
455   $new->{savepoints} = [];
456   $new->{_in_dbh_do} = 0;
457   $new->{_dbh_gen} = 0;
458
459   $new;
460 }
461
462 =head2 connect_info
463
464 This method is normally called by L<DBIx::Class::Schema/connection>, which
465 encapsulates its argument list in an arrayref before passing them here.
466
467 The argument list may contain:
468
469 =over
470
471 =item *
472
473 The same 4-element argument set one would normally pass to
474 L<DBI/connect>, optionally followed by
475 L<extra attributes|/DBIx::Class specific connection attributes>
476 recognized by DBIx::Class:
477
478   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
479
480 =item *
481
482 A single code reference which returns a connected 
483 L<DBI database handle|DBI/connect> optionally followed by 
484 L<extra attributes|/DBIx::Class specific connection attributes> recognized
485 by DBIx::Class:
486
487   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
488
489 =item *
490
491 A single hashref with all the attributes and the dsn/user/password
492 mixed together:
493
494   $connect_info_args = [{
495     dsn => $dsn,
496     user => $user,
497     password => $pass,
498     %dbi_attributes,
499     %extra_attributes,
500   }];
501
502 This is particularly useful for L<Catalyst> based applications, allowing the 
503 following config (L<Config::General> style):
504
505   <Model::DB>
506     schema_class   App::DB
507     <connect_info>
508       dsn          dbi:mysql:database=test
509       user         testuser
510       password     TestPass
511       AutoCommit   1
512     </connect_info>
513   </Model::DB>
514
515 =back
516
517 Please note that the L<DBI> docs recommend that you always explicitly
518 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
519 recommends that it be set to I<1>, and that you perform transactions
520 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
521 to I<1> if you do not do explicitly set it to zero.  This is the default 
522 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
523
524 =head3 DBIx::Class specific connection attributes
525
526 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
527 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
528 the following connection options. These options can be mixed in with your other
529 L<DBI> connection attributes, or placed in a seperate hashref
530 (C<\%extra_attributes>) as shown above.
531
532 Every time C<connect_info> is invoked, any previous settings for
533 these options will be cleared before setting the new ones, regardless of
534 whether any options are specified in the new C<connect_info>.
535
536
537 =over
538
539 =item on_connect_do
540
541 Specifies things to do immediately after connecting or re-connecting to
542 the database.  Its value may contain:
543
544 =over
545
546 =item an array reference
547
548 This contains SQL statements to execute in order.  Each element contains
549 a string or a code reference that returns a string.
550
551 =item a code reference
552
553 This contains some code to execute.  Unlike code references within an
554 array reference, its return value is ignored.
555
556 =back
557
558 =item on_disconnect_do
559
560 Takes arguments in the same form as L</on_connect_do> and executes them
561 immediately before disconnecting from the database.
562
563 Note, this only runs if you explicitly call L</disconnect> on the
564 storage object.
565
566 =item disable_sth_caching
567
568 If set to a true value, this option will disable the caching of
569 statement handles via L<DBI/prepare_cached>.
570
571 =item limit_dialect 
572
573 Sets the limit dialect. This is useful for JDBC-bridge among others
574 where the remote SQL-dialect cannot be determined by the name of the
575 driver alone. See also L<SQL::Abstract::Limit>.
576
577 =item quote_char
578
579 Specifies what characters to use to quote table and column names. If 
580 you use this you will want to specify L</name_sep> as well.
581
582 C<quote_char> expects either a single character, in which case is it
583 is placed on either side of the table/column name, or an arrayref of length
584 2 in which case the table/column name is placed between the elements.
585
586 For example under MySQL you should use C<< quote_char => '`' >>, and for
587 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
588
589 =item name_sep
590
591 This only needs to be used in conjunction with C<quote_char>, and is used to 
592 specify the charecter that seperates elements (schemas, tables, columns) from 
593 each other. In most cases this is simply a C<.>.
594
595 The consequences of not supplying this value is that L<SQL::Abstract>
596 will assume DBIx::Class' uses of aliases to be complete column
597 names. The output will look like I<"me.name"> when it should actually
598 be I<"me"."name">.
599
600 =item unsafe
601
602 This Storage driver normally installs its own C<HandleError>, sets
603 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
604 all database handles, including those supplied by a coderef.  It does this
605 so that it can have consistent and useful error behavior.
606
607 If you set this option to a true value, Storage will not do its usual
608 modifications to the database handle's attributes, and instead relies on
609 the settings in your connect_info DBI options (or the values you set in
610 your connection coderef, in the case that you are connecting via coderef).
611
612 Note that your custom settings can cause Storage to malfunction,
613 especially if you set a C<HandleError> handler that suppresses exceptions
614 and/or disable C<RaiseError>.
615
616 =item auto_savepoint
617
618 If this option is true, L<DBIx::Class> will use savepoints when nesting
619 transactions, making it possible to recover from failure in the inner
620 transaction without having to abort all outer transactions.
621
622 =item cursor_class
623
624 Use this argument to supply a cursor class other than the default
625 L<DBIx::Class::Storage::DBI::Cursor>.
626
627 =back
628
629 Some real-life examples of arguments to L</connect_info> and
630 L<DBIx::Class::Schema/connect>
631
632   # Simple SQLite connection
633   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
634
635   # Connect via subref
636   ->connect_info([ sub { DBI->connect(...) } ]);
637
638   # A bit more complicated
639   ->connect_info(
640     [
641       'dbi:Pg:dbname=foo',
642       'postgres',
643       'my_pg_password',
644       { AutoCommit => 1 },
645       { quote_char => q{"}, name_sep => q{.} },
646     ]
647   );
648
649   # Equivalent to the previous example
650   ->connect_info(
651     [
652       'dbi:Pg:dbname=foo',
653       'postgres',
654       'my_pg_password',
655       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
656     ]
657   );
658
659   # Same, but with hashref as argument
660   # See parse_connect_info for explanation
661   ->connect_info(
662     [{
663       dsn         => 'dbi:Pg:dbname=foo',
664       user        => 'postgres',
665       password    => 'my_pg_password',
666       AutoCommit  => 1,
667       quote_char  => q{"},
668       name_sep    => q{.},
669     }]
670   );
671
672   # Subref + DBIx::Class-specific connection options
673   ->connect_info(
674     [
675       sub { DBI->connect(...) },
676       {
677           quote_char => q{`},
678           name_sep => q{@},
679           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
680           disable_sth_caching => 1,
681       },
682     ]
683   );
684
685
686
687 =cut
688
689 sub connect_info {
690   my ($self, $info_arg) = @_;
691
692   return $self->_connect_info if !$info_arg;
693
694   my @args = @$info_arg;  # take a shallow copy for further mutilation
695   $self->_connect_info([@args]); # copy for _connect_info
696
697
698   # combine/pre-parse arguments depending on invocation style
699
700   my %attrs;
701   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
702     %attrs = %{ $args[1] || {} };
703     @args = $args[0];
704   }
705   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
706     %attrs = %{$args[0]};
707     @args = ();
708     for (qw/password user dsn/) {
709       unshift @args, delete $attrs{$_};
710     }
711   }
712   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
713     %attrs = (
714       % { $args[3] || {} },
715       % { $args[4] || {} },
716     );
717     @args = @args[0,1,2];
718   }
719
720   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
721   #  the new set of options
722   $self->_sql_maker(undef);
723   $self->_sql_maker_opts({});
724
725   if(keys %attrs) {
726     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
727       if(my $value = delete $attrs{$storage_opt}) {
728         $self->$storage_opt($value);
729       }
730     }
731     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
732       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
733         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
734       }
735     }
736   }
737
738   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
739
740   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
741   $self->_connect_info;
742 }
743
744 =head2 on_connect_do
745
746 This method is deprecated in favour of setting via L</connect_info>.
747
748
749 =head2 dbh_do
750
751 Arguments: ($subref | $method_name), @extra_coderef_args?
752
753 Execute the given $subref or $method_name using the new exception-based
754 connection management.
755
756 The first two arguments will be the storage object that C<dbh_do> was called
757 on and a database handle to use.  Any additional arguments will be passed
758 verbatim to the called subref as arguments 2 and onwards.
759
760 Using this (instead of $self->_dbh or $self->dbh) ensures correct
761 exception handling and reconnection (or failover in future subclasses).
762
763 Your subref should have no side-effects outside of the database, as
764 there is the potential for your subref to be partially double-executed
765 if the database connection was stale/dysfunctional.
766
767 Example:
768
769   my @stuff = $schema->storage->dbh_do(
770     sub {
771       my ($storage, $dbh, @cols) = @_;
772       my $cols = join(q{, }, @cols);
773       $dbh->selectrow_array("SELECT $cols FROM foo");
774     },
775     @column_list
776   );
777
778 =cut
779
780 sub dbh_do {
781   my $self = shift;
782   my $code = shift;
783
784   my $dbh = $self->_dbh;
785
786   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
787       || $self->{transaction_depth};
788
789   local $self->{_in_dbh_do} = 1;
790
791   my @result;
792   my $want_array = wantarray;
793
794   eval {
795     $self->_verify_pid if $dbh;
796     if(!$self->_dbh) {
797         $self->_populate_dbh;
798         $dbh = $self->_dbh;
799     }
800
801     if($want_array) {
802         @result = $self->$code($dbh, @_);
803     }
804     elsif(defined $want_array) {
805         $result[0] = $self->$code($dbh, @_);
806     }
807     else {
808         $self->$code($dbh, @_);
809     }
810   };
811
812   my $exception = $@;
813   if(!$exception) { return $want_array ? @result : $result[0] }
814
815   $self->throw_exception($exception) if $self->connected;
816
817   # We were not connected - reconnect and retry, but let any
818   #  exception fall right through this time
819   $self->_populate_dbh;
820   $self->$code($self->_dbh, @_);
821 }
822
823 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
824 # It also informs dbh_do to bypass itself while under the direction of txn_do,
825 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
826 sub txn_do {
827   my $self = shift;
828   my $coderef = shift;
829
830   ref $coderef eq 'CODE' or $self->throw_exception
831     ('$coderef must be a CODE reference');
832
833   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
834
835   local $self->{_in_dbh_do} = 1;
836
837   my @result;
838   my $want_array = wantarray;
839
840   my $tried = 0;
841   while(1) {
842     eval {
843       $self->_verify_pid if $self->_dbh;
844       $self->_populate_dbh if !$self->_dbh;
845
846       $self->txn_begin;
847       if($want_array) {
848           @result = $coderef->(@_);
849       }
850       elsif(defined $want_array) {
851           $result[0] = $coderef->(@_);
852       }
853       else {
854           $coderef->(@_);
855       }
856       $self->txn_commit;
857     };
858
859     my $exception = $@;
860     if(!$exception) { return $want_array ? @result : $result[0] }
861
862     if($tried++ > 0 || $self->connected) {
863       eval { $self->txn_rollback };
864       my $rollback_exception = $@;
865       if($rollback_exception) {
866         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
867         $self->throw_exception($exception)  # propagate nested rollback
868           if $rollback_exception =~ /$exception_class/;
869
870         $self->throw_exception(
871           "Transaction aborted: ${exception}. "
872           . "Rollback failed: ${rollback_exception}"
873         );
874       }
875       $self->throw_exception($exception)
876     }
877
878     # We were not connected, and was first try - reconnect and retry
879     # via the while loop
880     $self->_populate_dbh;
881   }
882 }
883
884 =head2 disconnect
885
886 Our C<disconnect> method also performs a rollback first if the
887 database is not in C<AutoCommit> mode.
888
889 =cut
890
891 sub disconnect {
892   my ($self) = @_;
893
894   if( $self->connected ) {
895     my $connection_do = $self->on_disconnect_do;
896     $self->_do_connection_actions($connection_do) if ref($connection_do);
897
898     $self->_dbh->rollback unless $self->_dbh_autocommit;
899     $self->_dbh->disconnect;
900     $self->_dbh(undef);
901     $self->{_dbh_gen}++;
902   }
903 }
904
905 =head2 with_deferred_fk_checks
906
907 =over 4
908
909 =item Arguments: C<$coderef>
910
911 =item Return Value: The return value of $coderef
912
913 =back
914
915 Storage specific method to run the code ref with FK checks deferred or
916 in MySQL's case disabled entirely.
917
918 =cut
919
920 # Storage subclasses should override this
921 sub with_deferred_fk_checks {
922   my ($self, $sub) = @_;
923
924   $sub->();
925 }
926
927 sub connected {
928   my ($self) = @_;
929
930   if(my $dbh = $self->_dbh) {
931       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
932           $self->_dbh(undef);
933           $self->{_dbh_gen}++;
934           return;
935       }
936       else {
937           $self->_verify_pid;
938           return 0 if !$self->_dbh;
939       }
940       return ($dbh->FETCH('Active') && $dbh->ping);
941   }
942
943   return 0;
944 }
945
946 # handle pid changes correctly
947 #  NOTE: assumes $self->_dbh is a valid $dbh
948 sub _verify_pid {
949   my ($self) = @_;
950
951   return if defined $self->_conn_pid && $self->_conn_pid == $$;
952
953   $self->_dbh->{InactiveDestroy} = 1;
954   $self->_dbh(undef);
955   $self->{_dbh_gen}++;
956
957   return;
958 }
959
960 sub ensure_connected {
961   my ($self) = @_;
962
963   unless ($self->connected) {
964     $self->_populate_dbh;
965   }
966 }
967
968 =head2 dbh
969
970 Returns the dbh - a data base handle of class L<DBI>.
971
972 =cut
973
974 sub dbh {
975   my ($self) = @_;
976
977   $self->ensure_connected;
978   return $self->_dbh;
979 }
980
981 sub _sql_maker_args {
982     my ($self) = @_;
983     
984     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
985 }
986
987 sub sql_maker {
988   my ($self) = @_;
989   unless ($self->_sql_maker) {
990     my $sql_maker_class = $self->sql_maker_class;
991     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
992   }
993   return $self->_sql_maker;
994 }
995
996 sub _rebless {}
997
998 sub _populate_dbh {
999   my ($self) = @_;
1000   my @info = @{$self->_dbi_connect_info || []};
1001   $self->_dbh($self->_connect(@info));
1002
1003   # Always set the transaction depth on connect, since
1004   #  there is no transaction in progress by definition
1005   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1006
1007   if(ref $self eq 'DBIx::Class::Storage::DBI') {
1008     my $driver = $self->_dbh->{Driver}->{Name};
1009     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
1010       bless $self, "DBIx::Class::Storage::DBI::${driver}";
1011       $self->_rebless();
1012     }
1013   }
1014
1015   $self->_conn_pid($$);
1016   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1017
1018   my $connection_do = $self->on_connect_do;
1019   $self->_do_connection_actions($connection_do) if ref($connection_do);
1020 }
1021
1022 sub _do_connection_actions {
1023   my $self = shift;
1024   my $connection_do = shift;
1025
1026   if (ref $connection_do eq 'ARRAY') {
1027     $self->_do_query($_) foreach @$connection_do;
1028   }
1029   elsif (ref $connection_do eq 'CODE') {
1030     $connection_do->($self);
1031   }
1032
1033   return $self;
1034 }
1035
1036 sub _do_query {
1037   my ($self, $action) = @_;
1038
1039   if (ref $action eq 'CODE') {
1040     $action = $action->($self);
1041     $self->_do_query($_) foreach @$action;
1042   }
1043   else {
1044     # Most debuggers expect ($sql, @bind), so we need to exclude
1045     # the attribute hash which is the second argument to $dbh->do
1046     # furthermore the bind values are usually to be presented
1047     # as named arrayref pairs, so wrap those here too
1048     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1049     my $sql = shift @do_args;
1050     my $attrs = shift @do_args;
1051     my @bind = map { [ undef, $_ ] } @do_args;
1052
1053     $self->_query_start($sql, @bind);
1054     $self->_dbh->do($sql, $attrs, @do_args);
1055     $self->_query_end($sql, @bind);
1056   }
1057
1058   return $self;
1059 }
1060
1061 sub _connect {
1062   my ($self, @info) = @_;
1063
1064   $self->throw_exception("You failed to provide any connection info")
1065     if !@info;
1066
1067   my ($old_connect_via, $dbh);
1068
1069   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1070     $old_connect_via = $DBI::connect_via;
1071     $DBI::connect_via = 'connect';
1072   }
1073
1074   eval {
1075     if(ref $info[0] eq 'CODE') {
1076        $dbh = &{$info[0]}
1077     }
1078     else {
1079        $dbh = DBI->connect(@info);
1080     }
1081
1082     if($dbh && !$self->unsafe) {
1083       my $weak_self = $self;
1084       weaken($weak_self);
1085       $dbh->{HandleError} = sub {
1086           if ($weak_self) {
1087             $weak_self->throw_exception("DBI Exception: $_[0]");
1088           }
1089           else {
1090             croak ("DBI Exception: $_[0]");
1091           }
1092       };
1093       $dbh->{ShowErrorStatement} = 1;
1094       $dbh->{RaiseError} = 1;
1095       $dbh->{PrintError} = 0;
1096     }
1097   };
1098
1099   $DBI::connect_via = $old_connect_via if $old_connect_via;
1100
1101   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1102     if !$dbh || $@;
1103
1104   $self->_dbh_autocommit($dbh->{AutoCommit});
1105
1106   $dbh;
1107 }
1108
1109 sub svp_begin {
1110   my ($self, $name) = @_;
1111
1112   $name = $self->_svp_generate_name
1113     unless defined $name;
1114
1115   $self->throw_exception ("You can't use savepoints outside a transaction")
1116     if $self->{transaction_depth} == 0;
1117
1118   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1119     unless $self->can('_svp_begin');
1120   
1121   push @{ $self->{savepoints} }, $name;
1122
1123   $self->debugobj->svp_begin($name) if $self->debug;
1124   
1125   return $self->_svp_begin($name);
1126 }
1127
1128 sub svp_release {
1129   my ($self, $name) = @_;
1130
1131   $self->throw_exception ("You can't use savepoints outside a transaction")
1132     if $self->{transaction_depth} == 0;
1133
1134   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1135     unless $self->can('_svp_release');
1136
1137   if (defined $name) {
1138     $self->throw_exception ("Savepoint '$name' does not exist")
1139       unless grep { $_ eq $name } @{ $self->{savepoints} };
1140
1141     # Dig through the stack until we find the one we are releasing.  This keeps
1142     # the stack up to date.
1143     my $svp;
1144
1145     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1146   } else {
1147     $name = pop @{ $self->{savepoints} };
1148   }
1149
1150   $self->debugobj->svp_release($name) if $self->debug;
1151
1152   return $self->_svp_release($name);
1153 }
1154
1155 sub svp_rollback {
1156   my ($self, $name) = @_;
1157
1158   $self->throw_exception ("You can't use savepoints outside a transaction")
1159     if $self->{transaction_depth} == 0;
1160
1161   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1162     unless $self->can('_svp_rollback');
1163
1164   if (defined $name) {
1165       # If they passed us a name, verify that it exists in the stack
1166       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1167           $self->throw_exception("Savepoint '$name' does not exist!");
1168       }
1169
1170       # Dig through the stack until we find the one we are releasing.  This keeps
1171       # the stack up to date.
1172       while(my $s = pop(@{ $self->{savepoints} })) {
1173           last if($s eq $name);
1174       }
1175       # Add the savepoint back to the stack, as a rollback doesn't remove the
1176       # named savepoint, only everything after it.
1177       push(@{ $self->{savepoints} }, $name);
1178   } else {
1179       # We'll assume they want to rollback to the last savepoint
1180       $name = $self->{savepoints}->[-1];
1181   }
1182
1183   $self->debugobj->svp_rollback($name) if $self->debug;
1184   
1185   return $self->_svp_rollback($name);
1186 }
1187
1188 sub _svp_generate_name {
1189     my ($self) = @_;
1190
1191     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1192 }
1193
1194 sub txn_begin {
1195   my $self = shift;
1196   $self->ensure_connected();
1197   if($self->{transaction_depth} == 0) {
1198     $self->debugobj->txn_begin()
1199       if $self->debug;
1200     # this isn't ->_dbh-> because
1201     #  we should reconnect on begin_work
1202     #  for AutoCommit users
1203     $self->dbh->begin_work;
1204   } elsif ($self->auto_savepoint) {
1205     $self->svp_begin;
1206   }
1207   $self->{transaction_depth}++;
1208 }
1209
1210 sub txn_commit {
1211   my $self = shift;
1212   if ($self->{transaction_depth} == 1) {
1213     my $dbh = $self->_dbh;
1214     $self->debugobj->txn_commit()
1215       if ($self->debug);
1216     $dbh->commit;
1217     $self->{transaction_depth} = 0
1218       if $self->_dbh_autocommit;
1219   }
1220   elsif($self->{transaction_depth} > 1) {
1221     $self->{transaction_depth}--;
1222     $self->svp_release
1223       if $self->auto_savepoint;
1224   }
1225 }
1226
1227 sub txn_rollback {
1228   my $self = shift;
1229   my $dbh = $self->_dbh;
1230   eval {
1231     if ($self->{transaction_depth} == 1) {
1232       $self->debugobj->txn_rollback()
1233         if ($self->debug);
1234       $self->{transaction_depth} = 0
1235         if $self->_dbh_autocommit;
1236       $dbh->rollback;
1237     }
1238     elsif($self->{transaction_depth} > 1) {
1239       $self->{transaction_depth}--;
1240       if ($self->auto_savepoint) {
1241         $self->svp_rollback;
1242         $self->svp_release;
1243       }
1244     }
1245     else {
1246       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1247     }
1248   };
1249   if ($@) {
1250     my $error = $@;
1251     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1252     $error =~ /$exception_class/ and $self->throw_exception($error);
1253     # ensure that a failed rollback resets the transaction depth
1254     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1255     $self->throw_exception($error);
1256   }
1257 }
1258
1259 # This used to be the top-half of _execute.  It was split out to make it
1260 #  easier to override in NoBindVars without duping the rest.  It takes up
1261 #  all of _execute's args, and emits $sql, @bind.
1262 sub _prep_for_execute {
1263   my ($self, $op, $extra_bind, $ident, $args) = @_;
1264
1265   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1266     $ident = $ident->from();
1267   }
1268
1269   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1270
1271   unshift(@bind,
1272     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1273       if $extra_bind;
1274   return ($sql, \@bind);
1275 }
1276
1277 sub _fix_bind_params {
1278     my ($self, @bind) = @_;
1279
1280     ### Turn @bind from something like this:
1281     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1282     ### to this:
1283     ###   ( "'1'", "'1'", "'3'" )
1284     return
1285         map {
1286             if ( defined( $_ && $_->[1] ) ) {
1287                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1288             }
1289             else { q{'NULL'}; }
1290         } @bind;
1291 }
1292
1293 sub _query_start {
1294     my ( $self, $sql, @bind ) = @_;
1295
1296     if ( $self->debug ) {
1297         @bind = $self->_fix_bind_params(@bind);
1298         
1299         $self->debugobj->query_start( $sql, @bind );
1300     }
1301 }
1302
1303 sub _query_end {
1304     my ( $self, $sql, @bind ) = @_;
1305
1306     if ( $self->debug ) {
1307         @bind = $self->_fix_bind_params(@bind);
1308         $self->debugobj->query_end( $sql, @bind );
1309     }
1310 }
1311
1312 sub _dbh_execute {
1313   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1314
1315   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1316
1317   $self->_query_start( $sql, @$bind );
1318
1319   my $sth = $self->sth($sql,$op);
1320
1321   my $placeholder_index = 1; 
1322
1323   foreach my $bound (@$bind) {
1324     my $attributes = {};
1325     my($column_name, @data) = @$bound;
1326
1327     if ($bind_attributes) {
1328       $attributes = $bind_attributes->{$column_name}
1329       if defined $bind_attributes->{$column_name};
1330     }
1331
1332     foreach my $data (@data) {
1333       my $ref = ref $data;
1334       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1335
1336       $sth->bind_param($placeholder_index, $data, $attributes);
1337       $placeholder_index++;
1338     }
1339   }
1340
1341   # Can this fail without throwing an exception anyways???
1342   my $rv = $sth->execute();
1343   $self->throw_exception($sth->errstr) if !$rv;
1344
1345   $self->_query_end( $sql, @$bind );
1346
1347   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1348 }
1349
1350 sub _execute {
1351     my $self = shift;
1352     $self->dbh_do('_dbh_execute', @_)
1353 }
1354
1355 sub insert {
1356   my ($self, $source, $to_insert) = @_;
1357   
1358   my $ident = $source->from; 
1359   my $bind_attributes = $self->source_bind_attributes($source);
1360
1361   my $updated_cols = {};
1362
1363   $self->ensure_connected;
1364   foreach my $col ( $source->columns ) {
1365     if ( !defined $to_insert->{$col} ) {
1366       my $col_info = $source->column_info($col);
1367
1368       if ( $col_info->{auto_nextval} ) {
1369         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1370       }
1371     }
1372   }
1373
1374   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1375
1376   return $updated_cols;
1377 }
1378
1379 ## Still not quite perfect, and EXPERIMENTAL
1380 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1381 ## scalar refs, or at least, all the same type as the first set, the statement is
1382 ## only prepped once.
1383 sub insert_bulk {
1384   my ($self, $source, $cols, $data) = @_;
1385   my %colvalues;
1386   my $table = $source->from;
1387   @colvalues{@$cols} = (0..$#$cols);
1388   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1389   
1390   $self->_query_start( $sql, @bind );
1391   my $sth = $self->sth($sql);
1392
1393 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1394
1395   ## This must be an arrayref, else nothing works!
1396   
1397   my $tuple_status = [];
1398   
1399   ##use Data::Dumper;
1400   ##print STDERR Dumper( $data, $sql, [@bind] );
1401
1402   my $time = time();
1403
1404   ## Get the bind_attributes, if any exist
1405   my $bind_attributes = $self->source_bind_attributes($source);
1406
1407   ## Bind the values and execute
1408   my $placeholder_index = 1; 
1409
1410   foreach my $bound (@bind) {
1411
1412     my $attributes = {};
1413     my ($column_name, $data_index) = @$bound;
1414
1415     if( $bind_attributes ) {
1416       $attributes = $bind_attributes->{$column_name}
1417       if defined $bind_attributes->{$column_name};
1418     }
1419
1420     my @data = map { $_->[$data_index] } @$data;
1421
1422     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1423     $placeholder_index++;
1424   }
1425   my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
1426   $self->throw_exception($sth->errstr) if !$rv;
1427
1428   $self->_query_end( $sql, @bind );
1429   return (wantarray ? ($rv, $sth, @bind) : $rv);
1430 }
1431
1432 sub update {
1433   my $self = shift @_;
1434   my $source = shift @_;
1435   my $bind_attributes = $self->source_bind_attributes($source);
1436   
1437   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1438 }
1439
1440
1441 sub delete {
1442   my $self = shift @_;
1443   my $source = shift @_;
1444   
1445   my $bind_attrs = {}; ## If ever it's needed...
1446   
1447   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1448 }
1449
1450 sub _select {
1451   my $self = shift;
1452   my $sql_maker = $self->sql_maker;
1453   local $sql_maker->{for};
1454   return $self->_execute($self->_select_args(@_));
1455 }
1456
1457 sub _select_args {
1458   my ($self, $ident, $select, $condition, $attrs) = @_;
1459   my $order = $attrs->{order_by};
1460
1461   my $for = delete $attrs->{for};
1462   my $sql_maker = $self->sql_maker;
1463   $sql_maker->{for} = $for;
1464
1465   if (exists $attrs->{group_by} || $attrs->{having}) {
1466     $order = {
1467       group_by => $attrs->{group_by},
1468       having => $attrs->{having},
1469       ($order ? (order_by => $order) : ())
1470     };
1471   }
1472   my $bind_attrs = {}; ## Future support
1473   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1474   if ($attrs->{software_limit} ||
1475       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1476         $attrs->{software_limit} = 1;
1477   } else {
1478     $self->throw_exception("rows attribute must be positive if present")
1479       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1480
1481     # MySQL actually recommends this approach.  I cringe.
1482     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1483     push @args, $attrs->{rows}, $attrs->{offset};
1484   }
1485   return @args;
1486 }
1487
1488 sub source_bind_attributes {
1489   my ($self, $source) = @_;
1490   
1491   my $bind_attributes;
1492   foreach my $column ($source->columns) {
1493   
1494     my $data_type = $source->column_info($column)->{data_type} || '';
1495     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1496      if $data_type;
1497   }
1498
1499   return $bind_attributes;
1500 }
1501
1502 =head2 select
1503
1504 =over 4
1505
1506 =item Arguments: $ident, $select, $condition, $attrs
1507
1508 =back
1509
1510 Handle a SQL select statement.
1511
1512 =cut
1513
1514 sub select {
1515   my $self = shift;
1516   my ($ident, $select, $condition, $attrs) = @_;
1517   return $self->cursor_class->new($self, \@_, $attrs);
1518 }
1519
1520 sub select_single {
1521   my $self = shift;
1522   my ($rv, $sth, @bind) = $self->_select(@_);
1523   my @row = $sth->fetchrow_array;
1524   my @nextrow = $sth->fetchrow_array if @row;
1525   if(@row && @nextrow) {
1526     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1527   }
1528   # Need to call finish() to work round broken DBDs
1529   $sth->finish();
1530   return @row;
1531 }
1532
1533 =head2 sth
1534
1535 =over 4
1536
1537 =item Arguments: $sql
1538
1539 =back
1540
1541 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1542
1543 =cut
1544
1545 sub _dbh_sth {
1546   my ($self, $dbh, $sql) = @_;
1547
1548   # 3 is the if_active parameter which avoids active sth re-use
1549   my $sth = $self->disable_sth_caching
1550     ? $dbh->prepare($sql)
1551     : $dbh->prepare_cached($sql, {}, 3);
1552
1553   # XXX You would think RaiseError would make this impossible,
1554   #  but apparently that's not true :(
1555   $self->throw_exception($dbh->errstr) if !$sth;
1556
1557   $sth;
1558 }
1559
1560 sub sth {
1561   my ($self, $sql) = @_;
1562   $self->dbh_do('_dbh_sth', $sql);
1563 }
1564
1565 sub _dbh_columns_info_for {
1566   my ($self, $dbh, $table) = @_;
1567
1568   if ($dbh->can('column_info')) {
1569     my %result;
1570     eval {
1571       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1572       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1573       $sth->execute();
1574       while ( my $info = $sth->fetchrow_hashref() ){
1575         my %column_info;
1576         $column_info{data_type}   = $info->{TYPE_NAME};
1577         $column_info{size}      = $info->{COLUMN_SIZE};
1578         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1579         $column_info{default_value} = $info->{COLUMN_DEF};
1580         my $col_name = $info->{COLUMN_NAME};
1581         $col_name =~ s/^\"(.*)\"$/$1/;
1582
1583         $result{$col_name} = \%column_info;
1584       }
1585     };
1586     return \%result if !$@ && scalar keys %result;
1587   }
1588
1589   my %result;
1590   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1591   $sth->execute;
1592   my @columns = @{$sth->{NAME_lc}};
1593   for my $i ( 0 .. $#columns ){
1594     my %column_info;
1595     $column_info{data_type} = $sth->{TYPE}->[$i];
1596     $column_info{size} = $sth->{PRECISION}->[$i];
1597     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1598
1599     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1600       $column_info{data_type} = $1;
1601       $column_info{size}    = $2;
1602     }
1603
1604     $result{$columns[$i]} = \%column_info;
1605   }
1606   $sth->finish;
1607
1608   foreach my $col (keys %result) {
1609     my $colinfo = $result{$col};
1610     my $type_num = $colinfo->{data_type};
1611     my $type_name;
1612     if(defined $type_num && $dbh->can('type_info')) {
1613       my $type_info = $dbh->type_info($type_num);
1614       $type_name = $type_info->{TYPE_NAME} if $type_info;
1615       $colinfo->{data_type} = $type_name if $type_name;
1616     }
1617   }
1618
1619   return \%result;
1620 }
1621
1622 sub columns_info_for {
1623   my ($self, $table) = @_;
1624   $self->dbh_do('_dbh_columns_info_for', $table);
1625 }
1626
1627 =head2 last_insert_id
1628
1629 Return the row id of the last insert.
1630
1631 =cut
1632
1633 sub _dbh_last_insert_id {
1634     # All Storage's need to register their own _dbh_last_insert_id
1635     # the old SQLite-based method was highly inappropriate
1636
1637     my $self = shift;
1638     my $class = ref $self;
1639     $self->throw_exception (<<EOE);
1640
1641 No _dbh_last_insert_id() method found in $class.
1642 Since the method of obtaining the autoincrement id of the last insert
1643 operation varies greatly between different databases, this method must be
1644 individually implemented for every storage class.
1645 EOE
1646 }
1647
1648 sub last_insert_id {
1649   my $self = shift;
1650   $self->dbh_do('_dbh_last_insert_id', @_);
1651 }
1652
1653 =head2 sqlt_type
1654
1655 Returns the database driver name.
1656
1657 =cut
1658
1659 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1660
1661 =head2 bind_attribute_by_data_type
1662
1663 Given a datatype from column info, returns a database specific bind
1664 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1665 let the database planner just handle it.
1666
1667 Generally only needed for special case column types, like bytea in postgres.
1668
1669 =cut
1670
1671 sub bind_attribute_by_data_type {
1672     return;
1673 }
1674
1675 =head2 create_ddl_dir
1676
1677 =over 4
1678
1679 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1680
1681 =back
1682
1683 Creates a SQL file based on the Schema, for each of the specified
1684 database types, in the given directory.
1685
1686 By default, C<\%sqlt_args> will have
1687
1688  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1689
1690 merged with the hash passed in. To disable any of those features, pass in a 
1691 hashref like the following
1692
1693  { ignore_constraint_names => 0, # ... other options }
1694
1695 =cut
1696
1697 sub create_ddl_dir {
1698   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1699
1700   if(!$dir || !-d $dir) {
1701     warn "No directory given, using ./\n";
1702     $dir = "./";
1703   }
1704   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1705   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1706
1707   my $schema_version = $schema->schema_version || '1.x';
1708   $version ||= $schema_version;
1709
1710   $sqltargs = {
1711     add_drop_table => 1, 
1712     ignore_constraint_names => 1,
1713     ignore_index_names => 1,
1714     %{$sqltargs || {}}
1715   };
1716
1717   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1718       . $self->_check_sqlt_message . q{'})
1719           if !$self->_check_sqlt_version;
1720
1721   my $sqlt = SQL::Translator->new( $sqltargs );
1722
1723   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1724   my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
1725
1726   foreach my $db (@$databases) {
1727     $sqlt->reset();
1728     $sqlt->{schema} = $sqlt_schema;
1729     $sqlt->producer($db);
1730
1731     my $file;
1732     my $filename = $schema->ddl_filename($db, $version, $dir);
1733     if (-e $filename && ($version eq $schema_version )) {
1734       # if we are dumping the current version, overwrite the DDL
1735       warn "Overwriting existing DDL file - $filename";
1736       unlink($filename);
1737     }
1738
1739     my $output = $sqlt->translate;
1740     if(!$output) {
1741       warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1742       next;
1743     }
1744     if(!open($file, ">$filename")) {
1745       $self->throw_exception("Can't open $filename for writing ($!)");
1746       next;
1747     }
1748     print $file $output;
1749     close($file);
1750   
1751     next unless ($preversion);
1752
1753     require SQL::Translator::Diff;
1754
1755     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1756     if(!-e $prefilename) {
1757       warn("No previous schema file found ($prefilename)");
1758       next;
1759     }
1760
1761     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1762     if(-e $difffile) {
1763       warn("Overwriting existing diff file - $difffile");
1764       unlink($difffile);
1765     }
1766     
1767     my $source_schema;
1768     {
1769       my $t = SQL::Translator->new($sqltargs);
1770       $t->debug( 0 );
1771       $t->trace( 0 );
1772       $t->parser( $db )                       or die $t->error;
1773       my $out = $t->translate( $prefilename ) or die $t->error;
1774       $source_schema = $t->schema;
1775       unless ( $source_schema->name ) {
1776         $source_schema->name( $prefilename );
1777       }
1778     }
1779
1780     # The "new" style of producers have sane normalization and can support 
1781     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1782     # And we have to diff parsed SQL against parsed SQL.
1783     my $dest_schema = $sqlt_schema;
1784     
1785     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1786       my $t = SQL::Translator->new($sqltargs);
1787       $t->debug( 0 );
1788       $t->trace( 0 );
1789       $t->parser( $db )                    or die $t->error;
1790       my $out = $t->translate( $filename ) or die $t->error;
1791       $dest_schema = $t->schema;
1792       $dest_schema->name( $filename )
1793         unless $dest_schema->name;
1794     }
1795     
1796     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1797                                                   $dest_schema,   $db,
1798                                                   $sqltargs
1799                                                  );
1800     if(!open $file, ">$difffile") { 
1801       $self->throw_exception("Can't write to $difffile ($!)");
1802       next;
1803     }
1804     print $file $diff;
1805     close($file);
1806   }
1807 }
1808
1809 =head2 deployment_statements
1810
1811 =over 4
1812
1813 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1814
1815 =back
1816
1817 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1818 The database driver name is given by C<$type>, though the value from
1819 L</sqlt_type> is used if it is not specified.
1820
1821 C<$directory> is used to return statements from files in a previously created
1822 L</create_ddl_dir> directory and is optional. The filenames are constructed
1823 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1824
1825 If no C<$directory> is specified then the statements are constructed on the
1826 fly using L<SQL::Translator> and C<$version> is ignored.
1827
1828 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1829
1830 =cut
1831
1832 sub deployment_statements {
1833   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1834   # Need to be connected to get the correct sqlt_type
1835   $self->ensure_connected() unless $type;
1836   $type ||= $self->sqlt_type;
1837   $version ||= $schema->schema_version || '1.x';
1838   $dir ||= './';
1839   my $filename = $schema->ddl_filename($type, $version, $dir);
1840   if(-f $filename)
1841   {
1842       my $file;
1843       open($file, "<$filename") 
1844         or $self->throw_exception("Can't open $filename ($!)");
1845       my @rows = <$file>;
1846       close($file);
1847       return join('', @rows);
1848   }
1849
1850   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
1851       . $self->_check_sqlt_message . q{'})
1852           if !$self->_check_sqlt_version;
1853
1854   require SQL::Translator::Parser::DBIx::Class;
1855   eval qq{use SQL::Translator::Producer::${type}};
1856   $self->throw_exception($@) if $@;
1857
1858   # sources needs to be a parser arg, but for simplicty allow at top level 
1859   # coming in
1860   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1861       if exists $sqltargs->{sources};
1862
1863   my $tr = SQL::Translator->new(%$sqltargs);
1864   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1865   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1866 }
1867
1868 sub deploy {
1869   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1870   my $deploy = sub {
1871     my $line = shift;
1872     return if($line =~ /^--/);
1873     return if(!$line);
1874     # next if($line =~ /^DROP/m);
1875     return if($line =~ /^BEGIN TRANSACTION/m);
1876     return if($line =~ /^COMMIT/m);
1877     return if $line =~ /^\s+$/; # skip whitespace only
1878     $self->_query_start($line);
1879     eval {
1880       $self->dbh->do($line); # shouldn't be using ->dbh ?
1881     };
1882     if ($@) {
1883       warn qq{$@ (running "${line}")};
1884     }
1885     $self->_query_end($line);
1886   };
1887   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } );
1888   if (@statements > 1) {
1889     foreach my $statement (@statements) {
1890       $deploy->( $statement );
1891     }
1892   }
1893   elsif (@statements == 1) {
1894     foreach my $line ( split(";\n", $statements[0])) {
1895       $deploy->( $line );
1896     }
1897   }
1898 }
1899
1900 =head2 datetime_parser
1901
1902 Returns the datetime parser class
1903
1904 =cut
1905
1906 sub datetime_parser {
1907   my $self = shift;
1908   return $self->{datetime_parser} ||= do {
1909     $self->ensure_connected;
1910     $self->build_datetime_parser(@_);
1911   };
1912 }
1913
1914 =head2 datetime_parser_type
1915
1916 Defines (returns) the datetime parser class - currently hardwired to
1917 L<DateTime::Format::MySQL>
1918
1919 =cut
1920
1921 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1922
1923 =head2 build_datetime_parser
1924
1925 See L</datetime_parser>
1926
1927 =cut
1928
1929 sub build_datetime_parser {
1930   my $self = shift;
1931   my $type = $self->datetime_parser_type(@_);
1932   eval "use ${type}";
1933   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1934   return $type;
1935 }
1936
1937 {
1938     my $_check_sqlt_version; # private
1939     my $_check_sqlt_message; # private
1940     sub _check_sqlt_version {
1941         return $_check_sqlt_version if defined $_check_sqlt_version;
1942         eval 'use SQL::Translator "0.09003"';
1943         $_check_sqlt_message = $@ || '';
1944         $_check_sqlt_version = !$@;
1945     }
1946
1947     sub _check_sqlt_message {
1948         _check_sqlt_version if !defined $_check_sqlt_message;
1949         $_check_sqlt_message;
1950     }
1951 }
1952
1953 =head2 is_replicating
1954
1955 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
1956 replicate from a master database.  Default is undef, which is the result
1957 returned by databases that don't support replication.
1958
1959 =cut
1960
1961 sub is_replicating {
1962     return;
1963     
1964 }
1965
1966 =head2 lag_behind_master
1967
1968 Returns a number that represents a certain amount of lag behind a master db
1969 when a given storage is replicating.  The number is database dependent, but
1970 starts at zero and increases with the amount of lag. Default in undef
1971
1972 =cut
1973
1974 sub lag_behind_master {
1975     return;
1976 }
1977
1978 sub DESTROY {
1979   my $self = shift;
1980   return if !$self->_dbh;
1981   $self->_verify_pid;
1982   $self->_dbh(undef);
1983 }
1984
1985 1;
1986
1987 =head1 USAGE NOTES
1988
1989 =head2 DBIx::Class and AutoCommit
1990
1991 DBIx::Class can do some wonderful magic with handling exceptions,
1992 disconnections, and transactions when you use C<< AutoCommit => 1 >>
1993 combined with C<txn_do> for transaction support.
1994
1995 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
1996 in an assumed transaction between commits, and you're telling us you'd
1997 like to manage that manually.  A lot of the magic protections offered by
1998 this module will go away.  We can't protect you from exceptions due to database
1999 disconnects because we don't know anything about how to restart your
2000 transactions.  You're on your own for handling all sorts of exceptional
2001 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2002 be with raw DBI.
2003
2004
2005 =head1 SQL METHODS
2006
2007 The module defines a set of methods within the DBIC::SQL::Abstract
2008 namespace.  These build on L<SQL::Abstract::Limit> to provide the
2009 SQL query functions.
2010
2011 The following methods are extended:-
2012
2013 =over 4
2014
2015 =item delete
2016
2017 =item insert
2018
2019 =item select
2020
2021 =item update
2022
2023 =item limit_dialect
2024
2025 See L</connect_info> for details.
2026
2027 =item quote_char
2028
2029 See L</connect_info> for details.
2030
2031 =item name_sep
2032
2033 See L</connect_info> for details.
2034
2035 =back
2036
2037 =head1 AUTHORS
2038
2039 Matt S. Trout <mst@shadowcatsystems.co.uk>
2040
2041 Andy Grundman <andy@hybridized.org>
2042
2043 =head1 LICENSE
2044
2045 You may distribute this code under the same terms as Perl itself.
2046
2047 =cut