* change count with group_by (distinct) to use a subquery
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use SQL::Abstract::Limit;
11 use DBIx::Class::Storage::DBI::Cursor;
12 use DBIx::Class::Storage::Statistics;
13 use Scalar::Util qw/blessed weaken/;
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
18 );
19
20 # the values for these accessors are picked out (and deleted) from
21 # the attribute hashref passed to connect_info
22 my @storage_options = qw/
23   on_connect_do on_disconnect_do disable_sth_caching unsafe auto_savepoint
24 /;
25 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
26
27
28 # default cursor class, overridable in connect_info attributes
29 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
30
31 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
32 __PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
33
34 BEGIN {
35
36 package # Hide from PAUSE
37   DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
38
39 use base qw/SQL::Abstract::Limit/;
40
41 # This prevents the caching of $dbh in S::A::L, I believe
42 sub new {
43   my $self = shift->SUPER::new(@_);
44
45   # If limit_dialect is a ref (like a $dbh), go ahead and replace
46   #   it with what it resolves to:
47   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
48     if ref $self->{limit_dialect};
49
50   $self;
51 }
52
53 # DB2 is the only remaining DB using this. Even though we are not sure if
54 # RowNumberOver is still needed here (should be part of SQLA) leave the 
55 # code in place
56 sub _RowNumberOver {
57   my ($self, $sql, $order, $rows, $offset ) = @_;
58
59   $offset += 1;
60   my $last = $rows + $offset;
61   my ( $order_by ) = $self->_order_by( $order );
62
63   $sql = <<"SQL";
64 SELECT * FROM
65 (
66    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
67       $sql
68       $order_by
69    ) Q1
70 ) Q2
71 WHERE ROW_NUM BETWEEN $offset AND $last
72
73 SQL
74
75   return $sql;
76 }
77
78
79 # While we're at it, this should make LIMIT queries more efficient,
80 #  without digging into things too deeply
81 use Scalar::Util 'blessed';
82 sub _find_syntax {
83   my ($self, $syntax) = @_;
84   
85   # DB2 is the only remaining DB using this. Even though we are not sure if
86   # RowNumberOver is still needed here (should be part of SQLA) leave the 
87   # code in place
88   my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
89   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
90     return 'RowNumberOver';
91   }
92   
93   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
94 }
95
96 sub select {
97   my ($self, $table, $fields, $where, $order, @rest) = @_;
98   local $self->{having_bind} = [];
99   local $self->{from_bind} = [];
100
101   if (ref $table eq 'SCALAR') {
102     $table = $$table;
103   }
104   elsif (not ref $table) {
105     $table = $self->_quote($table);
106   }
107   local $self->{rownum_hack_count} = 1
108     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
109   @rest = (-1) unless defined $rest[0];
110   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
111     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
112   my ($sql, @where_bind) = $self->SUPER::select(
113     $table, $self->_recurse_fields($fields), $where, $order, @rest
114   );
115   $sql .= 
116     $self->{for} ?
117     (
118       $self->{for} eq 'update' ? ' FOR UPDATE' :
119       $self->{for} eq 'shared' ? ' FOR SHARE'  :
120       ''
121     ) :
122     ''
123   ;
124   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
125 }
126
127 sub insert {
128   my $self = shift;
129   my $table = shift;
130   $table = $self->_quote($table) unless ref($table);
131   $self->SUPER::insert($table, @_);
132 }
133
134 sub update {
135   my $self = shift;
136   my $table = shift;
137   $table = $self->_quote($table) unless ref($table);
138   $self->SUPER::update($table, @_);
139 }
140
141 sub delete {
142   my $self = shift;
143   my $table = shift;
144   $table = $self->_quote($table) unless ref($table);
145   $self->SUPER::delete($table, @_);
146 }
147
148 sub _emulate_limit {
149   my $self = shift;
150   if ($_[3] == -1) {
151     return $_[1].$self->_order_by($_[2]);
152   } else {
153     return $self->SUPER::_emulate_limit(@_);
154   }
155 }
156
157 sub _recurse_fields {
158   my ($self, $fields, $params) = @_;
159   my $ref = ref $fields;
160   return $self->_quote($fields) unless $ref;
161   return $$fields if $ref eq 'SCALAR';
162
163   if ($ref eq 'ARRAY') {
164     return join(', ', map {
165       $self->_recurse_fields($_)
166         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
167           ? ' AS col'.$self->{rownum_hack_count}++
168           : '')
169       } @$fields);
170   } elsif ($ref eq 'HASH') {
171     foreach my $func (keys %$fields) {
172       return $self->_sqlcase($func)
173         .'( '.$self->_recurse_fields($fields->{$func}).' )';
174     }
175   }
176   # Is the second check absolutely necessary?
177   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
178     return $self->_bind_to_sql( $fields );
179   }
180   else {
181     Carp::croak($ref . qq{ unexpected in _recurse_fields()})
182   }
183 }
184
185 sub _order_by {
186   my $self = shift;
187   my $ret = '';
188   my @extra;
189   if (ref $_[0] eq 'HASH') {
190     if (defined $_[0]->{group_by}) {
191       $ret = $self->_sqlcase(' group by ')
192         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
193     }
194     if (defined $_[0]->{having}) {
195       my $frag;
196       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
197       push(@{$self->{having_bind}}, @extra);
198       $ret .= $self->_sqlcase(' having ').$frag;
199     }
200     if (defined $_[0]->{order_by}) {
201       $ret .= $self->_order_by($_[0]->{order_by});
202     }
203     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
204       return $self->SUPER::_order_by($_[0]);
205     }
206   } elsif (ref $_[0] eq 'SCALAR') {
207     $ret = $self->_sqlcase(' order by ').${ $_[0] };
208   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
209     my @order = @{+shift};
210     $ret = $self->_sqlcase(' order by ')
211           .join(', ', map {
212                         my $r = $self->_order_by($_, @_);
213                         $r =~ s/^ ?ORDER BY //i;
214                         $r;
215                       } @order);
216   } else {
217     $ret = $self->SUPER::_order_by(@_);
218   }
219   return $ret;
220 }
221
222 sub _order_directions {
223   my ($self, $order) = @_;
224   $order = $order->{order_by} if ref $order eq 'HASH';
225   return $self->SUPER::_order_directions($order);
226 }
227
228 sub _table {
229   my ($self, $from) = @_;
230   if (ref $from eq 'ARRAY') {
231     return $self->_recurse_from(@$from);
232   } elsif (ref $from eq 'HASH') {
233     return $self->_make_as($from);
234   } else {
235     return $from; # would love to quote here but _table ends up getting called
236                   # twice during an ->select without a limit clause due to
237                   # the way S::A::Limit->select works. should maybe consider
238                   # bypassing this and doing S::A::select($self, ...) in
239                   # our select method above. meantime, quoting shims have
240                   # been added to select/insert/update/delete here
241   }
242 }
243
244 sub _recurse_from {
245   my ($self, $from, @join) = @_;
246   my @sqlf;
247   push(@sqlf, $self->_make_as($from));
248   foreach my $j (@join) {
249     my ($to, $on) = @$j;
250
251     # check whether a join type exists
252     my $join_clause = '';
253     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
254     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
255       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
256     } else {
257       $join_clause = ' JOIN ';
258     }
259     push(@sqlf, $join_clause);
260
261     if (ref $to eq 'ARRAY') {
262       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
263     } else {
264       push(@sqlf, $self->_make_as($to));
265     }
266     push(@sqlf, ' ON ', $self->_join_condition($on));
267   }
268   return join('', @sqlf);
269 }
270
271 sub _bind_to_sql {
272   my ($self, $arr) = @_;
273   my ($sql, @bind) = @{${$arr}};
274   push (@{$self->{from_bind}}, @bind);
275   return $sql;
276 }
277
278 sub _make_as {
279   my ($self, $from) = @_;
280   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ 
281                         : ref $_ eq 'REF'    ? $self->_bind_to_sql($_) 
282                         : $self->_quote($_)) 
283                        } reverse each %{$self->_skip_options($from)});
284 }
285
286 sub _skip_options {
287   my ($self, $hash) = @_;
288   my $clean_hash = {};
289   $clean_hash->{$_} = $hash->{$_}
290     for grep {!/^-/} keys %$hash;
291   return $clean_hash;
292 }
293
294 sub _join_condition {
295   my ($self, $cond) = @_;
296   if (ref $cond eq 'HASH') {
297     my %j;
298     for (keys %$cond) {
299       my $v = $cond->{$_};
300       if (ref $v) {
301         # XXX no throw_exception() in this package and croak() fails with strange results
302         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
303             if ref($v) ne 'SCALAR';
304         $j{$_} = $v;
305       }
306       else {
307         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
308       }
309     };
310     return scalar($self->_recurse_where(\%j));
311   } elsif (ref $cond eq 'ARRAY') {
312     return join(' OR ', map { $self->_join_condition($_) } @$cond);
313   } else {
314     die "Can't handle this yet!";
315   }
316 }
317
318 sub _quote {
319   my ($self, $label) = @_;
320   return '' unless defined $label;
321   return "*" if $label eq '*';
322   return $label unless $self->{quote_char};
323   if(ref $self->{quote_char} eq "ARRAY"){
324     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
325       if !defined $self->{name_sep};
326     my $sep = $self->{name_sep};
327     return join($self->{name_sep},
328         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
329        split(/\Q$sep\E/,$label));
330   }
331   return $self->SUPER::_quote($label);
332 }
333
334 sub limit_dialect {
335     my $self = shift;
336     $self->{limit_dialect} = shift if @_;
337     return $self->{limit_dialect};
338 }
339
340 sub quote_char {
341     my $self = shift;
342     $self->{quote_char} = shift if @_;
343     return $self->{quote_char};
344 }
345
346 sub name_sep {
347     my $self = shift;
348     $self->{name_sep} = shift if @_;
349     return $self->{name_sep};
350 }
351
352 } # End of BEGIN block
353
354 =head1 NAME
355
356 DBIx::Class::Storage::DBI - DBI storage handler
357
358 =head1 SYNOPSIS
359
360   my $schema = MySchema->connect('dbi:SQLite:my.db');
361
362   $schema->storage->debug(1);
363   $schema->dbh_do("DROP TABLE authors");
364
365   $schema->resultset('Book')->search({
366      written_on => $schema->storage->datetime_parser(DateTime->now)
367   });
368
369 =head1 DESCRIPTION
370
371 This class represents the connection to an RDBMS via L<DBI>.  See
372 L<DBIx::Class::Storage> for general information.  This pod only
373 documents DBI-specific methods and behaviors.
374
375 =head1 METHODS
376
377 =cut
378
379 sub new {
380   my $new = shift->next::method(@_);
381
382   $new->transaction_depth(0);
383   $new->_sql_maker_opts({});
384   $new->{savepoints} = [];
385   $new->{_in_dbh_do} = 0;
386   $new->{_dbh_gen} = 0;
387
388   $new;
389 }
390
391 =head2 connect_info
392
393 This method is normally called by L<DBIx::Class::Schema/connection>, which
394 encapsulates its argument list in an arrayref before passing them here.
395
396 The argument list may contain:
397
398 =over
399
400 =item *
401
402 The same 4-element argument set one would normally pass to
403 L<DBI/connect>, optionally followed by
404 L<extra attributes|/DBIx::Class specific connection attributes>
405 recognized by DBIx::Class:
406
407   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
408
409 =item *
410
411 A single code reference which returns a connected 
412 L<DBI database handle|DBI/connect> optionally followed by 
413 L<extra attributes|/DBIx::Class specific connection attributes> recognized
414 by DBIx::Class:
415
416   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
417
418 =item *
419
420 A single hashref with all the attributes and the dsn/user/password
421 mixed together:
422
423   $connect_info_args = [{
424     dsn => $dsn,
425     user => $user,
426     password => $pass,
427     %dbi_attributes,
428     %extra_attributes,
429   }];
430
431 This is particularly useful for L<Catalyst> based applications, allowing the 
432 following config (L<Config::General> style):
433
434   <Model::DB>
435     schema_class   App::DB
436     <connect_info>
437       dsn          dbi:mysql:database=test
438       user         testuser
439       password     TestPass
440       AutoCommit   1
441     </connect_info>
442   </Model::DB>
443
444 =back
445
446 Please note that the L<DBI> docs recommend that you always explicitly
447 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
448 recommends that it be set to I<1>, and that you perform transactions
449 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
450 to I<1> if you do not do explicitly set it to zero.  This is the default 
451 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
452
453 =head3 DBIx::Class specific connection attributes
454
455 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
456 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
457 the following connection options. These options can be mixed in with your other
458 L<DBI> connection attributes, or placed in a seperate hashref
459 (C<\%extra_attributes>) as shown above.
460
461 Every time C<connect_info> is invoked, any previous settings for
462 these options will be cleared before setting the new ones, regardless of
463 whether any options are specified in the new C<connect_info>.
464
465
466 =over
467
468 =item on_connect_do
469
470 Specifies things to do immediately after connecting or re-connecting to
471 the database.  Its value may contain:
472
473 =over
474
475 =item an array reference
476
477 This contains SQL statements to execute in order.  Each element contains
478 a string or a code reference that returns a string.
479
480 =item a code reference
481
482 This contains some code to execute.  Unlike code references within an
483 array reference, its return value is ignored.
484
485 =back
486
487 =item on_disconnect_do
488
489 Takes arguments in the same form as L</on_connect_do> and executes them
490 immediately before disconnecting from the database.
491
492 Note, this only runs if you explicitly call L</disconnect> on the
493 storage object.
494
495 =item disable_sth_caching
496
497 If set to a true value, this option will disable the caching of
498 statement handles via L<DBI/prepare_cached>.
499
500 =item limit_dialect 
501
502 Sets the limit dialect. This is useful for JDBC-bridge among others
503 where the remote SQL-dialect cannot be determined by the name of the
504 driver alone. See also L<SQL::Abstract::Limit>.
505
506 =item quote_char
507
508 Specifies what characters to use to quote table and column names. If 
509 you use this you will want to specify L</name_sep> as well.
510
511 C<quote_char> expects either a single character, in which case is it
512 is placed on either side of the table/column name, or an arrayref of length
513 2 in which case the table/column name is placed between the elements.
514
515 For example under MySQL you should use C<< quote_char => '`' >>, and for
516 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
517
518 =item name_sep
519
520 This only needs to be used in conjunction with C<quote_char>, and is used to 
521 specify the charecter that seperates elements (schemas, tables, columns) from 
522 each other. In most cases this is simply a C<.>.
523
524 The consequences of not supplying this value is that L<SQL::Abstract>
525 will assume DBIx::Class' uses of aliases to be complete column
526 names. The output will look like I<"me.name"> when it should actually
527 be I<"me"."name">.
528
529 =item unsafe
530
531 This Storage driver normally installs its own C<HandleError>, sets
532 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
533 all database handles, including those supplied by a coderef.  It does this
534 so that it can have consistent and useful error behavior.
535
536 If you set this option to a true value, Storage will not do its usual
537 modifications to the database handle's attributes, and instead relies on
538 the settings in your connect_info DBI options (or the values you set in
539 your connection coderef, in the case that you are connecting via coderef).
540
541 Note that your custom settings can cause Storage to malfunction,
542 especially if you set a C<HandleError> handler that suppresses exceptions
543 and/or disable C<RaiseError>.
544
545 =item auto_savepoint
546
547 If this option is true, L<DBIx::Class> will use savepoints when nesting
548 transactions, making it possible to recover from failure in the inner
549 transaction without having to abort all outer transactions.
550
551 =item cursor_class
552
553 Use this argument to supply a cursor class other than the default
554 L<DBIx::Class::Storage::DBI::Cursor>.
555
556 =back
557
558 Some real-life examples of arguments to L</connect_info> and
559 L<DBIx::Class::Schema/connect>
560
561   # Simple SQLite connection
562   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
563
564   # Connect via subref
565   ->connect_info([ sub { DBI->connect(...) } ]);
566
567   # A bit more complicated
568   ->connect_info(
569     [
570       'dbi:Pg:dbname=foo',
571       'postgres',
572       'my_pg_password',
573       { AutoCommit => 1 },
574       { quote_char => q{"}, name_sep => q{.} },
575     ]
576   );
577
578   # Equivalent to the previous example
579   ->connect_info(
580     [
581       'dbi:Pg:dbname=foo',
582       'postgres',
583       'my_pg_password',
584       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
585     ]
586   );
587
588   # Same, but with hashref as argument
589   # See parse_connect_info for explanation
590   ->connect_info(
591     [{
592       dsn         => 'dbi:Pg:dbname=foo',
593       user        => 'postgres',
594       password    => 'my_pg_password',
595       AutoCommit  => 1,
596       quote_char  => q{"},
597       name_sep    => q{.},
598     }]
599   );
600
601   # Subref + DBIx::Class-specific connection options
602   ->connect_info(
603     [
604       sub { DBI->connect(...) },
605       {
606           quote_char => q{`},
607           name_sep => q{@},
608           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
609           disable_sth_caching => 1,
610       },
611     ]
612   );
613
614
615
616 =cut
617
618 sub connect_info {
619   my ($self, $info_arg) = @_;
620
621   return $self->_connect_info if !$info_arg;
622
623   my @args = @$info_arg;  # take a shallow copy for further mutilation
624   $self->_connect_info([@args]); # copy for _connect_info
625
626
627   # combine/pre-parse arguments depending on invocation style
628
629   my %attrs;
630   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
631     %attrs = %{ $args[1] || {} };
632     @args = $args[0];
633   }
634   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
635     %attrs = %{$args[0]};
636     @args = ();
637     for (qw/password user dsn/) {
638       unshift @args, delete $attrs{$_};
639     }
640   }
641   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
642     %attrs = (
643       % { $args[3] || {} },
644       % { $args[4] || {} },
645     );
646     @args = @args[0,1,2];
647   }
648
649   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
650   #  the new set of options
651   $self->_sql_maker(undef);
652   $self->_sql_maker_opts({});
653
654   if(keys %attrs) {
655     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
656       if(my $value = delete $attrs{$storage_opt}) {
657         $self->$storage_opt($value);
658       }
659     }
660     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
661       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
662         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
663       }
664     }
665   }
666
667   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
668
669   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
670   $self->_connect_info;
671 }
672
673 =head2 on_connect_do
674
675 This method is deprecated in favour of setting via L</connect_info>.
676
677
678 =head2 dbh_do
679
680 Arguments: ($subref | $method_name), @extra_coderef_args?
681
682 Execute the given $subref or $method_name using the new exception-based
683 connection management.
684
685 The first two arguments will be the storage object that C<dbh_do> was called
686 on and a database handle to use.  Any additional arguments will be passed
687 verbatim to the called subref as arguments 2 and onwards.
688
689 Using this (instead of $self->_dbh or $self->dbh) ensures correct
690 exception handling and reconnection (or failover in future subclasses).
691
692 Your subref should have no side-effects outside of the database, as
693 there is the potential for your subref to be partially double-executed
694 if the database connection was stale/dysfunctional.
695
696 Example:
697
698   my @stuff = $schema->storage->dbh_do(
699     sub {
700       my ($storage, $dbh, @cols) = @_;
701       my $cols = join(q{, }, @cols);
702       $dbh->selectrow_array("SELECT $cols FROM foo");
703     },
704     @column_list
705   );
706
707 =cut
708
709 sub dbh_do {
710   my $self = shift;
711   my $code = shift;
712
713   my $dbh = $self->_dbh;
714
715   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
716       || $self->{transaction_depth};
717
718   local $self->{_in_dbh_do} = 1;
719
720   my @result;
721   my $want_array = wantarray;
722
723   eval {
724     $self->_verify_pid if $dbh;
725     if(!$self->_dbh) {
726         $self->_populate_dbh;
727         $dbh = $self->_dbh;
728     }
729
730     if($want_array) {
731         @result = $self->$code($dbh, @_);
732     }
733     elsif(defined $want_array) {
734         $result[0] = $self->$code($dbh, @_);
735     }
736     else {
737         $self->$code($dbh, @_);
738     }
739   };
740
741   my $exception = $@;
742   if(!$exception) { return $want_array ? @result : $result[0] }
743
744   $self->throw_exception($exception) if $self->connected;
745
746   # We were not connected - reconnect and retry, but let any
747   #  exception fall right through this time
748   $self->_populate_dbh;
749   $self->$code($self->_dbh, @_);
750 }
751
752 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
753 # It also informs dbh_do to bypass itself while under the direction of txn_do,
754 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
755 sub txn_do {
756   my $self = shift;
757   my $coderef = shift;
758
759   ref $coderef eq 'CODE' or $self->throw_exception
760     ('$coderef must be a CODE reference');
761
762   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
763
764   local $self->{_in_dbh_do} = 1;
765
766   my @result;
767   my $want_array = wantarray;
768
769   my $tried = 0;
770   while(1) {
771     eval {
772       $self->_verify_pid if $self->_dbh;
773       $self->_populate_dbh if !$self->_dbh;
774
775       $self->txn_begin;
776       if($want_array) {
777           @result = $coderef->(@_);
778       }
779       elsif(defined $want_array) {
780           $result[0] = $coderef->(@_);
781       }
782       else {
783           $coderef->(@_);
784       }
785       $self->txn_commit;
786     };
787
788     my $exception = $@;
789     if(!$exception) { return $want_array ? @result : $result[0] }
790
791     if($tried++ > 0 || $self->connected) {
792       eval { $self->txn_rollback };
793       my $rollback_exception = $@;
794       if($rollback_exception) {
795         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
796         $self->throw_exception($exception)  # propagate nested rollback
797           if $rollback_exception =~ /$exception_class/;
798
799         $self->throw_exception(
800           "Transaction aborted: ${exception}. "
801           . "Rollback failed: ${rollback_exception}"
802         );
803       }
804       $self->throw_exception($exception)
805     }
806
807     # We were not connected, and was first try - reconnect and retry
808     # via the while loop
809     $self->_populate_dbh;
810   }
811 }
812
813 =head2 disconnect
814
815 Our C<disconnect> method also performs a rollback first if the
816 database is not in C<AutoCommit> mode.
817
818 =cut
819
820 sub disconnect {
821   my ($self) = @_;
822
823   if( $self->connected ) {
824     my $connection_do = $self->on_disconnect_do;
825     $self->_do_connection_actions($connection_do) if ref($connection_do);
826
827     $self->_dbh->rollback unless $self->_dbh_autocommit;
828     $self->_dbh->disconnect;
829     $self->_dbh(undef);
830     $self->{_dbh_gen}++;
831   }
832 }
833
834 =head2 with_deferred_fk_checks
835
836 =over 4
837
838 =item Arguments: C<$coderef>
839
840 =item Return Value: The return value of $coderef
841
842 =back
843
844 Storage specific method to run the code ref with FK checks deferred or
845 in MySQL's case disabled entirely.
846
847 =cut
848
849 # Storage subclasses should override this
850 sub with_deferred_fk_checks {
851   my ($self, $sub) = @_;
852
853   $sub->();
854 }
855
856 sub connected {
857   my ($self) = @_;
858
859   if(my $dbh = $self->_dbh) {
860       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
861           $self->_dbh(undef);
862           $self->{_dbh_gen}++;
863           return;
864       }
865       else {
866           $self->_verify_pid;
867           return 0 if !$self->_dbh;
868       }
869       return ($dbh->FETCH('Active') && $dbh->ping);
870   }
871
872   return 0;
873 }
874
875 # handle pid changes correctly
876 #  NOTE: assumes $self->_dbh is a valid $dbh
877 sub _verify_pid {
878   my ($self) = @_;
879
880   return if defined $self->_conn_pid && $self->_conn_pid == $$;
881
882   $self->_dbh->{InactiveDestroy} = 1;
883   $self->_dbh(undef);
884   $self->{_dbh_gen}++;
885
886   return;
887 }
888
889 sub ensure_connected {
890   my ($self) = @_;
891
892   unless ($self->connected) {
893     $self->_populate_dbh;
894   }
895 }
896
897 =head2 dbh
898
899 Returns the dbh - a data base handle of class L<DBI>.
900
901 =cut
902
903 sub dbh {
904   my ($self) = @_;
905
906   $self->ensure_connected;
907   return $self->_dbh;
908 }
909
910 sub _sql_maker_args {
911     my ($self) = @_;
912     
913     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
914 }
915
916 sub sql_maker {
917   my ($self) = @_;
918   unless ($self->_sql_maker) {
919     my $sql_maker_class = $self->sql_maker_class;
920     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
921   }
922   return $self->_sql_maker;
923 }
924
925 sub _rebless {}
926
927 sub _populate_dbh {
928   my ($self) = @_;
929   my @info = @{$self->_dbi_connect_info || []};
930   $self->_dbh($self->_connect(@info));
931
932   # Always set the transaction depth on connect, since
933   #  there is no transaction in progress by definition
934   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
935
936   if(ref $self eq 'DBIx::Class::Storage::DBI') {
937     my $driver = $self->_dbh->{Driver}->{Name};
938     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
939       bless $self, "DBIx::Class::Storage::DBI::${driver}";
940       $self->_rebless();
941     }
942   }
943
944   $self->_conn_pid($$);
945   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
946
947   my $connection_do = $self->on_connect_do;
948   $self->_do_connection_actions($connection_do) if ref($connection_do);
949 }
950
951 sub _do_connection_actions {
952   my $self = shift;
953   my $connection_do = shift;
954
955   if (ref $connection_do eq 'ARRAY') {
956     $self->_do_query($_) foreach @$connection_do;
957   }
958   elsif (ref $connection_do eq 'CODE') {
959     $connection_do->($self);
960   }
961
962   return $self;
963 }
964
965 sub _do_query {
966   my ($self, $action) = @_;
967
968   if (ref $action eq 'CODE') {
969     $action = $action->($self);
970     $self->_do_query($_) foreach @$action;
971   }
972   else {
973     # Most debuggers expect ($sql, @bind), so we need to exclude
974     # the attribute hash which is the second argument to $dbh->do
975     # furthermore the bind values are usually to be presented
976     # as named arrayref pairs, so wrap those here too
977     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
978     my $sql = shift @do_args;
979     my $attrs = shift @do_args;
980     my @bind = map { [ undef, $_ ] } @do_args;
981
982     $self->_query_start($sql, @bind);
983     $self->_dbh->do($sql, $attrs, @do_args);
984     $self->_query_end($sql, @bind);
985   }
986
987   return $self;
988 }
989
990 sub _connect {
991   my ($self, @info) = @_;
992
993   $self->throw_exception("You failed to provide any connection info")
994     if !@info;
995
996   my ($old_connect_via, $dbh);
997
998   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
999     $old_connect_via = $DBI::connect_via;
1000     $DBI::connect_via = 'connect';
1001   }
1002
1003   eval {
1004     if(ref $info[0] eq 'CODE') {
1005        $dbh = &{$info[0]}
1006     }
1007     else {
1008        $dbh = DBI->connect(@info);
1009     }
1010
1011     if($dbh && !$self->unsafe) {
1012       my $weak_self = $self;
1013       weaken($weak_self);
1014       $dbh->{HandleError} = sub {
1015           if ($weak_self) {
1016             $weak_self->throw_exception("DBI Exception: $_[0]");
1017           }
1018           else {
1019             croak ("DBI Exception: $_[0]");
1020           }
1021       };
1022       $dbh->{ShowErrorStatement} = 1;
1023       $dbh->{RaiseError} = 1;
1024       $dbh->{PrintError} = 0;
1025     }
1026   };
1027
1028   $DBI::connect_via = $old_connect_via if $old_connect_via;
1029
1030   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1031     if !$dbh || $@;
1032
1033   $self->_dbh_autocommit($dbh->{AutoCommit});
1034
1035   $dbh;
1036 }
1037
1038 sub svp_begin {
1039   my ($self, $name) = @_;
1040
1041   $name = $self->_svp_generate_name
1042     unless defined $name;
1043
1044   $self->throw_exception ("You can't use savepoints outside a transaction")
1045     if $self->{transaction_depth} == 0;
1046
1047   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1048     unless $self->can('_svp_begin');
1049   
1050   push @{ $self->{savepoints} }, $name;
1051
1052   $self->debugobj->svp_begin($name) if $self->debug;
1053   
1054   return $self->_svp_begin($name);
1055 }
1056
1057 sub svp_release {
1058   my ($self, $name) = @_;
1059
1060   $self->throw_exception ("You can't use savepoints outside a transaction")
1061     if $self->{transaction_depth} == 0;
1062
1063   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1064     unless $self->can('_svp_release');
1065
1066   if (defined $name) {
1067     $self->throw_exception ("Savepoint '$name' does not exist")
1068       unless grep { $_ eq $name } @{ $self->{savepoints} };
1069
1070     # Dig through the stack until we find the one we are releasing.  This keeps
1071     # the stack up to date.
1072     my $svp;
1073
1074     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1075   } else {
1076     $name = pop @{ $self->{savepoints} };
1077   }
1078
1079   $self->debugobj->svp_release($name) if $self->debug;
1080
1081   return $self->_svp_release($name);
1082 }
1083
1084 sub svp_rollback {
1085   my ($self, $name) = @_;
1086
1087   $self->throw_exception ("You can't use savepoints outside a transaction")
1088     if $self->{transaction_depth} == 0;
1089
1090   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1091     unless $self->can('_svp_rollback');
1092
1093   if (defined $name) {
1094       # If they passed us a name, verify that it exists in the stack
1095       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1096           $self->throw_exception("Savepoint '$name' does not exist!");
1097       }
1098
1099       # Dig through the stack until we find the one we are releasing.  This keeps
1100       # the stack up to date.
1101       while(my $s = pop(@{ $self->{savepoints} })) {
1102           last if($s eq $name);
1103       }
1104       # Add the savepoint back to the stack, as a rollback doesn't remove the
1105       # named savepoint, only everything after it.
1106       push(@{ $self->{savepoints} }, $name);
1107   } else {
1108       # We'll assume they want to rollback to the last savepoint
1109       $name = $self->{savepoints}->[-1];
1110   }
1111
1112   $self->debugobj->svp_rollback($name) if $self->debug;
1113   
1114   return $self->_svp_rollback($name);
1115 }
1116
1117 sub _svp_generate_name {
1118     my ($self) = @_;
1119
1120     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1121 }
1122
1123 sub txn_begin {
1124   my $self = shift;
1125   $self->ensure_connected();
1126   if($self->{transaction_depth} == 0) {
1127     $self->debugobj->txn_begin()
1128       if $self->debug;
1129     # this isn't ->_dbh-> because
1130     #  we should reconnect on begin_work
1131     #  for AutoCommit users
1132     $self->dbh->begin_work;
1133   } elsif ($self->auto_savepoint) {
1134     $self->svp_begin;
1135   }
1136   $self->{transaction_depth}++;
1137 }
1138
1139 sub txn_commit {
1140   my $self = shift;
1141   if ($self->{transaction_depth} == 1) {
1142     my $dbh = $self->_dbh;
1143     $self->debugobj->txn_commit()
1144       if ($self->debug);
1145     $dbh->commit;
1146     $self->{transaction_depth} = 0
1147       if $self->_dbh_autocommit;
1148   }
1149   elsif($self->{transaction_depth} > 1) {
1150     $self->{transaction_depth}--;
1151     $self->svp_release
1152       if $self->auto_savepoint;
1153   }
1154 }
1155
1156 sub txn_rollback {
1157   my $self = shift;
1158   my $dbh = $self->_dbh;
1159   eval {
1160     if ($self->{transaction_depth} == 1) {
1161       $self->debugobj->txn_rollback()
1162         if ($self->debug);
1163       $self->{transaction_depth} = 0
1164         if $self->_dbh_autocommit;
1165       $dbh->rollback;
1166     }
1167     elsif($self->{transaction_depth} > 1) {
1168       $self->{transaction_depth}--;
1169       if ($self->auto_savepoint) {
1170         $self->svp_rollback;
1171         $self->svp_release;
1172       }
1173     }
1174     else {
1175       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1176     }
1177   };
1178   if ($@) {
1179     my $error = $@;
1180     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1181     $error =~ /$exception_class/ and $self->throw_exception($error);
1182     # ensure that a failed rollback resets the transaction depth
1183     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1184     $self->throw_exception($error);
1185   }
1186 }
1187
1188 # This used to be the top-half of _execute.  It was split out to make it
1189 #  easier to override in NoBindVars without duping the rest.  It takes up
1190 #  all of _execute's args, and emits $sql, @bind.
1191 sub _prep_for_execute {
1192   my ($self, $op, $extra_bind, $ident, $args) = @_;
1193
1194   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1195     $ident = $ident->from();
1196   }
1197
1198   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1199
1200   unshift(@bind,
1201     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1202       if $extra_bind;
1203   return ($sql, \@bind);
1204 }
1205
1206 sub _fix_bind_params {
1207     my ($self, @bind) = @_;
1208
1209     ### Turn @bind from something like this:
1210     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1211     ### to this:
1212     ###   ( "'1'", "'1'", "'3'" )
1213     return
1214         map {
1215             if ( defined( $_ && $_->[1] ) ) {
1216                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1217             }
1218             else { q{'NULL'}; }
1219         } @bind;
1220 }
1221
1222 sub _query_start {
1223     my ( $self, $sql, @bind ) = @_;
1224
1225     if ( $self->debug ) {
1226         @bind = $self->_fix_bind_params(@bind);
1227         
1228         $self->debugobj->query_start( $sql, @bind );
1229     }
1230 }
1231
1232 sub _query_end {
1233     my ( $self, $sql, @bind ) = @_;
1234
1235     if ( $self->debug ) {
1236         @bind = $self->_fix_bind_params(@bind);
1237         $self->debugobj->query_end( $sql, @bind );
1238     }
1239 }
1240
1241 sub _dbh_execute {
1242   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1243
1244   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1245
1246   $self->_query_start( $sql, @$bind );
1247
1248   my $sth = $self->sth($sql,$op);
1249
1250   my $placeholder_index = 1; 
1251
1252   foreach my $bound (@$bind) {
1253     my $attributes = {};
1254     my($column_name, @data) = @$bound;
1255
1256     if ($bind_attributes) {
1257       $attributes = $bind_attributes->{$column_name}
1258       if defined $bind_attributes->{$column_name};
1259     }
1260
1261     foreach my $data (@data) {
1262       my $ref = ref $data;
1263       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1264
1265       $sth->bind_param($placeholder_index, $data, $attributes);
1266       $placeholder_index++;
1267     }
1268   }
1269
1270   # Can this fail without throwing an exception anyways???
1271   my $rv = $sth->execute();
1272   $self->throw_exception($sth->errstr) if !$rv;
1273
1274   $self->_query_end( $sql, @$bind );
1275
1276   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1277 }
1278
1279 sub _execute {
1280     my $self = shift;
1281     $self->dbh_do('_dbh_execute', @_)
1282 }
1283
1284 sub insert {
1285   my ($self, $source, $to_insert) = @_;
1286   
1287   my $ident = $source->from; 
1288   my $bind_attributes = $self->source_bind_attributes($source);
1289
1290   $self->ensure_connected;
1291   foreach my $col ( $source->columns ) {
1292     if ( !defined $to_insert->{$col} ) {
1293       my $col_info = $source->column_info($col);
1294
1295       if ( $col_info->{auto_nextval} ) {
1296         $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1297       }
1298     }
1299   }
1300
1301   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1302
1303   return $to_insert;
1304 }
1305
1306 ## Still not quite perfect, and EXPERIMENTAL
1307 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1308 ## scalar refs, or at least, all the same type as the first set, the statement is
1309 ## only prepped once.
1310 sub insert_bulk {
1311   my ($self, $source, $cols, $data) = @_;
1312   my %colvalues;
1313   my $table = $source->from;
1314   @colvalues{@$cols} = (0..$#$cols);
1315   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1316   
1317   $self->_query_start( $sql, @bind );
1318   my $sth = $self->sth($sql);
1319
1320 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1321
1322   ## This must be an arrayref, else nothing works!
1323   
1324   my $tuple_status = [];
1325   
1326   ##use Data::Dumper;
1327   ##print STDERR Dumper( $data, $sql, [@bind] );
1328
1329   my $time = time();
1330
1331   ## Get the bind_attributes, if any exist
1332   my $bind_attributes = $self->source_bind_attributes($source);
1333
1334   ## Bind the values and execute
1335   my $placeholder_index = 1; 
1336
1337   foreach my $bound (@bind) {
1338
1339     my $attributes = {};
1340     my ($column_name, $data_index) = @$bound;
1341
1342     if( $bind_attributes ) {
1343       $attributes = $bind_attributes->{$column_name}
1344       if defined $bind_attributes->{$column_name};
1345     }
1346
1347     my @data = map { $_->[$data_index] } @$data;
1348
1349     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1350     $placeholder_index++;
1351   }
1352   my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
1353   $self->throw_exception($sth->errstr) if !$rv;
1354
1355   $self->_query_end( $sql, @bind );
1356   return (wantarray ? ($rv, $sth, @bind) : $rv);
1357 }
1358
1359 sub update {
1360   my $self = shift @_;
1361   my $source = shift @_;
1362   my $bind_attributes = $self->source_bind_attributes($source);
1363   
1364   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1365 }
1366
1367
1368 sub delete {
1369   my $self = shift @_;
1370   my $source = shift @_;
1371   
1372   my $bind_attrs = {}; ## If ever it's needed...
1373   
1374   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1375 }
1376
1377 sub _select {
1378   my $self = shift;
1379   my $sql_maker = $self->sql_maker;
1380   local $sql_maker->{for};
1381   return $self->_execute($self->_select_args(@_));
1382 }
1383
1384 sub _select_args {
1385   my ($self, $ident, $select, $condition, $attrs) = @_;
1386   my $order = $attrs->{order_by};
1387
1388   if (ref $condition eq 'SCALAR') {
1389     my $unwrap = ${$condition};
1390     if ($unwrap =~ s/ORDER BY (.*)$//i) {
1391       $order = $1;
1392       $condition = \$unwrap;
1393     }
1394   }
1395
1396   my $for = delete $attrs->{for};
1397   my $sql_maker = $self->sql_maker;
1398   $sql_maker->{for} = $for;
1399
1400   if (exists $attrs->{group_by} || $attrs->{having}) {
1401     $order = {
1402       group_by => $attrs->{group_by},
1403       having => $attrs->{having},
1404       ($order ? (order_by => $order) : ())
1405     };
1406   }
1407   my $bind_attrs = {}; ## Future support
1408   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1409   if ($attrs->{software_limit} ||
1410       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1411         $attrs->{software_limit} = 1;
1412   } else {
1413     $self->throw_exception("rows attribute must be positive if present")
1414       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1415
1416     # MySQL actually recommends this approach.  I cringe.
1417     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1418     push @args, $attrs->{rows}, $attrs->{offset};
1419   }
1420   return @args;
1421 }
1422
1423 sub source_bind_attributes {
1424   my ($self, $source) = @_;
1425   
1426   my $bind_attributes;
1427   foreach my $column ($source->columns) {
1428   
1429     my $data_type = $source->column_info($column)->{data_type} || '';
1430     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1431      if $data_type;
1432   }
1433
1434   return $bind_attributes;
1435 }
1436
1437 =head2 select
1438
1439 =over 4
1440
1441 =item Arguments: $ident, $select, $condition, $attrs
1442
1443 =back
1444
1445 Handle a SQL select statement.
1446
1447 =cut
1448
1449 sub select {
1450   my $self = shift;
1451   my ($ident, $select, $condition, $attrs) = @_;
1452   return $self->cursor_class->new($self, \@_, $attrs);
1453 }
1454
1455 sub select_single {
1456   my $self = shift;
1457   my ($rv, $sth, @bind) = $self->_select(@_);
1458   my @row = $sth->fetchrow_array;
1459   my @nextrow = $sth->fetchrow_array if @row;
1460   if(@row && @nextrow) {
1461     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1462   }
1463   # Need to call finish() to work round broken DBDs
1464   $sth->finish();
1465   return @row;
1466 }
1467
1468 =head2 sth
1469
1470 =over 4
1471
1472 =item Arguments: $sql
1473
1474 =back
1475
1476 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1477
1478 =cut
1479
1480 sub _dbh_sth {
1481   my ($self, $dbh, $sql) = @_;
1482
1483   # 3 is the if_active parameter which avoids active sth re-use
1484   my $sth = $self->disable_sth_caching
1485     ? $dbh->prepare($sql)
1486     : $dbh->prepare_cached($sql, {}, 3);
1487
1488   # XXX You would think RaiseError would make this impossible,
1489   #  but apparently that's not true :(
1490   $self->throw_exception($dbh->errstr) if !$sth;
1491
1492   $sth;
1493 }
1494
1495 sub sth {
1496   my ($self, $sql) = @_;
1497   $self->dbh_do('_dbh_sth', $sql);
1498 }
1499
1500 sub _dbh_columns_info_for {
1501   my ($self, $dbh, $table) = @_;
1502
1503   if ($dbh->can('column_info')) {
1504     my %result;
1505     eval {
1506       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1507       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1508       $sth->execute();
1509       while ( my $info = $sth->fetchrow_hashref() ){
1510         my %column_info;
1511         $column_info{data_type}   = $info->{TYPE_NAME};
1512         $column_info{size}      = $info->{COLUMN_SIZE};
1513         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1514         $column_info{default_value} = $info->{COLUMN_DEF};
1515         my $col_name = $info->{COLUMN_NAME};
1516         $col_name =~ s/^\"(.*)\"$/$1/;
1517
1518         $result{$col_name} = \%column_info;
1519       }
1520     };
1521     return \%result if !$@ && scalar keys %result;
1522   }
1523
1524   my %result;
1525   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1526   $sth->execute;
1527   my @columns = @{$sth->{NAME_lc}};
1528   for my $i ( 0 .. $#columns ){
1529     my %column_info;
1530     $column_info{data_type} = $sth->{TYPE}->[$i];
1531     $column_info{size} = $sth->{PRECISION}->[$i];
1532     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1533
1534     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1535       $column_info{data_type} = $1;
1536       $column_info{size}    = $2;
1537     }
1538
1539     $result{$columns[$i]} = \%column_info;
1540   }
1541   $sth->finish;
1542
1543   foreach my $col (keys %result) {
1544     my $colinfo = $result{$col};
1545     my $type_num = $colinfo->{data_type};
1546     my $type_name;
1547     if(defined $type_num && $dbh->can('type_info')) {
1548       my $type_info = $dbh->type_info($type_num);
1549       $type_name = $type_info->{TYPE_NAME} if $type_info;
1550       $colinfo->{data_type} = $type_name if $type_name;
1551     }
1552   }
1553
1554   return \%result;
1555 }
1556
1557 sub columns_info_for {
1558   my ($self, $table) = @_;
1559   $self->dbh_do('_dbh_columns_info_for', $table);
1560 }
1561
1562 =head2 last_insert_id
1563
1564 Return the row id of the last insert.
1565
1566 =cut
1567
1568 sub _dbh_last_insert_id {
1569     # All Storage's need to register their own _dbh_last_insert_id
1570     # the old SQLite-based method was highly inappropriate
1571
1572     my $self = shift;
1573     my $class = ref $self;
1574     $self->throw_exception (<<EOE);
1575
1576 No _dbh_last_insert_id() method found in $class.
1577 Since the method of obtaining the autoincrement id of the last insert
1578 operation varies greatly between different databases, this method must be
1579 individually implemented for every storage class.
1580 EOE
1581 }
1582
1583 sub last_insert_id {
1584   my $self = shift;
1585   $self->dbh_do('_dbh_last_insert_id', @_);
1586 }
1587
1588 =head2 sqlt_type
1589
1590 Returns the database driver name.
1591
1592 =cut
1593
1594 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1595
1596 =head2 bind_attribute_by_data_type
1597
1598 Given a datatype from column info, returns a database specific bind
1599 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1600 let the database planner just handle it.
1601
1602 Generally only needed for special case column types, like bytea in postgres.
1603
1604 =cut
1605
1606 sub bind_attribute_by_data_type {
1607     return;
1608 }
1609
1610 =head2 create_ddl_dir
1611
1612 =over 4
1613
1614 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1615
1616 =back
1617
1618 Creates a SQL file based on the Schema, for each of the specified
1619 database types, in the given directory.
1620
1621 By default, C<\%sqlt_args> will have
1622
1623  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1624
1625 merged with the hash passed in. To disable any of those features, pass in a 
1626 hashref like the following
1627
1628  { ignore_constraint_names => 0, # ... other options }
1629
1630 =cut
1631
1632 sub create_ddl_dir {
1633   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1634
1635   if(!$dir || !-d $dir) {
1636     warn "No directory given, using ./\n";
1637     $dir = "./";
1638   }
1639   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1640   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1641
1642   my $schema_version = $schema->schema_version || '1.x';
1643   $version ||= $schema_version;
1644
1645   $sqltargs = {
1646     add_drop_table => 1, 
1647     ignore_constraint_names => 1,
1648     ignore_index_names => 1,
1649     %{$sqltargs || {}}
1650   };
1651
1652   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1653       . $self->_check_sqlt_message . q{'})
1654           if !$self->_check_sqlt_version;
1655
1656   my $sqlt = SQL::Translator->new( $sqltargs );
1657
1658   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1659   my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
1660
1661   foreach my $db (@$databases) {
1662     $sqlt->reset();
1663     $sqlt->{schema} = $sqlt_schema;
1664     $sqlt->producer($db);
1665
1666     my $file;
1667     my $filename = $schema->ddl_filename($db, $version, $dir);
1668     if (-e $filename && ($version eq $schema_version )) {
1669       # if we are dumping the current version, overwrite the DDL
1670       warn "Overwriting existing DDL file - $filename";
1671       unlink($filename);
1672     }
1673
1674     my $output = $sqlt->translate;
1675     if(!$output) {
1676       warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1677       next;
1678     }
1679     if(!open($file, ">$filename")) {
1680       $self->throw_exception("Can't open $filename for writing ($!)");
1681       next;
1682     }
1683     print $file $output;
1684     close($file);
1685   
1686     next unless ($preversion);
1687
1688     require SQL::Translator::Diff;
1689
1690     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1691     if(!-e $prefilename) {
1692       warn("No previous schema file found ($prefilename)");
1693       next;
1694     }
1695
1696     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1697     if(-e $difffile) {
1698       warn("Overwriting existing diff file - $difffile");
1699       unlink($difffile);
1700     }
1701     
1702     my $source_schema;
1703     {
1704       my $t = SQL::Translator->new($sqltargs);
1705       $t->debug( 0 );
1706       $t->trace( 0 );
1707       $t->parser( $db )                       or die $t->error;
1708       my $out = $t->translate( $prefilename ) or die $t->error;
1709       $source_schema = $t->schema;
1710       unless ( $source_schema->name ) {
1711         $source_schema->name( $prefilename );
1712       }
1713     }
1714
1715     # The "new" style of producers have sane normalization and can support 
1716     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1717     # And we have to diff parsed SQL against parsed SQL.
1718     my $dest_schema = $sqlt_schema;
1719     
1720     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1721       my $t = SQL::Translator->new($sqltargs);
1722       $t->debug( 0 );
1723       $t->trace( 0 );
1724       $t->parser( $db )                    or die $t->error;
1725       my $out = $t->translate( $filename ) or die $t->error;
1726       $dest_schema = $t->schema;
1727       $dest_schema->name( $filename )
1728         unless $dest_schema->name;
1729     }
1730     
1731     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1732                                                   $dest_schema,   $db,
1733                                                   $sqltargs
1734                                                  );
1735     if(!open $file, ">$difffile") { 
1736       $self->throw_exception("Can't write to $difffile ($!)");
1737       next;
1738     }
1739     print $file $diff;
1740     close($file);
1741   }
1742 }
1743
1744 =head2 deployment_statements
1745
1746 =over 4
1747
1748 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1749
1750 =back
1751
1752 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1753 The database driver name is given by C<$type>, though the value from
1754 L</sqlt_type> is used if it is not specified.
1755
1756 C<$directory> is used to return statements from files in a previously created
1757 L</create_ddl_dir> directory and is optional. The filenames are constructed
1758 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1759
1760 If no C<$directory> is specified then the statements are constructed on the
1761 fly using L<SQL::Translator> and C<$version> is ignored.
1762
1763 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1764
1765 =cut
1766
1767 sub deployment_statements {
1768   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1769   # Need to be connected to get the correct sqlt_type
1770   $self->ensure_connected() unless $type;
1771   $type ||= $self->sqlt_type;
1772   $version ||= $schema->schema_version || '1.x';
1773   $dir ||= './';
1774   my $filename = $schema->ddl_filename($type, $version, $dir);
1775   if(-f $filename)
1776   {
1777       my $file;
1778       open($file, "<$filename") 
1779         or $self->throw_exception("Can't open $filename ($!)");
1780       my @rows = <$file>;
1781       close($file);
1782       return join('', @rows);
1783   }
1784
1785   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
1786       . $self->_check_sqlt_message . q{'})
1787           if !$self->_check_sqlt_version;
1788
1789   require SQL::Translator::Parser::DBIx::Class;
1790   eval qq{use SQL::Translator::Producer::${type}};
1791   $self->throw_exception($@) if $@;
1792
1793   # sources needs to be a parser arg, but for simplicty allow at top level 
1794   # coming in
1795   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1796       if exists $sqltargs->{sources};
1797
1798   my $tr = SQL::Translator->new(%$sqltargs);
1799   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1800   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1801 }
1802
1803 sub deploy {
1804   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1805   my $deploy = sub {
1806     my $line = shift;
1807     return if($line =~ /^--/);
1808     return if(!$line);
1809     # next if($line =~ /^DROP/m);
1810     return if($line =~ /^BEGIN TRANSACTION/m);
1811     return if($line =~ /^COMMIT/m);
1812     return if $line =~ /^\s+$/; # skip whitespace only
1813     $self->_query_start($line);
1814     eval {
1815       $self->dbh->do($line); # shouldn't be using ->dbh ?
1816     };
1817     if ($@) {
1818       warn qq{$@ (running "${line}")};
1819     }
1820     $self->_query_end($line);
1821   };
1822   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } );
1823   if (@statements > 1) {
1824     foreach my $statement (@statements) {
1825       $deploy->( $statement );
1826     }
1827   }
1828   elsif (@statements == 1) {
1829     foreach my $line ( split(";\n", $statements[0])) {
1830       $deploy->( $line );
1831     }
1832   }
1833 }
1834
1835 =head2 datetime_parser
1836
1837 Returns the datetime parser class
1838
1839 =cut
1840
1841 sub datetime_parser {
1842   my $self = shift;
1843   return $self->{datetime_parser} ||= do {
1844     $self->ensure_connected;
1845     $self->build_datetime_parser(@_);
1846   };
1847 }
1848
1849 =head2 datetime_parser_type
1850
1851 Defines (returns) the datetime parser class - currently hardwired to
1852 L<DateTime::Format::MySQL>
1853
1854 =cut
1855
1856 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1857
1858 =head2 build_datetime_parser
1859
1860 See L</datetime_parser>
1861
1862 =cut
1863
1864 sub build_datetime_parser {
1865   my $self = shift;
1866   my $type = $self->datetime_parser_type(@_);
1867   eval "use ${type}";
1868   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1869   return $type;
1870 }
1871
1872 {
1873     my $_check_sqlt_version; # private
1874     my $_check_sqlt_message; # private
1875     sub _check_sqlt_version {
1876         return $_check_sqlt_version if defined $_check_sqlt_version;
1877         eval 'use SQL::Translator "0.09003"';
1878         $_check_sqlt_message = $@ || '';
1879         $_check_sqlt_version = !$@;
1880     }
1881
1882     sub _check_sqlt_message {
1883         _check_sqlt_version if !defined $_check_sqlt_message;
1884         $_check_sqlt_message;
1885     }
1886 }
1887
1888 =head2 is_replicating
1889
1890 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
1891 replicate from a master database.  Default is undef, which is the result
1892 returned by databases that don't support replication.
1893
1894 =cut
1895
1896 sub is_replicating {
1897     return;
1898     
1899 }
1900
1901 =head2 lag_behind_master
1902
1903 Returns a number that represents a certain amount of lag behind a master db
1904 when a given storage is replicating.  The number is database dependent, but
1905 starts at zero and increases with the amount of lag. Default in undef
1906
1907 =cut
1908
1909 sub lag_behind_master {
1910     return;
1911 }
1912
1913 sub DESTROY {
1914   my $self = shift;
1915   return if !$self->_dbh;
1916   $self->_verify_pid;
1917   $self->_dbh(undef);
1918 }
1919
1920 1;
1921
1922 =head1 USAGE NOTES
1923
1924 =head2 DBIx::Class and AutoCommit
1925
1926 DBIx::Class can do some wonderful magic with handling exceptions,
1927 disconnections, and transactions when you use C<< AutoCommit => 1 >>
1928 combined with C<txn_do> for transaction support.
1929
1930 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
1931 in an assumed transaction between commits, and you're telling us you'd
1932 like to manage that manually.  A lot of the magic protections offered by
1933 this module will go away.  We can't protect you from exceptions due to database
1934 disconnects because we don't know anything about how to restart your
1935 transactions.  You're on your own for handling all sorts of exceptional
1936 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
1937 be with raw DBI.
1938
1939
1940 =head1 SQL METHODS
1941
1942 The module defines a set of methods within the DBIC::SQL::Abstract
1943 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1944 SQL query functions.
1945
1946 The following methods are extended:-
1947
1948 =over 4
1949
1950 =item delete
1951
1952 =item insert
1953
1954 =item select
1955
1956 =item update
1957
1958 =item limit_dialect
1959
1960 See L</connect_info> for details.
1961
1962 =item quote_char
1963
1964 See L</connect_info> for details.
1965
1966 =item name_sep
1967
1968 See L</connect_info> for details.
1969
1970 =back
1971
1972 =head1 AUTHORS
1973
1974 Matt S. Trout <mst@shadowcatsystems.co.uk>
1975
1976 Andy Grundman <andy@hybridized.org>
1977
1978 =head1 LICENSE
1979
1980 You may distribute this code under the same terms as Perl itself.
1981
1982 =cut