a1bb464620a587dc6d9345cf1df40dc9ec32767a
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use SQL::Abstract::Limit;
11 use DBIx::Class::Storage::DBI::Cursor;
12 use DBIx::Class::Storage::Statistics;
13 use Scalar::Util qw/blessed weaken/;
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
18 );
19
20 # the values for these accessors are picked out (and deleted) from
21 # the attribute hashref passed to connect_info
22 my @storage_options = qw/
23   on_connect_do on_disconnect_do disable_sth_caching unsafe auto_savepoint
24 /;
25 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
26
27
28 # default cursor class, overridable in connect_info attributes
29 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
30
31 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
32 __PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
33
34 BEGIN {
35
36 package # Hide from PAUSE
37   DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
38
39 use base qw/SQL::Abstract::Limit/;
40
41 sub new {
42   my $self = shift->SUPER::new(@_);
43
44   # This prevents the caching of $dbh in S::A::L, I believe
45   # If limit_dialect is a ref (like a $dbh), go ahead and replace
46   #   it with what it resolves to:
47   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
48     if ref $self->{limit_dialect};
49
50   $self;
51 }
52
53
54
55 # Some databases (sqlite) do not handle multiple parenthesis
56 # around in/between arguments. A tentative x IN ( ( 1, 2 ,3) )
57 # is interpreted as x IN 1 or something similar.
58 #
59 # Since we currently do not have access to the SQLA AST, resort
60 # to barbaric mutilation of any SQL supplied in literal form
61
62 sub _strip_outer_paren {
63   my ($self, $arg) = @_;
64
65 use Data::Dumper;
66
67   return $self->_SWITCH_refkind ($arg, {
68     ARRAYREFREF => sub {
69       $$arg->[0] = __strip_outer_paren ($$arg->[0]);
70       return $arg;
71     },
72     SCALARREF => sub {
73       return \__strip_outer_paren( $$arg );
74     },
75     FALLBACK => sub {
76       return $arg
77     },
78   });
79 }
80
81 sub __strip_outer_paren {
82   my $sql = shift;
83
84   if ($sql and not ref $sql) {
85     while ($sql =~ /^ \s* \( (.*) \) \s* $/x ) {
86       $sql = $1;
87     }
88   }
89
90   return $sql;
91 }
92
93 sub _where_field_IN {
94   my ($self, $lhs, $op, $rhs) = @_;
95   $rhs = $self->_strip_outer_paren ($rhs);
96   return $self->SUPER::_where_field_IN ($lhs, $op, $rhs);
97 }
98
99 sub _where_field_BETWEEN {
100   my ($self, $lhs, $op, $rhs) = @_;
101   $rhs = $self->_strip_outer_paren ($rhs);
102   return $self->SUPER::_where_field_BETWEEN ($lhs, $op, $rhs);
103 }
104
105
106
107 # DB2 is the only remaining DB using this. Even though we are not sure if
108 # RowNumberOver is still needed here (should be part of SQLA) leave the 
109 # code in place
110 sub _RowNumberOver {
111   my ($self, $sql, $order, $rows, $offset ) = @_;
112
113   $offset += 1;
114   my $last = $rows + $offset;
115   my ( $order_by ) = $self->_order_by( $order );
116
117   $sql = <<"SQL";
118 SELECT * FROM
119 (
120    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
121       $sql
122       $order_by
123    ) Q1
124 ) Q2
125 WHERE ROW_NUM BETWEEN $offset AND $last
126
127 SQL
128
129   return $sql;
130 }
131
132
133 # While we're at it, this should make LIMIT queries more efficient,
134 #  without digging into things too deeply
135 use Scalar::Util 'blessed';
136 sub _find_syntax {
137   my ($self, $syntax) = @_;
138   
139   # DB2 is the only remaining DB using this. Even though we are not sure if
140   # RowNumberOver is still needed here (should be part of SQLA) leave the 
141   # code in place
142   my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
143   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
144     return 'RowNumberOver';
145   }
146   
147   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
148 }
149
150 sub select {
151   my ($self, $table, $fields, $where, $order, @rest) = @_;
152   local $self->{having_bind} = [];
153   local $self->{from_bind} = [];
154
155   if (ref $table eq 'SCALAR') {
156     $table = $$table;
157   }
158   elsif (not ref $table) {
159     $table = $self->_quote($table);
160   }
161   local $self->{rownum_hack_count} = 1
162     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
163   @rest = (-1) unless defined $rest[0];
164   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
165     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
166   my ($sql, @where_bind) = $self->SUPER::select(
167     $table, $self->_recurse_fields($fields), $where, $order, @rest
168   );
169   $sql .= 
170     $self->{for} ?
171     (
172       $self->{for} eq 'update' ? ' FOR UPDATE' :
173       $self->{for} eq 'shared' ? ' FOR SHARE'  :
174       ''
175     ) :
176     ''
177   ;
178   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}) : $sql;
179 }
180
181 sub insert {
182   my $self = shift;
183   my $table = shift;
184   $table = $self->_quote($table) unless ref($table);
185   $self->SUPER::insert($table, @_);
186 }
187
188 sub update {
189   my $self = shift;
190   my $table = shift;
191   $table = $self->_quote($table) unless ref($table);
192   $self->SUPER::update($table, @_);
193 }
194
195 sub delete {
196   my $self = shift;
197   my $table = shift;
198   $table = $self->_quote($table) unless ref($table);
199   $self->SUPER::delete($table, @_);
200 }
201
202 sub _emulate_limit {
203   my $self = shift;
204   if ($_[3] == -1) {
205     return $_[1].$self->_order_by($_[2]);
206   } else {
207     return $self->SUPER::_emulate_limit(@_);
208   }
209 }
210
211 sub _recurse_fields {
212   my ($self, $fields, $params) = @_;
213   my $ref = ref $fields;
214   return $self->_quote($fields) unless $ref;
215   return $$fields if $ref eq 'SCALAR';
216
217   if ($ref eq 'ARRAY') {
218     return join(', ', map {
219       $self->_recurse_fields($_)
220         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
221           ? ' AS col'.$self->{rownum_hack_count}++
222           : '')
223       } @$fields);
224   } elsif ($ref eq 'HASH') {
225     foreach my $func (keys %$fields) {
226       return $self->_sqlcase($func)
227         .'( '.$self->_recurse_fields($fields->{$func}).' )';
228     }
229   }
230   # Is the second check absolutely necessary?
231   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
232     return $self->_fold_sqlbind( $fields );
233   }
234   else {
235     Carp::croak($ref . qq{ unexpected in _recurse_fields()})
236   }
237 }
238
239 sub _order_by {
240   my $self = shift;
241   my $ret = '';
242   my @extra;
243   if (ref $_[0] eq 'HASH') {
244     if (defined $_[0]->{group_by}) {
245       $ret = $self->_sqlcase(' group by ')
246         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
247     }
248     if (defined $_[0]->{having}) {
249       my $frag;
250       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
251       push(@{$self->{having_bind}}, @extra);
252       $ret .= $self->_sqlcase(' having ').$frag;
253     }
254     if (defined $_[0]->{order_by}) {
255       $ret .= $self->_order_by($_[0]->{order_by});
256     }
257     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
258       return $self->SUPER::_order_by($_[0]);
259     }
260   } elsif (ref $_[0] eq 'SCALAR') {
261     $ret = $self->_sqlcase(' order by ').${ $_[0] };
262   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
263     my @order = @{+shift};
264     $ret = $self->_sqlcase(' order by ')
265           .join(', ', map {
266                         my $r = $self->_order_by($_, @_);
267                         $r =~ s/^ ?ORDER BY //i;
268                         $r;
269                       } @order);
270   } else {
271     $ret = $self->SUPER::_order_by(@_);
272   }
273   return $ret;
274 }
275
276 sub _order_directions {
277   my ($self, $order) = @_;
278   $order = $order->{order_by} if ref $order eq 'HASH';
279   return $self->SUPER::_order_directions($order);
280 }
281
282 sub _table {
283   my ($self, $from) = @_;
284   if (ref $from eq 'ARRAY') {
285     return $self->_recurse_from(@$from);
286   } elsif (ref $from eq 'HASH') {
287     return $self->_make_as($from);
288   } else {
289     return $from; # would love to quote here but _table ends up getting called
290                   # twice during an ->select without a limit clause due to
291                   # the way S::A::Limit->select works. should maybe consider
292                   # bypassing this and doing S::A::select($self, ...) in
293                   # our select method above. meantime, quoting shims have
294                   # been added to select/insert/update/delete here
295   }
296 }
297
298 sub _recurse_from {
299   my ($self, $from, @join) = @_;
300   my @sqlf;
301   push(@sqlf, $self->_make_as($from));
302   foreach my $j (@join) {
303     my ($to, $on) = @$j;
304
305     # check whether a join type exists
306     my $join_clause = '';
307     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
308     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
309       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
310     } else {
311       $join_clause = ' JOIN ';
312     }
313     push(@sqlf, $join_clause);
314
315     if (ref $to eq 'ARRAY') {
316       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
317     } else {
318       push(@sqlf, $self->_make_as($to));
319     }
320     push(@sqlf, ' ON ', $self->_join_condition($on));
321   }
322   return join('', @sqlf);
323 }
324
325 sub _fold_sqlbind {
326   my ($self, $sqlbind) = @_;
327   my $sql = shift @$$sqlbind;
328   push @{$self->{from_bind}}, @$$sqlbind;
329   return $sql;
330 }
331
332 sub _make_as {
333   my ($self, $from) = @_;
334   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
335                         : ref $_ eq 'REF'    ? $self->_fold_sqlbind($_)
336                         : $self->_quote($_))
337                        } reverse each %{$self->_skip_options($from)});
338 }
339
340 sub _skip_options {
341   my ($self, $hash) = @_;
342   my $clean_hash = {};
343   $clean_hash->{$_} = $hash->{$_}
344     for grep {!/^-/} keys %$hash;
345   return $clean_hash;
346 }
347
348 sub _join_condition {
349   my ($self, $cond) = @_;
350   if (ref $cond eq 'HASH') {
351     my %j;
352     for (keys %$cond) {
353       my $v = $cond->{$_};
354       if (ref $v) {
355         # XXX no throw_exception() in this package and croak() fails with strange results
356         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
357             if ref($v) ne 'SCALAR';
358         $j{$_} = $v;
359       }
360       else {
361         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
362       }
363     };
364     return scalar($self->_recurse_where(\%j));
365   } elsif (ref $cond eq 'ARRAY') {
366     return join(' OR ', map { $self->_join_condition($_) } @$cond);
367   } else {
368     die "Can't handle this yet!";
369   }
370 }
371
372 sub _quote {
373   my ($self, $label) = @_;
374   return '' unless defined $label;
375   return "*" if $label eq '*';
376   return $label unless $self->{quote_char};
377   if(ref $self->{quote_char} eq "ARRAY"){
378     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
379       if !defined $self->{name_sep};
380     my $sep = $self->{name_sep};
381     return join($self->{name_sep},
382         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
383        split(/\Q$sep\E/,$label));
384   }
385   return $self->SUPER::_quote($label);
386 }
387
388 sub limit_dialect {
389     my $self = shift;
390     $self->{limit_dialect} = shift if @_;
391     return $self->{limit_dialect};
392 }
393
394 sub quote_char {
395     my $self = shift;
396     $self->{quote_char} = shift if @_;
397     return $self->{quote_char};
398 }
399
400 sub name_sep {
401     my $self = shift;
402     $self->{name_sep} = shift if @_;
403     return $self->{name_sep};
404 }
405
406 } # End of BEGIN block
407
408 =head1 NAME
409
410 DBIx::Class::Storage::DBI - DBI storage handler
411
412 =head1 SYNOPSIS
413
414   my $schema = MySchema->connect('dbi:SQLite:my.db');
415
416   $schema->storage->debug(1);
417   $schema->dbh_do("DROP TABLE authors");
418
419   $schema->resultset('Book')->search({
420      written_on => $schema->storage->datetime_parser(DateTime->now)
421   });
422
423 =head1 DESCRIPTION
424
425 This class represents the connection to an RDBMS via L<DBI>.  See
426 L<DBIx::Class::Storage> for general information.  This pod only
427 documents DBI-specific methods and behaviors.
428
429 =head1 METHODS
430
431 =cut
432
433 sub new {
434   my $new = shift->next::method(@_);
435
436   $new->transaction_depth(0);
437   $new->_sql_maker_opts({});
438   $new->{savepoints} = [];
439   $new->{_in_dbh_do} = 0;
440   $new->{_dbh_gen} = 0;
441
442   $new;
443 }
444
445 =head2 connect_info
446
447 This method is normally called by L<DBIx::Class::Schema/connection>, which
448 encapsulates its argument list in an arrayref before passing them here.
449
450 The argument list may contain:
451
452 =over
453
454 =item *
455
456 The same 4-element argument set one would normally pass to
457 L<DBI/connect>, optionally followed by
458 L<extra attributes|/DBIx::Class specific connection attributes>
459 recognized by DBIx::Class:
460
461   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
462
463 =item *
464
465 A single code reference which returns a connected 
466 L<DBI database handle|DBI/connect> optionally followed by 
467 L<extra attributes|/DBIx::Class specific connection attributes> recognized
468 by DBIx::Class:
469
470   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
471
472 =item *
473
474 A single hashref with all the attributes and the dsn/user/password
475 mixed together:
476
477   $connect_info_args = [{
478     dsn => $dsn,
479     user => $user,
480     password => $pass,
481     %dbi_attributes,
482     %extra_attributes,
483   }];
484
485 This is particularly useful for L<Catalyst> based applications, allowing the 
486 following config (L<Config::General> style):
487
488   <Model::DB>
489     schema_class   App::DB
490     <connect_info>
491       dsn          dbi:mysql:database=test
492       user         testuser
493       password     TestPass
494       AutoCommit   1
495     </connect_info>
496   </Model::DB>
497
498 =back
499
500 Please note that the L<DBI> docs recommend that you always explicitly
501 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
502 recommends that it be set to I<1>, and that you perform transactions
503 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
504 to I<1> if you do not do explicitly set it to zero.  This is the default 
505 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
506
507 =head3 DBIx::Class specific connection attributes
508
509 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
510 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
511 the following connection options. These options can be mixed in with your other
512 L<DBI> connection attributes, or placed in a seperate hashref
513 (C<\%extra_attributes>) as shown above.
514
515 Every time C<connect_info> is invoked, any previous settings for
516 these options will be cleared before setting the new ones, regardless of
517 whether any options are specified in the new C<connect_info>.
518
519
520 =over
521
522 =item on_connect_do
523
524 Specifies things to do immediately after connecting or re-connecting to
525 the database.  Its value may contain:
526
527 =over
528
529 =item an array reference
530
531 This contains SQL statements to execute in order.  Each element contains
532 a string or a code reference that returns a string.
533
534 =item a code reference
535
536 This contains some code to execute.  Unlike code references within an
537 array reference, its return value is ignored.
538
539 =back
540
541 =item on_disconnect_do
542
543 Takes arguments in the same form as L</on_connect_do> and executes them
544 immediately before disconnecting from the database.
545
546 Note, this only runs if you explicitly call L</disconnect> on the
547 storage object.
548
549 =item disable_sth_caching
550
551 If set to a true value, this option will disable the caching of
552 statement handles via L<DBI/prepare_cached>.
553
554 =item limit_dialect 
555
556 Sets the limit dialect. This is useful for JDBC-bridge among others
557 where the remote SQL-dialect cannot be determined by the name of the
558 driver alone. See also L<SQL::Abstract::Limit>.
559
560 =item quote_char
561
562 Specifies what characters to use to quote table and column names. If 
563 you use this you will want to specify L</name_sep> as well.
564
565 C<quote_char> expects either a single character, in which case is it
566 is placed on either side of the table/column name, or an arrayref of length
567 2 in which case the table/column name is placed between the elements.
568
569 For example under MySQL you should use C<< quote_char => '`' >>, and for
570 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
571
572 =item name_sep
573
574 This only needs to be used in conjunction with C<quote_char>, and is used to 
575 specify the charecter that seperates elements (schemas, tables, columns) from 
576 each other. In most cases this is simply a C<.>.
577
578 The consequences of not supplying this value is that L<SQL::Abstract>
579 will assume DBIx::Class' uses of aliases to be complete column
580 names. The output will look like I<"me.name"> when it should actually
581 be I<"me"."name">.
582
583 =item unsafe
584
585 This Storage driver normally installs its own C<HandleError>, sets
586 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
587 all database handles, including those supplied by a coderef.  It does this
588 so that it can have consistent and useful error behavior.
589
590 If you set this option to a true value, Storage will not do its usual
591 modifications to the database handle's attributes, and instead relies on
592 the settings in your connect_info DBI options (or the values you set in
593 your connection coderef, in the case that you are connecting via coderef).
594
595 Note that your custom settings can cause Storage to malfunction,
596 especially if you set a C<HandleError> handler that suppresses exceptions
597 and/or disable C<RaiseError>.
598
599 =item auto_savepoint
600
601 If this option is true, L<DBIx::Class> will use savepoints when nesting
602 transactions, making it possible to recover from failure in the inner
603 transaction without having to abort all outer transactions.
604
605 =item cursor_class
606
607 Use this argument to supply a cursor class other than the default
608 L<DBIx::Class::Storage::DBI::Cursor>.
609
610 =back
611
612 Some real-life examples of arguments to L</connect_info> and
613 L<DBIx::Class::Schema/connect>
614
615   # Simple SQLite connection
616   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
617
618   # Connect via subref
619   ->connect_info([ sub { DBI->connect(...) } ]);
620
621   # A bit more complicated
622   ->connect_info(
623     [
624       'dbi:Pg:dbname=foo',
625       'postgres',
626       'my_pg_password',
627       { AutoCommit => 1 },
628       { quote_char => q{"}, name_sep => q{.} },
629     ]
630   );
631
632   # Equivalent to the previous example
633   ->connect_info(
634     [
635       'dbi:Pg:dbname=foo',
636       'postgres',
637       'my_pg_password',
638       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
639     ]
640   );
641
642   # Same, but with hashref as argument
643   # See parse_connect_info for explanation
644   ->connect_info(
645     [{
646       dsn         => 'dbi:Pg:dbname=foo',
647       user        => 'postgres',
648       password    => 'my_pg_password',
649       AutoCommit  => 1,
650       quote_char  => q{"},
651       name_sep    => q{.},
652     }]
653   );
654
655   # Subref + DBIx::Class-specific connection options
656   ->connect_info(
657     [
658       sub { DBI->connect(...) },
659       {
660           quote_char => q{`},
661           name_sep => q{@},
662           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
663           disable_sth_caching => 1,
664       },
665     ]
666   );
667
668
669
670 =cut
671
672 sub connect_info {
673   my ($self, $info_arg) = @_;
674
675   return $self->_connect_info if !$info_arg;
676
677   my @args = @$info_arg;  # take a shallow copy for further mutilation
678   $self->_connect_info([@args]); # copy for _connect_info
679
680
681   # combine/pre-parse arguments depending on invocation style
682
683   my %attrs;
684   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
685     %attrs = %{ $args[1] || {} };
686     @args = $args[0];
687   }
688   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
689     %attrs = %{$args[0]};
690     @args = ();
691     for (qw/password user dsn/) {
692       unshift @args, delete $attrs{$_};
693     }
694   }
695   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
696     %attrs = (
697       % { $args[3] || {} },
698       % { $args[4] || {} },
699     );
700     @args = @args[0,1,2];
701   }
702
703   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
704   #  the new set of options
705   $self->_sql_maker(undef);
706   $self->_sql_maker_opts({});
707
708   if(keys %attrs) {
709     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
710       if(my $value = delete $attrs{$storage_opt}) {
711         $self->$storage_opt($value);
712       }
713     }
714     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
715       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
716         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
717       }
718     }
719   }
720
721   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
722
723   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
724   $self->_connect_info;
725 }
726
727 =head2 on_connect_do
728
729 This method is deprecated in favour of setting via L</connect_info>.
730
731
732 =head2 dbh_do
733
734 Arguments: ($subref | $method_name), @extra_coderef_args?
735
736 Execute the given $subref or $method_name using the new exception-based
737 connection management.
738
739 The first two arguments will be the storage object that C<dbh_do> was called
740 on and a database handle to use.  Any additional arguments will be passed
741 verbatim to the called subref as arguments 2 and onwards.
742
743 Using this (instead of $self->_dbh or $self->dbh) ensures correct
744 exception handling and reconnection (or failover in future subclasses).
745
746 Your subref should have no side-effects outside of the database, as
747 there is the potential for your subref to be partially double-executed
748 if the database connection was stale/dysfunctional.
749
750 Example:
751
752   my @stuff = $schema->storage->dbh_do(
753     sub {
754       my ($storage, $dbh, @cols) = @_;
755       my $cols = join(q{, }, @cols);
756       $dbh->selectrow_array("SELECT $cols FROM foo");
757     },
758     @column_list
759   );
760
761 =cut
762
763 sub dbh_do {
764   my $self = shift;
765   my $code = shift;
766
767   my $dbh = $self->_dbh;
768
769   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
770       || $self->{transaction_depth};
771
772   local $self->{_in_dbh_do} = 1;
773
774   my @result;
775   my $want_array = wantarray;
776
777   eval {
778     $self->_verify_pid if $dbh;
779     if(!$self->_dbh) {
780         $self->_populate_dbh;
781         $dbh = $self->_dbh;
782     }
783
784     if($want_array) {
785         @result = $self->$code($dbh, @_);
786     }
787     elsif(defined $want_array) {
788         $result[0] = $self->$code($dbh, @_);
789     }
790     else {
791         $self->$code($dbh, @_);
792     }
793   };
794
795   my $exception = $@;
796   if(!$exception) { return $want_array ? @result : $result[0] }
797
798   $self->throw_exception($exception) if $self->connected;
799
800   # We were not connected - reconnect and retry, but let any
801   #  exception fall right through this time
802   $self->_populate_dbh;
803   $self->$code($self->_dbh, @_);
804 }
805
806 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
807 # It also informs dbh_do to bypass itself while under the direction of txn_do,
808 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
809 sub txn_do {
810   my $self = shift;
811   my $coderef = shift;
812
813   ref $coderef eq 'CODE' or $self->throw_exception
814     ('$coderef must be a CODE reference');
815
816   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
817
818   local $self->{_in_dbh_do} = 1;
819
820   my @result;
821   my $want_array = wantarray;
822
823   my $tried = 0;
824   while(1) {
825     eval {
826       $self->_verify_pid if $self->_dbh;
827       $self->_populate_dbh if !$self->_dbh;
828
829       $self->txn_begin;
830       if($want_array) {
831           @result = $coderef->(@_);
832       }
833       elsif(defined $want_array) {
834           $result[0] = $coderef->(@_);
835       }
836       else {
837           $coderef->(@_);
838       }
839       $self->txn_commit;
840     };
841
842     my $exception = $@;
843     if(!$exception) { return $want_array ? @result : $result[0] }
844
845     if($tried++ > 0 || $self->connected) {
846       eval { $self->txn_rollback };
847       my $rollback_exception = $@;
848       if($rollback_exception) {
849         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
850         $self->throw_exception($exception)  # propagate nested rollback
851           if $rollback_exception =~ /$exception_class/;
852
853         $self->throw_exception(
854           "Transaction aborted: ${exception}. "
855           . "Rollback failed: ${rollback_exception}"
856         );
857       }
858       $self->throw_exception($exception)
859     }
860
861     # We were not connected, and was first try - reconnect and retry
862     # via the while loop
863     $self->_populate_dbh;
864   }
865 }
866
867 =head2 disconnect
868
869 Our C<disconnect> method also performs a rollback first if the
870 database is not in C<AutoCommit> mode.
871
872 =cut
873
874 sub disconnect {
875   my ($self) = @_;
876
877   if( $self->connected ) {
878     my $connection_do = $self->on_disconnect_do;
879     $self->_do_connection_actions($connection_do) if ref($connection_do);
880
881     $self->_dbh->rollback unless $self->_dbh_autocommit;
882     $self->_dbh->disconnect;
883     $self->_dbh(undef);
884     $self->{_dbh_gen}++;
885   }
886 }
887
888 =head2 with_deferred_fk_checks
889
890 =over 4
891
892 =item Arguments: C<$coderef>
893
894 =item Return Value: The return value of $coderef
895
896 =back
897
898 Storage specific method to run the code ref with FK checks deferred or
899 in MySQL's case disabled entirely.
900
901 =cut
902
903 # Storage subclasses should override this
904 sub with_deferred_fk_checks {
905   my ($self, $sub) = @_;
906
907   $sub->();
908 }
909
910 sub connected {
911   my ($self) = @_;
912
913   if(my $dbh = $self->_dbh) {
914       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
915           $self->_dbh(undef);
916           $self->{_dbh_gen}++;
917           return;
918       }
919       else {
920           $self->_verify_pid;
921           return 0 if !$self->_dbh;
922       }
923       return ($dbh->FETCH('Active') && $dbh->ping);
924   }
925
926   return 0;
927 }
928
929 # handle pid changes correctly
930 #  NOTE: assumes $self->_dbh is a valid $dbh
931 sub _verify_pid {
932   my ($self) = @_;
933
934   return if defined $self->_conn_pid && $self->_conn_pid == $$;
935
936   $self->_dbh->{InactiveDestroy} = 1;
937   $self->_dbh(undef);
938   $self->{_dbh_gen}++;
939
940   return;
941 }
942
943 sub ensure_connected {
944   my ($self) = @_;
945
946   unless ($self->connected) {
947     $self->_populate_dbh;
948   }
949 }
950
951 =head2 dbh
952
953 Returns the dbh - a data base handle of class L<DBI>.
954
955 =cut
956
957 sub dbh {
958   my ($self) = @_;
959
960   $self->ensure_connected;
961   return $self->_dbh;
962 }
963
964 sub _sql_maker_args {
965     my ($self) = @_;
966     
967     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
968 }
969
970 sub sql_maker {
971   my ($self) = @_;
972   unless ($self->_sql_maker) {
973     my $sql_maker_class = $self->sql_maker_class;
974     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
975   }
976   return $self->_sql_maker;
977 }
978
979 sub _rebless {}
980
981 sub _populate_dbh {
982   my ($self) = @_;
983   my @info = @{$self->_dbi_connect_info || []};
984   $self->_dbh($self->_connect(@info));
985
986   # Always set the transaction depth on connect, since
987   #  there is no transaction in progress by definition
988   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
989
990   if(ref $self eq 'DBIx::Class::Storage::DBI') {
991     my $driver = $self->_dbh->{Driver}->{Name};
992     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
993       bless $self, "DBIx::Class::Storage::DBI::${driver}";
994       $self->_rebless();
995     }
996   }
997
998   $self->_conn_pid($$);
999   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1000
1001   my $connection_do = $self->on_connect_do;
1002   $self->_do_connection_actions($connection_do) if ref($connection_do);
1003 }
1004
1005 sub _do_connection_actions {
1006   my $self = shift;
1007   my $connection_do = shift;
1008
1009   if (ref $connection_do eq 'ARRAY') {
1010     $self->_do_query($_) foreach @$connection_do;
1011   }
1012   elsif (ref $connection_do eq 'CODE') {
1013     $connection_do->($self);
1014   }
1015
1016   return $self;
1017 }
1018
1019 sub _do_query {
1020   my ($self, $action) = @_;
1021
1022   if (ref $action eq 'CODE') {
1023     $action = $action->($self);
1024     $self->_do_query($_) foreach @$action;
1025   }
1026   else {
1027     # Most debuggers expect ($sql, @bind), so we need to exclude
1028     # the attribute hash which is the second argument to $dbh->do
1029     # furthermore the bind values are usually to be presented
1030     # as named arrayref pairs, so wrap those here too
1031     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1032     my $sql = shift @do_args;
1033     my $attrs = shift @do_args;
1034     my @bind = map { [ undef, $_ ] } @do_args;
1035
1036     $self->_query_start($sql, @bind);
1037     $self->_dbh->do($sql, $attrs, @do_args);
1038     $self->_query_end($sql, @bind);
1039   }
1040
1041   return $self;
1042 }
1043
1044 sub _connect {
1045   my ($self, @info) = @_;
1046
1047   $self->throw_exception("You failed to provide any connection info")
1048     if !@info;
1049
1050   my ($old_connect_via, $dbh);
1051
1052   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1053     $old_connect_via = $DBI::connect_via;
1054     $DBI::connect_via = 'connect';
1055   }
1056
1057   eval {
1058     if(ref $info[0] eq 'CODE') {
1059        $dbh = &{$info[0]}
1060     }
1061     else {
1062        $dbh = DBI->connect(@info);
1063     }
1064
1065     if($dbh && !$self->unsafe) {
1066       my $weak_self = $self;
1067       weaken($weak_self);
1068       $dbh->{HandleError} = sub {
1069           if ($weak_self) {
1070             $weak_self->throw_exception("DBI Exception: $_[0]");
1071           }
1072           else {
1073             croak ("DBI Exception: $_[0]");
1074           }
1075       };
1076       $dbh->{ShowErrorStatement} = 1;
1077       $dbh->{RaiseError} = 1;
1078       $dbh->{PrintError} = 0;
1079     }
1080   };
1081
1082   $DBI::connect_via = $old_connect_via if $old_connect_via;
1083
1084   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1085     if !$dbh || $@;
1086
1087   $self->_dbh_autocommit($dbh->{AutoCommit});
1088
1089   $dbh;
1090 }
1091
1092 sub svp_begin {
1093   my ($self, $name) = @_;
1094
1095   $name = $self->_svp_generate_name
1096     unless defined $name;
1097
1098   $self->throw_exception ("You can't use savepoints outside a transaction")
1099     if $self->{transaction_depth} == 0;
1100
1101   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1102     unless $self->can('_svp_begin');
1103   
1104   push @{ $self->{savepoints} }, $name;
1105
1106   $self->debugobj->svp_begin($name) if $self->debug;
1107   
1108   return $self->_svp_begin($name);
1109 }
1110
1111 sub svp_release {
1112   my ($self, $name) = @_;
1113
1114   $self->throw_exception ("You can't use savepoints outside a transaction")
1115     if $self->{transaction_depth} == 0;
1116
1117   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1118     unless $self->can('_svp_release');
1119
1120   if (defined $name) {
1121     $self->throw_exception ("Savepoint '$name' does not exist")
1122       unless grep { $_ eq $name } @{ $self->{savepoints} };
1123
1124     # Dig through the stack until we find the one we are releasing.  This keeps
1125     # the stack up to date.
1126     my $svp;
1127
1128     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1129   } else {
1130     $name = pop @{ $self->{savepoints} };
1131   }
1132
1133   $self->debugobj->svp_release($name) if $self->debug;
1134
1135   return $self->_svp_release($name);
1136 }
1137
1138 sub svp_rollback {
1139   my ($self, $name) = @_;
1140
1141   $self->throw_exception ("You can't use savepoints outside a transaction")
1142     if $self->{transaction_depth} == 0;
1143
1144   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1145     unless $self->can('_svp_rollback');
1146
1147   if (defined $name) {
1148       # If they passed us a name, verify that it exists in the stack
1149       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1150           $self->throw_exception("Savepoint '$name' does not exist!");
1151       }
1152
1153       # Dig through the stack until we find the one we are releasing.  This keeps
1154       # the stack up to date.
1155       while(my $s = pop(@{ $self->{savepoints} })) {
1156           last if($s eq $name);
1157       }
1158       # Add the savepoint back to the stack, as a rollback doesn't remove the
1159       # named savepoint, only everything after it.
1160       push(@{ $self->{savepoints} }, $name);
1161   } else {
1162       # We'll assume they want to rollback to the last savepoint
1163       $name = $self->{savepoints}->[-1];
1164   }
1165
1166   $self->debugobj->svp_rollback($name) if $self->debug;
1167   
1168   return $self->_svp_rollback($name);
1169 }
1170
1171 sub _svp_generate_name {
1172     my ($self) = @_;
1173
1174     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1175 }
1176
1177 sub txn_begin {
1178   my $self = shift;
1179   $self->ensure_connected();
1180   if($self->{transaction_depth} == 0) {
1181     $self->debugobj->txn_begin()
1182       if $self->debug;
1183     # this isn't ->_dbh-> because
1184     #  we should reconnect on begin_work
1185     #  for AutoCommit users
1186     $self->dbh->begin_work;
1187   } elsif ($self->auto_savepoint) {
1188     $self->svp_begin;
1189   }
1190   $self->{transaction_depth}++;
1191 }
1192
1193 sub txn_commit {
1194   my $self = shift;
1195   if ($self->{transaction_depth} == 1) {
1196     my $dbh = $self->_dbh;
1197     $self->debugobj->txn_commit()
1198       if ($self->debug);
1199     $dbh->commit;
1200     $self->{transaction_depth} = 0
1201       if $self->_dbh_autocommit;
1202   }
1203   elsif($self->{transaction_depth} > 1) {
1204     $self->{transaction_depth}--;
1205     $self->svp_release
1206       if $self->auto_savepoint;
1207   }
1208 }
1209
1210 sub txn_rollback {
1211   my $self = shift;
1212   my $dbh = $self->_dbh;
1213   eval {
1214     if ($self->{transaction_depth} == 1) {
1215       $self->debugobj->txn_rollback()
1216         if ($self->debug);
1217       $self->{transaction_depth} = 0
1218         if $self->_dbh_autocommit;
1219       $dbh->rollback;
1220     }
1221     elsif($self->{transaction_depth} > 1) {
1222       $self->{transaction_depth}--;
1223       if ($self->auto_savepoint) {
1224         $self->svp_rollback;
1225         $self->svp_release;
1226       }
1227     }
1228     else {
1229       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1230     }
1231   };
1232   if ($@) {
1233     my $error = $@;
1234     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1235     $error =~ /$exception_class/ and $self->throw_exception($error);
1236     # ensure that a failed rollback resets the transaction depth
1237     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1238     $self->throw_exception($error);
1239   }
1240 }
1241
1242 # This used to be the top-half of _execute.  It was split out to make it
1243 #  easier to override in NoBindVars without duping the rest.  It takes up
1244 #  all of _execute's args, and emits $sql, @bind.
1245 sub _prep_for_execute {
1246   my ($self, $op, $extra_bind, $ident, $args) = @_;
1247
1248   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1249     $ident = $ident->from();
1250   }
1251
1252   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1253
1254   unshift(@bind,
1255     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1256       if $extra_bind;
1257   return ($sql, \@bind);
1258 }
1259
1260 sub _fix_bind_params {
1261     my ($self, @bind) = @_;
1262
1263     ### Turn @bind from something like this:
1264     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1265     ### to this:
1266     ###   ( "'1'", "'1'", "'3'" )
1267     return
1268         map {
1269             if ( defined( $_ && $_->[1] ) ) {
1270                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1271             }
1272             else { q{'NULL'}; }
1273         } @bind;
1274 }
1275
1276 sub _query_start {
1277     my ( $self, $sql, @bind ) = @_;
1278
1279     if ( $self->debug ) {
1280         @bind = $self->_fix_bind_params(@bind);
1281         
1282         $self->debugobj->query_start( $sql, @bind );
1283     }
1284 }
1285
1286 sub _query_end {
1287     my ( $self, $sql, @bind ) = @_;
1288
1289     if ( $self->debug ) {
1290         @bind = $self->_fix_bind_params(@bind);
1291         $self->debugobj->query_end( $sql, @bind );
1292     }
1293 }
1294
1295 sub _dbh_execute {
1296   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1297
1298   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1299
1300   $self->_query_start( $sql, @$bind );
1301
1302   my $sth = $self->sth($sql,$op);
1303
1304   my $placeholder_index = 1; 
1305
1306   foreach my $bound (@$bind) {
1307     my $attributes = {};
1308     my($column_name, @data) = @$bound;
1309
1310     if ($bind_attributes) {
1311       $attributes = $bind_attributes->{$column_name}
1312       if defined $bind_attributes->{$column_name};
1313     }
1314
1315     foreach my $data (@data) {
1316       my $ref = ref $data;
1317       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1318
1319       $sth->bind_param($placeholder_index, $data, $attributes);
1320       $placeholder_index++;
1321     }
1322   }
1323
1324   # Can this fail without throwing an exception anyways???
1325   my $rv = $sth->execute();
1326   $self->throw_exception($sth->errstr) if !$rv;
1327
1328   $self->_query_end( $sql, @$bind );
1329
1330   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1331 }
1332
1333 sub _execute {
1334     my $self = shift;
1335     $self->dbh_do('_dbh_execute', @_)
1336 }
1337
1338 sub insert {
1339   my ($self, $source, $to_insert) = @_;
1340   
1341   my $ident = $source->from; 
1342   my $bind_attributes = $self->source_bind_attributes($source);
1343
1344   my $updated_cols = {};
1345
1346   $self->ensure_connected;
1347   foreach my $col ( $source->columns ) {
1348     if ( !defined $to_insert->{$col} ) {
1349       my $col_info = $source->column_info($col);
1350
1351       if ( $col_info->{auto_nextval} ) {
1352         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1353       }
1354     }
1355   }
1356
1357   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1358
1359   return $updated_cols;
1360 }
1361
1362 ## Still not quite perfect, and EXPERIMENTAL
1363 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1364 ## scalar refs, or at least, all the same type as the first set, the statement is
1365 ## only prepped once.
1366 sub insert_bulk {
1367   my ($self, $source, $cols, $data) = @_;
1368   my %colvalues;
1369   my $table = $source->from;
1370   @colvalues{@$cols} = (0..$#$cols);
1371   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1372   
1373   $self->_query_start( $sql, @bind );
1374   my $sth = $self->sth($sql);
1375
1376 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1377
1378   ## This must be an arrayref, else nothing works!
1379   
1380   my $tuple_status = [];
1381   
1382   ##use Data::Dumper;
1383   ##print STDERR Dumper( $data, $sql, [@bind] );
1384
1385   my $time = time();
1386
1387   ## Get the bind_attributes, if any exist
1388   my $bind_attributes = $self->source_bind_attributes($source);
1389
1390   ## Bind the values and execute
1391   my $placeholder_index = 1; 
1392
1393   foreach my $bound (@bind) {
1394
1395     my $attributes = {};
1396     my ($column_name, $data_index) = @$bound;
1397
1398     if( $bind_attributes ) {
1399       $attributes = $bind_attributes->{$column_name}
1400       if defined $bind_attributes->{$column_name};
1401     }
1402
1403     my @data = map { $_->[$data_index] } @$data;
1404
1405     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1406     $placeholder_index++;
1407   }
1408   my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
1409   $self->throw_exception($sth->errstr) if !$rv;
1410
1411   $self->_query_end( $sql, @bind );
1412   return (wantarray ? ($rv, $sth, @bind) : $rv);
1413 }
1414
1415 sub update {
1416   my $self = shift @_;
1417   my $source = shift @_;
1418   my $bind_attributes = $self->source_bind_attributes($source);
1419   
1420   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1421 }
1422
1423
1424 sub delete {
1425   my $self = shift @_;
1426   my $source = shift @_;
1427   
1428   my $bind_attrs = {}; ## If ever it's needed...
1429   
1430   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1431 }
1432
1433 sub _select {
1434   my $self = shift;
1435   my $sql_maker = $self->sql_maker;
1436   local $sql_maker->{for};
1437   return $self->_execute($self->_select_args(@_));
1438 }
1439
1440 sub _select_args {
1441   my ($self, $ident, $select, $condition, $attrs) = @_;
1442   my $order = $attrs->{order_by};
1443
1444   my $for = delete $attrs->{for};
1445   my $sql_maker = $self->sql_maker;
1446   $sql_maker->{for} = $for;
1447
1448   if (exists $attrs->{group_by} || $attrs->{having}) {
1449     $order = {
1450       group_by => $attrs->{group_by},
1451       having => $attrs->{having},
1452       ($order ? (order_by => $order) : ())
1453     };
1454   }
1455   my $bind_attrs = {}; ## Future support
1456   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1457   if ($attrs->{software_limit} ||
1458       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1459         $attrs->{software_limit} = 1;
1460   } else {
1461     $self->throw_exception("rows attribute must be positive if present")
1462       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1463
1464     # MySQL actually recommends this approach.  I cringe.
1465     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1466     push @args, $attrs->{rows}, $attrs->{offset};
1467   }
1468   return @args;
1469 }
1470
1471 sub source_bind_attributes {
1472   my ($self, $source) = @_;
1473   
1474   my $bind_attributes;
1475   foreach my $column ($source->columns) {
1476   
1477     my $data_type = $source->column_info($column)->{data_type} || '';
1478     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1479      if $data_type;
1480   }
1481
1482   return $bind_attributes;
1483 }
1484
1485 =head2 select
1486
1487 =over 4
1488
1489 =item Arguments: $ident, $select, $condition, $attrs
1490
1491 =back
1492
1493 Handle a SQL select statement.
1494
1495 =cut
1496
1497 sub select {
1498   my $self = shift;
1499   my ($ident, $select, $condition, $attrs) = @_;
1500   return $self->cursor_class->new($self, \@_, $attrs);
1501 }
1502
1503 sub select_single {
1504   my $self = shift;
1505   my ($rv, $sth, @bind) = $self->_select(@_);
1506   my @row = $sth->fetchrow_array;
1507   my @nextrow = $sth->fetchrow_array if @row;
1508   if(@row && @nextrow) {
1509     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1510   }
1511   # Need to call finish() to work round broken DBDs
1512   $sth->finish();
1513   return @row;
1514 }
1515
1516 =head2 sth
1517
1518 =over 4
1519
1520 =item Arguments: $sql
1521
1522 =back
1523
1524 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1525
1526 =cut
1527
1528 sub _dbh_sth {
1529   my ($self, $dbh, $sql) = @_;
1530
1531   # 3 is the if_active parameter which avoids active sth re-use
1532   my $sth = $self->disable_sth_caching
1533     ? $dbh->prepare($sql)
1534     : $dbh->prepare_cached($sql, {}, 3);
1535
1536   # XXX You would think RaiseError would make this impossible,
1537   #  but apparently that's not true :(
1538   $self->throw_exception($dbh->errstr) if !$sth;
1539
1540   $sth;
1541 }
1542
1543 sub sth {
1544   my ($self, $sql) = @_;
1545   $self->dbh_do('_dbh_sth', $sql);
1546 }
1547
1548 sub _dbh_columns_info_for {
1549   my ($self, $dbh, $table) = @_;
1550
1551   if ($dbh->can('column_info')) {
1552     my %result;
1553     eval {
1554       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1555       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1556       $sth->execute();
1557       while ( my $info = $sth->fetchrow_hashref() ){
1558         my %column_info;
1559         $column_info{data_type}   = $info->{TYPE_NAME};
1560         $column_info{size}      = $info->{COLUMN_SIZE};
1561         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1562         $column_info{default_value} = $info->{COLUMN_DEF};
1563         my $col_name = $info->{COLUMN_NAME};
1564         $col_name =~ s/^\"(.*)\"$/$1/;
1565
1566         $result{$col_name} = \%column_info;
1567       }
1568     };
1569     return \%result if !$@ && scalar keys %result;
1570   }
1571
1572   my %result;
1573   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1574   $sth->execute;
1575   my @columns = @{$sth->{NAME_lc}};
1576   for my $i ( 0 .. $#columns ){
1577     my %column_info;
1578     $column_info{data_type} = $sth->{TYPE}->[$i];
1579     $column_info{size} = $sth->{PRECISION}->[$i];
1580     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1581
1582     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1583       $column_info{data_type} = $1;
1584       $column_info{size}    = $2;
1585     }
1586
1587     $result{$columns[$i]} = \%column_info;
1588   }
1589   $sth->finish;
1590
1591   foreach my $col (keys %result) {
1592     my $colinfo = $result{$col};
1593     my $type_num = $colinfo->{data_type};
1594     my $type_name;
1595     if(defined $type_num && $dbh->can('type_info')) {
1596       my $type_info = $dbh->type_info($type_num);
1597       $type_name = $type_info->{TYPE_NAME} if $type_info;
1598       $colinfo->{data_type} = $type_name if $type_name;
1599     }
1600   }
1601
1602   return \%result;
1603 }
1604
1605 sub columns_info_for {
1606   my ($self, $table) = @_;
1607   $self->dbh_do('_dbh_columns_info_for', $table);
1608 }
1609
1610 =head2 last_insert_id
1611
1612 Return the row id of the last insert.
1613
1614 =cut
1615
1616 sub _dbh_last_insert_id {
1617     # All Storage's need to register their own _dbh_last_insert_id
1618     # the old SQLite-based method was highly inappropriate
1619
1620     my $self = shift;
1621     my $class = ref $self;
1622     $self->throw_exception (<<EOE);
1623
1624 No _dbh_last_insert_id() method found in $class.
1625 Since the method of obtaining the autoincrement id of the last insert
1626 operation varies greatly between different databases, this method must be
1627 individually implemented for every storage class.
1628 EOE
1629 }
1630
1631 sub last_insert_id {
1632   my $self = shift;
1633   $self->dbh_do('_dbh_last_insert_id', @_);
1634 }
1635
1636 =head2 sqlt_type
1637
1638 Returns the database driver name.
1639
1640 =cut
1641
1642 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1643
1644 =head2 bind_attribute_by_data_type
1645
1646 Given a datatype from column info, returns a database specific bind
1647 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1648 let the database planner just handle it.
1649
1650 Generally only needed for special case column types, like bytea in postgres.
1651
1652 =cut
1653
1654 sub bind_attribute_by_data_type {
1655     return;
1656 }
1657
1658 =head2 create_ddl_dir
1659
1660 =over 4
1661
1662 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1663
1664 =back
1665
1666 Creates a SQL file based on the Schema, for each of the specified
1667 database types, in the given directory.
1668
1669 By default, C<\%sqlt_args> will have
1670
1671  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1672
1673 merged with the hash passed in. To disable any of those features, pass in a 
1674 hashref like the following
1675
1676  { ignore_constraint_names => 0, # ... other options }
1677
1678 =cut
1679
1680 sub create_ddl_dir {
1681   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1682
1683   if(!$dir || !-d $dir) {
1684     warn "No directory given, using ./\n";
1685     $dir = "./";
1686   }
1687   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1688   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1689
1690   my $schema_version = $schema->schema_version || '1.x';
1691   $version ||= $schema_version;
1692
1693   $sqltargs = {
1694     add_drop_table => 1, 
1695     ignore_constraint_names => 1,
1696     ignore_index_names => 1,
1697     %{$sqltargs || {}}
1698   };
1699
1700   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1701       . $self->_check_sqlt_message . q{'})
1702           if !$self->_check_sqlt_version;
1703
1704   my $sqlt = SQL::Translator->new( $sqltargs );
1705
1706   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1707   my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
1708
1709   foreach my $db (@$databases) {
1710     $sqlt->reset();
1711     $sqlt->{schema} = $sqlt_schema;
1712     $sqlt->producer($db);
1713
1714     my $file;
1715     my $filename = $schema->ddl_filename($db, $version, $dir);
1716     if (-e $filename && ($version eq $schema_version )) {
1717       # if we are dumping the current version, overwrite the DDL
1718       warn "Overwriting existing DDL file - $filename";
1719       unlink($filename);
1720     }
1721
1722     my $output = $sqlt->translate;
1723     if(!$output) {
1724       warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1725       next;
1726     }
1727     if(!open($file, ">$filename")) {
1728       $self->throw_exception("Can't open $filename for writing ($!)");
1729       next;
1730     }
1731     print $file $output;
1732     close($file);
1733   
1734     next unless ($preversion);
1735
1736     require SQL::Translator::Diff;
1737
1738     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1739     if(!-e $prefilename) {
1740       warn("No previous schema file found ($prefilename)");
1741       next;
1742     }
1743
1744     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1745     if(-e $difffile) {
1746       warn("Overwriting existing diff file - $difffile");
1747       unlink($difffile);
1748     }
1749     
1750     my $source_schema;
1751     {
1752       my $t = SQL::Translator->new($sqltargs);
1753       $t->debug( 0 );
1754       $t->trace( 0 );
1755       $t->parser( $db )                       or die $t->error;
1756       my $out = $t->translate( $prefilename ) or die $t->error;
1757       $source_schema = $t->schema;
1758       unless ( $source_schema->name ) {
1759         $source_schema->name( $prefilename );
1760       }
1761     }
1762
1763     # The "new" style of producers have sane normalization and can support 
1764     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1765     # And we have to diff parsed SQL against parsed SQL.
1766     my $dest_schema = $sqlt_schema;
1767     
1768     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1769       my $t = SQL::Translator->new($sqltargs);
1770       $t->debug( 0 );
1771       $t->trace( 0 );
1772       $t->parser( $db )                    or die $t->error;
1773       my $out = $t->translate( $filename ) or die $t->error;
1774       $dest_schema = $t->schema;
1775       $dest_schema->name( $filename )
1776         unless $dest_schema->name;
1777     }
1778     
1779     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1780                                                   $dest_schema,   $db,
1781                                                   $sqltargs
1782                                                  );
1783     if(!open $file, ">$difffile") { 
1784       $self->throw_exception("Can't write to $difffile ($!)");
1785       next;
1786     }
1787     print $file $diff;
1788     close($file);
1789   }
1790 }
1791
1792 =head2 deployment_statements
1793
1794 =over 4
1795
1796 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1797
1798 =back
1799
1800 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1801 The database driver name is given by C<$type>, though the value from
1802 L</sqlt_type> is used if it is not specified.
1803
1804 C<$directory> is used to return statements from files in a previously created
1805 L</create_ddl_dir> directory and is optional. The filenames are constructed
1806 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1807
1808 If no C<$directory> is specified then the statements are constructed on the
1809 fly using L<SQL::Translator> and C<$version> is ignored.
1810
1811 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1812
1813 =cut
1814
1815 sub deployment_statements {
1816   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1817   # Need to be connected to get the correct sqlt_type
1818   $self->ensure_connected() unless $type;
1819   $type ||= $self->sqlt_type;
1820   $version ||= $schema->schema_version || '1.x';
1821   $dir ||= './';
1822   my $filename = $schema->ddl_filename($type, $version, $dir);
1823   if(-f $filename)
1824   {
1825       my $file;
1826       open($file, "<$filename") 
1827         or $self->throw_exception("Can't open $filename ($!)");
1828       my @rows = <$file>;
1829       close($file);
1830       return join('', @rows);
1831   }
1832
1833   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
1834       . $self->_check_sqlt_message . q{'})
1835           if !$self->_check_sqlt_version;
1836
1837   require SQL::Translator::Parser::DBIx::Class;
1838   eval qq{use SQL::Translator::Producer::${type}};
1839   $self->throw_exception($@) if $@;
1840
1841   # sources needs to be a parser arg, but for simplicty allow at top level 
1842   # coming in
1843   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1844       if exists $sqltargs->{sources};
1845
1846   my $tr = SQL::Translator->new(%$sqltargs);
1847   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1848   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1849 }
1850
1851 sub deploy {
1852   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1853   my $deploy = sub {
1854     my $line = shift;
1855     return if($line =~ /^--/);
1856     return if(!$line);
1857     # next if($line =~ /^DROP/m);
1858     return if($line =~ /^BEGIN TRANSACTION/m);
1859     return if($line =~ /^COMMIT/m);
1860     return if $line =~ /^\s+$/; # skip whitespace only
1861     $self->_query_start($line);
1862     eval {
1863       $self->dbh->do($line); # shouldn't be using ->dbh ?
1864     };
1865     if ($@) {
1866       warn qq{$@ (running "${line}")};
1867     }
1868     $self->_query_end($line);
1869   };
1870   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } );
1871   if (@statements > 1) {
1872     foreach my $statement (@statements) {
1873       $deploy->( $statement );
1874     }
1875   }
1876   elsif (@statements == 1) {
1877     foreach my $line ( split(";\n", $statements[0])) {
1878       $deploy->( $line );
1879     }
1880   }
1881 }
1882
1883 =head2 datetime_parser
1884
1885 Returns the datetime parser class
1886
1887 =cut
1888
1889 sub datetime_parser {
1890   my $self = shift;
1891   return $self->{datetime_parser} ||= do {
1892     $self->ensure_connected;
1893     $self->build_datetime_parser(@_);
1894   };
1895 }
1896
1897 =head2 datetime_parser_type
1898
1899 Defines (returns) the datetime parser class - currently hardwired to
1900 L<DateTime::Format::MySQL>
1901
1902 =cut
1903
1904 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1905
1906 =head2 build_datetime_parser
1907
1908 See L</datetime_parser>
1909
1910 =cut
1911
1912 sub build_datetime_parser {
1913   my $self = shift;
1914   my $type = $self->datetime_parser_type(@_);
1915   eval "use ${type}";
1916   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1917   return $type;
1918 }
1919
1920 {
1921     my $_check_sqlt_version; # private
1922     my $_check_sqlt_message; # private
1923     sub _check_sqlt_version {
1924         return $_check_sqlt_version if defined $_check_sqlt_version;
1925         eval 'use SQL::Translator "0.09003"';
1926         $_check_sqlt_message = $@ || '';
1927         $_check_sqlt_version = !$@;
1928     }
1929
1930     sub _check_sqlt_message {
1931         _check_sqlt_version if !defined $_check_sqlt_message;
1932         $_check_sqlt_message;
1933     }
1934 }
1935
1936 =head2 is_replicating
1937
1938 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
1939 replicate from a master database.  Default is undef, which is the result
1940 returned by databases that don't support replication.
1941
1942 =cut
1943
1944 sub is_replicating {
1945     return;
1946     
1947 }
1948
1949 =head2 lag_behind_master
1950
1951 Returns a number that represents a certain amount of lag behind a master db
1952 when a given storage is replicating.  The number is database dependent, but
1953 starts at zero and increases with the amount of lag. Default in undef
1954
1955 =cut
1956
1957 sub lag_behind_master {
1958     return;
1959 }
1960
1961 sub DESTROY {
1962   my $self = shift;
1963   return if !$self->_dbh;
1964   $self->_verify_pid;
1965   $self->_dbh(undef);
1966 }
1967
1968 1;
1969
1970 =head1 USAGE NOTES
1971
1972 =head2 DBIx::Class and AutoCommit
1973
1974 DBIx::Class can do some wonderful magic with handling exceptions,
1975 disconnections, and transactions when you use C<< AutoCommit => 1 >>
1976 combined with C<txn_do> for transaction support.
1977
1978 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
1979 in an assumed transaction between commits, and you're telling us you'd
1980 like to manage that manually.  A lot of the magic protections offered by
1981 this module will go away.  We can't protect you from exceptions due to database
1982 disconnects because we don't know anything about how to restart your
1983 transactions.  You're on your own for handling all sorts of exceptional
1984 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
1985 be with raw DBI.
1986
1987
1988 =head1 SQL METHODS
1989
1990 The module defines a set of methods within the DBIC::SQL::Abstract
1991 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1992 SQL query functions.
1993
1994 The following methods are extended:-
1995
1996 =over 4
1997
1998 =item delete
1999
2000 =item insert
2001
2002 =item select
2003
2004 =item update
2005
2006 =item limit_dialect
2007
2008 See L</connect_info> for details.
2009
2010 =item quote_char
2011
2012 See L</connect_info> for details.
2013
2014 =item name_sep
2015
2016 See L</connect_info> for details.
2017
2018 =back
2019
2020 =head1 AUTHORS
2021
2022 Matt S. Trout <mst@shadowcatsystems.co.uk>
2023
2024 Andy Grundman <andy@hybridized.org>
2025
2026 =head1 LICENSE
2027
2028 You may distribute this code under the same terms as Perl itself.
2029
2030 =cut