added support for from => $rs->as_query
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use SQL::Abstract::Limit;
11 use DBIx::Class::Storage::DBI::Cursor;
12 use DBIx::Class::Storage::Statistics;
13 use Scalar::Util qw/blessed weaken/;
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
18 );
19
20 # the values for these accessors are picked out (and deleted) from
21 # the attribute hashref passed to connect_info
22 my @storage_options = qw/
23   on_connect_do on_disconnect_do disable_sth_caching unsafe auto_savepoint
24 /;
25 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
26
27
28 # default cursor class, overridable in connect_info attributes
29 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
30
31 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
32 __PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
33
34 BEGIN {
35
36 package # Hide from PAUSE
37   DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
38
39 use base qw/SQL::Abstract::Limit/;
40
41 # This prevents the caching of $dbh in S::A::L, I believe
42 sub new {
43   my $self = shift->SUPER::new(@_);
44
45   # If limit_dialect is a ref (like a $dbh), go ahead and replace
46   #   it with what it resolves to:
47   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
48     if ref $self->{limit_dialect};
49
50   $self;
51 }
52
53 # DB2 is the only remaining DB using this. Even though we are not sure if
54 # RowNumberOver is still needed here (should be part of SQLA) leave the 
55 # code in place
56 sub _RowNumberOver {
57   my ($self, $sql, $order, $rows, $offset ) = @_;
58
59   $offset += 1;
60   my $last = $rows + $offset;
61   my ( $order_by ) = $self->_order_by( $order );
62
63   $sql = <<"SQL";
64 SELECT * FROM
65 (
66    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
67       $sql
68       $order_by
69    ) Q1
70 ) Q2
71 WHERE ROW_NUM BETWEEN $offset AND $last
72
73 SQL
74
75   return $sql;
76 }
77
78
79 # While we're at it, this should make LIMIT queries more efficient,
80 #  without digging into things too deeply
81 use Scalar::Util 'blessed';
82 sub _find_syntax {
83   my ($self, $syntax) = @_;
84   
85   # DB2 is the only remaining DB using this. Even though we are not sure if
86   # RowNumberOver is still needed here (should be part of SQLA) leave the 
87   # code in place
88   my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
89   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
90     return 'RowNumberOver';
91   }
92   
93   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
94 }
95
96 sub select {
97   my ($self, $table, $fields, $where, $order, @rest) = @_;
98   if (ref $table eq 'SCALAR') {
99     $table = $$table;
100   }
101   elsif (not ref $table) {
102     $table = $self->_quote($table);
103   }
104   local $self->{rownum_hack_count} = 1
105     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
106   @rest = (-1) unless defined $rest[0];
107   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
108     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
109   local $self->{having_bind} = [];
110   my ($sql, @ret) = $self->SUPER::select(
111     $table, $self->_recurse_fields($fields), $where, $order, @rest
112   );
113   $sql .= 
114     $self->{for} ?
115     (
116       $self->{for} eq 'update' ? ' FOR UPDATE' :
117       $self->{for} eq 'shared' ? ' FOR SHARE'  :
118       ''
119     ) :
120     ''
121   ;
122   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
123 }
124
125 sub insert {
126   my $self = shift;
127   my $table = shift;
128   $table = $self->_quote($table) unless ref($table);
129   $self->SUPER::insert($table, @_);
130 }
131
132 sub update {
133   my $self = shift;
134   my $table = shift;
135   $table = $self->_quote($table) unless ref($table);
136   $self->SUPER::update($table, @_);
137 }
138
139 sub delete {
140   my $self = shift;
141   my $table = shift;
142   $table = $self->_quote($table) unless ref($table);
143   $self->SUPER::delete($table, @_);
144 }
145
146 sub _emulate_limit {
147   my $self = shift;
148   if ($_[3] == -1) {
149     return $_[1].$self->_order_by($_[2]);
150   } else {
151     return $self->SUPER::_emulate_limit(@_);
152   }
153 }
154
155 sub _recurse_fields {
156   my ($self, $fields, $params) = @_;
157   my $ref = ref $fields;
158   return $self->_quote($fields) unless $ref;
159   return $$fields if $ref eq 'SCALAR';
160
161   if ($ref eq 'ARRAY') {
162     return join(', ', map {
163       $self->_recurse_fields($_)
164         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
165           ? ' AS col'.$self->{rownum_hack_count}++
166           : '')
167       } @$fields);
168   } elsif ($ref eq 'HASH') {
169     foreach my $func (keys %$fields) {
170       return $self->_sqlcase($func)
171         .'( '.$self->_recurse_fields($fields->{$func}).' )';
172     }
173   }
174 }
175
176 sub _order_by {
177   my $self = shift;
178   my $ret = '';
179   my @extra;
180   if (ref $_[0] eq 'HASH') {
181     if (defined $_[0]->{group_by}) {
182       $ret = $self->_sqlcase(' group by ')
183         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
184     }
185     if (defined $_[0]->{having}) {
186       my $frag;
187       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
188       push(@{$self->{having_bind}}, @extra);
189       $ret .= $self->_sqlcase(' having ').$frag;
190     }
191     if (defined $_[0]->{order_by}) {
192       $ret .= $self->_order_by($_[0]->{order_by});
193     }
194     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
195       return $self->SUPER::_order_by($_[0]);
196     }
197   } elsif (ref $_[0] eq 'SCALAR') {
198     $ret = $self->_sqlcase(' order by ').${ $_[0] };
199   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
200     my @order = @{+shift};
201     $ret = $self->_sqlcase(' order by ')
202           .join(', ', map {
203                         my $r = $self->_order_by($_, @_);
204                         $r =~ s/^ ?ORDER BY //i;
205                         $r;
206                       } @order);
207   } else {
208     $ret = $self->SUPER::_order_by(@_);
209   }
210   return $ret;
211 }
212
213 sub _order_directions {
214   my ($self, $order) = @_;
215   $order = $order->{order_by} if ref $order eq 'HASH';
216   return $self->SUPER::_order_directions($order);
217 }
218
219 sub _table {
220   my ($self, $from) = @_;
221   if (ref $from eq 'ARRAY') {
222     return $self->_recurse_from(@$from);
223   } elsif (ref $from eq 'HASH') {
224     return $self->_make_as($from);
225   } else {
226     return $from; # would love to quote here but _table ends up getting called
227                   # twice during an ->select without a limit clause due to
228                   # the way S::A::Limit->select works. should maybe consider
229                   # bypassing this and doing S::A::select($self, ...) in
230                   # our select method above. meantime, quoting shims have
231                   # been added to select/insert/update/delete here
232   }
233 }
234
235 sub _recurse_from {
236   my ($self, $from, @join) = @_;
237   my @sqlf;
238   push(@sqlf, $self->_make_as($from));
239   foreach my $j (@join) {
240     my ($to, $on) = @$j;
241
242     # check whether a join type exists
243     my $join_clause = '';
244     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
245     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
246       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
247     } else {
248       $join_clause = ' JOIN ';
249     }
250     push(@sqlf, $join_clause);
251
252     if (ref $to eq 'ARRAY') {
253       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
254     } else {
255       push(@sqlf, $self->_make_as($to));
256     }
257     push(@sqlf, ' ON ', $self->_join_condition($on));
258   }
259   return join('', @sqlf);
260 }
261
262 sub _bind_to_sql {
263   my $self = shift;
264   my $arr  = shift;
265   my $sql = shift @$$arr;
266   $sql =~ s/\?/$self->_quote((shift @$$arr)->[1])/eg;
267   return $sql
268 }
269
270 sub _make_as {
271   my ($self, $from) = @_;
272   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ 
273                         : ref $_ eq 'REF'    ? $self->_bind_to_sql($_) 
274                         : $self->_quote($_)) 
275                        } reverse each %{$self->_skip_options($from)});
276 }
277
278 sub _skip_options {
279   my ($self, $hash) = @_;
280   my $clean_hash = {};
281   $clean_hash->{$_} = $hash->{$_}
282     for grep {!/^-/} keys %$hash;
283   return $clean_hash;
284 }
285
286 sub _join_condition {
287   my ($self, $cond) = @_;
288   if (ref $cond eq 'HASH') {
289     my %j;
290     for (keys %$cond) {
291       my $v = $cond->{$_};
292       if (ref $v) {
293         # XXX no throw_exception() in this package and croak() fails with strange results
294         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
295             if ref($v) ne 'SCALAR';
296         $j{$_} = $v;
297       }
298       else {
299         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
300       }
301     };
302     return scalar($self->_recurse_where(\%j));
303   } elsif (ref $cond eq 'ARRAY') {
304     return join(' OR ', map { $self->_join_condition($_) } @$cond);
305   } else {
306     die "Can't handle this yet!";
307   }
308 }
309
310 sub _quote {
311   my ($self, $label) = @_;
312   return '' unless defined $label;
313   return "*" if $label eq '*';
314   return $label unless $self->{quote_char};
315   if(ref $self->{quote_char} eq "ARRAY"){
316     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
317       if !defined $self->{name_sep};
318     my $sep = $self->{name_sep};
319     return join($self->{name_sep},
320         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
321        split(/\Q$sep\E/,$label));
322   }
323   return $self->SUPER::_quote($label);
324 }
325
326 sub limit_dialect {
327     my $self = shift;
328     $self->{limit_dialect} = shift if @_;
329     return $self->{limit_dialect};
330 }
331
332 sub quote_char {
333     my $self = shift;
334     $self->{quote_char} = shift if @_;
335     return $self->{quote_char};
336 }
337
338 sub name_sep {
339     my $self = shift;
340     $self->{name_sep} = shift if @_;
341     return $self->{name_sep};
342 }
343
344 } # End of BEGIN block
345
346 =head1 NAME
347
348 DBIx::Class::Storage::DBI - DBI storage handler
349
350 =head1 SYNOPSIS
351
352   my $schema = MySchema->connect('dbi:SQLite:my.db');
353
354   $schema->storage->debug(1);
355   $schema->dbh_do("DROP TABLE authors");
356
357   $schema->resultset('Book')->search({
358      written_on => $schema->storage->datetime_parser(DateTime->now)
359   });
360
361 =head1 DESCRIPTION
362
363 This class represents the connection to an RDBMS via L<DBI>.  See
364 L<DBIx::Class::Storage> for general information.  This pod only
365 documents DBI-specific methods and behaviors.
366
367 =head1 METHODS
368
369 =cut
370
371 sub new {
372   my $new = shift->next::method(@_);
373
374   $new->transaction_depth(0);
375   $new->_sql_maker_opts({});
376   $new->{savepoints} = [];
377   $new->{_in_dbh_do} = 0;
378   $new->{_dbh_gen} = 0;
379
380   $new;
381 }
382
383 =head2 connect_info
384
385 This method is normally called by L<DBIx::Class::Schema/connection>, which
386 encapsulates its argument list in an arrayref before passing them here.
387
388 The argument list may contain:
389
390 =over
391
392 =item *
393
394 The same 4-element argument set one would normally pass to
395 L<DBI/connect>, optionally followed by
396 L<extra attributes|/DBIx::Class specific connection attributes>
397 recognized by DBIx::Class:
398
399   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
400
401 =item *
402
403 A single code reference which returns a connected 
404 L<DBI database handle|DBI/connect> optionally followed by 
405 L<extra attributes|/DBIx::Class specific connection attributes> recognized
406 by DBIx::Class:
407
408   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
409
410 =item *
411
412 A single hashref with all the attributes and the dsn/user/password
413 mixed together:
414
415   $connect_info_args = [{
416     dsn => $dsn,
417     user => $user,
418     password => $pass,
419     %dbi_attributes,
420     %extra_attributes,
421   }];
422
423 This is particularly useful for L<Catalyst> based applications, allowing the 
424 following config (L<Config::General> style):
425
426   <Model::DB>
427     schema_class   App::DB
428     <connect_info>
429       dsn          dbi:mysql:database=test
430       user         testuser
431       password     TestPass
432       AutoCommit   1
433     </connect_info>
434   </Model::DB>
435
436 =back
437
438 Please note that the L<DBI> docs recommend that you always explicitly
439 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
440 recommends that it be set to I<1>, and that you perform transactions
441 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
442 to I<1> if you do not do explicitly set it to zero.  This is the default 
443 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
444
445 =head3 DBIx::Class specific connection attributes
446
447 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
448 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
449 the following connection options. These options can be mixed in with your other
450 L<DBI> connection attributes, or placed in a seperate hashref
451 (C<\%extra_attributes>) as shown above.
452
453 Every time C<connect_info> is invoked, any previous settings for
454 these options will be cleared before setting the new ones, regardless of
455 whether any options are specified in the new C<connect_info>.
456
457
458 =over
459
460 =item on_connect_do
461
462 Specifies things to do immediately after connecting or re-connecting to
463 the database.  Its value may contain:
464
465 =over
466
467 =item an array reference
468
469 This contains SQL statements to execute in order.  Each element contains
470 a string or a code reference that returns a string.
471
472 =item a code reference
473
474 This contains some code to execute.  Unlike code references within an
475 array reference, its return value is ignored.
476
477 =back
478
479 =item on_disconnect_do
480
481 Takes arguments in the same form as L</on_connect_do> and executes them
482 immediately before disconnecting from the database.
483
484 Note, this only runs if you explicitly call L</disconnect> on the
485 storage object.
486
487 =item disable_sth_caching
488
489 If set to a true value, this option will disable the caching of
490 statement handles via L<DBI/prepare_cached>.
491
492 =item limit_dialect 
493
494 Sets the limit dialect. This is useful for JDBC-bridge among others
495 where the remote SQL-dialect cannot be determined by the name of the
496 driver alone. See also L<SQL::Abstract::Limit>.
497
498 =item quote_char
499
500 Specifies what characters to use to quote table and column names. If 
501 you use this you will want to specify L</name_sep> as well.
502
503 C<quote_char> expects either a single character, in which case is it
504 is placed on either side of the table/column name, or an arrayref of length
505 2 in which case the table/column name is placed between the elements.
506
507 For example under MySQL you should use C<< quote_char => '`' >>, and for
508 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
509
510 =item name_sep
511
512 This only needs to be used in conjunction with C<quote_char>, and is used to 
513 specify the charecter that seperates elements (schemas, tables, columns) from 
514 each other. In most cases this is simply a C<.>.
515
516 The consequences of not supplying this value is that L<SQL::Abstract>
517 will assume DBIx::Class' uses of aliases to be complete column
518 names. The output will look like I<"me.name"> when it should actually
519 be I<"me"."name">.
520
521 =item unsafe
522
523 This Storage driver normally installs its own C<HandleError>, sets
524 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
525 all database handles, including those supplied by a coderef.  It does this
526 so that it can have consistent and useful error behavior.
527
528 If you set this option to a true value, Storage will not do its usual
529 modifications to the database handle's attributes, and instead relies on
530 the settings in your connect_info DBI options (or the values you set in
531 your connection coderef, in the case that you are connecting via coderef).
532
533 Note that your custom settings can cause Storage to malfunction,
534 especially if you set a C<HandleError> handler that suppresses exceptions
535 and/or disable C<RaiseError>.
536
537 =item auto_savepoint
538
539 If this option is true, L<DBIx::Class> will use savepoints when nesting
540 transactions, making it possible to recover from failure in the inner
541 transaction without having to abort all outer transactions.
542
543 =item cursor_class
544
545 Use this argument to supply a cursor class other than the default
546 L<DBIx::Class::Storage::DBI::Cursor>.
547
548 =back
549
550 Some real-life examples of arguments to L</connect_info> and
551 L<DBIx::Class::Schema/connect>
552
553   # Simple SQLite connection
554   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
555
556   # Connect via subref
557   ->connect_info([ sub { DBI->connect(...) } ]);
558
559   # A bit more complicated
560   ->connect_info(
561     [
562       'dbi:Pg:dbname=foo',
563       'postgres',
564       'my_pg_password',
565       { AutoCommit => 1 },
566       { quote_char => q{"}, name_sep => q{.} },
567     ]
568   );
569
570   # Equivalent to the previous example
571   ->connect_info(
572     [
573       'dbi:Pg:dbname=foo',
574       'postgres',
575       'my_pg_password',
576       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
577     ]
578   );
579
580   # Same, but with hashref as argument
581   # See parse_connect_info for explanation
582   ->connect_info(
583     [{
584       dsn         => 'dbi:Pg:dbname=foo',
585       user        => 'postgres',
586       password    => 'my_pg_password',
587       AutoCommit  => 1,
588       quote_char  => q{"},
589       name_sep    => q{.},
590     }]
591   );
592
593   # Subref + DBIx::Class-specific connection options
594   ->connect_info(
595     [
596       sub { DBI->connect(...) },
597       {
598           quote_char => q{`},
599           name_sep => q{@},
600           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
601           disable_sth_caching => 1,
602       },
603     ]
604   );
605
606
607
608 =cut
609
610 sub connect_info {
611   my ($self, $info_arg) = @_;
612
613   return $self->_connect_info if !$info_arg;
614
615   my @args = @$info_arg;  # take a shallow copy for further mutilation
616   $self->_connect_info([@args]); # copy for _connect_info
617
618
619   # combine/pre-parse arguments depending on invocation style
620
621   my %attrs;
622   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
623     %attrs = %{ $args[1] || {} };
624     @args = $args[0];
625   }
626   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
627     %attrs = %{$args[0]};
628     @args = ();
629     for (qw/password user dsn/) {
630       unshift @args, delete $attrs{$_};
631     }
632   }
633   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
634     %attrs = (
635       % { $args[3] || {} },
636       % { $args[4] || {} },
637     );
638     @args = @args[0,1,2];
639   }
640
641   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
642   #  the new set of options
643   $self->_sql_maker(undef);
644   $self->_sql_maker_opts({});
645
646   if(keys %attrs) {
647     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
648       if(my $value = delete $attrs{$storage_opt}) {
649         $self->$storage_opt($value);
650       }
651     }
652     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
653       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
654         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
655       }
656     }
657   }
658
659   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
660
661   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
662   $self->_connect_info;
663 }
664
665 =head2 on_connect_do
666
667 This method is deprecated in favour of setting via L</connect_info>.
668
669
670 =head2 dbh_do
671
672 Arguments: ($subref | $method_name), @extra_coderef_args?
673
674 Execute the given $subref or $method_name using the new exception-based
675 connection management.
676
677 The first two arguments will be the storage object that C<dbh_do> was called
678 on and a database handle to use.  Any additional arguments will be passed
679 verbatim to the called subref as arguments 2 and onwards.
680
681 Using this (instead of $self->_dbh or $self->dbh) ensures correct
682 exception handling and reconnection (or failover in future subclasses).
683
684 Your subref should have no side-effects outside of the database, as
685 there is the potential for your subref to be partially double-executed
686 if the database connection was stale/dysfunctional.
687
688 Example:
689
690   my @stuff = $schema->storage->dbh_do(
691     sub {
692       my ($storage, $dbh, @cols) = @_;
693       my $cols = join(q{, }, @cols);
694       $dbh->selectrow_array("SELECT $cols FROM foo");
695     },
696     @column_list
697   );
698
699 =cut
700
701 sub dbh_do {
702   my $self = shift;
703   my $code = shift;
704
705   my $dbh = $self->_dbh;
706
707   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
708       || $self->{transaction_depth};
709
710   local $self->{_in_dbh_do} = 1;
711
712   my @result;
713   my $want_array = wantarray;
714
715   eval {
716     $self->_verify_pid if $dbh;
717     if(!$self->_dbh) {
718         $self->_populate_dbh;
719         $dbh = $self->_dbh;
720     }
721
722     if($want_array) {
723         @result = $self->$code($dbh, @_);
724     }
725     elsif(defined $want_array) {
726         $result[0] = $self->$code($dbh, @_);
727     }
728     else {
729         $self->$code($dbh, @_);
730     }
731   };
732
733   my $exception = $@;
734   if(!$exception) { return $want_array ? @result : $result[0] }
735
736   $self->throw_exception($exception) if $self->connected;
737
738   # We were not connected - reconnect and retry, but let any
739   #  exception fall right through this time
740   $self->_populate_dbh;
741   $self->$code($self->_dbh, @_);
742 }
743
744 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
745 # It also informs dbh_do to bypass itself while under the direction of txn_do,
746 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
747 sub txn_do {
748   my $self = shift;
749   my $coderef = shift;
750
751   ref $coderef eq 'CODE' or $self->throw_exception
752     ('$coderef must be a CODE reference');
753
754   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
755
756   local $self->{_in_dbh_do} = 1;
757
758   my @result;
759   my $want_array = wantarray;
760
761   my $tried = 0;
762   while(1) {
763     eval {
764       $self->_verify_pid if $self->_dbh;
765       $self->_populate_dbh if !$self->_dbh;
766
767       $self->txn_begin;
768       if($want_array) {
769           @result = $coderef->(@_);
770       }
771       elsif(defined $want_array) {
772           $result[0] = $coderef->(@_);
773       }
774       else {
775           $coderef->(@_);
776       }
777       $self->txn_commit;
778     };
779
780     my $exception = $@;
781     if(!$exception) { return $want_array ? @result : $result[0] }
782
783     if($tried++ > 0 || $self->connected) {
784       eval { $self->txn_rollback };
785       my $rollback_exception = $@;
786       if($rollback_exception) {
787         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
788         $self->throw_exception($exception)  # propagate nested rollback
789           if $rollback_exception =~ /$exception_class/;
790
791         $self->throw_exception(
792           "Transaction aborted: ${exception}. "
793           . "Rollback failed: ${rollback_exception}"
794         );
795       }
796       $self->throw_exception($exception)
797     }
798
799     # We were not connected, and was first try - reconnect and retry
800     # via the while loop
801     $self->_populate_dbh;
802   }
803 }
804
805 =head2 disconnect
806
807 Our C<disconnect> method also performs a rollback first if the
808 database is not in C<AutoCommit> mode.
809
810 =cut
811
812 sub disconnect {
813   my ($self) = @_;
814
815   if( $self->connected ) {
816     my $connection_do = $self->on_disconnect_do;
817     $self->_do_connection_actions($connection_do) if ref($connection_do);
818
819     $self->_dbh->rollback unless $self->_dbh_autocommit;
820     $self->_dbh->disconnect;
821     $self->_dbh(undef);
822     $self->{_dbh_gen}++;
823   }
824 }
825
826 =head2 with_deferred_fk_checks
827
828 =over 4
829
830 =item Arguments: C<$coderef>
831
832 =item Return Value: The return value of $coderef
833
834 =back
835
836 Storage specific method to run the code ref with FK checks deferred or
837 in MySQL's case disabled entirely.
838
839 =cut
840
841 # Storage subclasses should override this
842 sub with_deferred_fk_checks {
843   my ($self, $sub) = @_;
844
845   $sub->();
846 }
847
848 sub connected {
849   my ($self) = @_;
850
851   if(my $dbh = $self->_dbh) {
852       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
853           $self->_dbh(undef);
854           $self->{_dbh_gen}++;
855           return;
856       }
857       else {
858           $self->_verify_pid;
859           return 0 if !$self->_dbh;
860       }
861       return ($dbh->FETCH('Active') && $dbh->ping);
862   }
863
864   return 0;
865 }
866
867 # handle pid changes correctly
868 #  NOTE: assumes $self->_dbh is a valid $dbh
869 sub _verify_pid {
870   my ($self) = @_;
871
872   return if defined $self->_conn_pid && $self->_conn_pid == $$;
873
874   $self->_dbh->{InactiveDestroy} = 1;
875   $self->_dbh(undef);
876   $self->{_dbh_gen}++;
877
878   return;
879 }
880
881 sub ensure_connected {
882   my ($self) = @_;
883
884   unless ($self->connected) {
885     $self->_populate_dbh;
886   }
887 }
888
889 =head2 dbh
890
891 Returns the dbh - a data base handle of class L<DBI>.
892
893 =cut
894
895 sub dbh {
896   my ($self) = @_;
897
898   $self->ensure_connected;
899   return $self->_dbh;
900 }
901
902 sub _sql_maker_args {
903     my ($self) = @_;
904     
905     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
906 }
907
908 sub sql_maker {
909   my ($self) = @_;
910   unless ($self->_sql_maker) {
911     my $sql_maker_class = $self->sql_maker_class;
912     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
913   }
914   return $self->_sql_maker;
915 }
916
917 sub _rebless {}
918
919 sub _populate_dbh {
920   my ($self) = @_;
921   my @info = @{$self->_dbi_connect_info || []};
922   $self->_dbh($self->_connect(@info));
923
924   # Always set the transaction depth on connect, since
925   #  there is no transaction in progress by definition
926   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
927
928   if(ref $self eq 'DBIx::Class::Storage::DBI') {
929     my $driver = $self->_dbh->{Driver}->{Name};
930     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
931       bless $self, "DBIx::Class::Storage::DBI::${driver}";
932       $self->_rebless();
933     }
934   }
935
936   $self->_conn_pid($$);
937   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
938
939   my $connection_do = $self->on_connect_do;
940   $self->_do_connection_actions($connection_do) if ref($connection_do);
941 }
942
943 sub _do_connection_actions {
944   my $self = shift;
945   my $connection_do = shift;
946
947   if (ref $connection_do eq 'ARRAY') {
948     $self->_do_query($_) foreach @$connection_do;
949   }
950   elsif (ref $connection_do eq 'CODE') {
951     $connection_do->($self);
952   }
953
954   return $self;
955 }
956
957 sub _do_query {
958   my ($self, $action) = @_;
959
960   if (ref $action eq 'CODE') {
961     $action = $action->($self);
962     $self->_do_query($_) foreach @$action;
963   }
964   else {
965     # Most debuggers expect ($sql, @bind), so we need to exclude
966     # the attribute hash which is the second argument to $dbh->do
967     # furthermore the bind values are usually to be presented
968     # as named arrayref pairs, so wrap those here too
969     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
970     my $sql = shift @do_args;
971     my $attrs = shift @do_args;
972     my @bind = map { [ undef, $_ ] } @do_args;
973
974     $self->_query_start($sql, @bind);
975     $self->_dbh->do($sql, $attrs, @do_args);
976     $self->_query_end($sql, @bind);
977   }
978
979   return $self;
980 }
981
982 sub _connect {
983   my ($self, @info) = @_;
984
985   $self->throw_exception("You failed to provide any connection info")
986     if !@info;
987
988   my ($old_connect_via, $dbh);
989
990   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
991     $old_connect_via = $DBI::connect_via;
992     $DBI::connect_via = 'connect';
993   }
994
995   eval {
996     if(ref $info[0] eq 'CODE') {
997        $dbh = &{$info[0]}
998     }
999     else {
1000        $dbh = DBI->connect(@info);
1001     }
1002
1003     if($dbh && !$self->unsafe) {
1004       my $weak_self = $self;
1005       weaken($weak_self);
1006       $dbh->{HandleError} = sub {
1007           if ($weak_self) {
1008             $weak_self->throw_exception("DBI Exception: $_[0]");
1009           }
1010           else {
1011             croak ("DBI Exception: $_[0]");
1012           }
1013       };
1014       $dbh->{ShowErrorStatement} = 1;
1015       $dbh->{RaiseError} = 1;
1016       $dbh->{PrintError} = 0;
1017     }
1018   };
1019
1020   $DBI::connect_via = $old_connect_via if $old_connect_via;
1021
1022   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1023     if !$dbh || $@;
1024
1025   $self->_dbh_autocommit($dbh->{AutoCommit});
1026
1027   $dbh;
1028 }
1029
1030 sub svp_begin {
1031   my ($self, $name) = @_;
1032
1033   $name = $self->_svp_generate_name
1034     unless defined $name;
1035
1036   $self->throw_exception ("You can't use savepoints outside a transaction")
1037     if $self->{transaction_depth} == 0;
1038
1039   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1040     unless $self->can('_svp_begin');
1041   
1042   push @{ $self->{savepoints} }, $name;
1043
1044   $self->debugobj->svp_begin($name) if $self->debug;
1045   
1046   return $self->_svp_begin($name);
1047 }
1048
1049 sub svp_release {
1050   my ($self, $name) = @_;
1051
1052   $self->throw_exception ("You can't use savepoints outside a transaction")
1053     if $self->{transaction_depth} == 0;
1054
1055   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1056     unless $self->can('_svp_release');
1057
1058   if (defined $name) {
1059     $self->throw_exception ("Savepoint '$name' does not exist")
1060       unless grep { $_ eq $name } @{ $self->{savepoints} };
1061
1062     # Dig through the stack until we find the one we are releasing.  This keeps
1063     # the stack up to date.
1064     my $svp;
1065
1066     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1067   } else {
1068     $name = pop @{ $self->{savepoints} };
1069   }
1070
1071   $self->debugobj->svp_release($name) if $self->debug;
1072
1073   return $self->_svp_release($name);
1074 }
1075
1076 sub svp_rollback {
1077   my ($self, $name) = @_;
1078
1079   $self->throw_exception ("You can't use savepoints outside a transaction")
1080     if $self->{transaction_depth} == 0;
1081
1082   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1083     unless $self->can('_svp_rollback');
1084
1085   if (defined $name) {
1086       # If they passed us a name, verify that it exists in the stack
1087       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1088           $self->throw_exception("Savepoint '$name' does not exist!");
1089       }
1090
1091       # Dig through the stack until we find the one we are releasing.  This keeps
1092       # the stack up to date.
1093       while(my $s = pop(@{ $self->{savepoints} })) {
1094           last if($s eq $name);
1095       }
1096       # Add the savepoint back to the stack, as a rollback doesn't remove the
1097       # named savepoint, only everything after it.
1098       push(@{ $self->{savepoints} }, $name);
1099   } else {
1100       # We'll assume they want to rollback to the last savepoint
1101       $name = $self->{savepoints}->[-1];
1102   }
1103
1104   $self->debugobj->svp_rollback($name) if $self->debug;
1105   
1106   return $self->_svp_rollback($name);
1107 }
1108
1109 sub _svp_generate_name {
1110     my ($self) = @_;
1111
1112     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1113 }
1114
1115 sub txn_begin {
1116   my $self = shift;
1117   $self->ensure_connected();
1118   if($self->{transaction_depth} == 0) {
1119     $self->debugobj->txn_begin()
1120       if $self->debug;
1121     # this isn't ->_dbh-> because
1122     #  we should reconnect on begin_work
1123     #  for AutoCommit users
1124     $self->dbh->begin_work;
1125   } elsif ($self->auto_savepoint) {
1126     $self->svp_begin;
1127   }
1128   $self->{transaction_depth}++;
1129 }
1130
1131 sub txn_commit {
1132   my $self = shift;
1133   if ($self->{transaction_depth} == 1) {
1134     my $dbh = $self->_dbh;
1135     $self->debugobj->txn_commit()
1136       if ($self->debug);
1137     $dbh->commit;
1138     $self->{transaction_depth} = 0
1139       if $self->_dbh_autocommit;
1140   }
1141   elsif($self->{transaction_depth} > 1) {
1142     $self->{transaction_depth}--;
1143     $self->svp_release
1144       if $self->auto_savepoint;
1145   }
1146 }
1147
1148 sub txn_rollback {
1149   my $self = shift;
1150   my $dbh = $self->_dbh;
1151   eval {
1152     if ($self->{transaction_depth} == 1) {
1153       $self->debugobj->txn_rollback()
1154         if ($self->debug);
1155       $self->{transaction_depth} = 0
1156         if $self->_dbh_autocommit;
1157       $dbh->rollback;
1158     }
1159     elsif($self->{transaction_depth} > 1) {
1160       $self->{transaction_depth}--;
1161       if ($self->auto_savepoint) {
1162         $self->svp_rollback;
1163         $self->svp_release;
1164       }
1165     }
1166     else {
1167       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1168     }
1169   };
1170   if ($@) {
1171     my $error = $@;
1172     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1173     $error =~ /$exception_class/ and $self->throw_exception($error);
1174     # ensure that a failed rollback resets the transaction depth
1175     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1176     $self->throw_exception($error);
1177   }
1178 }
1179
1180 # This used to be the top-half of _execute.  It was split out to make it
1181 #  easier to override in NoBindVars without duping the rest.  It takes up
1182 #  all of _execute's args, and emits $sql, @bind.
1183 sub _prep_for_execute {
1184   my ($self, $op, $extra_bind, $ident, $args) = @_;
1185
1186   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1187     $ident = $ident->from();
1188   }
1189
1190   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1191
1192   unshift(@bind,
1193     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1194       if $extra_bind;
1195   return ($sql, \@bind);
1196 }
1197
1198 sub _fix_bind_params {
1199     my ($self, @bind) = @_;
1200
1201     ### Turn @bind from something like this:
1202     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1203     ### to this:
1204     ###   ( "'1'", "'1'", "'3'" )
1205     return
1206         map {
1207             if ( defined( $_ && $_->[1] ) ) {
1208                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1209             }
1210             else { q{'NULL'}; }
1211         } @bind;
1212 }
1213
1214 sub _query_start {
1215     my ( $self, $sql, @bind ) = @_;
1216
1217     if ( $self->debug ) {
1218         @bind = $self->_fix_bind_params(@bind);
1219         
1220         $self->debugobj->query_start( $sql, @bind );
1221     }
1222 }
1223
1224 sub _query_end {
1225     my ( $self, $sql, @bind ) = @_;
1226
1227     if ( $self->debug ) {
1228         @bind = $self->_fix_bind_params(@bind);
1229         $self->debugobj->query_end( $sql, @bind );
1230     }
1231 }
1232
1233 sub _dbh_execute {
1234   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1235   
1236   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1237
1238   $self->_query_start( $sql, @$bind );
1239
1240   my $sth = $self->sth($sql,$op);
1241
1242   my $placeholder_index = 1; 
1243
1244   foreach my $bound (@$bind) {
1245     my $attributes = {};
1246     my($column_name, @data) = @$bound;
1247
1248     if ($bind_attributes) {
1249       $attributes = $bind_attributes->{$column_name}
1250       if defined $bind_attributes->{$column_name};
1251     }
1252
1253     foreach my $data (@data) {
1254       my $ref = ref $data;
1255       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1256
1257       $sth->bind_param($placeholder_index, $data, $attributes);
1258       $placeholder_index++;
1259     }
1260   }
1261
1262   # Can this fail without throwing an exception anyways???
1263   my $rv = $sth->execute();
1264   $self->throw_exception($sth->errstr) if !$rv;
1265
1266   $self->_query_end( $sql, @$bind );
1267
1268   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1269 }
1270
1271 sub _execute {
1272     my $self = shift;
1273     $self->dbh_do('_dbh_execute', @_)
1274 }
1275
1276 sub insert {
1277   my ($self, $source, $to_insert) = @_;
1278   
1279   my $ident = $source->from; 
1280   my $bind_attributes = $self->source_bind_attributes($source);
1281
1282   $self->ensure_connected;
1283   foreach my $col ( $source->columns ) {
1284     if ( !defined $to_insert->{$col} ) {
1285       my $col_info = $source->column_info($col);
1286
1287       if ( $col_info->{auto_nextval} ) {
1288         $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1289       }
1290     }
1291   }
1292
1293   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1294
1295   return $to_insert;
1296 }
1297
1298 ## Still not quite perfect, and EXPERIMENTAL
1299 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1300 ## scalar refs, or at least, all the same type as the first set, the statement is
1301 ## only prepped once.
1302 sub insert_bulk {
1303   my ($self, $source, $cols, $data) = @_;
1304   my %colvalues;
1305   my $table = $source->from;
1306   @colvalues{@$cols} = (0..$#$cols);
1307   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1308   
1309   $self->_query_start( $sql, @bind );
1310   my $sth = $self->sth($sql);
1311
1312 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1313
1314   ## This must be an arrayref, else nothing works!
1315   
1316   my $tuple_status = [];
1317   
1318   ##use Data::Dumper;
1319   ##print STDERR Dumper( $data, $sql, [@bind] );
1320
1321   my $time = time();
1322
1323   ## Get the bind_attributes, if any exist
1324   my $bind_attributes = $self->source_bind_attributes($source);
1325
1326   ## Bind the values and execute
1327   my $placeholder_index = 1; 
1328
1329   foreach my $bound (@bind) {
1330
1331     my $attributes = {};
1332     my ($column_name, $data_index) = @$bound;
1333
1334     if( $bind_attributes ) {
1335       $attributes = $bind_attributes->{$column_name}
1336       if defined $bind_attributes->{$column_name};
1337     }
1338
1339     my @data = map { $_->[$data_index] } @$data;
1340
1341     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1342     $placeholder_index++;
1343   }
1344   my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
1345   $self->throw_exception($sth->errstr) if !$rv;
1346
1347   $self->_query_end( $sql, @bind );
1348   return (wantarray ? ($rv, $sth, @bind) : $rv);
1349 }
1350
1351 sub update {
1352   my $self = shift @_;
1353   my $source = shift @_;
1354   my $bind_attributes = $self->source_bind_attributes($source);
1355   
1356   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1357 }
1358
1359
1360 sub delete {
1361   my $self = shift @_;
1362   my $source = shift @_;
1363   
1364   my $bind_attrs = {}; ## If ever it's needed...
1365   
1366   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1367 }
1368
1369 sub _select {
1370   my $self = shift;
1371   my $sql_maker = $self->sql_maker;
1372   local $sql_maker->{for};
1373   return $self->_execute($self->_select_args(@_));
1374 }
1375
1376 sub _select_args {
1377   my ($self, $ident, $select, $condition, $attrs) = @_;
1378   my $order = $attrs->{order_by};
1379
1380   if (ref $condition eq 'SCALAR') {
1381     my $unwrap = ${$condition};
1382     if ($unwrap =~ s/ORDER BY (.*)$//i) {
1383       $order = $1;
1384       $condition = \$unwrap;
1385     }
1386   }
1387
1388   my $for = delete $attrs->{for};
1389   my $sql_maker = $self->sql_maker;
1390   $sql_maker->{for} = $for;
1391
1392   if (exists $attrs->{group_by} || $attrs->{having}) {
1393     $order = {
1394       group_by => $attrs->{group_by},
1395       having => $attrs->{having},
1396       ($order ? (order_by => $order) : ())
1397     };
1398   }
1399   my $bind_attrs = {}; ## Future support
1400   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1401   if ($attrs->{software_limit} ||
1402       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1403         $attrs->{software_limit} = 1;
1404   } else {
1405     $self->throw_exception("rows attribute must be positive if present")
1406       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1407
1408     # MySQL actually recommends this approach.  I cringe.
1409     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1410     push @args, $attrs->{rows}, $attrs->{offset};
1411   }
1412
1413   return @args;
1414 }
1415
1416 sub source_bind_attributes {
1417   my ($self, $source) = @_;
1418   
1419   my $bind_attributes;
1420   foreach my $column ($source->columns) {
1421   
1422     my $data_type = $source->column_info($column)->{data_type} || '';
1423     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1424      if $data_type;
1425   }
1426
1427   return $bind_attributes;
1428 }
1429
1430 =head2 select
1431
1432 =over 4
1433
1434 =item Arguments: $ident, $select, $condition, $attrs
1435
1436 =back
1437
1438 Handle a SQL select statement.
1439
1440 =cut
1441
1442 sub select {
1443   my $self = shift;
1444   my ($ident, $select, $condition, $attrs) = @_;
1445   return $self->cursor_class->new($self, \@_, $attrs);
1446 }
1447
1448 sub select_single {
1449   my $self = shift;
1450   my ($rv, $sth, @bind) = $self->_select(@_);
1451   my @row = $sth->fetchrow_array;
1452   my @nextrow = $sth->fetchrow_array if @row;
1453   if(@row && @nextrow) {
1454     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1455   }
1456   # Need to call finish() to work round broken DBDs
1457   $sth->finish();
1458   return @row;
1459 }
1460
1461 =head2 sth
1462
1463 =over 4
1464
1465 =item Arguments: $sql
1466
1467 =back
1468
1469 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1470
1471 =cut
1472
1473 sub _dbh_sth {
1474   my ($self, $dbh, $sql) = @_;
1475
1476   # 3 is the if_active parameter which avoids active sth re-use
1477   my $sth = $self->disable_sth_caching
1478     ? $dbh->prepare($sql)
1479     : $dbh->prepare_cached($sql, {}, 3);
1480
1481   # XXX You would think RaiseError would make this impossible,
1482   #  but apparently that's not true :(
1483   $self->throw_exception($dbh->errstr) if !$sth;
1484
1485   $sth;
1486 }
1487
1488 sub sth {
1489   my ($self, $sql) = @_;
1490   $self->dbh_do('_dbh_sth', $sql);
1491 }
1492
1493 sub _dbh_columns_info_for {
1494   my ($self, $dbh, $table) = @_;
1495
1496   if ($dbh->can('column_info')) {
1497     my %result;
1498     eval {
1499       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1500       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1501       $sth->execute();
1502       while ( my $info = $sth->fetchrow_hashref() ){
1503         my %column_info;
1504         $column_info{data_type}   = $info->{TYPE_NAME};
1505         $column_info{size}      = $info->{COLUMN_SIZE};
1506         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1507         $column_info{default_value} = $info->{COLUMN_DEF};
1508         my $col_name = $info->{COLUMN_NAME};
1509         $col_name =~ s/^\"(.*)\"$/$1/;
1510
1511         $result{$col_name} = \%column_info;
1512       }
1513     };
1514     return \%result if !$@ && scalar keys %result;
1515   }
1516
1517   my %result;
1518   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1519   $sth->execute;
1520   my @columns = @{$sth->{NAME_lc}};
1521   for my $i ( 0 .. $#columns ){
1522     my %column_info;
1523     $column_info{data_type} = $sth->{TYPE}->[$i];
1524     $column_info{size} = $sth->{PRECISION}->[$i];
1525     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1526
1527     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1528       $column_info{data_type} = $1;
1529       $column_info{size}    = $2;
1530     }
1531
1532     $result{$columns[$i]} = \%column_info;
1533   }
1534   $sth->finish;
1535
1536   foreach my $col (keys %result) {
1537     my $colinfo = $result{$col};
1538     my $type_num = $colinfo->{data_type};
1539     my $type_name;
1540     if(defined $type_num && $dbh->can('type_info')) {
1541       my $type_info = $dbh->type_info($type_num);
1542       $type_name = $type_info->{TYPE_NAME} if $type_info;
1543       $colinfo->{data_type} = $type_name if $type_name;
1544     }
1545   }
1546
1547   return \%result;
1548 }
1549
1550 sub columns_info_for {
1551   my ($self, $table) = @_;
1552   $self->dbh_do('_dbh_columns_info_for', $table);
1553 }
1554
1555 =head2 last_insert_id
1556
1557 Return the row id of the last insert.
1558
1559 =cut
1560
1561 sub _dbh_last_insert_id {
1562     my ($self, $dbh, $source, $col) = @_;
1563     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1564     $dbh->func('last_insert_rowid');
1565 }
1566
1567 sub last_insert_id {
1568   my $self = shift;
1569   $self->dbh_do('_dbh_last_insert_id', @_);
1570 }
1571
1572 =head2 sqlt_type
1573
1574 Returns the database driver name.
1575
1576 =cut
1577
1578 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1579
1580 =head2 bind_attribute_by_data_type
1581
1582 Given a datatype from column info, returns a database specific bind
1583 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1584 let the database planner just handle it.
1585
1586 Generally only needed for special case column types, like bytea in postgres.
1587
1588 =cut
1589
1590 sub bind_attribute_by_data_type {
1591     return;
1592 }
1593
1594 =head2 create_ddl_dir
1595
1596 =over 4
1597
1598 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1599
1600 =back
1601
1602 Creates a SQL file based on the Schema, for each of the specified
1603 database types, in the given directory.
1604
1605 By default, C<\%sqlt_args> will have
1606
1607  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1608
1609 merged with the hash passed in. To disable any of those features, pass in a 
1610 hashref like the following
1611
1612  { ignore_constraint_names => 0, # ... other options }
1613
1614 =cut
1615
1616 sub create_ddl_dir {
1617   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1618
1619   if(!$dir || !-d $dir) {
1620     warn "No directory given, using ./\n";
1621     $dir = "./";
1622   }
1623   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1624   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1625
1626   my $schema_version = $schema->schema_version || '1.x';
1627   $version ||= $schema_version;
1628
1629   $sqltargs = {
1630     add_drop_table => 1, 
1631     ignore_constraint_names => 1,
1632     ignore_index_names => 1,
1633     %{$sqltargs || {}}
1634   };
1635
1636   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1637       . $self->_check_sqlt_message . q{'})
1638           if !$self->_check_sqlt_version;
1639
1640   my $sqlt = SQL::Translator->new( $sqltargs );
1641
1642   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1643   my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
1644
1645   foreach my $db (@$databases) {
1646     $sqlt->reset();
1647     $sqlt = $self->configure_sqlt($sqlt, $db);
1648     $sqlt->{schema} = $sqlt_schema;
1649     $sqlt->producer($db);
1650
1651     my $file;
1652     my $filename = $schema->ddl_filename($db, $version, $dir);
1653     if (-e $filename && ($version eq $schema_version )) {
1654       # if we are dumping the current version, overwrite the DDL
1655       warn "Overwriting existing DDL file - $filename";
1656       unlink($filename);
1657     }
1658
1659     my $output = $sqlt->translate;
1660     if(!$output) {
1661       warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1662       next;
1663     }
1664     if(!open($file, ">$filename")) {
1665       $self->throw_exception("Can't open $filename for writing ($!)");
1666       next;
1667     }
1668     print $file $output;
1669     close($file);
1670   
1671     next unless ($preversion);
1672
1673     require SQL::Translator::Diff;
1674
1675     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1676     if(!-e $prefilename) {
1677       warn("No previous schema file found ($prefilename)");
1678       next;
1679     }
1680
1681     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1682     if(-e $difffile) {
1683       warn("Overwriting existing diff file - $difffile");
1684       unlink($difffile);
1685     }
1686     
1687     my $source_schema;
1688     {
1689       my $t = SQL::Translator->new($sqltargs);
1690       $t->debug( 0 );
1691       $t->trace( 0 );
1692       $t->parser( $db )                       or die $t->error;
1693       $t = $self->configure_sqlt($t, $db);
1694       my $out = $t->translate( $prefilename ) or die $t->error;
1695       $source_schema = $t->schema;
1696       unless ( $source_schema->name ) {
1697         $source_schema->name( $prefilename );
1698       }
1699     }
1700
1701     # The "new" style of producers have sane normalization and can support 
1702     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1703     # And we have to diff parsed SQL against parsed SQL.
1704     my $dest_schema = $sqlt_schema;
1705     
1706     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1707       my $t = SQL::Translator->new($sqltargs);
1708       $t->debug( 0 );
1709       $t->trace( 0 );
1710       $t->parser( $db )                    or die $t->error;
1711       $t = $self->configure_sqlt($t, $db);
1712       my $out = $t->translate( $filename ) or die $t->error;
1713       $dest_schema = $t->schema;
1714       $dest_schema->name( $filename )
1715         unless $dest_schema->name;
1716     }
1717     
1718     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1719                                                   $dest_schema,   $db,
1720                                                   $sqltargs
1721                                                  );
1722     if(!open $file, ">$difffile") { 
1723       $self->throw_exception("Can't write to $difffile ($!)");
1724       next;
1725     }
1726     print $file $diff;
1727     close($file);
1728   }
1729 }
1730
1731 sub configure_sqlt() {
1732   my $self = shift;
1733   my $tr = shift;
1734   my $db = shift || $self->sqlt_type;
1735   if ($db eq 'PostgreSQL') {
1736     $tr->quote_table_names(0);
1737     $tr->quote_field_names(0);
1738   }
1739   return $tr;
1740 }
1741
1742 =head2 deployment_statements
1743
1744 =over 4
1745
1746 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1747
1748 =back
1749
1750 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1751 The database driver name is given by C<$type>, though the value from
1752 L</sqlt_type> is used if it is not specified.
1753
1754 C<$directory> is used to return statements from files in a previously created
1755 L</create_ddl_dir> directory and is optional. The filenames are constructed
1756 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1757
1758 If no C<$directory> is specified then the statements are constructed on the
1759 fly using L<SQL::Translator> and C<$version> is ignored.
1760
1761 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1762
1763 =cut
1764
1765 sub deployment_statements {
1766   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1767   # Need to be connected to get the correct sqlt_type
1768   $self->ensure_connected() unless $type;
1769   $type ||= $self->sqlt_type;
1770   $version ||= $schema->schema_version || '1.x';
1771   $dir ||= './';
1772   my $filename = $schema->ddl_filename($type, $dir, $version);
1773   if(-f $filename)
1774   {
1775       my $file;
1776       open($file, "<$filename") 
1777         or $self->throw_exception("Can't open $filename ($!)");
1778       my @rows = <$file>;
1779       close($file);
1780       return join('', @rows);
1781   }
1782
1783   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
1784       . $self->_check_sqlt_message . q{'})
1785           if !$self->_check_sqlt_version;
1786
1787   require SQL::Translator::Parser::DBIx::Class;
1788   eval qq{use SQL::Translator::Producer::${type}};
1789   $self->throw_exception($@) if $@;
1790
1791   # sources needs to be a parser arg, but for simplicty allow at top level 
1792   # coming in
1793   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1794       if exists $sqltargs->{sources};
1795
1796   my $tr = SQL::Translator->new(%$sqltargs);
1797   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1798   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1799 }
1800
1801 sub deploy {
1802   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1803   my $deploy = sub {
1804     my $line = shift;
1805     return if($line =~ /^--/);
1806     return if(!$line);
1807     # next if($line =~ /^DROP/m);
1808     return if($line =~ /^BEGIN TRANSACTION/m);
1809     return if($line =~ /^COMMIT/m);
1810     return if $line =~ /^\s+$/; # skip whitespace only
1811     $self->_query_start($line);
1812     eval {
1813       $self->dbh->do($line); # shouldn't be using ->dbh ?
1814     };
1815     if ($@) {
1816       warn qq{$@ (running "${line}")};
1817     }
1818     $self->_query_end($line);
1819   };
1820   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } );
1821   if (@statements > 1) {
1822     foreach my $statement (@statements) {
1823       $deploy->( $statement );
1824     }
1825   }
1826   elsif (@statements == 1) {
1827     foreach my $line ( split(";\n", $statements[0])) {
1828       $deploy->( $line );
1829     }
1830   }
1831 }
1832
1833 =head2 datetime_parser
1834
1835 Returns the datetime parser class
1836
1837 =cut
1838
1839 sub datetime_parser {
1840   my $self = shift;
1841   return $self->{datetime_parser} ||= do {
1842     $self->ensure_connected;
1843     $self->build_datetime_parser(@_);
1844   };
1845 }
1846
1847 =head2 datetime_parser_type
1848
1849 Defines (returns) the datetime parser class - currently hardwired to
1850 L<DateTime::Format::MySQL>
1851
1852 =cut
1853
1854 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1855
1856 =head2 build_datetime_parser
1857
1858 See L</datetime_parser>
1859
1860 =cut
1861
1862 sub build_datetime_parser {
1863   my $self = shift;
1864   my $type = $self->datetime_parser_type(@_);
1865   eval "use ${type}";
1866   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1867   return $type;
1868 }
1869
1870 {
1871     my $_check_sqlt_version; # private
1872     my $_check_sqlt_message; # private
1873     sub _check_sqlt_version {
1874         return $_check_sqlt_version if defined $_check_sqlt_version;
1875         eval 'use SQL::Translator "0.09003"';
1876         $_check_sqlt_message = $@ || '';
1877         $_check_sqlt_version = !$@;
1878     }
1879
1880     sub _check_sqlt_message {
1881         _check_sqlt_version if !defined $_check_sqlt_message;
1882         $_check_sqlt_message;
1883     }
1884 }
1885
1886 =head2 is_replicating
1887
1888 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
1889 replicate from a master database.  Default is undef, which is the result
1890 returned by databases that don't support replication.
1891
1892 =cut
1893
1894 sub is_replicating {
1895     return;
1896     
1897 }
1898
1899 =head2 lag_behind_master
1900
1901 Returns a number that represents a certain amount of lag behind a master db
1902 when a given storage is replicating.  The number is database dependent, but
1903 starts at zero and increases with the amount of lag. Default in undef
1904
1905 =cut
1906
1907 sub lag_behind_master {
1908     return;
1909 }
1910
1911 sub DESTROY {
1912   my $self = shift;
1913   return if !$self->_dbh;
1914   $self->_verify_pid;
1915   $self->_dbh(undef);
1916 }
1917
1918 1;
1919
1920 =head1 USAGE NOTES
1921
1922 =head2 DBIx::Class and AutoCommit
1923
1924 DBIx::Class can do some wonderful magic with handling exceptions,
1925 disconnections, and transactions when you use C<< AutoCommit => 1 >>
1926 combined with C<txn_do> for transaction support.
1927
1928 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
1929 in an assumed transaction between commits, and you're telling us you'd
1930 like to manage that manually.  A lot of the magic protections offered by
1931 this module will go away.  We can't protect you from exceptions due to database
1932 disconnects because we don't know anything about how to restart your
1933 transactions.  You're on your own for handling all sorts of exceptional
1934 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
1935 be with raw DBI.
1936
1937
1938 =head1 SQL METHODS
1939
1940 The module defines a set of methods within the DBIC::SQL::Abstract
1941 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1942 SQL query functions.
1943
1944 The following methods are extended:-
1945
1946 =over 4
1947
1948 =item delete
1949
1950 =item insert
1951
1952 =item select
1953
1954 =item update
1955
1956 =item limit_dialect
1957
1958 See L</connect_info> for details.
1959
1960 =item quote_char
1961
1962 See L</connect_info> for details.
1963
1964 =item name_sep
1965
1966 See L</connect_info> for details.
1967
1968 =back
1969
1970 =head1 AUTHORS
1971
1972 Matt S. Trout <mst@shadowcatsystems.co.uk>
1973
1974 Andy Grundman <andy@hybridized.org>
1975
1976 =head1 LICENSE
1977
1978 You may distribute this code under the same terms as Perl itself.
1979
1980 =cut