changes to handling of from
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use SQL::Abstract::Limit;
11 use DBIx::Class::Storage::DBI::Cursor;
12 use DBIx::Class::Storage::Statistics;
13 use Scalar::Util qw/blessed weaken/;
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
18 );
19
20 # the values for these accessors are picked out (and deleted) from
21 # the attribute hashref passed to connect_info
22 my @storage_options = qw/
23   on_connect_do on_disconnect_do disable_sth_caching unsafe auto_savepoint
24 /;
25 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
26
27
28 # default cursor class, overridable in connect_info attributes
29 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
30
31 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
32 __PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
33
34 BEGIN {
35
36 package # Hide from PAUSE
37   DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
38
39 use base qw/SQL::Abstract::Limit/;
40
41 # This prevents the caching of $dbh in S::A::L, I believe
42 sub new {
43   my $self = shift->SUPER::new(@_);
44
45   # If limit_dialect is a ref (like a $dbh), go ahead and replace
46   #   it with what it resolves to:
47   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
48     if ref $self->{limit_dialect};
49
50   $self;
51 }
52
53 # DB2 is the only remaining DB using this. Even though we are not sure if
54 # RowNumberOver is still needed here (should be part of SQLA) leave the 
55 # code in place
56 sub _RowNumberOver {
57   my ($self, $sql, $order, $rows, $offset ) = @_;
58
59   $offset += 1;
60   my $last = $rows + $offset;
61   my ( $order_by ) = $self->_order_by( $order );
62
63   $sql = <<"SQL";
64 SELECT * FROM
65 (
66    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
67       $sql
68       $order_by
69    ) Q1
70 ) Q2
71 WHERE ROW_NUM BETWEEN $offset AND $last
72
73 SQL
74
75   return $sql;
76 }
77
78
79 # While we're at it, this should make LIMIT queries more efficient,
80 #  without digging into things too deeply
81 use Scalar::Util 'blessed';
82 sub _find_syntax {
83   my ($self, $syntax) = @_;
84   
85   # DB2 is the only remaining DB using this. Even though we are not sure if
86   # RowNumberOver is still needed here (should be part of SQLA) leave the 
87   # code in place
88   my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
89   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
90     return 'RowNumberOver';
91   }
92   
93   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
94 }
95
96 sub select {
97   my ($self, $table, $fields, $where, $order, @rest) = @_;
98   local $self->{having_bind} = [];
99
100 #  if (ref $table eq 'HASH') {
101 #    my $alias;
102 #    ($alias, $table) = %$table;
103 #  }
104  
105   if (ref $table eq 'SCALAR') {
106     $table = $$table;
107   }
108   elsif (ref $table eq 'REF') {
109     my ($sql, @bind) = @{${$table}};
110     push(@{$self->{having_bind}}, @bind);
111     $table = $sql;
112   }
113   elsif (not ref $table) {
114     $table = $self->_quote($table);
115   }
116   local $self->{rownum_hack_count} = 1
117     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
118   @rest = (-1) unless defined $rest[0];
119   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
120     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
121   my ($sql, @ret) = $self->SUPER::select(
122     $table, $self->_recurse_fields($fields), $where, $order, @rest
123   );
124   $sql .= 
125     $self->{for} ?
126     (
127       $self->{for} eq 'update' ? ' FOR UPDATE' :
128       $self->{for} eq 'shared' ? ' FOR SHARE'  :
129       ''
130     ) :
131     ''
132   ;
133   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
134 }
135
136 sub insert {
137   my $self = shift;
138   my $table = shift;
139   $table = $self->_quote($table) unless ref($table);
140   $self->SUPER::insert($table, @_);
141 }
142
143 sub update {
144   my $self = shift;
145   my $table = shift;
146   $table = $self->_quote($table) unless ref($table);
147   $self->SUPER::update($table, @_);
148 }
149
150 sub delete {
151   my $self = shift;
152   my $table = shift;
153   $table = $self->_quote($table) unless ref($table);
154   $self->SUPER::delete($table, @_);
155 }
156
157 sub _emulate_limit {
158   my $self = shift;
159   if ($_[3] == -1) {
160     return $_[1].$self->_order_by($_[2]);
161   } else {
162     return $self->SUPER::_emulate_limit(@_);
163   }
164 }
165
166 sub _recurse_fields {
167   my ($self, $fields, $params) = @_;
168   my $ref = ref $fields;
169   return $self->_quote($fields) unless $ref;
170   return $$fields if $ref eq 'SCALAR';
171
172   if ($ref eq 'ARRAY') {
173     return join(', ', map {
174       $self->_recurse_fields($_)
175         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
176           ? ' AS col'.$self->{rownum_hack_count}++
177           : '')
178       } @$fields);
179   } elsif ($ref eq 'HASH') {
180     foreach my $func (keys %$fields) {
181       return $self->_sqlcase($func)
182         .'( '.$self->_recurse_fields($fields->{$func}).' )';
183     }
184   }
185   # Is the second check absolutely necessary?
186   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
187     return $self->_bind_to_sql( $fields );
188   }
189   else {
190     Carp::croak($ref . qq{ unexpected in _recurse_fields()})
191   }
192 }
193
194 sub _order_by {
195   my $self = shift;
196   my $ret = '';
197   my @extra;
198   if (ref $_[0] eq 'HASH') {
199     if (defined $_[0]->{group_by}) {
200       $ret = $self->_sqlcase(' group by ')
201         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
202     }
203     if (defined $_[0]->{having}) {
204       my $frag;
205       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
206       push(@{$self->{having_bind}}, @extra);
207       $ret .= $self->_sqlcase(' having ').$frag;
208     }
209     if (defined $_[0]->{order_by}) {
210       $ret .= $self->_order_by($_[0]->{order_by});
211     }
212     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
213       return $self->SUPER::_order_by($_[0]);
214     }
215   } elsif (ref $_[0] eq 'SCALAR') {
216     $ret = $self->_sqlcase(' order by ').${ $_[0] };
217   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
218     my @order = @{+shift};
219     $ret = $self->_sqlcase(' order by ')
220           .join(', ', map {
221                         my $r = $self->_order_by($_, @_);
222                         $r =~ s/^ ?ORDER BY //i;
223                         $r;
224                       } @order);
225   } else {
226     $ret = $self->SUPER::_order_by(@_);
227   }
228   return $ret;
229 }
230
231 sub _order_directions {
232   my ($self, $order) = @_;
233   $order = $order->{order_by} if ref $order eq 'HASH';
234   return $self->SUPER::_order_directions($order);
235 }
236
237 sub _table {
238   my ($self, $from) = @_;
239   if (ref $from eq 'ARRAY') {
240     return $self->_recurse_from(@$from);
241   } elsif (ref $from eq 'HASH') {
242     return $self->_make_as($from);
243   } else {
244     return $from; # would love to quote here but _table ends up getting called
245                   # twice during an ->select without a limit clause due to
246                   # the way S::A::Limit->select works. should maybe consider
247                   # bypassing this and doing S::A::select($self, ...) in
248                   # our select method above. meantime, quoting shims have
249                   # been added to select/insert/update/delete here
250   }
251 }
252
253 sub _recurse_from {
254   my ($self, $from, @join) = @_;
255   my @sqlf;
256   push(@sqlf, $self->_make_as($from));
257   foreach my $j (@join) {
258     my ($to, $on) = @$j;
259
260     # check whether a join type exists
261     my $join_clause = '';
262     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
263     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
264       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
265     } else {
266       $join_clause = ' JOIN ';
267     }
268     push(@sqlf, $join_clause);
269
270     if (ref $to eq 'ARRAY') {
271       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
272     } else {
273       push(@sqlf, $self->_make_as($to));
274     }
275     push(@sqlf, ' ON ', $self->_join_condition($on));
276   }
277   return join('', @sqlf);
278 }
279
280 sub _bind_to_sql {
281   my $self = shift;
282   my $arr  = shift;
283   my $sql = shift @$$arr;
284   $sql =~ s/\?/$self->_quote((shift @$$arr)->[1])/eg;
285   return $sql
286 }
287
288 sub _make_as {
289   my ($self, $from) = @_;
290   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ 
291                         : ref $_ eq 'REF'    ? $self->_bind_to_sql($_) 
292                         : $self->_quote($_)) 
293                        } reverse each %{$self->_skip_options($from)});
294 }
295
296 sub _skip_options {
297   my ($self, $hash) = @_;
298   my $clean_hash = {};
299   $clean_hash->{$_} = $hash->{$_}
300     for grep {!/^-/} keys %$hash;
301   return $clean_hash;
302 }
303
304 sub _join_condition {
305   my ($self, $cond) = @_;
306   if (ref $cond eq 'HASH') {
307     my %j;
308     for (keys %$cond) {
309       my $v = $cond->{$_};
310       if (ref $v) {
311         # XXX no throw_exception() in this package and croak() fails with strange results
312         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
313             if ref($v) ne 'SCALAR';
314         $j{$_} = $v;
315       }
316       else {
317         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
318       }
319     };
320     return scalar($self->_recurse_where(\%j));
321   } elsif (ref $cond eq 'ARRAY') {
322     return join(' OR ', map { $self->_join_condition($_) } @$cond);
323   } else {
324     die "Can't handle this yet!";
325   }
326 }
327
328 sub _quote {
329   my ($self, $label) = @_;
330   return '' unless defined $label;
331   return "*" if $label eq '*';
332   return $label unless $self->{quote_char};
333   if(ref $self->{quote_char} eq "ARRAY"){
334     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
335       if !defined $self->{name_sep};
336     my $sep = $self->{name_sep};
337     return join($self->{name_sep},
338         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
339        split(/\Q$sep\E/,$label));
340   }
341   return $self->SUPER::_quote($label);
342 }
343
344 sub limit_dialect {
345     my $self = shift;
346     $self->{limit_dialect} = shift if @_;
347     return $self->{limit_dialect};
348 }
349
350 sub quote_char {
351     my $self = shift;
352     $self->{quote_char} = shift if @_;
353     return $self->{quote_char};
354 }
355
356 sub name_sep {
357     my $self = shift;
358     $self->{name_sep} = shift if @_;
359     return $self->{name_sep};
360 }
361
362 } # End of BEGIN block
363
364 =head1 NAME
365
366 DBIx::Class::Storage::DBI - DBI storage handler
367
368 =head1 SYNOPSIS
369
370   my $schema = MySchema->connect('dbi:SQLite:my.db');
371
372   $schema->storage->debug(1);
373   $schema->dbh_do("DROP TABLE authors");
374
375   $schema->resultset('Book')->search({
376      written_on => $schema->storage->datetime_parser(DateTime->now)
377   });
378
379 =head1 DESCRIPTION
380
381 This class represents the connection to an RDBMS via L<DBI>.  See
382 L<DBIx::Class::Storage> for general information.  This pod only
383 documents DBI-specific methods and behaviors.
384
385 =head1 METHODS
386
387 =cut
388
389 sub new {
390   my $new = shift->next::method(@_);
391
392   $new->transaction_depth(0);
393   $new->_sql_maker_opts({});
394   $new->{savepoints} = [];
395   $new->{_in_dbh_do} = 0;
396   $new->{_dbh_gen} = 0;
397
398   $new;
399 }
400
401 =head2 connect_info
402
403 This method is normally called by L<DBIx::Class::Schema/connection>, which
404 encapsulates its argument list in an arrayref before passing them here.
405
406 The argument list may contain:
407
408 =over
409
410 =item *
411
412 The same 4-element argument set one would normally pass to
413 L<DBI/connect>, optionally followed by
414 L<extra attributes|/DBIx::Class specific connection attributes>
415 recognized by DBIx::Class:
416
417   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
418
419 =item *
420
421 A single code reference which returns a connected 
422 L<DBI database handle|DBI/connect> optionally followed by 
423 L<extra attributes|/DBIx::Class specific connection attributes> recognized
424 by DBIx::Class:
425
426   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
427
428 =item *
429
430 A single hashref with all the attributes and the dsn/user/password
431 mixed together:
432
433   $connect_info_args = [{
434     dsn => $dsn,
435     user => $user,
436     password => $pass,
437     %dbi_attributes,
438     %extra_attributes,
439   }];
440
441 This is particularly useful for L<Catalyst> based applications, allowing the 
442 following config (L<Config::General> style):
443
444   <Model::DB>
445     schema_class   App::DB
446     <connect_info>
447       dsn          dbi:mysql:database=test
448       user         testuser
449       password     TestPass
450       AutoCommit   1
451     </connect_info>
452   </Model::DB>
453
454 =back
455
456 Please note that the L<DBI> docs recommend that you always explicitly
457 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
458 recommends that it be set to I<1>, and that you perform transactions
459 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
460 to I<1> if you do not do explicitly set it to zero.  This is the default 
461 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
462
463 =head3 DBIx::Class specific connection attributes
464
465 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
466 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
467 the following connection options. These options can be mixed in with your other
468 L<DBI> connection attributes, or placed in a seperate hashref
469 (C<\%extra_attributes>) as shown above.
470
471 Every time C<connect_info> is invoked, any previous settings for
472 these options will be cleared before setting the new ones, regardless of
473 whether any options are specified in the new C<connect_info>.
474
475
476 =over
477
478 =item on_connect_do
479
480 Specifies things to do immediately after connecting or re-connecting to
481 the database.  Its value may contain:
482
483 =over
484
485 =item an array reference
486
487 This contains SQL statements to execute in order.  Each element contains
488 a string or a code reference that returns a string.
489
490 =item a code reference
491
492 This contains some code to execute.  Unlike code references within an
493 array reference, its return value is ignored.
494
495 =back
496
497 =item on_disconnect_do
498
499 Takes arguments in the same form as L</on_connect_do> and executes them
500 immediately before disconnecting from the database.
501
502 Note, this only runs if you explicitly call L</disconnect> on the
503 storage object.
504
505 =item disable_sth_caching
506
507 If set to a true value, this option will disable the caching of
508 statement handles via L<DBI/prepare_cached>.
509
510 =item limit_dialect 
511
512 Sets the limit dialect. This is useful for JDBC-bridge among others
513 where the remote SQL-dialect cannot be determined by the name of the
514 driver alone. See also L<SQL::Abstract::Limit>.
515
516 =item quote_char
517
518 Specifies what characters to use to quote table and column names. If 
519 you use this you will want to specify L</name_sep> as well.
520
521 C<quote_char> expects either a single character, in which case is it
522 is placed on either side of the table/column name, or an arrayref of length
523 2 in which case the table/column name is placed between the elements.
524
525 For example under MySQL you should use C<< quote_char => '`' >>, and for
526 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
527
528 =item name_sep
529
530 This only needs to be used in conjunction with C<quote_char>, and is used to 
531 specify the charecter that seperates elements (schemas, tables, columns) from 
532 each other. In most cases this is simply a C<.>.
533
534 The consequences of not supplying this value is that L<SQL::Abstract>
535 will assume DBIx::Class' uses of aliases to be complete column
536 names. The output will look like I<"me.name"> when it should actually
537 be I<"me"."name">.
538
539 =item unsafe
540
541 This Storage driver normally installs its own C<HandleError>, sets
542 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
543 all database handles, including those supplied by a coderef.  It does this
544 so that it can have consistent and useful error behavior.
545
546 If you set this option to a true value, Storage will not do its usual
547 modifications to the database handle's attributes, and instead relies on
548 the settings in your connect_info DBI options (or the values you set in
549 your connection coderef, in the case that you are connecting via coderef).
550
551 Note that your custom settings can cause Storage to malfunction,
552 especially if you set a C<HandleError> handler that suppresses exceptions
553 and/or disable C<RaiseError>.
554
555 =item auto_savepoint
556
557 If this option is true, L<DBIx::Class> will use savepoints when nesting
558 transactions, making it possible to recover from failure in the inner
559 transaction without having to abort all outer transactions.
560
561 =item cursor_class
562
563 Use this argument to supply a cursor class other than the default
564 L<DBIx::Class::Storage::DBI::Cursor>.
565
566 =back
567
568 Some real-life examples of arguments to L</connect_info> and
569 L<DBIx::Class::Schema/connect>
570
571   # Simple SQLite connection
572   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
573
574   # Connect via subref
575   ->connect_info([ sub { DBI->connect(...) } ]);
576
577   # A bit more complicated
578   ->connect_info(
579     [
580       'dbi:Pg:dbname=foo',
581       'postgres',
582       'my_pg_password',
583       { AutoCommit => 1 },
584       { quote_char => q{"}, name_sep => q{.} },
585     ]
586   );
587
588   # Equivalent to the previous example
589   ->connect_info(
590     [
591       'dbi:Pg:dbname=foo',
592       'postgres',
593       'my_pg_password',
594       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
595     ]
596   );
597
598   # Same, but with hashref as argument
599   # See parse_connect_info for explanation
600   ->connect_info(
601     [{
602       dsn         => 'dbi:Pg:dbname=foo',
603       user        => 'postgres',
604       password    => 'my_pg_password',
605       AutoCommit  => 1,
606       quote_char  => q{"},
607       name_sep    => q{.},
608     }]
609   );
610
611   # Subref + DBIx::Class-specific connection options
612   ->connect_info(
613     [
614       sub { DBI->connect(...) },
615       {
616           quote_char => q{`},
617           name_sep => q{@},
618           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
619           disable_sth_caching => 1,
620       },
621     ]
622   );
623
624
625
626 =cut
627
628 sub connect_info {
629   my ($self, $info_arg) = @_;
630
631   return $self->_connect_info if !$info_arg;
632
633   my @args = @$info_arg;  # take a shallow copy for further mutilation
634   $self->_connect_info([@args]); # copy for _connect_info
635
636
637   # combine/pre-parse arguments depending on invocation style
638
639   my %attrs;
640   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
641     %attrs = %{ $args[1] || {} };
642     @args = $args[0];
643   }
644   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
645     %attrs = %{$args[0]};
646     @args = ();
647     for (qw/password user dsn/) {
648       unshift @args, delete $attrs{$_};
649     }
650   }
651   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
652     %attrs = (
653       % { $args[3] || {} },
654       % { $args[4] || {} },
655     );
656     @args = @args[0,1,2];
657   }
658
659   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
660   #  the new set of options
661   $self->_sql_maker(undef);
662   $self->_sql_maker_opts({});
663
664   if(keys %attrs) {
665     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
666       if(my $value = delete $attrs{$storage_opt}) {
667         $self->$storage_opt($value);
668       }
669     }
670     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
671       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
672         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
673       }
674     }
675   }
676
677   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
678
679   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
680   $self->_connect_info;
681 }
682
683 =head2 on_connect_do
684
685 This method is deprecated in favour of setting via L</connect_info>.
686
687
688 =head2 dbh_do
689
690 Arguments: ($subref | $method_name), @extra_coderef_args?
691
692 Execute the given $subref or $method_name using the new exception-based
693 connection management.
694
695 The first two arguments will be the storage object that C<dbh_do> was called
696 on and a database handle to use.  Any additional arguments will be passed
697 verbatim to the called subref as arguments 2 and onwards.
698
699 Using this (instead of $self->_dbh or $self->dbh) ensures correct
700 exception handling and reconnection (or failover in future subclasses).
701
702 Your subref should have no side-effects outside of the database, as
703 there is the potential for your subref to be partially double-executed
704 if the database connection was stale/dysfunctional.
705
706 Example:
707
708   my @stuff = $schema->storage->dbh_do(
709     sub {
710       my ($storage, $dbh, @cols) = @_;
711       my $cols = join(q{, }, @cols);
712       $dbh->selectrow_array("SELECT $cols FROM foo");
713     },
714     @column_list
715   );
716
717 =cut
718
719 sub dbh_do {
720   my $self = shift;
721   my $code = shift;
722
723   my $dbh = $self->_dbh;
724
725   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
726       || $self->{transaction_depth};
727
728   local $self->{_in_dbh_do} = 1;
729
730   my @result;
731   my $want_array = wantarray;
732
733   eval {
734     $self->_verify_pid if $dbh;
735     if(!$self->_dbh) {
736         $self->_populate_dbh;
737         $dbh = $self->_dbh;
738     }
739
740     if($want_array) {
741         @result = $self->$code($dbh, @_);
742     }
743     elsif(defined $want_array) {
744         $result[0] = $self->$code($dbh, @_);
745     }
746     else {
747         $self->$code($dbh, @_);
748     }
749   };
750
751   my $exception = $@;
752   if(!$exception) { return $want_array ? @result : $result[0] }
753
754   $self->throw_exception($exception) if $self->connected;
755
756   # We were not connected - reconnect and retry, but let any
757   #  exception fall right through this time
758   $self->_populate_dbh;
759   $self->$code($self->_dbh, @_);
760 }
761
762 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
763 # It also informs dbh_do to bypass itself while under the direction of txn_do,
764 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
765 sub txn_do {
766   my $self = shift;
767   my $coderef = shift;
768
769   ref $coderef eq 'CODE' or $self->throw_exception
770     ('$coderef must be a CODE reference');
771
772   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
773
774   local $self->{_in_dbh_do} = 1;
775
776   my @result;
777   my $want_array = wantarray;
778
779   my $tried = 0;
780   while(1) {
781     eval {
782       $self->_verify_pid if $self->_dbh;
783       $self->_populate_dbh if !$self->_dbh;
784
785       $self->txn_begin;
786       if($want_array) {
787           @result = $coderef->(@_);
788       }
789       elsif(defined $want_array) {
790           $result[0] = $coderef->(@_);
791       }
792       else {
793           $coderef->(@_);
794       }
795       $self->txn_commit;
796     };
797
798     my $exception = $@;
799     if(!$exception) { return $want_array ? @result : $result[0] }
800
801     if($tried++ > 0 || $self->connected) {
802       eval { $self->txn_rollback };
803       my $rollback_exception = $@;
804       if($rollback_exception) {
805         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
806         $self->throw_exception($exception)  # propagate nested rollback
807           if $rollback_exception =~ /$exception_class/;
808
809         $self->throw_exception(
810           "Transaction aborted: ${exception}. "
811           . "Rollback failed: ${rollback_exception}"
812         );
813       }
814       $self->throw_exception($exception)
815     }
816
817     # We were not connected, and was first try - reconnect and retry
818     # via the while loop
819     $self->_populate_dbh;
820   }
821 }
822
823 =head2 disconnect
824
825 Our C<disconnect> method also performs a rollback first if the
826 database is not in C<AutoCommit> mode.
827
828 =cut
829
830 sub disconnect {
831   my ($self) = @_;
832
833   if( $self->connected ) {
834     my $connection_do = $self->on_disconnect_do;
835     $self->_do_connection_actions($connection_do) if ref($connection_do);
836
837     $self->_dbh->rollback unless $self->_dbh_autocommit;
838     $self->_dbh->disconnect;
839     $self->_dbh(undef);
840     $self->{_dbh_gen}++;
841   }
842 }
843
844 =head2 with_deferred_fk_checks
845
846 =over 4
847
848 =item Arguments: C<$coderef>
849
850 =item Return Value: The return value of $coderef
851
852 =back
853
854 Storage specific method to run the code ref with FK checks deferred or
855 in MySQL's case disabled entirely.
856
857 =cut
858
859 # Storage subclasses should override this
860 sub with_deferred_fk_checks {
861   my ($self, $sub) = @_;
862
863   $sub->();
864 }
865
866 sub connected {
867   my ($self) = @_;
868
869   if(my $dbh = $self->_dbh) {
870       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
871           $self->_dbh(undef);
872           $self->{_dbh_gen}++;
873           return;
874       }
875       else {
876           $self->_verify_pid;
877           return 0 if !$self->_dbh;
878       }
879       return ($dbh->FETCH('Active') && $dbh->ping);
880   }
881
882   return 0;
883 }
884
885 # handle pid changes correctly
886 #  NOTE: assumes $self->_dbh is a valid $dbh
887 sub _verify_pid {
888   my ($self) = @_;
889
890   return if defined $self->_conn_pid && $self->_conn_pid == $$;
891
892   $self->_dbh->{InactiveDestroy} = 1;
893   $self->_dbh(undef);
894   $self->{_dbh_gen}++;
895
896   return;
897 }
898
899 sub ensure_connected {
900   my ($self) = @_;
901
902   unless ($self->connected) {
903     $self->_populate_dbh;
904   }
905 }
906
907 =head2 dbh
908
909 Returns the dbh - a data base handle of class L<DBI>.
910
911 =cut
912
913 sub dbh {
914   my ($self) = @_;
915
916   $self->ensure_connected;
917   return $self->_dbh;
918 }
919
920 sub _sql_maker_args {
921     my ($self) = @_;
922     
923     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
924 }
925
926 sub sql_maker {
927   my ($self) = @_;
928   unless ($self->_sql_maker) {
929     my $sql_maker_class = $self->sql_maker_class;
930     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
931   }
932   return $self->_sql_maker;
933 }
934
935 sub _rebless {}
936
937 sub _populate_dbh {
938   my ($self) = @_;
939   my @info = @{$self->_dbi_connect_info || []};
940   $self->_dbh($self->_connect(@info));
941
942   # Always set the transaction depth on connect, since
943   #  there is no transaction in progress by definition
944   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
945
946   if(ref $self eq 'DBIx::Class::Storage::DBI') {
947     my $driver = $self->_dbh->{Driver}->{Name};
948     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
949       bless $self, "DBIx::Class::Storage::DBI::${driver}";
950       $self->_rebless();
951     }
952   }
953
954   $self->_conn_pid($$);
955   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
956
957   my $connection_do = $self->on_connect_do;
958   $self->_do_connection_actions($connection_do) if ref($connection_do);
959 }
960
961 sub _do_connection_actions {
962   my $self = shift;
963   my $connection_do = shift;
964
965   if (ref $connection_do eq 'ARRAY') {
966     $self->_do_query($_) foreach @$connection_do;
967   }
968   elsif (ref $connection_do eq 'CODE') {
969     $connection_do->($self);
970   }
971
972   return $self;
973 }
974
975 sub _do_query {
976   my ($self, $action) = @_;
977
978   if (ref $action eq 'CODE') {
979     $action = $action->($self);
980     $self->_do_query($_) foreach @$action;
981   }
982   else {
983     # Most debuggers expect ($sql, @bind), so we need to exclude
984     # the attribute hash which is the second argument to $dbh->do
985     # furthermore the bind values are usually to be presented
986     # as named arrayref pairs, so wrap those here too
987     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
988     my $sql = shift @do_args;
989     my $attrs = shift @do_args;
990     my @bind = map { [ undef, $_ ] } @do_args;
991
992     $self->_query_start($sql, @bind);
993     $self->_dbh->do($sql, $attrs, @do_args);
994     $self->_query_end($sql, @bind);
995   }
996
997   return $self;
998 }
999
1000 sub _connect {
1001   my ($self, @info) = @_;
1002
1003   $self->throw_exception("You failed to provide any connection info")
1004     if !@info;
1005
1006   my ($old_connect_via, $dbh);
1007
1008   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1009     $old_connect_via = $DBI::connect_via;
1010     $DBI::connect_via = 'connect';
1011   }
1012
1013   eval {
1014     if(ref $info[0] eq 'CODE') {
1015        $dbh = &{$info[0]}
1016     }
1017     else {
1018        $dbh = DBI->connect(@info);
1019     }
1020
1021     if($dbh && !$self->unsafe) {
1022       my $weak_self = $self;
1023       weaken($weak_self);
1024       $dbh->{HandleError} = sub {
1025           if ($weak_self) {
1026             $weak_self->throw_exception("DBI Exception: $_[0]");
1027           }
1028           else {
1029             croak ("DBI Exception: $_[0]");
1030           }
1031       };
1032       $dbh->{ShowErrorStatement} = 1;
1033       $dbh->{RaiseError} = 1;
1034       $dbh->{PrintError} = 0;
1035     }
1036   };
1037
1038   $DBI::connect_via = $old_connect_via if $old_connect_via;
1039
1040   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1041     if !$dbh || $@;
1042
1043   $self->_dbh_autocommit($dbh->{AutoCommit});
1044
1045   $dbh;
1046 }
1047
1048 sub svp_begin {
1049   my ($self, $name) = @_;
1050
1051   $name = $self->_svp_generate_name
1052     unless defined $name;
1053
1054   $self->throw_exception ("You can't use savepoints outside a transaction")
1055     if $self->{transaction_depth} == 0;
1056
1057   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1058     unless $self->can('_svp_begin');
1059   
1060   push @{ $self->{savepoints} }, $name;
1061
1062   $self->debugobj->svp_begin($name) if $self->debug;
1063   
1064   return $self->_svp_begin($name);
1065 }
1066
1067 sub svp_release {
1068   my ($self, $name) = @_;
1069
1070   $self->throw_exception ("You can't use savepoints outside a transaction")
1071     if $self->{transaction_depth} == 0;
1072
1073   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1074     unless $self->can('_svp_release');
1075
1076   if (defined $name) {
1077     $self->throw_exception ("Savepoint '$name' does not exist")
1078       unless grep { $_ eq $name } @{ $self->{savepoints} };
1079
1080     # Dig through the stack until we find the one we are releasing.  This keeps
1081     # the stack up to date.
1082     my $svp;
1083
1084     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1085   } else {
1086     $name = pop @{ $self->{savepoints} };
1087   }
1088
1089   $self->debugobj->svp_release($name) if $self->debug;
1090
1091   return $self->_svp_release($name);
1092 }
1093
1094 sub svp_rollback {
1095   my ($self, $name) = @_;
1096
1097   $self->throw_exception ("You can't use savepoints outside a transaction")
1098     if $self->{transaction_depth} == 0;
1099
1100   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1101     unless $self->can('_svp_rollback');
1102
1103   if (defined $name) {
1104       # If they passed us a name, verify that it exists in the stack
1105       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1106           $self->throw_exception("Savepoint '$name' does not exist!");
1107       }
1108
1109       # Dig through the stack until we find the one we are releasing.  This keeps
1110       # the stack up to date.
1111       while(my $s = pop(@{ $self->{savepoints} })) {
1112           last if($s eq $name);
1113       }
1114       # Add the savepoint back to the stack, as a rollback doesn't remove the
1115       # named savepoint, only everything after it.
1116       push(@{ $self->{savepoints} }, $name);
1117   } else {
1118       # We'll assume they want to rollback to the last savepoint
1119       $name = $self->{savepoints}->[-1];
1120   }
1121
1122   $self->debugobj->svp_rollback($name) if $self->debug;
1123   
1124   return $self->_svp_rollback($name);
1125 }
1126
1127 sub _svp_generate_name {
1128     my ($self) = @_;
1129
1130     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1131 }
1132
1133 sub txn_begin {
1134   my $self = shift;
1135   $self->ensure_connected();
1136   if($self->{transaction_depth} == 0) {
1137     $self->debugobj->txn_begin()
1138       if $self->debug;
1139     # this isn't ->_dbh-> because
1140     #  we should reconnect on begin_work
1141     #  for AutoCommit users
1142     $self->dbh->begin_work;
1143   } elsif ($self->auto_savepoint) {
1144     $self->svp_begin;
1145   }
1146   $self->{transaction_depth}++;
1147 }
1148
1149 sub txn_commit {
1150   my $self = shift;
1151   if ($self->{transaction_depth} == 1) {
1152     my $dbh = $self->_dbh;
1153     $self->debugobj->txn_commit()
1154       if ($self->debug);
1155     $dbh->commit;
1156     $self->{transaction_depth} = 0
1157       if $self->_dbh_autocommit;
1158   }
1159   elsif($self->{transaction_depth} > 1) {
1160     $self->{transaction_depth}--;
1161     $self->svp_release
1162       if $self->auto_savepoint;
1163   }
1164 }
1165
1166 sub txn_rollback {
1167   my $self = shift;
1168   my $dbh = $self->_dbh;
1169   eval {
1170     if ($self->{transaction_depth} == 1) {
1171       $self->debugobj->txn_rollback()
1172         if ($self->debug);
1173       $self->{transaction_depth} = 0
1174         if $self->_dbh_autocommit;
1175       $dbh->rollback;
1176     }
1177     elsif($self->{transaction_depth} > 1) {
1178       $self->{transaction_depth}--;
1179       if ($self->auto_savepoint) {
1180         $self->svp_rollback;
1181         $self->svp_release;
1182       }
1183     }
1184     else {
1185       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1186     }
1187   };
1188   if ($@) {
1189     my $error = $@;
1190     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1191     $error =~ /$exception_class/ and $self->throw_exception($error);
1192     # ensure that a failed rollback resets the transaction depth
1193     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1194     $self->throw_exception($error);
1195   }
1196 }
1197
1198 # This used to be the top-half of _execute.  It was split out to make it
1199 #  easier to override in NoBindVars without duping the rest.  It takes up
1200 #  all of _execute's args, and emits $sql, @bind.
1201 sub _prep_for_execute {
1202   my ($self, $op, $extra_bind, $ident, $args) = @_;
1203
1204   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1205     $ident = $ident->from();
1206   }
1207
1208   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1209
1210   unshift(@bind,
1211     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1212       if $extra_bind;
1213   return ($sql, \@bind);
1214 }
1215
1216 sub _fix_bind_params {
1217     my ($self, @bind) = @_;
1218
1219     ### Turn @bind from something like this:
1220     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1221     ### to this:
1222     ###   ( "'1'", "'1'", "'3'" )
1223     return
1224         map {
1225             if ( defined( $_ && $_->[1] ) ) {
1226                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1227             }
1228             else { q{'NULL'}; }
1229         } @bind;
1230 }
1231
1232 sub _query_start {
1233     my ( $self, $sql, @bind ) = @_;
1234
1235     if ( $self->debug ) {
1236         @bind = $self->_fix_bind_params(@bind);
1237         
1238         $self->debugobj->query_start( $sql, @bind );
1239     }
1240 }
1241
1242 sub _query_end {
1243     my ( $self, $sql, @bind ) = @_;
1244
1245     if ( $self->debug ) {
1246         @bind = $self->_fix_bind_params(@bind);
1247         $self->debugobj->query_end( $sql, @bind );
1248     }
1249 }
1250
1251 sub _dbh_execute {
1252   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1253
1254   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1255
1256   $self->_query_start( $sql, @$bind );
1257
1258   my $sth = $self->sth($sql,$op);
1259
1260   my $placeholder_index = 1; 
1261
1262   foreach my $bound (@$bind) {
1263     my $attributes = {};
1264     my($column_name, @data) = @$bound;
1265
1266     if ($bind_attributes) {
1267       $attributes = $bind_attributes->{$column_name}
1268       if defined $bind_attributes->{$column_name};
1269     }
1270
1271     foreach my $data (@data) {
1272       my $ref = ref $data;
1273       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1274
1275       $sth->bind_param($placeholder_index, $data, $attributes);
1276       $placeholder_index++;
1277     }
1278   }
1279
1280   # Can this fail without throwing an exception anyways???
1281   my $rv = $sth->execute();
1282   $self->throw_exception($sth->errstr) if !$rv;
1283
1284   $self->_query_end( $sql, @$bind );
1285
1286   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1287 }
1288
1289 sub _execute {
1290     my $self = shift;
1291     $self->dbh_do('_dbh_execute', @_)
1292 }
1293
1294 sub insert {
1295   my ($self, $source, $to_insert) = @_;
1296   
1297   my $ident = $source->from; 
1298   my $bind_attributes = $self->source_bind_attributes($source);
1299
1300   $self->ensure_connected;
1301   foreach my $col ( $source->columns ) {
1302     if ( !defined $to_insert->{$col} ) {
1303       my $col_info = $source->column_info($col);
1304
1305       if ( $col_info->{auto_nextval} ) {
1306         $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1307       }
1308     }
1309   }
1310
1311   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1312
1313   return $to_insert;
1314 }
1315
1316 ## Still not quite perfect, and EXPERIMENTAL
1317 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1318 ## scalar refs, or at least, all the same type as the first set, the statement is
1319 ## only prepped once.
1320 sub insert_bulk {
1321   my ($self, $source, $cols, $data) = @_;
1322   my %colvalues;
1323   my $table = $source->from;
1324   @colvalues{@$cols} = (0..$#$cols);
1325   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1326   
1327   $self->_query_start( $sql, @bind );
1328   my $sth = $self->sth($sql);
1329
1330 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1331
1332   ## This must be an arrayref, else nothing works!
1333   
1334   my $tuple_status = [];
1335   
1336   ##use Data::Dumper;
1337   ##print STDERR Dumper( $data, $sql, [@bind] );
1338
1339   my $time = time();
1340
1341   ## Get the bind_attributes, if any exist
1342   my $bind_attributes = $self->source_bind_attributes($source);
1343
1344   ## Bind the values and execute
1345   my $placeholder_index = 1; 
1346
1347   foreach my $bound (@bind) {
1348
1349     my $attributes = {};
1350     my ($column_name, $data_index) = @$bound;
1351
1352     if( $bind_attributes ) {
1353       $attributes = $bind_attributes->{$column_name}
1354       if defined $bind_attributes->{$column_name};
1355     }
1356
1357     my @data = map { $_->[$data_index] } @$data;
1358
1359     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1360     $placeholder_index++;
1361   }
1362   my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
1363   $self->throw_exception($sth->errstr) if !$rv;
1364
1365   $self->_query_end( $sql, @bind );
1366   return (wantarray ? ($rv, $sth, @bind) : $rv);
1367 }
1368
1369 sub update {
1370   my $self = shift @_;
1371   my $source = shift @_;
1372   my $bind_attributes = $self->source_bind_attributes($source);
1373   
1374   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1375 }
1376
1377
1378 sub delete {
1379   my $self = shift @_;
1380   my $source = shift @_;
1381   
1382   my $bind_attrs = {}; ## If ever it's needed...
1383   
1384   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1385 }
1386
1387 sub _select {
1388   my $self = shift;
1389   my $sql_maker = $self->sql_maker;
1390   local $sql_maker->{for};
1391   return $self->_execute($self->_select_args(@_));
1392 }
1393
1394 sub _select_args {
1395   my ($self, $ident, $select, $condition, $attrs) = @_;
1396   my $order = $attrs->{order_by};
1397
1398   if (ref $condition eq 'SCALAR') {
1399     my $unwrap = ${$condition};
1400     if ($unwrap =~ s/ORDER BY (.*)$//i) {
1401       $order = $1;
1402       $condition = \$unwrap;
1403     }
1404   }
1405
1406   my $for = delete $attrs->{for};
1407   my $sql_maker = $self->sql_maker;
1408   $sql_maker->{for} = $for;
1409
1410   if (exists $attrs->{group_by} || $attrs->{having}) {
1411     $order = {
1412       group_by => $attrs->{group_by},
1413       having => $attrs->{having},
1414       ($order ? (order_by => $order) : ())
1415     };
1416   }
1417   my $bind_attrs = {}; ## Future support
1418   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1419   if ($attrs->{software_limit} ||
1420       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1421         $attrs->{software_limit} = 1;
1422   } else {
1423     $self->throw_exception("rows attribute must be positive if present")
1424       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1425
1426     # MySQL actually recommends this approach.  I cringe.
1427     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1428     push @args, $attrs->{rows}, $attrs->{offset};
1429   }
1430   return @args;
1431 }
1432
1433 sub source_bind_attributes {
1434   my ($self, $source) = @_;
1435   
1436   my $bind_attributes;
1437   foreach my $column ($source->columns) {
1438   
1439     my $data_type = $source->column_info($column)->{data_type} || '';
1440     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1441      if $data_type;
1442   }
1443
1444   return $bind_attributes;
1445 }
1446
1447 =head2 select
1448
1449 =over 4
1450
1451 =item Arguments: $ident, $select, $condition, $attrs
1452
1453 =back
1454
1455 Handle a SQL select statement.
1456
1457 =cut
1458
1459 sub select {
1460   my $self = shift;
1461   my ($ident, $select, $condition, $attrs) = @_;
1462   return $self->cursor_class->new($self, \@_, $attrs);
1463 }
1464
1465 sub select_single {
1466   my $self = shift;
1467   my ($rv, $sth, @bind) = $self->_select(@_);
1468   my @row = $sth->fetchrow_array;
1469   my @nextrow = $sth->fetchrow_array if @row;
1470   if(@row && @nextrow) {
1471     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1472   }
1473   # Need to call finish() to work round broken DBDs
1474   $sth->finish();
1475   return @row;
1476 }
1477
1478 =head2 sth
1479
1480 =over 4
1481
1482 =item Arguments: $sql
1483
1484 =back
1485
1486 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1487
1488 =cut
1489
1490 sub _dbh_sth {
1491   my ($self, $dbh, $sql) = @_;
1492
1493   # 3 is the if_active parameter which avoids active sth re-use
1494   my $sth = $self->disable_sth_caching
1495     ? $dbh->prepare($sql)
1496     : $dbh->prepare_cached($sql, {}, 3);
1497
1498   # XXX You would think RaiseError would make this impossible,
1499   #  but apparently that's not true :(
1500   $self->throw_exception($dbh->errstr) if !$sth;
1501
1502   $sth;
1503 }
1504
1505 sub sth {
1506   my ($self, $sql) = @_;
1507   $self->dbh_do('_dbh_sth', $sql);
1508 }
1509
1510 sub _dbh_columns_info_for {
1511   my ($self, $dbh, $table) = @_;
1512
1513   if ($dbh->can('column_info')) {
1514     my %result;
1515     eval {
1516       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1517       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1518       $sth->execute();
1519       while ( my $info = $sth->fetchrow_hashref() ){
1520         my %column_info;
1521         $column_info{data_type}   = $info->{TYPE_NAME};
1522         $column_info{size}      = $info->{COLUMN_SIZE};
1523         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1524         $column_info{default_value} = $info->{COLUMN_DEF};
1525         my $col_name = $info->{COLUMN_NAME};
1526         $col_name =~ s/^\"(.*)\"$/$1/;
1527
1528         $result{$col_name} = \%column_info;
1529       }
1530     };
1531     return \%result if !$@ && scalar keys %result;
1532   }
1533
1534   my %result;
1535   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1536   $sth->execute;
1537   my @columns = @{$sth->{NAME_lc}};
1538   for my $i ( 0 .. $#columns ){
1539     my %column_info;
1540     $column_info{data_type} = $sth->{TYPE}->[$i];
1541     $column_info{size} = $sth->{PRECISION}->[$i];
1542     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1543
1544     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1545       $column_info{data_type} = $1;
1546       $column_info{size}    = $2;
1547     }
1548
1549     $result{$columns[$i]} = \%column_info;
1550   }
1551   $sth->finish;
1552
1553   foreach my $col (keys %result) {
1554     my $colinfo = $result{$col};
1555     my $type_num = $colinfo->{data_type};
1556     my $type_name;
1557     if(defined $type_num && $dbh->can('type_info')) {
1558       my $type_info = $dbh->type_info($type_num);
1559       $type_name = $type_info->{TYPE_NAME} if $type_info;
1560       $colinfo->{data_type} = $type_name if $type_name;
1561     }
1562   }
1563
1564   return \%result;
1565 }
1566
1567 sub columns_info_for {
1568   my ($self, $table) = @_;
1569   $self->dbh_do('_dbh_columns_info_for', $table);
1570 }
1571
1572 =head2 last_insert_id
1573
1574 Return the row id of the last insert.
1575
1576 =cut
1577
1578 sub _dbh_last_insert_id {
1579     # All Storage's need to register their own _dbh_last_insert_id
1580     # the old SQLite-based method was highly inappropriate
1581
1582     my $self = shift;
1583     my $class = ref $self;
1584     $self->throw_exception (<<EOE);
1585
1586 No _dbh_last_insert_id() method found in $class.
1587 Since the method of obtaining the autoincrement id of the last insert
1588 operation varies greatly between different databases, this method must be
1589 individually implemented for every storage class.
1590 EOE
1591 }
1592
1593 sub last_insert_id {
1594   my $self = shift;
1595   $self->dbh_do('_dbh_last_insert_id', @_);
1596 }
1597
1598 =head2 sqlt_type
1599
1600 Returns the database driver name.
1601
1602 =cut
1603
1604 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1605
1606 =head2 bind_attribute_by_data_type
1607
1608 Given a datatype from column info, returns a database specific bind
1609 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1610 let the database planner just handle it.
1611
1612 Generally only needed for special case column types, like bytea in postgres.
1613
1614 =cut
1615
1616 sub bind_attribute_by_data_type {
1617     return;
1618 }
1619
1620 =head2 create_ddl_dir
1621
1622 =over 4
1623
1624 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1625
1626 =back
1627
1628 Creates a SQL file based on the Schema, for each of the specified
1629 database types, in the given directory.
1630
1631 By default, C<\%sqlt_args> will have
1632
1633  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1634
1635 merged with the hash passed in. To disable any of those features, pass in a 
1636 hashref like the following
1637
1638  { ignore_constraint_names => 0, # ... other options }
1639
1640 =cut
1641
1642 sub create_ddl_dir {
1643   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1644
1645   if(!$dir || !-d $dir) {
1646     warn "No directory given, using ./\n";
1647     $dir = "./";
1648   }
1649   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1650   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1651
1652   my $schema_version = $schema->schema_version || '1.x';
1653   $version ||= $schema_version;
1654
1655   $sqltargs = {
1656     add_drop_table => 1, 
1657     ignore_constraint_names => 1,
1658     ignore_index_names => 1,
1659     %{$sqltargs || {}}
1660   };
1661
1662   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1663       . $self->_check_sqlt_message . q{'})
1664           if !$self->_check_sqlt_version;
1665
1666   my $sqlt = SQL::Translator->new( $sqltargs );
1667
1668   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1669   my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
1670
1671   foreach my $db (@$databases) {
1672     $sqlt->reset();
1673     $sqlt->{schema} = $sqlt_schema;
1674     $sqlt->producer($db);
1675
1676     my $file;
1677     my $filename = $schema->ddl_filename($db, $version, $dir);
1678     if (-e $filename && ($version eq $schema_version )) {
1679       # if we are dumping the current version, overwrite the DDL
1680       warn "Overwriting existing DDL file - $filename";
1681       unlink($filename);
1682     }
1683
1684     my $output = $sqlt->translate;
1685     if(!$output) {
1686       warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1687       next;
1688     }
1689     if(!open($file, ">$filename")) {
1690       $self->throw_exception("Can't open $filename for writing ($!)");
1691       next;
1692     }
1693     print $file $output;
1694     close($file);
1695   
1696     next unless ($preversion);
1697
1698     require SQL::Translator::Diff;
1699
1700     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1701     if(!-e $prefilename) {
1702       warn("No previous schema file found ($prefilename)");
1703       next;
1704     }
1705
1706     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1707     if(-e $difffile) {
1708       warn("Overwriting existing diff file - $difffile");
1709       unlink($difffile);
1710     }
1711     
1712     my $source_schema;
1713     {
1714       my $t = SQL::Translator->new($sqltargs);
1715       $t->debug( 0 );
1716       $t->trace( 0 );
1717       $t->parser( $db )                       or die $t->error;
1718       my $out = $t->translate( $prefilename ) or die $t->error;
1719       $source_schema = $t->schema;
1720       unless ( $source_schema->name ) {
1721         $source_schema->name( $prefilename );
1722       }
1723     }
1724
1725     # The "new" style of producers have sane normalization and can support 
1726     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1727     # And we have to diff parsed SQL against parsed SQL.
1728     my $dest_schema = $sqlt_schema;
1729     
1730     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1731       my $t = SQL::Translator->new($sqltargs);
1732       $t->debug( 0 );
1733       $t->trace( 0 );
1734       $t->parser( $db )                    or die $t->error;
1735       my $out = $t->translate( $filename ) or die $t->error;
1736       $dest_schema = $t->schema;
1737       $dest_schema->name( $filename )
1738         unless $dest_schema->name;
1739     }
1740     
1741     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1742                                                   $dest_schema,   $db,
1743                                                   $sqltargs
1744                                                  );
1745     if(!open $file, ">$difffile") { 
1746       $self->throw_exception("Can't write to $difffile ($!)");
1747       next;
1748     }
1749     print $file $diff;
1750     close($file);
1751   }
1752 }
1753
1754 =head2 deployment_statements
1755
1756 =over 4
1757
1758 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1759
1760 =back
1761
1762 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1763 The database driver name is given by C<$type>, though the value from
1764 L</sqlt_type> is used if it is not specified.
1765
1766 C<$directory> is used to return statements from files in a previously created
1767 L</create_ddl_dir> directory and is optional. The filenames are constructed
1768 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1769
1770 If no C<$directory> is specified then the statements are constructed on the
1771 fly using L<SQL::Translator> and C<$version> is ignored.
1772
1773 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1774
1775 =cut
1776
1777 sub deployment_statements {
1778   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1779   # Need to be connected to get the correct sqlt_type
1780   $self->ensure_connected() unless $type;
1781   $type ||= $self->sqlt_type;
1782   $version ||= $schema->schema_version || '1.x';
1783   $dir ||= './';
1784   my $filename = $schema->ddl_filename($type, $version, $dir);
1785   if(-f $filename)
1786   {
1787       my $file;
1788       open($file, "<$filename") 
1789         or $self->throw_exception("Can't open $filename ($!)");
1790       my @rows = <$file>;
1791       close($file);
1792       return join('', @rows);
1793   }
1794
1795   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
1796       . $self->_check_sqlt_message . q{'})
1797           if !$self->_check_sqlt_version;
1798
1799   require SQL::Translator::Parser::DBIx::Class;
1800   eval qq{use SQL::Translator::Producer::${type}};
1801   $self->throw_exception($@) if $@;
1802
1803   # sources needs to be a parser arg, but for simplicty allow at top level 
1804   # coming in
1805   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1806       if exists $sqltargs->{sources};
1807
1808   my $tr = SQL::Translator->new(%$sqltargs);
1809   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1810   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1811 }
1812
1813 sub deploy {
1814   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1815   my $deploy = sub {
1816     my $line = shift;
1817     return if($line =~ /^--/);
1818     return if(!$line);
1819     # next if($line =~ /^DROP/m);
1820     return if($line =~ /^BEGIN TRANSACTION/m);
1821     return if($line =~ /^COMMIT/m);
1822     return if $line =~ /^\s+$/; # skip whitespace only
1823     $self->_query_start($line);
1824     eval {
1825       $self->dbh->do($line); # shouldn't be using ->dbh ?
1826     };
1827     if ($@) {
1828       warn qq{$@ (running "${line}")};
1829     }
1830     $self->_query_end($line);
1831   };
1832   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } );
1833   if (@statements > 1) {
1834     foreach my $statement (@statements) {
1835       $deploy->( $statement );
1836     }
1837   }
1838   elsif (@statements == 1) {
1839     foreach my $line ( split(";\n", $statements[0])) {
1840       $deploy->( $line );
1841     }
1842   }
1843 }
1844
1845 =head2 datetime_parser
1846
1847 Returns the datetime parser class
1848
1849 =cut
1850
1851 sub datetime_parser {
1852   my $self = shift;
1853   return $self->{datetime_parser} ||= do {
1854     $self->ensure_connected;
1855     $self->build_datetime_parser(@_);
1856   };
1857 }
1858
1859 =head2 datetime_parser_type
1860
1861 Defines (returns) the datetime parser class - currently hardwired to
1862 L<DateTime::Format::MySQL>
1863
1864 =cut
1865
1866 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1867
1868 =head2 build_datetime_parser
1869
1870 See L</datetime_parser>
1871
1872 =cut
1873
1874 sub build_datetime_parser {
1875   my $self = shift;
1876   my $type = $self->datetime_parser_type(@_);
1877   eval "use ${type}";
1878   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1879   return $type;
1880 }
1881
1882 {
1883     my $_check_sqlt_version; # private
1884     my $_check_sqlt_message; # private
1885     sub _check_sqlt_version {
1886         return $_check_sqlt_version if defined $_check_sqlt_version;
1887         eval 'use SQL::Translator "0.09003"';
1888         $_check_sqlt_message = $@ || '';
1889         $_check_sqlt_version = !$@;
1890     }
1891
1892     sub _check_sqlt_message {
1893         _check_sqlt_version if !defined $_check_sqlt_message;
1894         $_check_sqlt_message;
1895     }
1896 }
1897
1898 =head2 is_replicating
1899
1900 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
1901 replicate from a master database.  Default is undef, which is the result
1902 returned by databases that don't support replication.
1903
1904 =cut
1905
1906 sub is_replicating {
1907     return;
1908     
1909 }
1910
1911 =head2 lag_behind_master
1912
1913 Returns a number that represents a certain amount of lag behind a master db
1914 when a given storage is replicating.  The number is database dependent, but
1915 starts at zero and increases with the amount of lag. Default in undef
1916
1917 =cut
1918
1919 sub lag_behind_master {
1920     return;
1921 }
1922
1923 sub DESTROY {
1924   my $self = shift;
1925   return if !$self->_dbh;
1926   $self->_verify_pid;
1927   $self->_dbh(undef);
1928 }
1929
1930 1;
1931
1932 =head1 USAGE NOTES
1933
1934 =head2 DBIx::Class and AutoCommit
1935
1936 DBIx::Class can do some wonderful magic with handling exceptions,
1937 disconnections, and transactions when you use C<< AutoCommit => 1 >>
1938 combined with C<txn_do> for transaction support.
1939
1940 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
1941 in an assumed transaction between commits, and you're telling us you'd
1942 like to manage that manually.  A lot of the magic protections offered by
1943 this module will go away.  We can't protect you from exceptions due to database
1944 disconnects because we don't know anything about how to restart your
1945 transactions.  You're on your own for handling all sorts of exceptional
1946 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
1947 be with raw DBI.
1948
1949
1950 =head1 SQL METHODS
1951
1952 The module defines a set of methods within the DBIC::SQL::Abstract
1953 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1954 SQL query functions.
1955
1956 The following methods are extended:-
1957
1958 =over 4
1959
1960 =item delete
1961
1962 =item insert
1963
1964 =item select
1965
1966 =item update
1967
1968 =item limit_dialect
1969
1970 See L</connect_info> for details.
1971
1972 =item quote_char
1973
1974 See L</connect_info> for details.
1975
1976 =item name_sep
1977
1978 See L</connect_info> for details.
1979
1980 =back
1981
1982 =head1 AUTHORS
1983
1984 Matt S. Trout <mst@shadowcatsystems.co.uk>
1985
1986 Andy Grundman <andy@hybridized.org>
1987
1988 =head1 LICENSE
1989
1990 You may distribute this code under the same terms as Perl itself.
1991
1992 =cut