09dc92a78698a3a62b71b9c04e335397dd7ea3d5
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use SQL::Abstract::Limit;
11 use DBIx::Class::Storage::DBI::Cursor;
12 use DBIx::Class::Storage::Statistics;
13 use Scalar::Util qw/blessed weaken/;
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
18 );
19
20 # the values for these accessors are picked out (and deleted) from
21 # the attribute hashref passed to connect_info
22 my @storage_options = qw/
23   on_connect_do on_disconnect_do disable_sth_caching unsafe auto_savepoint
24 /;
25 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
26
27
28 # default cursor class, overridable in connect_info attributes
29 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
30
31 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
32 __PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
33
34 BEGIN {
35
36 package # Hide from PAUSE
37   DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
38
39 use base qw/SQL::Abstract::Limit/;
40
41 # This prevents the caching of $dbh in S::A::L, I believe
42 sub new {
43   my $self = shift->SUPER::new(@_);
44
45   # If limit_dialect is a ref (like a $dbh), go ahead and replace
46   #   it with what it resolves to:
47   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
48     if ref $self->{limit_dialect};
49
50   $self;
51 }
52
53 # DB2 is the only remaining DB using this. Even though we are not sure if
54 # RowNumberOver is still needed here (should be part of SQLA) leave the 
55 # code in place
56 sub _RowNumberOver {
57   my ($self, $sql, $order, $rows, $offset ) = @_;
58
59   $offset += 1;
60   my $last = $rows + $offset;
61   my ( $order_by ) = $self->_order_by( $order );
62
63   $sql = <<"SQL";
64 SELECT * FROM
65 (
66    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
67       $sql
68       $order_by
69    ) Q1
70 ) Q2
71 WHERE ROW_NUM BETWEEN $offset AND $last
72
73 SQL
74
75   return $sql;
76 }
77
78
79 # While we're at it, this should make LIMIT queries more efficient,
80 #  without digging into things too deeply
81 use Scalar::Util 'blessed';
82 sub _find_syntax {
83   my ($self, $syntax) = @_;
84   
85   # DB2 is the only remaining DB using this. Even though we are not sure if
86   # RowNumberOver is still needed here (should be part of SQLA) leave the 
87   # code in place
88   my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
89   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
90     return 'RowNumberOver';
91   }
92   
93   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
94 }
95
96 sub select {
97   my ($self, $table, $fields, $where, $order, @rest) = @_;
98   local $self->{having_bind} = [];
99   if (ref $table eq 'SCALAR') {
100     $table = $$table;
101   }
102   elsif (ref $table eq 'HASH') {
103     ## what if they want to alias a sub query?
104   }
105   elsif (ref $table eq 'REF') {
106     #my ($sql, @bind) = @{${$t}}; push(@{$self->{having_bind}}, @bind;);
107     my $t = $table; 
108     $table = shift @$$t;
109     while (my $b = shift @$$t) { push @{$self->{having_bind}}, $b; }
110   }
111   elsif (not ref $table) {
112     $table = $self->_quote($table);
113   }
114   local $self->{rownum_hack_count} = 1
115     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
116   @rest = (-1) unless defined $rest[0];
117   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
118     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
119   my ($sql, @ret) = $self->SUPER::select(
120     $table, $self->_recurse_fields($fields), $where, $order, @rest
121   );
122   $sql .= 
123     $self->{for} ?
124     (
125       $self->{for} eq 'update' ? ' FOR UPDATE' :
126       $self->{for} eq 'shared' ? ' FOR SHARE'  :
127       ''
128     ) :
129     ''
130   ;
131   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
132 }
133
134 sub insert {
135   my $self = shift;
136   my $table = shift;
137   $table = $self->_quote($table) unless ref($table);
138   $self->SUPER::insert($table, @_);
139 }
140
141 sub update {
142   my $self = shift;
143   my $table = shift;
144   $table = $self->_quote($table) unless ref($table);
145   $self->SUPER::update($table, @_);
146 }
147
148 sub delete {
149   my $self = shift;
150   my $table = shift;
151   $table = $self->_quote($table) unless ref($table);
152   $self->SUPER::delete($table, @_);
153 }
154
155 sub _emulate_limit {
156   my $self = shift;
157   if ($_[3] == -1) {
158     return $_[1].$self->_order_by($_[2]);
159   } else {
160     return $self->SUPER::_emulate_limit(@_);
161   }
162 }
163
164 sub _recurse_fields {
165   my ($self, $fields, $params) = @_;
166   my $ref = ref $fields;
167   return $self->_quote($fields) unless $ref;
168   return $$fields if $ref eq 'SCALAR';
169
170   if ($ref eq 'ARRAY') {
171     return join(', ', map {
172       $self->_recurse_fields($_)
173         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
174           ? ' AS col'.$self->{rownum_hack_count}++
175           : '')
176       } @$fields);
177   } elsif ($ref eq 'HASH') {
178     foreach my $func (keys %$fields) {
179       return $self->_sqlcase($func)
180         .'( '.$self->_recurse_fields($fields->{$func}).' )';
181     }
182   }
183   # Is the second check absolutely necessary?
184   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
185     return $self->_bind_to_sql( $fields );
186   }
187   else {
188     Carp::croak($ref . qq{ unexpected in _recurse_fields()})
189   }
190 }
191
192 sub _order_by {
193   my $self = shift;
194   my $ret = '';
195   my @extra;
196   if (ref $_[0] eq 'HASH') {
197     if (defined $_[0]->{group_by}) {
198       $ret = $self->_sqlcase(' group by ')
199         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
200     }
201     if (defined $_[0]->{having}) {
202       my $frag;
203       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
204       push(@{$self->{having_bind}}, @extra);
205       $ret .= $self->_sqlcase(' having ').$frag;
206     }
207     if (defined $_[0]->{order_by}) {
208       $ret .= $self->_order_by($_[0]->{order_by});
209     }
210     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
211       return $self->SUPER::_order_by($_[0]);
212     }
213   } elsif (ref $_[0] eq 'SCALAR') {
214     $ret = $self->_sqlcase(' order by ').${ $_[0] };
215   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
216     my @order = @{+shift};
217     $ret = $self->_sqlcase(' order by ')
218           .join(', ', map {
219                         my $r = $self->_order_by($_, @_);
220                         $r =~ s/^ ?ORDER BY //i;
221                         $r;
222                       } @order);
223   } else {
224     $ret = $self->SUPER::_order_by(@_);
225   }
226   return $ret;
227 }
228
229 sub _order_directions {
230   my ($self, $order) = @_;
231   $order = $order->{order_by} if ref $order eq 'HASH';
232   return $self->SUPER::_order_directions($order);
233 }
234
235 sub _table {
236   my ($self, $from) = @_;
237   if (ref $from eq 'ARRAY') {
238     return $self->_recurse_from(@$from);
239   } elsif (ref $from eq 'HASH') {
240     return $self->_make_as($from);
241   } else {
242     return $from; # would love to quote here but _table ends up getting called
243                   # twice during an ->select without a limit clause due to
244                   # the way S::A::Limit->select works. should maybe consider
245                   # bypassing this and doing S::A::select($self, ...) in
246                   # our select method above. meantime, quoting shims have
247                   # been added to select/insert/update/delete here
248   }
249 }
250
251 sub _recurse_from {
252   my ($self, $from, @join) = @_;
253   my @sqlf;
254   push(@sqlf, $self->_make_as($from));
255   foreach my $j (@join) {
256     my ($to, $on) = @$j;
257
258     # check whether a join type exists
259     my $join_clause = '';
260     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
261     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
262       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
263     } else {
264       $join_clause = ' JOIN ';
265     }
266     push(@sqlf, $join_clause);
267
268     if (ref $to eq 'ARRAY') {
269       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
270     } else {
271       push(@sqlf, $self->_make_as($to));
272     }
273     push(@sqlf, ' ON ', $self->_join_condition($on));
274   }
275   return join('', @sqlf);
276 }
277
278 sub _bind_to_sql {
279   my $self = shift;
280   my $arr  = shift;
281   my $sql = shift @$$arr;
282   $sql =~ s/\?/$self->_quote((shift @$$arr)->[1])/eg;
283   return $sql
284 }
285
286 sub _make_as {
287   my ($self, $from) = @_;
288   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ 
289                         : ref $_ eq 'REF'    ? $self->_bind_to_sql($_) 
290                         : $self->_quote($_)) 
291                        } reverse each %{$self->_skip_options($from)});
292 }
293
294 sub _skip_options {
295   my ($self, $hash) = @_;
296   my $clean_hash = {};
297   $clean_hash->{$_} = $hash->{$_}
298     for grep {!/^-/} keys %$hash;
299   return $clean_hash;
300 }
301
302 sub _join_condition {
303   my ($self, $cond) = @_;
304   if (ref $cond eq 'HASH') {
305     my %j;
306     for (keys %$cond) {
307       my $v = $cond->{$_};
308       if (ref $v) {
309         # XXX no throw_exception() in this package and croak() fails with strange results
310         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
311             if ref($v) ne 'SCALAR';
312         $j{$_} = $v;
313       }
314       else {
315         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
316       }
317     };
318     return scalar($self->_recurse_where(\%j));
319   } elsif (ref $cond eq 'ARRAY') {
320     return join(' OR ', map { $self->_join_condition($_) } @$cond);
321   } else {
322     die "Can't handle this yet!";
323   }
324 }
325
326 sub _quote {
327   my ($self, $label) = @_;
328   return '' unless defined $label;
329   return "*" if $label eq '*';
330   return $label unless $self->{quote_char};
331   if(ref $self->{quote_char} eq "ARRAY"){
332     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
333       if !defined $self->{name_sep};
334     my $sep = $self->{name_sep};
335     return join($self->{name_sep},
336         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
337        split(/\Q$sep\E/,$label));
338   }
339   return $self->SUPER::_quote($label);
340 }
341
342 sub limit_dialect {
343     my $self = shift;
344     $self->{limit_dialect} = shift if @_;
345     return $self->{limit_dialect};
346 }
347
348 sub quote_char {
349     my $self = shift;
350     $self->{quote_char} = shift if @_;
351     return $self->{quote_char};
352 }
353
354 sub name_sep {
355     my $self = shift;
356     $self->{name_sep} = shift if @_;
357     return $self->{name_sep};
358 }
359
360 } # End of BEGIN block
361
362 =head1 NAME
363
364 DBIx::Class::Storage::DBI - DBI storage handler
365
366 =head1 SYNOPSIS
367
368   my $schema = MySchema->connect('dbi:SQLite:my.db');
369
370   $schema->storage->debug(1);
371   $schema->dbh_do("DROP TABLE authors");
372
373   $schema->resultset('Book')->search({
374      written_on => $schema->storage->datetime_parser(DateTime->now)
375   });
376
377 =head1 DESCRIPTION
378
379 This class represents the connection to an RDBMS via L<DBI>.  See
380 L<DBIx::Class::Storage> for general information.  This pod only
381 documents DBI-specific methods and behaviors.
382
383 =head1 METHODS
384
385 =cut
386
387 sub new {
388   my $new = shift->next::method(@_);
389
390   $new->transaction_depth(0);
391   $new->_sql_maker_opts({});
392   $new->{savepoints} = [];
393   $new->{_in_dbh_do} = 0;
394   $new->{_dbh_gen} = 0;
395
396   $new;
397 }
398
399 =head2 connect_info
400
401 This method is normally called by L<DBIx::Class::Schema/connection>, which
402 encapsulates its argument list in an arrayref before passing them here.
403
404 The argument list may contain:
405
406 =over
407
408 =item *
409
410 The same 4-element argument set one would normally pass to
411 L<DBI/connect>, optionally followed by
412 L<extra attributes|/DBIx::Class specific connection attributes>
413 recognized by DBIx::Class:
414
415   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
416
417 =item *
418
419 A single code reference which returns a connected 
420 L<DBI database handle|DBI/connect> optionally followed by 
421 L<extra attributes|/DBIx::Class specific connection attributes> recognized
422 by DBIx::Class:
423
424   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
425
426 =item *
427
428 A single hashref with all the attributes and the dsn/user/password
429 mixed together:
430
431   $connect_info_args = [{
432     dsn => $dsn,
433     user => $user,
434     password => $pass,
435     %dbi_attributes,
436     %extra_attributes,
437   }];
438
439 This is particularly useful for L<Catalyst> based applications, allowing the 
440 following config (L<Config::General> style):
441
442   <Model::DB>
443     schema_class   App::DB
444     <connect_info>
445       dsn          dbi:mysql:database=test
446       user         testuser
447       password     TestPass
448       AutoCommit   1
449     </connect_info>
450   </Model::DB>
451
452 =back
453
454 Please note that the L<DBI> docs recommend that you always explicitly
455 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
456 recommends that it be set to I<1>, and that you perform transactions
457 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
458 to I<1> if you do not do explicitly set it to zero.  This is the default 
459 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
460
461 =head3 DBIx::Class specific connection attributes
462
463 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
464 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
465 the following connection options. These options can be mixed in with your other
466 L<DBI> connection attributes, or placed in a seperate hashref
467 (C<\%extra_attributes>) as shown above.
468
469 Every time C<connect_info> is invoked, any previous settings for
470 these options will be cleared before setting the new ones, regardless of
471 whether any options are specified in the new C<connect_info>.
472
473
474 =over
475
476 =item on_connect_do
477
478 Specifies things to do immediately after connecting or re-connecting to
479 the database.  Its value may contain:
480
481 =over
482
483 =item an array reference
484
485 This contains SQL statements to execute in order.  Each element contains
486 a string or a code reference that returns a string.
487
488 =item a code reference
489
490 This contains some code to execute.  Unlike code references within an
491 array reference, its return value is ignored.
492
493 =back
494
495 =item on_disconnect_do
496
497 Takes arguments in the same form as L</on_connect_do> and executes them
498 immediately before disconnecting from the database.
499
500 Note, this only runs if you explicitly call L</disconnect> on the
501 storage object.
502
503 =item disable_sth_caching
504
505 If set to a true value, this option will disable the caching of
506 statement handles via L<DBI/prepare_cached>.
507
508 =item limit_dialect 
509
510 Sets the limit dialect. This is useful for JDBC-bridge among others
511 where the remote SQL-dialect cannot be determined by the name of the
512 driver alone. See also L<SQL::Abstract::Limit>.
513
514 =item quote_char
515
516 Specifies what characters to use to quote table and column names. If 
517 you use this you will want to specify L</name_sep> as well.
518
519 C<quote_char> expects either a single character, in which case is it
520 is placed on either side of the table/column name, or an arrayref of length
521 2 in which case the table/column name is placed between the elements.
522
523 For example under MySQL you should use C<< quote_char => '`' >>, and for
524 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
525
526 =item name_sep
527
528 This only needs to be used in conjunction with C<quote_char>, and is used to 
529 specify the charecter that seperates elements (schemas, tables, columns) from 
530 each other. In most cases this is simply a C<.>.
531
532 The consequences of not supplying this value is that L<SQL::Abstract>
533 will assume DBIx::Class' uses of aliases to be complete column
534 names. The output will look like I<"me.name"> when it should actually
535 be I<"me"."name">.
536
537 =item unsafe
538
539 This Storage driver normally installs its own C<HandleError>, sets
540 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
541 all database handles, including those supplied by a coderef.  It does this
542 so that it can have consistent and useful error behavior.
543
544 If you set this option to a true value, Storage will not do its usual
545 modifications to the database handle's attributes, and instead relies on
546 the settings in your connect_info DBI options (or the values you set in
547 your connection coderef, in the case that you are connecting via coderef).
548
549 Note that your custom settings can cause Storage to malfunction,
550 especially if you set a C<HandleError> handler that suppresses exceptions
551 and/or disable C<RaiseError>.
552
553 =item auto_savepoint
554
555 If this option is true, L<DBIx::Class> will use savepoints when nesting
556 transactions, making it possible to recover from failure in the inner
557 transaction without having to abort all outer transactions.
558
559 =item cursor_class
560
561 Use this argument to supply a cursor class other than the default
562 L<DBIx::Class::Storage::DBI::Cursor>.
563
564 =back
565
566 Some real-life examples of arguments to L</connect_info> and
567 L<DBIx::Class::Schema/connect>
568
569   # Simple SQLite connection
570   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
571
572   # Connect via subref
573   ->connect_info([ sub { DBI->connect(...) } ]);
574
575   # A bit more complicated
576   ->connect_info(
577     [
578       'dbi:Pg:dbname=foo',
579       'postgres',
580       'my_pg_password',
581       { AutoCommit => 1 },
582       { quote_char => q{"}, name_sep => q{.} },
583     ]
584   );
585
586   # Equivalent to the previous example
587   ->connect_info(
588     [
589       'dbi:Pg:dbname=foo',
590       'postgres',
591       'my_pg_password',
592       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
593     ]
594   );
595
596   # Same, but with hashref as argument
597   # See parse_connect_info for explanation
598   ->connect_info(
599     [{
600       dsn         => 'dbi:Pg:dbname=foo',
601       user        => 'postgres',
602       password    => 'my_pg_password',
603       AutoCommit  => 1,
604       quote_char  => q{"},
605       name_sep    => q{.},
606     }]
607   );
608
609   # Subref + DBIx::Class-specific connection options
610   ->connect_info(
611     [
612       sub { DBI->connect(...) },
613       {
614           quote_char => q{`},
615           name_sep => q{@},
616           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
617           disable_sth_caching => 1,
618       },
619     ]
620   );
621
622
623
624 =cut
625
626 sub connect_info {
627   my ($self, $info_arg) = @_;
628
629   return $self->_connect_info if !$info_arg;
630
631   my @args = @$info_arg;  # take a shallow copy for further mutilation
632   $self->_connect_info([@args]); # copy for _connect_info
633
634
635   # combine/pre-parse arguments depending on invocation style
636
637   my %attrs;
638   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
639     %attrs = %{ $args[1] || {} };
640     @args = $args[0];
641   }
642   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
643     %attrs = %{$args[0]};
644     @args = ();
645     for (qw/password user dsn/) {
646       unshift @args, delete $attrs{$_};
647     }
648   }
649   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
650     %attrs = (
651       % { $args[3] || {} },
652       % { $args[4] || {} },
653     );
654     @args = @args[0,1,2];
655   }
656
657   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
658   #  the new set of options
659   $self->_sql_maker(undef);
660   $self->_sql_maker_opts({});
661
662   if(keys %attrs) {
663     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
664       if(my $value = delete $attrs{$storage_opt}) {
665         $self->$storage_opt($value);
666       }
667     }
668     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
669       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
670         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
671       }
672     }
673   }
674
675   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
676
677   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
678   $self->_connect_info;
679 }
680
681 =head2 on_connect_do
682
683 This method is deprecated in favour of setting via L</connect_info>.
684
685
686 =head2 dbh_do
687
688 Arguments: ($subref | $method_name), @extra_coderef_args?
689
690 Execute the given $subref or $method_name using the new exception-based
691 connection management.
692
693 The first two arguments will be the storage object that C<dbh_do> was called
694 on and a database handle to use.  Any additional arguments will be passed
695 verbatim to the called subref as arguments 2 and onwards.
696
697 Using this (instead of $self->_dbh or $self->dbh) ensures correct
698 exception handling and reconnection (or failover in future subclasses).
699
700 Your subref should have no side-effects outside of the database, as
701 there is the potential for your subref to be partially double-executed
702 if the database connection was stale/dysfunctional.
703
704 Example:
705
706   my @stuff = $schema->storage->dbh_do(
707     sub {
708       my ($storage, $dbh, @cols) = @_;
709       my $cols = join(q{, }, @cols);
710       $dbh->selectrow_array("SELECT $cols FROM foo");
711     },
712     @column_list
713   );
714
715 =cut
716
717 sub dbh_do {
718   my $self = shift;
719   my $code = shift;
720
721   my $dbh = $self->_dbh;
722
723   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
724       || $self->{transaction_depth};
725
726   local $self->{_in_dbh_do} = 1;
727
728   my @result;
729   my $want_array = wantarray;
730
731   eval {
732     $self->_verify_pid if $dbh;
733     if(!$self->_dbh) {
734         $self->_populate_dbh;
735         $dbh = $self->_dbh;
736     }
737
738     if($want_array) {
739         @result = $self->$code($dbh, @_);
740     }
741     elsif(defined $want_array) {
742         $result[0] = $self->$code($dbh, @_);
743     }
744     else {
745         $self->$code($dbh, @_);
746     }
747   };
748
749   my $exception = $@;
750   if(!$exception) { return $want_array ? @result : $result[0] }
751
752   $self->throw_exception($exception) if $self->connected;
753
754   # We were not connected - reconnect and retry, but let any
755   #  exception fall right through this time
756   $self->_populate_dbh;
757   $self->$code($self->_dbh, @_);
758 }
759
760 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
761 # It also informs dbh_do to bypass itself while under the direction of txn_do,
762 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
763 sub txn_do {
764   my $self = shift;
765   my $coderef = shift;
766
767   ref $coderef eq 'CODE' or $self->throw_exception
768     ('$coderef must be a CODE reference');
769
770   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
771
772   local $self->{_in_dbh_do} = 1;
773
774   my @result;
775   my $want_array = wantarray;
776
777   my $tried = 0;
778   while(1) {
779     eval {
780       $self->_verify_pid if $self->_dbh;
781       $self->_populate_dbh if !$self->_dbh;
782
783       $self->txn_begin;
784       if($want_array) {
785           @result = $coderef->(@_);
786       }
787       elsif(defined $want_array) {
788           $result[0] = $coderef->(@_);
789       }
790       else {
791           $coderef->(@_);
792       }
793       $self->txn_commit;
794     };
795
796     my $exception = $@;
797     if(!$exception) { return $want_array ? @result : $result[0] }
798
799     if($tried++ > 0 || $self->connected) {
800       eval { $self->txn_rollback };
801       my $rollback_exception = $@;
802       if($rollback_exception) {
803         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
804         $self->throw_exception($exception)  # propagate nested rollback
805           if $rollback_exception =~ /$exception_class/;
806
807         $self->throw_exception(
808           "Transaction aborted: ${exception}. "
809           . "Rollback failed: ${rollback_exception}"
810         );
811       }
812       $self->throw_exception($exception)
813     }
814
815     # We were not connected, and was first try - reconnect and retry
816     # via the while loop
817     $self->_populate_dbh;
818   }
819 }
820
821 =head2 disconnect
822
823 Our C<disconnect> method also performs a rollback first if the
824 database is not in C<AutoCommit> mode.
825
826 =cut
827
828 sub disconnect {
829   my ($self) = @_;
830
831   if( $self->connected ) {
832     my $connection_do = $self->on_disconnect_do;
833     $self->_do_connection_actions($connection_do) if ref($connection_do);
834
835     $self->_dbh->rollback unless $self->_dbh_autocommit;
836     $self->_dbh->disconnect;
837     $self->_dbh(undef);
838     $self->{_dbh_gen}++;
839   }
840 }
841
842 =head2 with_deferred_fk_checks
843
844 =over 4
845
846 =item Arguments: C<$coderef>
847
848 =item Return Value: The return value of $coderef
849
850 =back
851
852 Storage specific method to run the code ref with FK checks deferred or
853 in MySQL's case disabled entirely.
854
855 =cut
856
857 # Storage subclasses should override this
858 sub with_deferred_fk_checks {
859   my ($self, $sub) = @_;
860
861   $sub->();
862 }
863
864 sub connected {
865   my ($self) = @_;
866
867   if(my $dbh = $self->_dbh) {
868       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
869           $self->_dbh(undef);
870           $self->{_dbh_gen}++;
871           return;
872       }
873       else {
874           $self->_verify_pid;
875           return 0 if !$self->_dbh;
876       }
877       return ($dbh->FETCH('Active') && $dbh->ping);
878   }
879
880   return 0;
881 }
882
883 # handle pid changes correctly
884 #  NOTE: assumes $self->_dbh is a valid $dbh
885 sub _verify_pid {
886   my ($self) = @_;
887
888   return if defined $self->_conn_pid && $self->_conn_pid == $$;
889
890   $self->_dbh->{InactiveDestroy} = 1;
891   $self->_dbh(undef);
892   $self->{_dbh_gen}++;
893
894   return;
895 }
896
897 sub ensure_connected {
898   my ($self) = @_;
899
900   unless ($self->connected) {
901     $self->_populate_dbh;
902   }
903 }
904
905 =head2 dbh
906
907 Returns the dbh - a data base handle of class L<DBI>.
908
909 =cut
910
911 sub dbh {
912   my ($self) = @_;
913
914   $self->ensure_connected;
915   return $self->_dbh;
916 }
917
918 sub _sql_maker_args {
919     my ($self) = @_;
920     
921     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
922 }
923
924 sub sql_maker {
925   my ($self) = @_;
926   unless ($self->_sql_maker) {
927     my $sql_maker_class = $self->sql_maker_class;
928     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
929   }
930   return $self->_sql_maker;
931 }
932
933 sub _rebless {}
934
935 sub _populate_dbh {
936   my ($self) = @_;
937   my @info = @{$self->_dbi_connect_info || []};
938   $self->_dbh($self->_connect(@info));
939
940   # Always set the transaction depth on connect, since
941   #  there is no transaction in progress by definition
942   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
943
944   if(ref $self eq 'DBIx::Class::Storage::DBI') {
945     my $driver = $self->_dbh->{Driver}->{Name};
946     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
947       bless $self, "DBIx::Class::Storage::DBI::${driver}";
948       $self->_rebless();
949     }
950   }
951
952   $self->_conn_pid($$);
953   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
954
955   my $connection_do = $self->on_connect_do;
956   $self->_do_connection_actions($connection_do) if ref($connection_do);
957 }
958
959 sub _do_connection_actions {
960   my $self = shift;
961   my $connection_do = shift;
962
963   if (ref $connection_do eq 'ARRAY') {
964     $self->_do_query($_) foreach @$connection_do;
965   }
966   elsif (ref $connection_do eq 'CODE') {
967     $connection_do->($self);
968   }
969
970   return $self;
971 }
972
973 sub _do_query {
974   my ($self, $action) = @_;
975
976   if (ref $action eq 'CODE') {
977     $action = $action->($self);
978     $self->_do_query($_) foreach @$action;
979   }
980   else {
981     # Most debuggers expect ($sql, @bind), so we need to exclude
982     # the attribute hash which is the second argument to $dbh->do
983     # furthermore the bind values are usually to be presented
984     # as named arrayref pairs, so wrap those here too
985     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
986     my $sql = shift @do_args;
987     my $attrs = shift @do_args;
988     my @bind = map { [ undef, $_ ] } @do_args;
989
990     $self->_query_start($sql, @bind);
991     $self->_dbh->do($sql, $attrs, @do_args);
992     $self->_query_end($sql, @bind);
993   }
994
995   return $self;
996 }
997
998 sub _connect {
999   my ($self, @info) = @_;
1000
1001   $self->throw_exception("You failed to provide any connection info")
1002     if !@info;
1003
1004   my ($old_connect_via, $dbh);
1005
1006   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1007     $old_connect_via = $DBI::connect_via;
1008     $DBI::connect_via = 'connect';
1009   }
1010
1011   eval {
1012     if(ref $info[0] eq 'CODE') {
1013        $dbh = &{$info[0]}
1014     }
1015     else {
1016        $dbh = DBI->connect(@info);
1017     }
1018
1019     if($dbh && !$self->unsafe) {
1020       my $weak_self = $self;
1021       weaken($weak_self);
1022       $dbh->{HandleError} = sub {
1023           if ($weak_self) {
1024             $weak_self->throw_exception("DBI Exception: $_[0]");
1025           }
1026           else {
1027             croak ("DBI Exception: $_[0]");
1028           }
1029       };
1030       $dbh->{ShowErrorStatement} = 1;
1031       $dbh->{RaiseError} = 1;
1032       $dbh->{PrintError} = 0;
1033     }
1034   };
1035
1036   $DBI::connect_via = $old_connect_via if $old_connect_via;
1037
1038   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1039     if !$dbh || $@;
1040
1041   $self->_dbh_autocommit($dbh->{AutoCommit});
1042
1043   $dbh;
1044 }
1045
1046 sub svp_begin {
1047   my ($self, $name) = @_;
1048
1049   $name = $self->_svp_generate_name
1050     unless defined $name;
1051
1052   $self->throw_exception ("You can't use savepoints outside a transaction")
1053     if $self->{transaction_depth} == 0;
1054
1055   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1056     unless $self->can('_svp_begin');
1057   
1058   push @{ $self->{savepoints} }, $name;
1059
1060   $self->debugobj->svp_begin($name) if $self->debug;
1061   
1062   return $self->_svp_begin($name);
1063 }
1064
1065 sub svp_release {
1066   my ($self, $name) = @_;
1067
1068   $self->throw_exception ("You can't use savepoints outside a transaction")
1069     if $self->{transaction_depth} == 0;
1070
1071   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1072     unless $self->can('_svp_release');
1073
1074   if (defined $name) {
1075     $self->throw_exception ("Savepoint '$name' does not exist")
1076       unless grep { $_ eq $name } @{ $self->{savepoints} };
1077
1078     # Dig through the stack until we find the one we are releasing.  This keeps
1079     # the stack up to date.
1080     my $svp;
1081
1082     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1083   } else {
1084     $name = pop @{ $self->{savepoints} };
1085   }
1086
1087   $self->debugobj->svp_release($name) if $self->debug;
1088
1089   return $self->_svp_release($name);
1090 }
1091
1092 sub svp_rollback {
1093   my ($self, $name) = @_;
1094
1095   $self->throw_exception ("You can't use savepoints outside a transaction")
1096     if $self->{transaction_depth} == 0;
1097
1098   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1099     unless $self->can('_svp_rollback');
1100
1101   if (defined $name) {
1102       # If they passed us a name, verify that it exists in the stack
1103       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1104           $self->throw_exception("Savepoint '$name' does not exist!");
1105       }
1106
1107       # Dig through the stack until we find the one we are releasing.  This keeps
1108       # the stack up to date.
1109       while(my $s = pop(@{ $self->{savepoints} })) {
1110           last if($s eq $name);
1111       }
1112       # Add the savepoint back to the stack, as a rollback doesn't remove the
1113       # named savepoint, only everything after it.
1114       push(@{ $self->{savepoints} }, $name);
1115   } else {
1116       # We'll assume they want to rollback to the last savepoint
1117       $name = $self->{savepoints}->[-1];
1118   }
1119
1120   $self->debugobj->svp_rollback($name) if $self->debug;
1121   
1122   return $self->_svp_rollback($name);
1123 }
1124
1125 sub _svp_generate_name {
1126     my ($self) = @_;
1127
1128     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1129 }
1130
1131 sub txn_begin {
1132   my $self = shift;
1133   $self->ensure_connected();
1134   if($self->{transaction_depth} == 0) {
1135     $self->debugobj->txn_begin()
1136       if $self->debug;
1137     # this isn't ->_dbh-> because
1138     #  we should reconnect on begin_work
1139     #  for AutoCommit users
1140     $self->dbh->begin_work;
1141   } elsif ($self->auto_savepoint) {
1142     $self->svp_begin;
1143   }
1144   $self->{transaction_depth}++;
1145 }
1146
1147 sub txn_commit {
1148   my $self = shift;
1149   if ($self->{transaction_depth} == 1) {
1150     my $dbh = $self->_dbh;
1151     $self->debugobj->txn_commit()
1152       if ($self->debug);
1153     $dbh->commit;
1154     $self->{transaction_depth} = 0
1155       if $self->_dbh_autocommit;
1156   }
1157   elsif($self->{transaction_depth} > 1) {
1158     $self->{transaction_depth}--;
1159     $self->svp_release
1160       if $self->auto_savepoint;
1161   }
1162 }
1163
1164 sub txn_rollback {
1165   my $self = shift;
1166   my $dbh = $self->_dbh;
1167   eval {
1168     if ($self->{transaction_depth} == 1) {
1169       $self->debugobj->txn_rollback()
1170         if ($self->debug);
1171       $self->{transaction_depth} = 0
1172         if $self->_dbh_autocommit;
1173       $dbh->rollback;
1174     }
1175     elsif($self->{transaction_depth} > 1) {
1176       $self->{transaction_depth}--;
1177       if ($self->auto_savepoint) {
1178         $self->svp_rollback;
1179         $self->svp_release;
1180       }
1181     }
1182     else {
1183       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1184     }
1185   };
1186   if ($@) {
1187     my $error = $@;
1188     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1189     $error =~ /$exception_class/ and $self->throw_exception($error);
1190     # ensure that a failed rollback resets the transaction depth
1191     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1192     $self->throw_exception($error);
1193   }
1194 }
1195
1196 # This used to be the top-half of _execute.  It was split out to make it
1197 #  easier to override in NoBindVars without duping the rest.  It takes up
1198 #  all of _execute's args, and emits $sql, @bind.
1199 sub _prep_for_execute {
1200   my ($self, $op, $extra_bind, $ident, $args) = @_;
1201
1202   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1203     $ident = $ident->from();
1204   }
1205
1206   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1207
1208   unshift(@bind,
1209     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1210       if $extra_bind;
1211   return ($sql, \@bind);
1212 }
1213
1214 sub _fix_bind_params {
1215     my ($self, @bind) = @_;
1216
1217     ### Turn @bind from something like this:
1218     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1219     ### to this:
1220     ###   ( "'1'", "'1'", "'3'" )
1221     return
1222         map {
1223             if ( defined( $_ && $_->[1] ) ) {
1224                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1225             }
1226             else { q{'NULL'}; }
1227         } @bind;
1228 }
1229
1230 sub _query_start {
1231     my ( $self, $sql, @bind ) = @_;
1232
1233     if ( $self->debug ) {
1234         @bind = $self->_fix_bind_params(@bind);
1235         
1236         $self->debugobj->query_start( $sql, @bind );
1237     }
1238 }
1239
1240 sub _query_end {
1241     my ( $self, $sql, @bind ) = @_;
1242
1243     if ( $self->debug ) {
1244         @bind = $self->_fix_bind_params(@bind);
1245         $self->debugobj->query_end( $sql, @bind );
1246     }
1247 }
1248
1249 sub _dbh_execute {
1250   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1251
1252   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1253
1254   $self->_query_start( $sql, @$bind );
1255
1256   my $sth = $self->sth($sql,$op);
1257
1258   my $placeholder_index = 1; 
1259
1260   foreach my $bound (@$bind) {
1261     my $attributes = {};
1262     my($column_name, @data) = @$bound;
1263
1264     if ($bind_attributes) {
1265       $attributes = $bind_attributes->{$column_name}
1266       if defined $bind_attributes->{$column_name};
1267     }
1268
1269     foreach my $data (@data) {
1270       my $ref = ref $data;
1271       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1272
1273       $sth->bind_param($placeholder_index, $data, $attributes);
1274       $placeholder_index++;
1275     }
1276   }
1277
1278   # Can this fail without throwing an exception anyways???
1279   my $rv = $sth->execute();
1280   $self->throw_exception($sth->errstr) if !$rv;
1281
1282   $self->_query_end( $sql, @$bind );
1283
1284   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1285 }
1286
1287 sub _execute {
1288     my $self = shift;
1289     $self->dbh_do('_dbh_execute', @_)
1290 }
1291
1292 sub insert {
1293   my ($self, $source, $to_insert) = @_;
1294   
1295   my $ident = $source->from; 
1296   my $bind_attributes = $self->source_bind_attributes($source);
1297
1298   $self->ensure_connected;
1299   foreach my $col ( $source->columns ) {
1300     if ( !defined $to_insert->{$col} ) {
1301       my $col_info = $source->column_info($col);
1302
1303       if ( $col_info->{auto_nextval} ) {
1304         $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1305       }
1306     }
1307   }
1308
1309   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1310
1311   return $to_insert;
1312 }
1313
1314 ## Still not quite perfect, and EXPERIMENTAL
1315 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1316 ## scalar refs, or at least, all the same type as the first set, the statement is
1317 ## only prepped once.
1318 sub insert_bulk {
1319   my ($self, $source, $cols, $data) = @_;
1320   my %colvalues;
1321   my $table = $source->from;
1322   @colvalues{@$cols} = (0..$#$cols);
1323   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1324   
1325   $self->_query_start( $sql, @bind );
1326   my $sth = $self->sth($sql);
1327
1328 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1329
1330   ## This must be an arrayref, else nothing works!
1331   
1332   my $tuple_status = [];
1333   
1334   ##use Data::Dumper;
1335   ##print STDERR Dumper( $data, $sql, [@bind] );
1336
1337   my $time = time();
1338
1339   ## Get the bind_attributes, if any exist
1340   my $bind_attributes = $self->source_bind_attributes($source);
1341
1342   ## Bind the values and execute
1343   my $placeholder_index = 1; 
1344
1345   foreach my $bound (@bind) {
1346
1347     my $attributes = {};
1348     my ($column_name, $data_index) = @$bound;
1349
1350     if( $bind_attributes ) {
1351       $attributes = $bind_attributes->{$column_name}
1352       if defined $bind_attributes->{$column_name};
1353     }
1354
1355     my @data = map { $_->[$data_index] } @$data;
1356
1357     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1358     $placeholder_index++;
1359   }
1360   my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
1361   $self->throw_exception($sth->errstr) if !$rv;
1362
1363   $self->_query_end( $sql, @bind );
1364   return (wantarray ? ($rv, $sth, @bind) : $rv);
1365 }
1366
1367 sub update {
1368   my $self = shift @_;
1369   my $source = shift @_;
1370   my $bind_attributes = $self->source_bind_attributes($source);
1371   
1372   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1373 }
1374
1375
1376 sub delete {
1377   my $self = shift @_;
1378   my $source = shift @_;
1379   
1380   my $bind_attrs = {}; ## If ever it's needed...
1381   
1382   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1383 }
1384
1385 sub _select {
1386   my $self = shift;
1387   my $sql_maker = $self->sql_maker;
1388   local $sql_maker->{for};
1389   return $self->_execute($self->_select_args(@_));
1390 }
1391
1392 sub _select_args {
1393   my ($self, $ident, $select, $condition, $attrs) = @_;
1394   my $order = $attrs->{order_by};
1395
1396   if (ref $condition eq 'SCALAR') {
1397     my $unwrap = ${$condition};
1398     if ($unwrap =~ s/ORDER BY (.*)$//i) {
1399       $order = $1;
1400       $condition = \$unwrap;
1401     }
1402   }
1403
1404   my $for = delete $attrs->{for};
1405   my $sql_maker = $self->sql_maker;
1406   $sql_maker->{for} = $for;
1407
1408   if (exists $attrs->{group_by} || $attrs->{having}) {
1409     $order = {
1410       group_by => $attrs->{group_by},
1411       having => $attrs->{having},
1412       ($order ? (order_by => $order) : ())
1413     };
1414   }
1415   my $bind_attrs = {}; ## Future support
1416   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1417   if ($attrs->{software_limit} ||
1418       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1419         $attrs->{software_limit} = 1;
1420   } else {
1421     $self->throw_exception("rows attribute must be positive if present")
1422       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1423
1424     # MySQL actually recommends this approach.  I cringe.
1425     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1426     push @args, $attrs->{rows}, $attrs->{offset};
1427   }
1428   return @args;
1429 }
1430
1431 sub source_bind_attributes {
1432   my ($self, $source) = @_;
1433   
1434   my $bind_attributes;
1435   foreach my $column ($source->columns) {
1436   
1437     my $data_type = $source->column_info($column)->{data_type} || '';
1438     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1439      if $data_type;
1440   }
1441
1442   return $bind_attributes;
1443 }
1444
1445 =head2 select
1446
1447 =over 4
1448
1449 =item Arguments: $ident, $select, $condition, $attrs
1450
1451 =back
1452
1453 Handle a SQL select statement.
1454
1455 =cut
1456
1457 sub select {
1458   my $self = shift;
1459   my ($ident, $select, $condition, $attrs) = @_;
1460   return $self->cursor_class->new($self, \@_, $attrs);
1461 }
1462
1463 sub select_single {
1464   my $self = shift;
1465   my ($rv, $sth, @bind) = $self->_select(@_);
1466   my @row = $sth->fetchrow_array;
1467   my @nextrow = $sth->fetchrow_array if @row;
1468   if(@row && @nextrow) {
1469     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1470   }
1471   # Need to call finish() to work round broken DBDs
1472   $sth->finish();
1473   return @row;
1474 }
1475
1476 =head2 sth
1477
1478 =over 4
1479
1480 =item Arguments: $sql
1481
1482 =back
1483
1484 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1485
1486 =cut
1487
1488 sub _dbh_sth {
1489   my ($self, $dbh, $sql) = @_;
1490
1491   # 3 is the if_active parameter which avoids active sth re-use
1492   my $sth = $self->disable_sth_caching
1493     ? $dbh->prepare($sql)
1494     : $dbh->prepare_cached($sql, {}, 3);
1495
1496   # XXX You would think RaiseError would make this impossible,
1497   #  but apparently that's not true :(
1498   $self->throw_exception($dbh->errstr) if !$sth;
1499
1500   $sth;
1501 }
1502
1503 sub sth {
1504   my ($self, $sql) = @_;
1505   $self->dbh_do('_dbh_sth', $sql);
1506 }
1507
1508 sub _dbh_columns_info_for {
1509   my ($self, $dbh, $table) = @_;
1510
1511   if ($dbh->can('column_info')) {
1512     my %result;
1513     eval {
1514       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1515       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1516       $sth->execute();
1517       while ( my $info = $sth->fetchrow_hashref() ){
1518         my %column_info;
1519         $column_info{data_type}   = $info->{TYPE_NAME};
1520         $column_info{size}      = $info->{COLUMN_SIZE};
1521         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1522         $column_info{default_value} = $info->{COLUMN_DEF};
1523         my $col_name = $info->{COLUMN_NAME};
1524         $col_name =~ s/^\"(.*)\"$/$1/;
1525
1526         $result{$col_name} = \%column_info;
1527       }
1528     };
1529     return \%result if !$@ && scalar keys %result;
1530   }
1531
1532   my %result;
1533   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1534   $sth->execute;
1535   my @columns = @{$sth->{NAME_lc}};
1536   for my $i ( 0 .. $#columns ){
1537     my %column_info;
1538     $column_info{data_type} = $sth->{TYPE}->[$i];
1539     $column_info{size} = $sth->{PRECISION}->[$i];
1540     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1541
1542     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1543       $column_info{data_type} = $1;
1544       $column_info{size}    = $2;
1545     }
1546
1547     $result{$columns[$i]} = \%column_info;
1548   }
1549   $sth->finish;
1550
1551   foreach my $col (keys %result) {
1552     my $colinfo = $result{$col};
1553     my $type_num = $colinfo->{data_type};
1554     my $type_name;
1555     if(defined $type_num && $dbh->can('type_info')) {
1556       my $type_info = $dbh->type_info($type_num);
1557       $type_name = $type_info->{TYPE_NAME} if $type_info;
1558       $colinfo->{data_type} = $type_name if $type_name;
1559     }
1560   }
1561
1562   return \%result;
1563 }
1564
1565 sub columns_info_for {
1566   my ($self, $table) = @_;
1567   $self->dbh_do('_dbh_columns_info_for', $table);
1568 }
1569
1570 =head2 last_insert_id
1571
1572 Return the row id of the last insert.
1573
1574 =cut
1575
1576 sub _dbh_last_insert_id {
1577     # All Storage's need to register their own _dbh_last_insert_id
1578     # the old SQLite-based method was highly inappropriate
1579
1580     my $self = shift;
1581     my $class = ref $self;
1582     $self->throw_exception (<<EOE);
1583
1584 No _dbh_last_insert_id() method found in $class.
1585 Since the method of obtaining the autoincrement id of the last insert
1586 operation varies greatly between different databases, this method must be
1587 individually implemented for every storage class.
1588 EOE
1589 }
1590
1591 sub last_insert_id {
1592   my $self = shift;
1593   $self->dbh_do('_dbh_last_insert_id', @_);
1594 }
1595
1596 =head2 sqlt_type
1597
1598 Returns the database driver name.
1599
1600 =cut
1601
1602 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1603
1604 =head2 bind_attribute_by_data_type
1605
1606 Given a datatype from column info, returns a database specific bind
1607 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1608 let the database planner just handle it.
1609
1610 Generally only needed for special case column types, like bytea in postgres.
1611
1612 =cut
1613
1614 sub bind_attribute_by_data_type {
1615     return;
1616 }
1617
1618 =head2 create_ddl_dir
1619
1620 =over 4
1621
1622 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1623
1624 =back
1625
1626 Creates a SQL file based on the Schema, for each of the specified
1627 database types, in the given directory.
1628
1629 By default, C<\%sqlt_args> will have
1630
1631  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1632
1633 merged with the hash passed in. To disable any of those features, pass in a 
1634 hashref like the following
1635
1636  { ignore_constraint_names => 0, # ... other options }
1637
1638 =cut
1639
1640 sub create_ddl_dir {
1641   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1642
1643   if(!$dir || !-d $dir) {
1644     warn "No directory given, using ./\n";
1645     $dir = "./";
1646   }
1647   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1648   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1649
1650   my $schema_version = $schema->schema_version || '1.x';
1651   $version ||= $schema_version;
1652
1653   $sqltargs = {
1654     add_drop_table => 1, 
1655     ignore_constraint_names => 1,
1656     ignore_index_names => 1,
1657     %{$sqltargs || {}}
1658   };
1659
1660   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1661       . $self->_check_sqlt_message . q{'})
1662           if !$self->_check_sqlt_version;
1663
1664   my $sqlt = SQL::Translator->new( $sqltargs );
1665
1666   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1667   my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
1668
1669   foreach my $db (@$databases) {
1670     $sqlt->reset();
1671     $sqlt->{schema} = $sqlt_schema;
1672     $sqlt->producer($db);
1673
1674     my $file;
1675     my $filename = $schema->ddl_filename($db, $version, $dir);
1676     if (-e $filename && ($version eq $schema_version )) {
1677       # if we are dumping the current version, overwrite the DDL
1678       warn "Overwriting existing DDL file - $filename";
1679       unlink($filename);
1680     }
1681
1682     my $output = $sqlt->translate;
1683     if(!$output) {
1684       warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1685       next;
1686     }
1687     if(!open($file, ">$filename")) {
1688       $self->throw_exception("Can't open $filename for writing ($!)");
1689       next;
1690     }
1691     print $file $output;
1692     close($file);
1693   
1694     next unless ($preversion);
1695
1696     require SQL::Translator::Diff;
1697
1698     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1699     if(!-e $prefilename) {
1700       warn("No previous schema file found ($prefilename)");
1701       next;
1702     }
1703
1704     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1705     if(-e $difffile) {
1706       warn("Overwriting existing diff file - $difffile");
1707       unlink($difffile);
1708     }
1709     
1710     my $source_schema;
1711     {
1712       my $t = SQL::Translator->new($sqltargs);
1713       $t->debug( 0 );
1714       $t->trace( 0 );
1715       $t->parser( $db )                       or die $t->error;
1716       my $out = $t->translate( $prefilename ) or die $t->error;
1717       $source_schema = $t->schema;
1718       unless ( $source_schema->name ) {
1719         $source_schema->name( $prefilename );
1720       }
1721     }
1722
1723     # The "new" style of producers have sane normalization and can support 
1724     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1725     # And we have to diff parsed SQL against parsed SQL.
1726     my $dest_schema = $sqlt_schema;
1727     
1728     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1729       my $t = SQL::Translator->new($sqltargs);
1730       $t->debug( 0 );
1731       $t->trace( 0 );
1732       $t->parser( $db )                    or die $t->error;
1733       my $out = $t->translate( $filename ) or die $t->error;
1734       $dest_schema = $t->schema;
1735       $dest_schema->name( $filename )
1736         unless $dest_schema->name;
1737     }
1738     
1739     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1740                                                   $dest_schema,   $db,
1741                                                   $sqltargs
1742                                                  );
1743     if(!open $file, ">$difffile") { 
1744       $self->throw_exception("Can't write to $difffile ($!)");
1745       next;
1746     }
1747     print $file $diff;
1748     close($file);
1749   }
1750 }
1751
1752 =head2 deployment_statements
1753
1754 =over 4
1755
1756 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1757
1758 =back
1759
1760 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1761 The database driver name is given by C<$type>, though the value from
1762 L</sqlt_type> is used if it is not specified.
1763
1764 C<$directory> is used to return statements from files in a previously created
1765 L</create_ddl_dir> directory and is optional. The filenames are constructed
1766 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1767
1768 If no C<$directory> is specified then the statements are constructed on the
1769 fly using L<SQL::Translator> and C<$version> is ignored.
1770
1771 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1772
1773 =cut
1774
1775 sub deployment_statements {
1776   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1777   # Need to be connected to get the correct sqlt_type
1778   $self->ensure_connected() unless $type;
1779   $type ||= $self->sqlt_type;
1780   $version ||= $schema->schema_version || '1.x';
1781   $dir ||= './';
1782   my $filename = $schema->ddl_filename($type, $version, $dir);
1783   if(-f $filename)
1784   {
1785       my $file;
1786       open($file, "<$filename") 
1787         or $self->throw_exception("Can't open $filename ($!)");
1788       my @rows = <$file>;
1789       close($file);
1790       return join('', @rows);
1791   }
1792
1793   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
1794       . $self->_check_sqlt_message . q{'})
1795           if !$self->_check_sqlt_version;
1796
1797   require SQL::Translator::Parser::DBIx::Class;
1798   eval qq{use SQL::Translator::Producer::${type}};
1799   $self->throw_exception($@) if $@;
1800
1801   # sources needs to be a parser arg, but for simplicty allow at top level 
1802   # coming in
1803   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1804       if exists $sqltargs->{sources};
1805
1806   my $tr = SQL::Translator->new(%$sqltargs);
1807   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1808   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1809 }
1810
1811 sub deploy {
1812   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1813   my $deploy = sub {
1814     my $line = shift;
1815     return if($line =~ /^--/);
1816     return if(!$line);
1817     # next if($line =~ /^DROP/m);
1818     return if($line =~ /^BEGIN TRANSACTION/m);
1819     return if($line =~ /^COMMIT/m);
1820     return if $line =~ /^\s+$/; # skip whitespace only
1821     $self->_query_start($line);
1822     eval {
1823       $self->dbh->do($line); # shouldn't be using ->dbh ?
1824     };
1825     if ($@) {
1826       warn qq{$@ (running "${line}")};
1827     }
1828     $self->_query_end($line);
1829   };
1830   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } );
1831   if (@statements > 1) {
1832     foreach my $statement (@statements) {
1833       $deploy->( $statement );
1834     }
1835   }
1836   elsif (@statements == 1) {
1837     foreach my $line ( split(";\n", $statements[0])) {
1838       $deploy->( $line );
1839     }
1840   }
1841 }
1842
1843 =head2 datetime_parser
1844
1845 Returns the datetime parser class
1846
1847 =cut
1848
1849 sub datetime_parser {
1850   my $self = shift;
1851   return $self->{datetime_parser} ||= do {
1852     $self->ensure_connected;
1853     $self->build_datetime_parser(@_);
1854   };
1855 }
1856
1857 =head2 datetime_parser_type
1858
1859 Defines (returns) the datetime parser class - currently hardwired to
1860 L<DateTime::Format::MySQL>
1861
1862 =cut
1863
1864 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1865
1866 =head2 build_datetime_parser
1867
1868 See L</datetime_parser>
1869
1870 =cut
1871
1872 sub build_datetime_parser {
1873   my $self = shift;
1874   my $type = $self->datetime_parser_type(@_);
1875   eval "use ${type}";
1876   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1877   return $type;
1878 }
1879
1880 {
1881     my $_check_sqlt_version; # private
1882     my $_check_sqlt_message; # private
1883     sub _check_sqlt_version {
1884         return $_check_sqlt_version if defined $_check_sqlt_version;
1885         eval 'use SQL::Translator "0.09003"';
1886         $_check_sqlt_message = $@ || '';
1887         $_check_sqlt_version = !$@;
1888     }
1889
1890     sub _check_sqlt_message {
1891         _check_sqlt_version if !defined $_check_sqlt_message;
1892         $_check_sqlt_message;
1893     }
1894 }
1895
1896 =head2 is_replicating
1897
1898 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
1899 replicate from a master database.  Default is undef, which is the result
1900 returned by databases that don't support replication.
1901
1902 =cut
1903
1904 sub is_replicating {
1905     return;
1906     
1907 }
1908
1909 =head2 lag_behind_master
1910
1911 Returns a number that represents a certain amount of lag behind a master db
1912 when a given storage is replicating.  The number is database dependent, but
1913 starts at zero and increases with the amount of lag. Default in undef
1914
1915 =cut
1916
1917 sub lag_behind_master {
1918     return;
1919 }
1920
1921 sub DESTROY {
1922   my $self = shift;
1923   return if !$self->_dbh;
1924   $self->_verify_pid;
1925   $self->_dbh(undef);
1926 }
1927
1928 1;
1929
1930 =head1 USAGE NOTES
1931
1932 =head2 DBIx::Class and AutoCommit
1933
1934 DBIx::Class can do some wonderful magic with handling exceptions,
1935 disconnections, and transactions when you use C<< AutoCommit => 1 >>
1936 combined with C<txn_do> for transaction support.
1937
1938 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
1939 in an assumed transaction between commits, and you're telling us you'd
1940 like to manage that manually.  A lot of the magic protections offered by
1941 this module will go away.  We can't protect you from exceptions due to database
1942 disconnects because we don't know anything about how to restart your
1943 transactions.  You're on your own for handling all sorts of exceptional
1944 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
1945 be with raw DBI.
1946
1947
1948 =head1 SQL METHODS
1949
1950 The module defines a set of methods within the DBIC::SQL::Abstract
1951 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1952 SQL query functions.
1953
1954 The following methods are extended:-
1955
1956 =over 4
1957
1958 =item delete
1959
1960 =item insert
1961
1962 =item select
1963
1964 =item update
1965
1966 =item limit_dialect
1967
1968 See L</connect_info> for details.
1969
1970 =item quote_char
1971
1972 See L</connect_info> for details.
1973
1974 =item name_sep
1975
1976 See L</connect_info> for details.
1977
1978 =back
1979
1980 =head1 AUTHORS
1981
1982 Matt S. Trout <mst@shadowcatsystems.co.uk>
1983
1984 Andy Grundman <andy@hybridized.org>
1985
1986 =head1 LICENSE
1987
1988 You may distribute this code under the same terms as Perl itself.
1989
1990 =cut