Merge 'trunk' into 'count_distinct'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use SQL::Abstract::Limit;
11 use DBIx::Class::Storage::DBI::Cursor;
12 use DBIx::Class::Storage::Statistics;
13 use Scalar::Util qw/blessed weaken/;
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
18 );
19
20 # the values for these accessors are picked out (and deleted) from
21 # the attribute hashref passed to connect_info
22 my @storage_options = qw/
23   on_connect_do on_disconnect_do disable_sth_caching unsafe auto_savepoint
24 /;
25 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
26
27
28 # default cursor class, overridable in connect_info attributes
29 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
30
31 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
32 __PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
33
34 BEGIN {
35
36 package # Hide from PAUSE
37   DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
38
39 use base qw/SQL::Abstract::Limit/;
40
41 # This prevents the caching of $dbh in S::A::L, I believe
42 sub new {
43   my $self = shift->SUPER::new(@_);
44
45   # If limit_dialect is a ref (like a $dbh), go ahead and replace
46   #   it with what it resolves to:
47   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
48     if ref $self->{limit_dialect};
49
50   $self;
51 }
52
53 # DB2 is the only remaining DB using this. Even though we are not sure if
54 # RowNumberOver is still needed here (should be part of SQLA) leave the 
55 # code in place
56 sub _RowNumberOver {
57   my ($self, $sql, $order, $rows, $offset ) = @_;
58
59   $offset += 1;
60   my $last = $rows + $offset;
61   my ( $order_by ) = $self->_order_by( $order );
62
63   $sql = <<"SQL";
64 SELECT * FROM
65 (
66    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
67       $sql
68       $order_by
69    ) Q1
70 ) Q2
71 WHERE ROW_NUM BETWEEN $offset AND $last
72
73 SQL
74
75   return $sql;
76 }
77
78
79 # While we're at it, this should make LIMIT queries more efficient,
80 #  without digging into things too deeply
81 use Scalar::Util 'blessed';
82 sub _find_syntax {
83   my ($self, $syntax) = @_;
84   
85   # DB2 is the only remaining DB using this. Even though we are not sure if
86   # RowNumberOver is still needed here (should be part of SQLA) leave the 
87   # code in place
88   my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
89   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
90     return 'RowNumberOver';
91   }
92   
93   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
94 }
95
96 sub select {
97   my ($self, $table, $fields, $where, $order, @rest) = @_;
98   local $self->{having_bind} = [];
99   local $self->{from_bind} = [];
100
101   if (ref $table eq 'SCALAR') {
102     $table = $$table;
103   }
104   elsif (not ref $table) {
105     $table = $self->_quote($table);
106   }
107   local $self->{rownum_hack_count} = 1
108     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
109   @rest = (-1) unless defined $rest[0];
110   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
111     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
112   my ($sql, @where_bind) = $self->SUPER::select(
113     $table, $self->_recurse_fields($fields), $where, $order, @rest
114   );
115   $sql .= 
116     $self->{for} ?
117     (
118       $self->{for} eq 'update' ? ' FOR UPDATE' :
119       $self->{for} eq 'shared' ? ' FOR SHARE'  :
120       ''
121     ) :
122     ''
123   ;
124   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
125 }
126
127 sub insert {
128   my $self = shift;
129   my $table = shift;
130   $table = $self->_quote($table) unless ref($table);
131   $self->SUPER::insert($table, @_);
132 }
133
134 sub update {
135   my $self = shift;
136   my $table = shift;
137   $table = $self->_quote($table) unless ref($table);
138   $self->SUPER::update($table, @_);
139 }
140
141 sub delete {
142   my $self = shift;
143   my $table = shift;
144   $table = $self->_quote($table) unless ref($table);
145   $self->SUPER::delete($table, @_);
146 }
147
148 sub _emulate_limit {
149   my $self = shift;
150   if ($_[3] == -1) {
151     return $_[1].$self->_order_by($_[2]);
152   } else {
153     return $self->SUPER::_emulate_limit(@_);
154   }
155 }
156
157 sub _recurse_fields {
158   my ($self, $fields, $params) = @_;
159   my $ref = ref $fields;
160   return $self->_quote($fields) unless $ref;
161   return $$fields if $ref eq 'SCALAR';
162
163   if ($ref eq 'ARRAY') {
164     return join(', ', map {
165       $self->_recurse_fields($_)
166         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
167           ? ' AS col'.$self->{rownum_hack_count}++
168           : '')
169       } @$fields);
170   } elsif ($ref eq 'HASH') {
171     foreach my $func (keys %$fields) {
172       return $self->_sqlcase($func)
173         .'( '.$self->_recurse_fields($fields->{$func}).' )';
174     }
175   }
176   # Is the second check absolutely necessary?
177   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
178     return $self->_bind_to_sql( $fields );
179   }
180   else {
181     Carp::croak($ref . qq{ unexpected in _recurse_fields()})
182   }
183 }
184
185 sub _order_by {
186   my $self = shift;
187   my $ret = '';
188   my @extra;
189   if (ref $_[0] eq 'HASH') {
190     if (defined $_[0]->{group_by}) {
191       $ret = $self->_sqlcase(' group by ')
192         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
193     }
194     if (defined $_[0]->{having}) {
195       my $frag;
196       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
197       push(@{$self->{having_bind}}, @extra);
198       $ret .= $self->_sqlcase(' having ').$frag;
199     }
200     if (defined $_[0]->{order_by}) {
201       $ret .= $self->_order_by($_[0]->{order_by});
202     }
203     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
204       return $self->SUPER::_order_by($_[0]);
205     }
206   } elsif (ref $_[0] eq 'SCALAR') {
207     $ret = $self->_sqlcase(' order by ').${ $_[0] };
208   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
209     my @order = @{+shift};
210     $ret = $self->_sqlcase(' order by ')
211           .join(', ', map {
212                         my $r = $self->_order_by($_, @_);
213                         $r =~ s/^ ?ORDER BY //i;
214                         $r;
215                       } @order);
216   } else {
217     $ret = $self->SUPER::_order_by(@_);
218   }
219   return $ret;
220 }
221
222 sub _order_directions {
223   my ($self, $order) = @_;
224   $order = $order->{order_by} if ref $order eq 'HASH';
225   return $self->SUPER::_order_directions($order);
226 }
227
228 sub _table {
229   my ($self, $from) = @_;
230   if (ref $from eq 'ARRAY') {
231     return $self->_recurse_from(@$from);
232   } elsif (ref $from eq 'HASH') {
233     return $self->_make_as($from);
234   } else {
235     return $from; # would love to quote here but _table ends up getting called
236                   # twice during an ->select without a limit clause due to
237                   # the way S::A::Limit->select works. should maybe consider
238                   # bypassing this and doing S::A::select($self, ...) in
239                   # our select method above. meantime, quoting shims have
240                   # been added to select/insert/update/delete here
241   }
242 }
243
244 sub _recurse_from {
245   my ($self, $from, @join) = @_;
246   my @sqlf;
247   push(@sqlf, $self->_make_as($from));
248   foreach my $j (@join) {
249     my ($to, $on) = @$j;
250
251     # check whether a join type exists
252     my $join_clause = '';
253     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
254     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
255       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
256     } else {
257       $join_clause = ' JOIN ';
258     }
259     push(@sqlf, $join_clause);
260
261     if (ref $to eq 'ARRAY') {
262       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
263     } else {
264       push(@sqlf, $self->_make_as($to));
265     }
266     push(@sqlf, ' ON ', $self->_join_condition($on));
267   }
268   return join('', @sqlf);
269 }
270
271 sub _bind_to_sql {
272   my ($self, $arr) = @_;
273   my ($sql, @bind) = @{${$arr}};
274   push (@{$self->{from_bind}}, @bind);
275   return $sql;
276 }
277
278 sub _make_as {
279   my ($self, $from) = @_;
280   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ 
281                         : ref $_ eq 'REF'    ? $self->_bind_to_sql($_) 
282                         : $self->_quote($_)) 
283                        } reverse each %{$self->_skip_options($from)});
284 }
285
286 sub _skip_options {
287   my ($self, $hash) = @_;
288   my $clean_hash = {};
289   $clean_hash->{$_} = $hash->{$_}
290     for grep {!/^-/} keys %$hash;
291   return $clean_hash;
292 }
293
294 sub _join_condition {
295   my ($self, $cond) = @_;
296   if (ref $cond eq 'HASH') {
297     my %j;
298     for (keys %$cond) {
299       my $v = $cond->{$_};
300       if (ref $v) {
301         # XXX no throw_exception() in this package and croak() fails with strange results
302         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
303             if ref($v) ne 'SCALAR';
304         $j{$_} = $v;
305       }
306       else {
307         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
308       }
309     };
310     return scalar($self->_recurse_where(\%j));
311   } elsif (ref $cond eq 'ARRAY') {
312     return join(' OR ', map { $self->_join_condition($_) } @$cond);
313   } else {
314     die "Can't handle this yet!";
315   }
316 }
317
318 sub _quote {
319   my ($self, $label) = @_;
320   return '' unless defined $label;
321   return "*" if $label eq '*';
322   return $label unless $self->{quote_char};
323   if(ref $self->{quote_char} eq "ARRAY"){
324     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
325       if !defined $self->{name_sep};
326     my $sep = $self->{name_sep};
327     return join($self->{name_sep},
328         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
329        split(/\Q$sep\E/,$label));
330   }
331   return $self->SUPER::_quote($label);
332 }
333
334 sub limit_dialect {
335     my $self = shift;
336     $self->{limit_dialect} = shift if @_;
337     return $self->{limit_dialect};
338 }
339
340 sub quote_char {
341     my $self = shift;
342     $self->{quote_char} = shift if @_;
343     return $self->{quote_char};
344 }
345
346 sub name_sep {
347     my $self = shift;
348     $self->{name_sep} = shift if @_;
349     return $self->{name_sep};
350 }
351
352 } # End of BEGIN block
353
354 =head1 NAME
355
356 DBIx::Class::Storage::DBI - DBI storage handler
357
358 =head1 SYNOPSIS
359
360   my $schema = MySchema->connect('dbi:SQLite:my.db');
361
362   $schema->storage->debug(1);
363   $schema->dbh_do("DROP TABLE authors");
364
365   $schema->resultset('Book')->search({
366      written_on => $schema->storage->datetime_parser(DateTime->now)
367   });
368
369 =head1 DESCRIPTION
370
371 This class represents the connection to an RDBMS via L<DBI>.  See
372 L<DBIx::Class::Storage> for general information.  This pod only
373 documents DBI-specific methods and behaviors.
374
375 =head1 METHODS
376
377 =cut
378
379 sub new {
380   my $new = shift->next::method(@_);
381
382   $new->transaction_depth(0);
383   $new->_sql_maker_opts({});
384   $new->{savepoints} = [];
385   $new->{_in_dbh_do} = 0;
386   $new->{_dbh_gen} = 0;
387
388   $new;
389 }
390
391 =head2 connect_info
392
393 This method is normally called by L<DBIx::Class::Schema/connection>, which
394 encapsulates its argument list in an arrayref before passing them here.
395
396 The argument list may contain:
397
398 =over
399
400 =item *
401
402 The same 4-element argument set one would normally pass to
403 L<DBI/connect>, optionally followed by
404 L<extra attributes|/DBIx::Class specific connection attributes>
405 recognized by DBIx::Class:
406
407   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
408
409 =item *
410
411 A single code reference which returns a connected 
412 L<DBI database handle|DBI/connect> optionally followed by 
413 L<extra attributes|/DBIx::Class specific connection attributes> recognized
414 by DBIx::Class:
415
416   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
417
418 =item *
419
420 A single hashref with all the attributes and the dsn/user/password
421 mixed together:
422
423   $connect_info_args = [{
424     dsn => $dsn,
425     user => $user,
426     password => $pass,
427     %dbi_attributes,
428     %extra_attributes,
429   }];
430
431 This is particularly useful for L<Catalyst> based applications, allowing the 
432 following config (L<Config::General> style):
433
434   <Model::DB>
435     schema_class   App::DB
436     <connect_info>
437       dsn          dbi:mysql:database=test
438       user         testuser
439       password     TestPass
440       AutoCommit   1
441     </connect_info>
442   </Model::DB>
443
444 =back
445
446 Please note that the L<DBI> docs recommend that you always explicitly
447 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
448 recommends that it be set to I<1>, and that you perform transactions
449 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
450 to I<1> if you do not do explicitly set it to zero.  This is the default 
451 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
452
453 =head3 DBIx::Class specific connection attributes
454
455 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
456 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
457 the following connection options. These options can be mixed in with your other
458 L<DBI> connection attributes, or placed in a seperate hashref
459 (C<\%extra_attributes>) as shown above.
460
461 Every time C<connect_info> is invoked, any previous settings for
462 these options will be cleared before setting the new ones, regardless of
463 whether any options are specified in the new C<connect_info>.
464
465
466 =over
467
468 =item on_connect_do
469
470 Specifies things to do immediately after connecting or re-connecting to
471 the database.  Its value may contain:
472
473 =over
474
475 =item an array reference
476
477 This contains SQL statements to execute in order.  Each element contains
478 a string or a code reference that returns a string.
479
480 =item a code reference
481
482 This contains some code to execute.  Unlike code references within an
483 array reference, its return value is ignored.
484
485 =back
486
487 =item on_disconnect_do
488
489 Takes arguments in the same form as L</on_connect_do> and executes them
490 immediately before disconnecting from the database.
491
492 Note, this only runs if you explicitly call L</disconnect> on the
493 storage object.
494
495 =item disable_sth_caching
496
497 If set to a true value, this option will disable the caching of
498 statement handles via L<DBI/prepare_cached>.
499
500 =item limit_dialect 
501
502 Sets the limit dialect. This is useful for JDBC-bridge among others
503 where the remote SQL-dialect cannot be determined by the name of the
504 driver alone. See also L<SQL::Abstract::Limit>.
505
506 =item quote_char
507
508 Specifies what characters to use to quote table and column names. If 
509 you use this you will want to specify L</name_sep> as well.
510
511 C<quote_char> expects either a single character, in which case is it
512 is placed on either side of the table/column name, or an arrayref of length
513 2 in which case the table/column name is placed between the elements.
514
515 For example under MySQL you should use C<< quote_char => '`' >>, and for
516 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
517
518 =item name_sep
519
520 This only needs to be used in conjunction with C<quote_char>, and is used to 
521 specify the charecter that seperates elements (schemas, tables, columns) from 
522 each other. In most cases this is simply a C<.>.
523
524 The consequences of not supplying this value is that L<SQL::Abstract>
525 will assume DBIx::Class' uses of aliases to be complete column
526 names. The output will look like I<"me.name"> when it should actually
527 be I<"me"."name">.
528
529 =item unsafe
530
531 This Storage driver normally installs its own C<HandleError>, sets
532 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
533 all database handles, including those supplied by a coderef.  It does this
534 so that it can have consistent and useful error behavior.
535
536 If you set this option to a true value, Storage will not do its usual
537 modifications to the database handle's attributes, and instead relies on
538 the settings in your connect_info DBI options (or the values you set in
539 your connection coderef, in the case that you are connecting via coderef).
540
541 Note that your custom settings can cause Storage to malfunction,
542 especially if you set a C<HandleError> handler that suppresses exceptions
543 and/or disable C<RaiseError>.
544
545 =item auto_savepoint
546
547 If this option is true, L<DBIx::Class> will use savepoints when nesting
548 transactions, making it possible to recover from failure in the inner
549 transaction without having to abort all outer transactions.
550
551 =item cursor_class
552
553 Use this argument to supply a cursor class other than the default
554 L<DBIx::Class::Storage::DBI::Cursor>.
555
556 =back
557
558 Some real-life examples of arguments to L</connect_info> and
559 L<DBIx::Class::Schema/connect>
560
561   # Simple SQLite connection
562   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
563
564   # Connect via subref
565   ->connect_info([ sub { DBI->connect(...) } ]);
566
567   # A bit more complicated
568   ->connect_info(
569     [
570       'dbi:Pg:dbname=foo',
571       'postgres',
572       'my_pg_password',
573       { AutoCommit => 1 },
574       { quote_char => q{"}, name_sep => q{.} },
575     ]
576   );
577
578   # Equivalent to the previous example
579   ->connect_info(
580     [
581       'dbi:Pg:dbname=foo',
582       'postgres',
583       'my_pg_password',
584       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
585     ]
586   );
587
588   # Same, but with hashref as argument
589   # See parse_connect_info for explanation
590   ->connect_info(
591     [{
592       dsn         => 'dbi:Pg:dbname=foo',
593       user        => 'postgres',
594       password    => 'my_pg_password',
595       AutoCommit  => 1,
596       quote_char  => q{"},
597       name_sep    => q{.},
598     }]
599   );
600
601   # Subref + DBIx::Class-specific connection options
602   ->connect_info(
603     [
604       sub { DBI->connect(...) },
605       {
606           quote_char => q{`},
607           name_sep => q{@},
608           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
609           disable_sth_caching => 1,
610       },
611     ]
612   );
613
614
615
616 =cut
617
618 sub connect_info {
619   my ($self, $info_arg) = @_;
620
621   return $self->_connect_info if !$info_arg;
622
623   my @args = @$info_arg;  # take a shallow copy for further mutilation
624   $self->_connect_info([@args]); # copy for _connect_info
625
626
627   # combine/pre-parse arguments depending on invocation style
628
629   my %attrs;
630   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
631     %attrs = %{ $args[1] || {} };
632     @args = $args[0];
633   }
634   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
635     %attrs = %{$args[0]};
636     @args = ();
637     for (qw/password user dsn/) {
638       unshift @args, delete $attrs{$_};
639     }
640   }
641   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
642     %attrs = (
643       % { $args[3] || {} },
644       % { $args[4] || {} },
645     );
646     @args = @args[0,1,2];
647   }
648
649   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
650   #  the new set of options
651   $self->_sql_maker(undef);
652   $self->_sql_maker_opts({});
653
654   if(keys %attrs) {
655     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
656       if(my $value = delete $attrs{$storage_opt}) {
657         $self->$storage_opt($value);
658       }
659     }
660     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
661       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
662         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
663       }
664     }
665   }
666
667   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
668
669   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
670   $self->_connect_info;
671 }
672
673 =head2 on_connect_do
674
675 This method is deprecated in favour of setting via L</connect_info>.
676
677
678 =head2 dbh_do
679
680 Arguments: ($subref | $method_name), @extra_coderef_args?
681
682 Execute the given $subref or $method_name using the new exception-based
683 connection management.
684
685 The first two arguments will be the storage object that C<dbh_do> was called
686 on and a database handle to use.  Any additional arguments will be passed
687 verbatim to the called subref as arguments 2 and onwards.
688
689 Using this (instead of $self->_dbh or $self->dbh) ensures correct
690 exception handling and reconnection (or failover in future subclasses).
691
692 Your subref should have no side-effects outside of the database, as
693 there is the potential for your subref to be partially double-executed
694 if the database connection was stale/dysfunctional.
695
696 Example:
697
698   my @stuff = $schema->storage->dbh_do(
699     sub {
700       my ($storage, $dbh, @cols) = @_;
701       my $cols = join(q{, }, @cols);
702       $dbh->selectrow_array("SELECT $cols FROM foo");
703     },
704     @column_list
705   );
706
707 =cut
708
709 sub dbh_do {
710   my $self = shift;
711   my $code = shift;
712
713   my $dbh = $self->_dbh;
714
715   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
716       || $self->{transaction_depth};
717
718   local $self->{_in_dbh_do} = 1;
719
720   my @result;
721   my $want_array = wantarray;
722
723   eval {
724     $self->_verify_pid if $dbh;
725     if(!$self->_dbh) {
726         $self->_populate_dbh;
727         $dbh = $self->_dbh;
728     }
729
730     if($want_array) {
731         @result = $self->$code($dbh, @_);
732     }
733     elsif(defined $want_array) {
734         $result[0] = $self->$code($dbh, @_);
735     }
736     else {
737         $self->$code($dbh, @_);
738     }
739   };
740
741   my $exception = $@;
742   if(!$exception) { return $want_array ? @result : $result[0] }
743
744   $self->throw_exception($exception) if $self->connected;
745
746   # We were not connected - reconnect and retry, but let any
747   #  exception fall right through this time
748   $self->_populate_dbh;
749   $self->$code($self->_dbh, @_);
750 }
751
752 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
753 # It also informs dbh_do to bypass itself while under the direction of txn_do,
754 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
755 sub txn_do {
756   my $self = shift;
757   my $coderef = shift;
758
759   ref $coderef eq 'CODE' or $self->throw_exception
760     ('$coderef must be a CODE reference');
761
762   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
763
764   local $self->{_in_dbh_do} = 1;
765
766   my @result;
767   my $want_array = wantarray;
768
769   my $tried = 0;
770   while(1) {
771     eval {
772       $self->_verify_pid if $self->_dbh;
773       $self->_populate_dbh if !$self->_dbh;
774
775       $self->txn_begin;
776       if($want_array) {
777           @result = $coderef->(@_);
778       }
779       elsif(defined $want_array) {
780           $result[0] = $coderef->(@_);
781       }
782       else {
783           $coderef->(@_);
784       }
785       $self->txn_commit;
786     };
787
788     my $exception = $@;
789     if(!$exception) { return $want_array ? @result : $result[0] }
790
791     if($tried++ > 0 || $self->connected) {
792       eval { $self->txn_rollback };
793       my $rollback_exception = $@;
794       if($rollback_exception) {
795         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
796         $self->throw_exception($exception)  # propagate nested rollback
797           if $rollback_exception =~ /$exception_class/;
798
799         $self->throw_exception(
800           "Transaction aborted: ${exception}. "
801           . "Rollback failed: ${rollback_exception}"
802         );
803       }
804       $self->throw_exception($exception)
805     }
806
807     # We were not connected, and was first try - reconnect and retry
808     # via the while loop
809     $self->_populate_dbh;
810   }
811 }
812
813 =head2 disconnect
814
815 Our C<disconnect> method also performs a rollback first if the
816 database is not in C<AutoCommit> mode.
817
818 =cut
819
820 sub disconnect {
821   my ($self) = @_;
822
823   if( $self->connected ) {
824     my $connection_do = $self->on_disconnect_do;
825     $self->_do_connection_actions($connection_do) if ref($connection_do);
826
827     $self->_dbh->rollback unless $self->_dbh_autocommit;
828     $self->_dbh->disconnect;
829     $self->_dbh(undef);
830     $self->{_dbh_gen}++;
831   }
832 }
833
834 =head2 with_deferred_fk_checks
835
836 =over 4
837
838 =item Arguments: C<$coderef>
839
840 =item Return Value: The return value of $coderef
841
842 =back
843
844 Storage specific method to run the code ref with FK checks deferred or
845 in MySQL's case disabled entirely.
846
847 =cut
848
849 # Storage subclasses should override this
850 sub with_deferred_fk_checks {
851   my ($self, $sub) = @_;
852
853   $sub->();
854 }
855
856 sub connected {
857   my ($self) = @_;
858
859   if(my $dbh = $self->_dbh) {
860       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
861           $self->_dbh(undef);
862           $self->{_dbh_gen}++;
863           return;
864       }
865       else {
866           $self->_verify_pid;
867           return 0 if !$self->_dbh;
868       }
869       return ($dbh->FETCH('Active') && $dbh->ping);
870   }
871
872   return 0;
873 }
874
875 # handle pid changes correctly
876 #  NOTE: assumes $self->_dbh is a valid $dbh
877 sub _verify_pid {
878   my ($self) = @_;
879
880   return if defined $self->_conn_pid && $self->_conn_pid == $$;
881
882   $self->_dbh->{InactiveDestroy} = 1;
883   $self->_dbh(undef);
884   $self->{_dbh_gen}++;
885
886   return;
887 }
888
889 sub ensure_connected {
890   my ($self) = @_;
891
892   unless ($self->connected) {
893     $self->_populate_dbh;
894   }
895 }
896
897 =head2 dbh
898
899 Returns the dbh - a data base handle of class L<DBI>.
900
901 =cut
902
903 sub dbh {
904   my ($self) = @_;
905
906   $self->ensure_connected;
907   return $self->_dbh;
908 }
909
910 sub _sql_maker_args {
911     my ($self) = @_;
912     
913     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
914 }
915
916 sub sql_maker {
917   my ($self) = @_;
918   unless ($self->_sql_maker) {
919     my $sql_maker_class = $self->sql_maker_class;
920     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
921   }
922   return $self->_sql_maker;
923 }
924
925 sub _rebless {}
926
927 sub _populate_dbh {
928   my ($self) = @_;
929   my @info = @{$self->_dbi_connect_info || []};
930   $self->_dbh($self->_connect(@info));
931
932   # Always set the transaction depth on connect, since
933   #  there is no transaction in progress by definition
934   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
935
936   if(ref $self eq 'DBIx::Class::Storage::DBI') {
937     my $driver = $self->_dbh->{Driver}->{Name};
938     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
939       bless $self, "DBIx::Class::Storage::DBI::${driver}";
940       $self->_rebless();
941     }
942   }
943
944   $self->_conn_pid($$);
945   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
946
947   my $connection_do = $self->on_connect_do;
948   $self->_do_connection_actions($connection_do) if ref($connection_do);
949 }
950
951 sub _do_connection_actions {
952   my $self = shift;
953   my $connection_do = shift;
954
955   if (ref $connection_do eq 'ARRAY') {
956     $self->_do_query($_) foreach @$connection_do;
957   }
958   elsif (ref $connection_do eq 'CODE') {
959     $connection_do->($self);
960   }
961
962   return $self;
963 }
964
965 sub _do_query {
966   my ($self, $action) = @_;
967
968   if (ref $action eq 'CODE') {
969     $action = $action->($self);
970     $self->_do_query($_) foreach @$action;
971   }
972   else {
973     # Most debuggers expect ($sql, @bind), so we need to exclude
974     # the attribute hash which is the second argument to $dbh->do
975     # furthermore the bind values are usually to be presented
976     # as named arrayref pairs, so wrap those here too
977     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
978     my $sql = shift @do_args;
979     my $attrs = shift @do_args;
980     my @bind = map { [ undef, $_ ] } @do_args;
981
982     $self->_query_start($sql, @bind);
983     $self->_dbh->do($sql, $attrs, @do_args);
984     $self->_query_end($sql, @bind);
985   }
986
987   return $self;
988 }
989
990 sub _connect {
991   my ($self, @info) = @_;
992
993   $self->throw_exception("You failed to provide any connection info")
994     if !@info;
995
996   my ($old_connect_via, $dbh);
997
998   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
999     $old_connect_via = $DBI::connect_via;
1000     $DBI::connect_via = 'connect';
1001   }
1002
1003   eval {
1004     if(ref $info[0] eq 'CODE') {
1005        $dbh = &{$info[0]}
1006     }
1007     else {
1008        $dbh = DBI->connect(@info);
1009     }
1010
1011     if($dbh && !$self->unsafe) {
1012       my $weak_self = $self;
1013       weaken($weak_self);
1014       $dbh->{HandleError} = sub {
1015           if ($weak_self) {
1016             $weak_self->throw_exception("DBI Exception: $_[0]");
1017           }
1018           else {
1019             croak ("DBI Exception: $_[0]");
1020           }
1021       };
1022       $dbh->{ShowErrorStatement} = 1;
1023       $dbh->{RaiseError} = 1;
1024       $dbh->{PrintError} = 0;
1025     }
1026   };
1027
1028   $DBI::connect_via = $old_connect_via if $old_connect_via;
1029
1030   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1031     if !$dbh || $@;
1032
1033   $self->_dbh_autocommit($dbh->{AutoCommit});
1034
1035   $dbh;
1036 }
1037
1038 sub svp_begin {
1039   my ($self, $name) = @_;
1040
1041   $name = $self->_svp_generate_name
1042     unless defined $name;
1043
1044   $self->throw_exception ("You can't use savepoints outside a transaction")
1045     if $self->{transaction_depth} == 0;
1046
1047   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1048     unless $self->can('_svp_begin');
1049   
1050   push @{ $self->{savepoints} }, $name;
1051
1052   $self->debugobj->svp_begin($name) if $self->debug;
1053   
1054   return $self->_svp_begin($name);
1055 }
1056
1057 sub svp_release {
1058   my ($self, $name) = @_;
1059
1060   $self->throw_exception ("You can't use savepoints outside a transaction")
1061     if $self->{transaction_depth} == 0;
1062
1063   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1064     unless $self->can('_svp_release');
1065
1066   if (defined $name) {
1067     $self->throw_exception ("Savepoint '$name' does not exist")
1068       unless grep { $_ eq $name } @{ $self->{savepoints} };
1069
1070     # Dig through the stack until we find the one we are releasing.  This keeps
1071     # the stack up to date.
1072     my $svp;
1073
1074     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1075   } else {
1076     $name = pop @{ $self->{savepoints} };
1077   }
1078
1079   $self->debugobj->svp_release($name) if $self->debug;
1080
1081   return $self->_svp_release($name);
1082 }
1083
1084 sub svp_rollback {
1085   my ($self, $name) = @_;
1086
1087   $self->throw_exception ("You can't use savepoints outside a transaction")
1088     if $self->{transaction_depth} == 0;
1089
1090   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1091     unless $self->can('_svp_rollback');
1092
1093   if (defined $name) {
1094       # If they passed us a name, verify that it exists in the stack
1095       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1096           $self->throw_exception("Savepoint '$name' does not exist!");
1097       }
1098
1099       # Dig through the stack until we find the one we are releasing.  This keeps
1100       # the stack up to date.
1101       while(my $s = pop(@{ $self->{savepoints} })) {
1102           last if($s eq $name);
1103       }
1104       # Add the savepoint back to the stack, as a rollback doesn't remove the
1105       # named savepoint, only everything after it.
1106       push(@{ $self->{savepoints} }, $name);
1107   } else {
1108       # We'll assume they want to rollback to the last savepoint
1109       $name = $self->{savepoints}->[-1];
1110   }
1111
1112   $self->debugobj->svp_rollback($name) if $self->debug;
1113   
1114   return $self->_svp_rollback($name);
1115 }
1116
1117 sub _svp_generate_name {
1118     my ($self) = @_;
1119
1120     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1121 }
1122
1123 sub txn_begin {
1124   my $self = shift;
1125   $self->ensure_connected();
1126   if($self->{transaction_depth} == 0) {
1127     $self->debugobj->txn_begin()
1128       if $self->debug;
1129     # this isn't ->_dbh-> because
1130     #  we should reconnect on begin_work
1131     #  for AutoCommit users
1132     $self->dbh->begin_work;
1133   } elsif ($self->auto_savepoint) {
1134     $self->svp_begin;
1135   }
1136   $self->{transaction_depth}++;
1137 }
1138
1139 sub txn_commit {
1140   my $self = shift;
1141   if ($self->{transaction_depth} == 1) {
1142     my $dbh = $self->_dbh;
1143     $self->debugobj->txn_commit()
1144       if ($self->debug);
1145     $dbh->commit;
1146     $self->{transaction_depth} = 0
1147       if $self->_dbh_autocommit;
1148   }
1149   elsif($self->{transaction_depth} > 1) {
1150     $self->{transaction_depth}--;
1151     $self->svp_release
1152       if $self->auto_savepoint;
1153   }
1154 }
1155
1156 sub txn_rollback {
1157   my $self = shift;
1158   my $dbh = $self->_dbh;
1159   eval {
1160     if ($self->{transaction_depth} == 1) {
1161       $self->debugobj->txn_rollback()
1162         if ($self->debug);
1163       $self->{transaction_depth} = 0
1164         if $self->_dbh_autocommit;
1165       $dbh->rollback;
1166     }
1167     elsif($self->{transaction_depth} > 1) {
1168       $self->{transaction_depth}--;
1169       if ($self->auto_savepoint) {
1170         $self->svp_rollback;
1171         $self->svp_release;
1172       }
1173     }
1174     else {
1175       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1176     }
1177   };
1178   if ($@) {
1179     my $error = $@;
1180     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1181     $error =~ /$exception_class/ and $self->throw_exception($error);
1182     # ensure that a failed rollback resets the transaction depth
1183     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1184     $self->throw_exception($error);
1185   }
1186 }
1187
1188 # This used to be the top-half of _execute.  It was split out to make it
1189 #  easier to override in NoBindVars without duping the rest.  It takes up
1190 #  all of _execute's args, and emits $sql, @bind.
1191 sub _prep_for_execute {
1192   my ($self, $op, $extra_bind, $ident, $args) = @_;
1193
1194   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1195     $ident = $ident->from();
1196   }
1197
1198   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1199
1200   unshift(@bind,
1201     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1202       if $extra_bind;
1203   return ($sql, \@bind);
1204 }
1205
1206 sub _fix_bind_params {
1207     my ($self, @bind) = @_;
1208
1209     ### Turn @bind from something like this:
1210     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1211     ### to this:
1212     ###   ( "'1'", "'1'", "'3'" )
1213     return
1214         map {
1215             if ( defined( $_ && $_->[1] ) ) {
1216                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1217             }
1218             else { q{'NULL'}; }
1219         } @bind;
1220 }
1221
1222 sub _query_start {
1223     my ( $self, $sql, @bind ) = @_;
1224
1225     if ( $self->debug ) {
1226         @bind = $self->_fix_bind_params(@bind);
1227         
1228         $self->debugobj->query_start( $sql, @bind );
1229     }
1230 }
1231
1232 sub _query_end {
1233     my ( $self, $sql, @bind ) = @_;
1234
1235     if ( $self->debug ) {
1236         @bind = $self->_fix_bind_params(@bind);
1237         $self->debugobj->query_end( $sql, @bind );
1238     }
1239 }
1240
1241 sub _dbh_execute {
1242   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1243
1244   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1245
1246   $self->_query_start( $sql, @$bind );
1247
1248   my $sth = $self->sth($sql,$op);
1249
1250   my $placeholder_index = 1; 
1251
1252   foreach my $bound (@$bind) {
1253     my $attributes = {};
1254     my($column_name, @data) = @$bound;
1255
1256     if ($bind_attributes) {
1257       $attributes = $bind_attributes->{$column_name}
1258       if defined $bind_attributes->{$column_name};
1259     }
1260
1261     foreach my $data (@data) {
1262       my $ref = ref $data;
1263       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1264
1265       $sth->bind_param($placeholder_index, $data, $attributes);
1266       $placeholder_index++;
1267     }
1268   }
1269
1270   # Can this fail without throwing an exception anyways???
1271   my $rv = $sth->execute();
1272   $self->throw_exception($sth->errstr) if !$rv;
1273
1274   $self->_query_end( $sql, @$bind );
1275
1276   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1277 }
1278
1279 sub _execute {
1280     my $self = shift;
1281     $self->dbh_do('_dbh_execute', @_)
1282 }
1283
1284 sub insert {
1285   my ($self, $source, $to_insert) = @_;
1286   
1287   my $ident = $source->from; 
1288   my $bind_attributes = $self->source_bind_attributes($source);
1289
1290   my $updated_cols = {};
1291
1292   $self->ensure_connected;
1293   foreach my $col ( $source->columns ) {
1294     if ( !defined $to_insert->{$col} ) {
1295       my $col_info = $source->column_info($col);
1296
1297       if ( $col_info->{auto_nextval} ) {
1298         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1299       }
1300     }
1301   }
1302
1303   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1304
1305   return $updated_cols;
1306 }
1307
1308 ## Still not quite perfect, and EXPERIMENTAL
1309 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1310 ## scalar refs, or at least, all the same type as the first set, the statement is
1311 ## only prepped once.
1312 sub insert_bulk {
1313   my ($self, $source, $cols, $data) = @_;
1314   my %colvalues;
1315   my $table = $source->from;
1316   @colvalues{@$cols} = (0..$#$cols);
1317   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1318   
1319   $self->_query_start( $sql, @bind );
1320   my $sth = $self->sth($sql);
1321
1322 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1323
1324   ## This must be an arrayref, else nothing works!
1325   
1326   my $tuple_status = [];
1327   
1328   ##use Data::Dumper;
1329   ##print STDERR Dumper( $data, $sql, [@bind] );
1330
1331   my $time = time();
1332
1333   ## Get the bind_attributes, if any exist
1334   my $bind_attributes = $self->source_bind_attributes($source);
1335
1336   ## Bind the values and execute
1337   my $placeholder_index = 1; 
1338
1339   foreach my $bound (@bind) {
1340
1341     my $attributes = {};
1342     my ($column_name, $data_index) = @$bound;
1343
1344     if( $bind_attributes ) {
1345       $attributes = $bind_attributes->{$column_name}
1346       if defined $bind_attributes->{$column_name};
1347     }
1348
1349     my @data = map { $_->[$data_index] } @$data;
1350
1351     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1352     $placeholder_index++;
1353   }
1354   my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
1355   $self->throw_exception($sth->errstr) if !$rv;
1356
1357   $self->_query_end( $sql, @bind );
1358   return (wantarray ? ($rv, $sth, @bind) : $rv);
1359 }
1360
1361 sub update {
1362   my $self = shift @_;
1363   my $source = shift @_;
1364   my $bind_attributes = $self->source_bind_attributes($source);
1365   
1366   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1367 }
1368
1369
1370 sub delete {
1371   my $self = shift @_;
1372   my $source = shift @_;
1373   
1374   my $bind_attrs = {}; ## If ever it's needed...
1375   
1376   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1377 }
1378
1379 sub _select {
1380   my $self = shift;
1381   my $sql_maker = $self->sql_maker;
1382   local $sql_maker->{for};
1383   return $self->_execute($self->_select_args(@_));
1384 }
1385
1386 sub _select_args {
1387   my ($self, $ident, $select, $condition, $attrs) = @_;
1388   my $order = $attrs->{order_by};
1389
1390   my $for = delete $attrs->{for};
1391   my $sql_maker = $self->sql_maker;
1392   $sql_maker->{for} = $for;
1393
1394   if (exists $attrs->{group_by} || $attrs->{having}) {
1395     $order = {
1396       group_by => $attrs->{group_by},
1397       having => $attrs->{having},
1398       ($order ? (order_by => $order) : ())
1399     };
1400   }
1401   my $bind_attrs = {}; ## Future support
1402   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1403   if ($attrs->{software_limit} ||
1404       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1405         $attrs->{software_limit} = 1;
1406   } else {
1407     $self->throw_exception("rows attribute must be positive if present")
1408       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1409
1410     # MySQL actually recommends this approach.  I cringe.
1411     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1412     push @args, $attrs->{rows}, $attrs->{offset};
1413   }
1414   return @args;
1415 }
1416
1417 sub source_bind_attributes {
1418   my ($self, $source) = @_;
1419   
1420   my $bind_attributes;
1421   foreach my $column ($source->columns) {
1422   
1423     my $data_type = $source->column_info($column)->{data_type} || '';
1424     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1425      if $data_type;
1426   }
1427
1428   return $bind_attributes;
1429 }
1430
1431 =head2 select
1432
1433 =over 4
1434
1435 =item Arguments: $ident, $select, $condition, $attrs
1436
1437 =back
1438
1439 Handle a SQL select statement.
1440
1441 =cut
1442
1443 sub select {
1444   my $self = shift;
1445   my ($ident, $select, $condition, $attrs) = @_;
1446   return $self->cursor_class->new($self, \@_, $attrs);
1447 }
1448
1449 sub select_single {
1450   my $self = shift;
1451   my ($rv, $sth, @bind) = $self->_select(@_);
1452   my @row = $sth->fetchrow_array;
1453   my @nextrow = $sth->fetchrow_array if @row;
1454   if(@row && @nextrow) {
1455     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1456   }
1457   # Need to call finish() to work round broken DBDs
1458   $sth->finish();
1459   return @row;
1460 }
1461
1462 =head2 sth
1463
1464 =over 4
1465
1466 =item Arguments: $sql
1467
1468 =back
1469
1470 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1471
1472 =cut
1473
1474 sub _dbh_sth {
1475   my ($self, $dbh, $sql) = @_;
1476
1477   # 3 is the if_active parameter which avoids active sth re-use
1478   my $sth = $self->disable_sth_caching
1479     ? $dbh->prepare($sql)
1480     : $dbh->prepare_cached($sql, {}, 3);
1481
1482   # XXX You would think RaiseError would make this impossible,
1483   #  but apparently that's not true :(
1484   $self->throw_exception($dbh->errstr) if !$sth;
1485
1486   $sth;
1487 }
1488
1489 sub sth {
1490   my ($self, $sql) = @_;
1491   $self->dbh_do('_dbh_sth', $sql);
1492 }
1493
1494 sub _dbh_columns_info_for {
1495   my ($self, $dbh, $table) = @_;
1496
1497   if ($dbh->can('column_info')) {
1498     my %result;
1499     eval {
1500       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1501       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1502       $sth->execute();
1503       while ( my $info = $sth->fetchrow_hashref() ){
1504         my %column_info;
1505         $column_info{data_type}   = $info->{TYPE_NAME};
1506         $column_info{size}      = $info->{COLUMN_SIZE};
1507         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1508         $column_info{default_value} = $info->{COLUMN_DEF};
1509         my $col_name = $info->{COLUMN_NAME};
1510         $col_name =~ s/^\"(.*)\"$/$1/;
1511
1512         $result{$col_name} = \%column_info;
1513       }
1514     };
1515     return \%result if !$@ && scalar keys %result;
1516   }
1517
1518   my %result;
1519   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1520   $sth->execute;
1521   my @columns = @{$sth->{NAME_lc}};
1522   for my $i ( 0 .. $#columns ){
1523     my %column_info;
1524     $column_info{data_type} = $sth->{TYPE}->[$i];
1525     $column_info{size} = $sth->{PRECISION}->[$i];
1526     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1527
1528     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1529       $column_info{data_type} = $1;
1530       $column_info{size}    = $2;
1531     }
1532
1533     $result{$columns[$i]} = \%column_info;
1534   }
1535   $sth->finish;
1536
1537   foreach my $col (keys %result) {
1538     my $colinfo = $result{$col};
1539     my $type_num = $colinfo->{data_type};
1540     my $type_name;
1541     if(defined $type_num && $dbh->can('type_info')) {
1542       my $type_info = $dbh->type_info($type_num);
1543       $type_name = $type_info->{TYPE_NAME} if $type_info;
1544       $colinfo->{data_type} = $type_name if $type_name;
1545     }
1546   }
1547
1548   return \%result;
1549 }
1550
1551 sub columns_info_for {
1552   my ($self, $table) = @_;
1553   $self->dbh_do('_dbh_columns_info_for', $table);
1554 }
1555
1556 =head2 last_insert_id
1557
1558 Return the row id of the last insert.
1559
1560 =cut
1561
1562 sub _dbh_last_insert_id {
1563     # All Storage's need to register their own _dbh_last_insert_id
1564     # the old SQLite-based method was highly inappropriate
1565
1566     my $self = shift;
1567     my $class = ref $self;
1568     $self->throw_exception (<<EOE);
1569
1570 No _dbh_last_insert_id() method found in $class.
1571 Since the method of obtaining the autoincrement id of the last insert
1572 operation varies greatly between different databases, this method must be
1573 individually implemented for every storage class.
1574 EOE
1575 }
1576
1577 sub last_insert_id {
1578   my $self = shift;
1579   $self->dbh_do('_dbh_last_insert_id', @_);
1580 }
1581
1582 =head2 sqlt_type
1583
1584 Returns the database driver name.
1585
1586 =cut
1587
1588 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1589
1590 =head2 bind_attribute_by_data_type
1591
1592 Given a datatype from column info, returns a database specific bind
1593 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1594 let the database planner just handle it.
1595
1596 Generally only needed for special case column types, like bytea in postgres.
1597
1598 =cut
1599
1600 sub bind_attribute_by_data_type {
1601     return;
1602 }
1603
1604 =head2 create_ddl_dir
1605
1606 =over 4
1607
1608 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1609
1610 =back
1611
1612 Creates a SQL file based on the Schema, for each of the specified
1613 database types, in the given directory.
1614
1615 By default, C<\%sqlt_args> will have
1616
1617  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1618
1619 merged with the hash passed in. To disable any of those features, pass in a 
1620 hashref like the following
1621
1622  { ignore_constraint_names => 0, # ... other options }
1623
1624 =cut
1625
1626 sub create_ddl_dir {
1627   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1628
1629   if(!$dir || !-d $dir) {
1630     warn "No directory given, using ./\n";
1631     $dir = "./";
1632   }
1633   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1634   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1635
1636   my $schema_version = $schema->schema_version || '1.x';
1637   $version ||= $schema_version;
1638
1639   $sqltargs = {
1640     add_drop_table => 1, 
1641     ignore_constraint_names => 1,
1642     ignore_index_names => 1,
1643     %{$sqltargs || {}}
1644   };
1645
1646   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1647       . $self->_check_sqlt_message . q{'})
1648           if !$self->_check_sqlt_version;
1649
1650   my $sqlt = SQL::Translator->new( $sqltargs );
1651
1652   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1653   my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
1654
1655   foreach my $db (@$databases) {
1656     $sqlt->reset();
1657     $sqlt->{schema} = $sqlt_schema;
1658     $sqlt->producer($db);
1659
1660     my $file;
1661     my $filename = $schema->ddl_filename($db, $version, $dir);
1662     if (-e $filename && ($version eq $schema_version )) {
1663       # if we are dumping the current version, overwrite the DDL
1664       warn "Overwriting existing DDL file - $filename";
1665       unlink($filename);
1666     }
1667
1668     my $output = $sqlt->translate;
1669     if(!$output) {
1670       warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1671       next;
1672     }
1673     if(!open($file, ">$filename")) {
1674       $self->throw_exception("Can't open $filename for writing ($!)");
1675       next;
1676     }
1677     print $file $output;
1678     close($file);
1679   
1680     next unless ($preversion);
1681
1682     require SQL::Translator::Diff;
1683
1684     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1685     if(!-e $prefilename) {
1686       warn("No previous schema file found ($prefilename)");
1687       next;
1688     }
1689
1690     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1691     if(-e $difffile) {
1692       warn("Overwriting existing diff file - $difffile");
1693       unlink($difffile);
1694     }
1695     
1696     my $source_schema;
1697     {
1698       my $t = SQL::Translator->new($sqltargs);
1699       $t->debug( 0 );
1700       $t->trace( 0 );
1701       $t->parser( $db )                       or die $t->error;
1702       my $out = $t->translate( $prefilename ) or die $t->error;
1703       $source_schema = $t->schema;
1704       unless ( $source_schema->name ) {
1705         $source_schema->name( $prefilename );
1706       }
1707     }
1708
1709     # The "new" style of producers have sane normalization and can support 
1710     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1711     # And we have to diff parsed SQL against parsed SQL.
1712     my $dest_schema = $sqlt_schema;
1713     
1714     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1715       my $t = SQL::Translator->new($sqltargs);
1716       $t->debug( 0 );
1717       $t->trace( 0 );
1718       $t->parser( $db )                    or die $t->error;
1719       my $out = $t->translate( $filename ) or die $t->error;
1720       $dest_schema = $t->schema;
1721       $dest_schema->name( $filename )
1722         unless $dest_schema->name;
1723     }
1724     
1725     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1726                                                   $dest_schema,   $db,
1727                                                   $sqltargs
1728                                                  );
1729     if(!open $file, ">$difffile") { 
1730       $self->throw_exception("Can't write to $difffile ($!)");
1731       next;
1732     }
1733     print $file $diff;
1734     close($file);
1735   }
1736 }
1737
1738 =head2 deployment_statements
1739
1740 =over 4
1741
1742 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1743
1744 =back
1745
1746 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1747 The database driver name is given by C<$type>, though the value from
1748 L</sqlt_type> is used if it is not specified.
1749
1750 C<$directory> is used to return statements from files in a previously created
1751 L</create_ddl_dir> directory and is optional. The filenames are constructed
1752 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1753
1754 If no C<$directory> is specified then the statements are constructed on the
1755 fly using L<SQL::Translator> and C<$version> is ignored.
1756
1757 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1758
1759 =cut
1760
1761 sub deployment_statements {
1762   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1763   # Need to be connected to get the correct sqlt_type
1764   $self->ensure_connected() unless $type;
1765   $type ||= $self->sqlt_type;
1766   $version ||= $schema->schema_version || '1.x';
1767   $dir ||= './';
1768   my $filename = $schema->ddl_filename($type, $version, $dir);
1769   if(-f $filename)
1770   {
1771       my $file;
1772       open($file, "<$filename") 
1773         or $self->throw_exception("Can't open $filename ($!)");
1774       my @rows = <$file>;
1775       close($file);
1776       return join('', @rows);
1777   }
1778
1779   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
1780       . $self->_check_sqlt_message . q{'})
1781           if !$self->_check_sqlt_version;
1782
1783   require SQL::Translator::Parser::DBIx::Class;
1784   eval qq{use SQL::Translator::Producer::${type}};
1785   $self->throw_exception($@) if $@;
1786
1787   # sources needs to be a parser arg, but for simplicty allow at top level 
1788   # coming in
1789   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1790       if exists $sqltargs->{sources};
1791
1792   my $tr = SQL::Translator->new(%$sqltargs);
1793   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1794   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1795 }
1796
1797 sub deploy {
1798   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1799   my $deploy = sub {
1800     my $line = shift;
1801     return if($line =~ /^--/);
1802     return if(!$line);
1803     # next if($line =~ /^DROP/m);
1804     return if($line =~ /^BEGIN TRANSACTION/m);
1805     return if($line =~ /^COMMIT/m);
1806     return if $line =~ /^\s+$/; # skip whitespace only
1807     $self->_query_start($line);
1808     eval {
1809       $self->dbh->do($line); # shouldn't be using ->dbh ?
1810     };
1811     if ($@) {
1812       warn qq{$@ (running "${line}")};
1813     }
1814     $self->_query_end($line);
1815   };
1816   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } );
1817   if (@statements > 1) {
1818     foreach my $statement (@statements) {
1819       $deploy->( $statement );
1820     }
1821   }
1822   elsif (@statements == 1) {
1823     foreach my $line ( split(";\n", $statements[0])) {
1824       $deploy->( $line );
1825     }
1826   }
1827 }
1828
1829 =head2 datetime_parser
1830
1831 Returns the datetime parser class
1832
1833 =cut
1834
1835 sub datetime_parser {
1836   my $self = shift;
1837   return $self->{datetime_parser} ||= do {
1838     $self->ensure_connected;
1839     $self->build_datetime_parser(@_);
1840   };
1841 }
1842
1843 =head2 datetime_parser_type
1844
1845 Defines (returns) the datetime parser class - currently hardwired to
1846 L<DateTime::Format::MySQL>
1847
1848 =cut
1849
1850 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1851
1852 =head2 build_datetime_parser
1853
1854 See L</datetime_parser>
1855
1856 =cut
1857
1858 sub build_datetime_parser {
1859   my $self = shift;
1860   my $type = $self->datetime_parser_type(@_);
1861   eval "use ${type}";
1862   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1863   return $type;
1864 }
1865
1866 {
1867     my $_check_sqlt_version; # private
1868     my $_check_sqlt_message; # private
1869     sub _check_sqlt_version {
1870         return $_check_sqlt_version if defined $_check_sqlt_version;
1871         eval 'use SQL::Translator "0.09003"';
1872         $_check_sqlt_message = $@ || '';
1873         $_check_sqlt_version = !$@;
1874     }
1875
1876     sub _check_sqlt_message {
1877         _check_sqlt_version if !defined $_check_sqlt_message;
1878         $_check_sqlt_message;
1879     }
1880 }
1881
1882 =head2 is_replicating
1883
1884 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
1885 replicate from a master database.  Default is undef, which is the result
1886 returned by databases that don't support replication.
1887
1888 =cut
1889
1890 sub is_replicating {
1891     return;
1892     
1893 }
1894
1895 =head2 lag_behind_master
1896
1897 Returns a number that represents a certain amount of lag behind a master db
1898 when a given storage is replicating.  The number is database dependent, but
1899 starts at zero and increases with the amount of lag. Default in undef
1900
1901 =cut
1902
1903 sub lag_behind_master {
1904     return;
1905 }
1906
1907 sub DESTROY {
1908   my $self = shift;
1909   return if !$self->_dbh;
1910   $self->_verify_pid;
1911   $self->_dbh(undef);
1912 }
1913
1914 1;
1915
1916 =head1 USAGE NOTES
1917
1918 =head2 DBIx::Class and AutoCommit
1919
1920 DBIx::Class can do some wonderful magic with handling exceptions,
1921 disconnections, and transactions when you use C<< AutoCommit => 1 >>
1922 combined with C<txn_do> for transaction support.
1923
1924 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
1925 in an assumed transaction between commits, and you're telling us you'd
1926 like to manage that manually.  A lot of the magic protections offered by
1927 this module will go away.  We can't protect you from exceptions due to database
1928 disconnects because we don't know anything about how to restart your
1929 transactions.  You're on your own for handling all sorts of exceptional
1930 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
1931 be with raw DBI.
1932
1933
1934 =head1 SQL METHODS
1935
1936 The module defines a set of methods within the DBIC::SQL::Abstract
1937 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1938 SQL query functions.
1939
1940 The following methods are extended:-
1941
1942 =over 4
1943
1944 =item delete
1945
1946 =item insert
1947
1948 =item select
1949
1950 =item update
1951
1952 =item limit_dialect
1953
1954 See L</connect_info> for details.
1955
1956 =item quote_char
1957
1958 See L</connect_info> for details.
1959
1960 =item name_sep
1961
1962 See L</connect_info> for details.
1963
1964 =back
1965
1966 =head1 AUTHORS
1967
1968 Matt S. Trout <mst@shadowcatsystems.co.uk>
1969
1970 Andy Grundman <andy@hybridized.org>
1971
1972 =head1 LICENSE
1973
1974 You may distribute this code under the same terms as Perl itself.
1975
1976 =cut