Versioning! With tests! Woo!
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        disable_sth_caching cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 use Scalar::Util 'blessed';
62 sub _find_syntax {
63   my ($self, $syntax) = @_;
64   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
65   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
66     return 'RowNumberOver';
67   }
68
69   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
70 }
71
72 sub select {
73   my ($self, $table, $fields, $where, $order, @rest) = @_;
74   $table = $self->_quote($table) unless ref($table);
75   local $self->{rownum_hack_count} = 1
76     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
77   @rest = (-1) unless defined $rest[0];
78   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
79     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
80   local $self->{having_bind} = [];
81   my ($sql, @ret) = $self->SUPER::select(
82     $table, $self->_recurse_fields($fields), $where, $order, @rest
83   );
84   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
85 }
86
87 sub insert {
88   my $self = shift;
89   my $table = shift;
90   $table = $self->_quote($table) unless ref($table);
91   $self->SUPER::insert($table, @_);
92 }
93
94 sub update {
95   my $self = shift;
96   my $table = shift;
97   $table = $self->_quote($table) unless ref($table);
98   $self->SUPER::update($table, @_);
99 }
100
101 sub delete {
102   my $self = shift;
103   my $table = shift;
104   $table = $self->_quote($table) unless ref($table);
105   $self->SUPER::delete($table, @_);
106 }
107
108 sub _emulate_limit {
109   my $self = shift;
110   if ($_[3] == -1) {
111     return $_[1].$self->_order_by($_[2]);
112   } else {
113     return $self->SUPER::_emulate_limit(@_);
114   }
115 }
116
117 sub _recurse_fields {
118   my ($self, $fields) = @_;
119   my $ref = ref $fields;
120   return $self->_quote($fields) unless $ref;
121   return $$fields if $ref eq 'SCALAR';
122
123   if ($ref eq 'ARRAY') {
124     return join(', ', map {
125       $self->_recurse_fields($_)
126       .(exists $self->{rownum_hack_count}
127          ? ' AS col'.$self->{rownum_hack_count}++
128          : '')
129      } @$fields);
130   } elsif ($ref eq 'HASH') {
131     foreach my $func (keys %$fields) {
132       return $self->_sqlcase($func)
133         .'( '.$self->_recurse_fields($fields->{$func}).' )';
134     }
135   }
136 }
137
138 sub _order_by {
139   my $self = shift;
140   my $ret = '';
141   my @extra;
142   if (ref $_[0] eq 'HASH') {
143     if (defined $_[0]->{group_by}) {
144       $ret = $self->_sqlcase(' group by ')
145                .$self->_recurse_fields($_[0]->{group_by});
146     }
147     if (defined $_[0]->{having}) {
148       my $frag;
149       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
150       push(@{$self->{having_bind}}, @extra);
151       $ret .= $self->_sqlcase(' having ').$frag;
152     }
153     if (defined $_[0]->{order_by}) {
154       $ret .= $self->_order_by($_[0]->{order_by});
155     }
156   } elsif (ref $_[0] eq 'SCALAR') {
157     $ret = $self->_sqlcase(' order by ').${ $_[0] };
158   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
159     my @order = @{+shift};
160     $ret = $self->_sqlcase(' order by ')
161           .join(', ', map {
162                         my $r = $self->_order_by($_, @_);
163                         $r =~ s/^ ?ORDER BY //i;
164                         $r;
165                       } @order);
166   } else {
167     $ret = $self->SUPER::_order_by(@_);
168   }
169   return $ret;
170 }
171
172 sub _order_directions {
173   my ($self, $order) = @_;
174   $order = $order->{order_by} if ref $order eq 'HASH';
175   return $self->SUPER::_order_directions($order);
176 }
177
178 sub _table {
179   my ($self, $from) = @_;
180   if (ref $from eq 'ARRAY') {
181     return $self->_recurse_from(@$from);
182   } elsif (ref $from eq 'HASH') {
183     return $self->_make_as($from);
184   } else {
185     return $from; # would love to quote here but _table ends up getting called
186                   # twice during an ->select without a limit clause due to
187                   # the way S::A::Limit->select works. should maybe consider
188                   # bypassing this and doing S::A::select($self, ...) in
189                   # our select method above. meantime, quoting shims have
190                   # been added to select/insert/update/delete here
191   }
192 }
193
194 sub _recurse_from {
195   my ($self, $from, @join) = @_;
196   my @sqlf;
197   push(@sqlf, $self->_make_as($from));
198   foreach my $j (@join) {
199     my ($to, $on) = @$j;
200
201     # check whether a join type exists
202     my $join_clause = '';
203     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
204     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
205       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
206     } else {
207       $join_clause = ' JOIN ';
208     }
209     push(@sqlf, $join_clause);
210
211     if (ref $to eq 'ARRAY') {
212       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
213     } else {
214       push(@sqlf, $self->_make_as($to));
215     }
216     push(@sqlf, ' ON ', $self->_join_condition($on));
217   }
218   return join('', @sqlf);
219 }
220
221 sub _make_as {
222   my ($self, $from) = @_;
223   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
224                      reverse each %{$self->_skip_options($from)});
225 }
226
227 sub _skip_options {
228   my ($self, $hash) = @_;
229   my $clean_hash = {};
230   $clean_hash->{$_} = $hash->{$_}
231     for grep {!/^-/} keys %$hash;
232   return $clean_hash;
233 }
234
235 sub _join_condition {
236   my ($self, $cond) = @_;
237   if (ref $cond eq 'HASH') {
238     my %j;
239     for (keys %$cond) {
240       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
241     };
242     return $self->_recurse_where(\%j);
243   } elsif (ref $cond eq 'ARRAY') {
244     return join(' OR ', map { $self->_join_condition($_) } @$cond);
245   } else {
246     die "Can't handle this yet!";
247   }
248 }
249
250 sub _quote {
251   my ($self, $label) = @_;
252   return '' unless defined $label;
253   return "*" if $label eq '*';
254   return $label unless $self->{quote_char};
255   if(ref $self->{quote_char} eq "ARRAY"){
256     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
257       if !defined $self->{name_sep};
258     my $sep = $self->{name_sep};
259     return join($self->{name_sep},
260         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
261        split(/\Q$sep\E/,$label));
262   }
263   return $self->SUPER::_quote($label);
264 }
265
266 sub limit_dialect {
267     my $self = shift;
268     $self->{limit_dialect} = shift if @_;
269     return $self->{limit_dialect};
270 }
271
272 sub quote_char {
273     my $self = shift;
274     $self->{quote_char} = shift if @_;
275     return $self->{quote_char};
276 }
277
278 sub name_sep {
279     my $self = shift;
280     $self->{name_sep} = shift if @_;
281     return $self->{name_sep};
282 }
283
284 } # End of BEGIN block
285
286 =head1 NAME
287
288 DBIx::Class::Storage::DBI - DBI storage handler
289
290 =head1 SYNOPSIS
291
292 =head1 DESCRIPTION
293
294 This class represents the connection to an RDBMS via L<DBI>.  See
295 L<DBIx::Class::Storage> for general information.  This pod only
296 documents DBI-specific methods and behaviors.
297
298 =head1 METHODS
299
300 =cut
301
302 sub new {
303   my $new = shift->next::method(@_);
304
305   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
306   $new->transaction_depth(0);
307   $new->_sql_maker_opts({});
308   $new->{_in_dbh_do} = 0;
309   $new->{_dbh_gen} = 0;
310
311   $new;
312 }
313
314 =head2 connect_info
315
316 The arguments of C<connect_info> are always a single array reference.
317
318 This is normally accessed via L<DBIx::Class::Schema/connection>, which
319 encapsulates its argument list in an arrayref before calling
320 C<connect_info> here.
321
322 The arrayref can either contain the same set of arguments one would
323 normally pass to L<DBI/connect>, or a lone code reference which returns
324 a connected database handle.
325
326 In either case, if the final argument in your connect_info happens
327 to be a hashref, C<connect_info> will look there for several
328 connection-specific options:
329
330 =over 4
331
332 =item on_connect_do
333
334 This can be set to an arrayref of literal sql statements, which will
335 be executed immediately after making the connection to the database
336 every time we [re-]connect.
337
338 =item disable_sth_caching
339
340 If set to a true value, this option will disable the caching of
341 statement handles via L<DBI/prepare_cached>.
342
343 =item limit_dialect 
344
345 Sets the limit dialect. This is useful for JDBC-bridge among others
346 where the remote SQL-dialect cannot be determined by the name of the
347 driver alone.
348
349 =item quote_char
350
351 Specifies what characters to use to quote table and column names. If 
352 you use this you will want to specify L<name_sep> as well.
353
354 quote_char expects either a single character, in which case is it is placed
355 on either side of the table/column, or an arrayref of length 2 in which case the
356 table/column name is placed between the elements.
357
358 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
359 use C<quote_char =E<gt> [qw/[ ]/]>.
360
361 =item name_sep
362
363 This only needs to be used in conjunction with L<quote_char>, and is used to 
364 specify the charecter that seperates elements (schemas, tables, columns) from 
365 each other. In most cases this is simply a C<.>.
366
367 =back
368
369 These options can be mixed in with your other L<DBI> connection attributes,
370 or placed in a seperate hashref after all other normal L<DBI> connection
371 arguments.
372
373 Every time C<connect_info> is invoked, any previous settings for
374 these options will be cleared before setting the new ones, regardless of
375 whether any options are specified in the new C<connect_info>.
376
377 Important note:  DBIC expects the returned database handle provided by 
378 a subref argument to have RaiseError set on it.  If it doesn't, things
379 might not work very well, YMMV.  If you don't use a subref, DBIC will
380 force this setting for you anyways.  Setting HandleError to anything
381 other than simple exception object wrapper might cause problems too.
382
383 Examples:
384
385   # Simple SQLite connection
386   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
387
388   # Connect via subref
389   ->connect_info([ sub { DBI->connect(...) } ]);
390
391   # A bit more complicated
392   ->connect_info(
393     [
394       'dbi:Pg:dbname=foo',
395       'postgres',
396       'my_pg_password',
397       { AutoCommit => 0 },
398       { quote_char => q{"}, name_sep => q{.} },
399     ]
400   );
401
402   # Equivalent to the previous example
403   ->connect_info(
404     [
405       'dbi:Pg:dbname=foo',
406       'postgres',
407       'my_pg_password',
408       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
409     ]
410   );
411
412   # Subref + DBIC-specific connection options
413   ->connect_info(
414     [
415       sub { DBI->connect(...) },
416       {
417           quote_char => q{`},
418           name_sep => q{@},
419           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
420           disable_sth_caching => 1,
421       },
422     ]
423   );
424
425 =cut
426
427 sub connect_info {
428   my ($self, $info_arg) = @_;
429
430   return $self->_connect_info if !$info_arg;
431
432   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
433   #  the new set of options
434   $self->_sql_maker(undef);
435   $self->_sql_maker_opts({});
436
437   my $info = [ @$info_arg ]; # copy because we can alter it
438   my $last_info = $info->[-1];
439   if(ref $last_info eq 'HASH') {
440     for my $storage_opt (qw/on_connect_do disable_sth_caching/) {
441       if(my $value = delete $last_info->{$storage_opt}) {
442         $self->$storage_opt($value);
443       }
444     }
445     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
446       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
447         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
448       }
449     }
450
451     # Get rid of any trailing empty hashref
452     pop(@$info) if !keys %$last_info;
453   }
454
455   $self->_connect_info($info);
456 }
457
458 =head2 on_connect_do
459
460 This method is deprecated in favor of setting via L</connect_info>.
461
462 =head2 dbh_do
463
464 Arguments: $subref, @extra_coderef_args?
465
466 Execute the given subref using the new exception-based connection management.
467
468 The first two arguments will be the storage object that C<dbh_do> was called
469 on and a database handle to use.  Any additional arguments will be passed
470 verbatim to the called subref as arguments 2 and onwards.
471
472 Using this (instead of $self->_dbh or $self->dbh) ensures correct
473 exception handling and reconnection (or failover in future subclasses).
474
475 Your subref should have no side-effects outside of the database, as
476 there is the potential for your subref to be partially double-executed
477 if the database connection was stale/dysfunctional.
478
479 Example:
480
481   my @stuff = $schema->storage->dbh_do(
482     sub {
483       my ($storage, $dbh, @cols) = @_;
484       my $cols = join(q{, }, @cols);
485       $dbh->selectrow_array("SELECT $cols FROM foo");
486     },
487     @column_list
488   );
489
490 =cut
491
492 sub dbh_do {
493   my $self = shift;
494   my $coderef = shift;
495
496   ref $coderef eq 'CODE' or $self->throw_exception
497     ('$coderef must be a CODE reference');
498
499   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
500   local $self->{_in_dbh_do} = 1;
501
502   my @result;
503   my $want_array = wantarray;
504
505   eval {
506     $self->_verify_pid if $self->_dbh;
507     $self->_populate_dbh if !$self->_dbh;
508     if($want_array) {
509         @result = $coderef->($self, $self->_dbh, @_);
510     }
511     elsif(defined $want_array) {
512         $result[0] = $coderef->($self, $self->_dbh, @_);
513     }
514     else {
515         $coderef->($self, $self->_dbh, @_);
516     }
517   };
518
519   my $exception = $@;
520   if(!$exception) { return $want_array ? @result : $result[0] }
521
522   $self->throw_exception($exception) if $self->connected;
523
524   # We were not connected - reconnect and retry, but let any
525   #  exception fall right through this time
526   $self->_populate_dbh;
527   $coderef->($self, $self->_dbh, @_);
528 }
529
530 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
531 # It also informs dbh_do to bypass itself while under the direction of txn_do,
532 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
533 sub txn_do {
534   my $self = shift;
535   my $coderef = shift;
536
537   ref $coderef eq 'CODE' or $self->throw_exception
538     ('$coderef must be a CODE reference');
539
540   local $self->{_in_dbh_do} = 1;
541
542   my @result;
543   my $want_array = wantarray;
544
545   my $tried = 0;
546   while(1) {
547     eval {
548       $self->_verify_pid if $self->_dbh;
549       $self->_populate_dbh if !$self->_dbh;
550
551       $self->txn_begin;
552       if($want_array) {
553           @result = $coderef->(@_);
554       }
555       elsif(defined $want_array) {
556           $result[0] = $coderef->(@_);
557       }
558       else {
559           $coderef->(@_);
560       }
561       $self->txn_commit;
562     };
563
564     my $exception = $@;
565     if(!$exception) { return $want_array ? @result : $result[0] }
566
567     if($tried++ > 0 || $self->connected) {
568       eval { $self->txn_rollback };
569       my $rollback_exception = $@;
570       if($rollback_exception) {
571         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
572         $self->throw_exception($exception)  # propagate nested rollback
573           if $rollback_exception =~ /$exception_class/;
574
575         $self->throw_exception(
576           "Transaction aborted: ${exception}. "
577           . "Rollback failed: ${rollback_exception}"
578         );
579       }
580       $self->throw_exception($exception)
581     }
582
583     # We were not connected, and was first try - reconnect and retry
584     # via the while loop
585     $self->_populate_dbh;
586   }
587 }
588
589 =head2 disconnect
590
591 Our C<disconnect> method also performs a rollback first if the
592 database is not in C<AutoCommit> mode.
593
594 =cut
595
596 sub disconnect {
597   my ($self) = @_;
598
599   if( $self->connected ) {
600     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
601     $self->_dbh->disconnect;
602     $self->_dbh(undef);
603     $self->{_dbh_gen}++;
604   }
605 }
606
607 sub connected {
608   my ($self) = @_;
609
610   if(my $dbh = $self->_dbh) {
611       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
612           $self->_dbh(undef);
613           $self->{_dbh_gen}++;
614           return;
615       }
616       else {
617           $self->_verify_pid;
618       }
619       return ($dbh->FETCH('Active') && $dbh->ping);
620   }
621
622   return 0;
623 }
624
625 # handle pid changes correctly
626 #  NOTE: assumes $self->_dbh is a valid $dbh
627 sub _verify_pid {
628   my ($self) = @_;
629
630   return if $self->_conn_pid == $$;
631
632   $self->_dbh->{InactiveDestroy} = 1;
633   $self->_dbh(undef);
634   $self->{_dbh_gen}++;
635
636   return;
637 }
638
639 sub ensure_connected {
640   my ($self) = @_;
641
642   unless ($self->connected) {
643     $self->_populate_dbh;
644   }
645 }
646
647 =head2 dbh
648
649 Returns the dbh - a data base handle of class L<DBI>.
650
651 =cut
652
653 sub dbh {
654   my ($self) = @_;
655
656   $self->ensure_connected;
657   return $self->_dbh;
658 }
659
660 sub _sql_maker_args {
661     my ($self) = @_;
662     
663     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
664 }
665
666 sub sql_maker {
667   my ($self) = @_;
668   unless ($self->_sql_maker) {
669     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
670   }
671   return $self->_sql_maker;
672 }
673
674 sub _populate_dbh {
675   my ($self) = @_;
676   my @info = @{$self->_connect_info || []};
677   $self->_dbh($self->_connect(@info));
678
679   if(ref $self eq 'DBIx::Class::Storage::DBI') {
680     my $driver = $self->_dbh->{Driver}->{Name};
681     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
682       bless $self, "DBIx::Class::Storage::DBI::${driver}";
683       $self->_rebless() if $self->can('_rebless');
684     }
685   }
686
687   # if on-connect sql statements are given execute them
688   foreach my $sql_statement (@{$self->on_connect_do || []}) {
689     $self->debugobj->query_start($sql_statement) if $self->debug();
690     $self->_dbh->do($sql_statement);
691     $self->debugobj->query_end($sql_statement) if $self->debug();
692   }
693
694   $self->_conn_pid($$);
695   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
696 }
697
698 sub _connect {
699   my ($self, @info) = @_;
700
701   $self->throw_exception("You failed to provide any connection info")
702       if !@info;
703
704   my ($old_connect_via, $dbh);
705
706   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
707       $old_connect_via = $DBI::connect_via;
708       $DBI::connect_via = 'connect';
709   }
710
711   eval {
712     if(ref $info[0] eq 'CODE') {
713        $dbh = &{$info[0]}
714     }
715     else {
716        $dbh = DBI->connect(@info);
717        $dbh->{RaiseError} = 1;
718        $dbh->{PrintError} = 0;
719        $dbh->{PrintWarn} = 0;
720     }
721   };
722
723   $DBI::connect_via = $old_connect_via if $old_connect_via;
724
725   if (!$dbh || $@) {
726     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
727   }
728
729   $dbh;
730 }
731
732 sub _dbh_txn_begin {
733   my ($self, $dbh) = @_;
734   if ($dbh->{AutoCommit}) {
735     $self->debugobj->txn_begin()
736       if ($self->debug);
737     $dbh->begin_work;
738   }
739 }
740
741 sub txn_begin {
742   my $self = shift;
743   $self->dbh_do($self->can('_dbh_txn_begin'))
744     if $self->{transaction_depth}++ == 0;
745 }
746
747 sub _dbh_txn_commit {
748   my ($self, $dbh) = @_;
749   if ($self->{transaction_depth} == 0) {
750     unless ($dbh->{AutoCommit}) {
751       $self->debugobj->txn_commit()
752         if ($self->debug);
753       $dbh->commit;
754     }
755   }
756   else {
757     if (--$self->{transaction_depth} == 0) {
758       $self->debugobj->txn_commit()
759         if ($self->debug);
760       $dbh->commit;
761     }
762   }
763 }
764
765 sub txn_commit {
766   my $self = shift;
767   $self->dbh_do($self->can('_dbh_txn_commit'));
768 }
769
770 sub _dbh_txn_rollback {
771   my ($self, $dbh) = @_;
772   if ($self->{transaction_depth} == 0) {
773     unless ($dbh->{AutoCommit}) {
774       $self->debugobj->txn_rollback()
775         if ($self->debug);
776       $dbh->rollback;
777     }
778   }
779   else {
780     if (--$self->{transaction_depth} == 0) {
781       $self->debugobj->txn_rollback()
782         if ($self->debug);
783       $dbh->rollback;
784     }
785     else {
786       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
787     }
788   }
789 }
790
791 sub txn_rollback {
792   my $self = shift;
793
794   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
795   if ($@) {
796     my $error = $@;
797     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
798     $error =~ /$exception_class/ and $self->throw_exception($error);
799     $self->{transaction_depth} = 0;          # ensure that a failed rollback
800     $self->throw_exception($error);          # resets the transaction depth
801   }
802 }
803
804 # This used to be the top-half of _execute.  It was split out to make it
805 #  easier to override in NoBindVars without duping the rest.  It takes up
806 #  all of _execute's args, and emits $sql, @bind.
807 sub _prep_for_execute {
808   my ($self, $op, $extra_bind, $ident, @args) = @_;
809
810   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
811   unshift(@bind, @$extra_bind) if $extra_bind;
812   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
813
814   return ($sql, @bind);
815 }
816
817 sub _execute {
818   my $self = shift;
819
820   my ($sql, @bind) = $self->_prep_for_execute(@_);
821
822   if ($self->debug) {
823       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
824       $self->debugobj->query_start($sql, @debug_bind);
825   }
826
827   my $sth = $self->sth($sql);
828
829   my $rv;
830   if ($sth) {
831     my $time = time();
832     $rv = eval { $sth->execute(@bind) };
833
834     if ($@ || !$rv) {
835       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
836     }
837   } else {
838     $self->throw_exception("'$sql' did not generate a statement.");
839   }
840   if ($self->debug) {
841       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
842       $self->debugobj->query_end($sql, @debug_bind);
843   }
844   return (wantarray ? ($rv, $sth, @bind) : $rv);
845 }
846
847 sub insert {
848   my ($self, $ident, $to_insert) = @_;
849   $self->throw_exception(
850     "Couldn't insert ".join(', ',
851       map "$_ => $to_insert->{$_}", keys %$to_insert
852     )." into ${ident}"
853   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
854   return $to_insert;
855 }
856
857 ## Still not quite perfect, and EXPERIMENTAL
858 ## Currently it is assumed that all values passed will be "normal", i.e. not 
859 ## scalar refs, or at least, all the same type as the first set, the statement is
860 ## only prepped once.
861 sub insert_bulk {
862   my ($self, $table, $cols, $data) = @_;
863   my %colvalues;
864   @colvalues{@$cols} = (0..$#$cols);
865   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
866 # print STDERR "BIND".Dumper(\@bind);
867
868   if ($self->debug) {
869       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
870       $self->debugobj->query_start($sql, @debug_bind);
871   }
872   my $sth = $self->sth($sql);
873
874 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
875
876   my $rv;
877   ## This must be an arrayref, else nothing works!
878   my $tuple_status = [];
879 #  use Data::Dumper;
880 #  print STDERR Dumper($data);
881   if ($sth) {
882     my $time = time();
883     $rv = eval { $sth->execute_array({ ArrayTupleFetch => sub { my $values = shift @$data;  return if !$values; return [ @{$values}[@bind] ]},
884                                        ArrayTupleStatus => $tuple_status }) };
885 # print STDERR Dumper($tuple_status);
886 # print STDERR "RV: $rv\n";
887     if ($@ || !defined $rv) {
888       my $errors = '';
889       foreach my $tuple (@$tuple_status)
890       {
891           $errors .= "\n" . $tuple->[1] if(ref $tuple);
892       }
893       $self->throw_exception("Error executing '$sql': ".($@ || $errors));
894     }
895   } else {
896     $self->throw_exception("'$sql' did not generate a statement.");
897   }
898   if ($self->debug) {
899       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
900       $self->debugobj->query_end($sql, @debug_bind);
901   }
902   return (wantarray ? ($rv, $sth, @bind) : $rv);
903 }
904
905 sub update {
906   return shift->_execute('update' => [], @_);
907 }
908
909 sub delete {
910   return shift->_execute('delete' => [], @_);
911 }
912
913 sub _select {
914   my ($self, $ident, $select, $condition, $attrs) = @_;
915   my $order = $attrs->{order_by};
916   if (ref $condition eq 'SCALAR') {
917     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
918   }
919   if (exists $attrs->{group_by} || $attrs->{having}) {
920     $order = {
921       group_by => $attrs->{group_by},
922       having => $attrs->{having},
923       ($order ? (order_by => $order) : ())
924     };
925   }
926   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
927   if ($attrs->{software_limit} ||
928       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
929         $attrs->{software_limit} = 1;
930   } else {
931     $self->throw_exception("rows attribute must be positive if present")
932       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
933     push @args, $attrs->{rows}, $attrs->{offset};
934   }
935   return $self->_execute(@args);
936 }
937
938 =head2 select
939
940 =over 4
941
942 =item Arguments: $ident, $select, $condition, $attrs
943
944 =back
945
946 Handle a SQL select statement.
947
948 =cut
949
950 sub select {
951   my $self = shift;
952   my ($ident, $select, $condition, $attrs) = @_;
953   return $self->cursor->new($self, \@_, $attrs);
954 }
955
956 sub select_single {
957   my $self = shift;
958   my ($rv, $sth, @bind) = $self->_select(@_);
959   my @row = $sth->fetchrow_array;
960   # Need to call finish() to work round broken DBDs
961   $sth->finish();
962   return @row;
963 }
964
965 =head2 sth
966
967 =over 4
968
969 =item Arguments: $sql
970
971 =back
972
973 Returns a L<DBI> sth (statement handle) for the supplied SQL.
974
975 =cut
976
977 sub _dbh_sth {
978   my ($self, $dbh, $sql) = @_;
979
980   # 3 is the if_active parameter which avoids active sth re-use
981   my $sth = $self->disable_sth_caching
982     ? $dbh->prepare($sql)
983     : $dbh->prepare_cached($sql, {}, 3);
984
985   $self->throw_exception(
986     'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
987   ) if !$sth;
988
989   $sth;
990 }
991
992 sub sth {
993   my ($self, $sql) = @_;
994   $self->dbh_do($self->can('_dbh_sth'), $sql);
995 }
996
997 sub _dbh_columns_info_for {
998   my ($self, $dbh, $table) = @_;
999
1000   if ($dbh->can('column_info')) {
1001     my %result;
1002     eval {
1003       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1004       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1005       $sth->execute();
1006       while ( my $info = $sth->fetchrow_hashref() ){
1007         my %column_info;
1008         $column_info{data_type}   = $info->{TYPE_NAME};
1009         $column_info{size}      = $info->{COLUMN_SIZE};
1010         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1011         $column_info{default_value} = $info->{COLUMN_DEF};
1012         my $col_name = $info->{COLUMN_NAME};
1013         $col_name =~ s/^\"(.*)\"$/$1/;
1014
1015         $result{$col_name} = \%column_info;
1016       }
1017     };
1018     return \%result if !$@ && scalar keys %result;
1019   }
1020
1021   my %result;
1022   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
1023   $sth->execute;
1024   my @columns = @{$sth->{NAME_lc}};
1025   for my $i ( 0 .. $#columns ){
1026     my %column_info;
1027     my $type_num = $sth->{TYPE}->[$i];
1028     my $type_name;
1029     if(defined $type_num && $dbh->can('type_info')) {
1030       my $type_info = $dbh->type_info($type_num);
1031       $type_name = $type_info->{TYPE_NAME} if $type_info;
1032     }
1033     $column_info{data_type} = $type_name ? $type_name : $type_num;
1034     $column_info{size} = $sth->{PRECISION}->[$i];
1035     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1036
1037     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1038       $column_info{data_type} = $1;
1039       $column_info{size}    = $2;
1040     }
1041
1042     $result{$columns[$i]} = \%column_info;
1043   }
1044
1045   return \%result;
1046 }
1047
1048 sub columns_info_for {
1049   my ($self, $table) = @_;
1050   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
1051 }
1052
1053 =head2 last_insert_id
1054
1055 Return the row id of the last insert.
1056
1057 =cut
1058
1059 sub _dbh_last_insert_id {
1060     my ($self, $dbh, $source, $col) = @_;
1061     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1062     $dbh->func('last_insert_rowid');
1063 }
1064
1065 sub last_insert_id {
1066   my $self = shift;
1067   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1068 }
1069
1070 =head2 sqlt_type
1071
1072 Returns the database driver name.
1073
1074 =cut
1075
1076 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1077
1078 =head2 create_ddl_dir (EXPERIMENTAL)
1079
1080 =over 4
1081
1082 =item Arguments: $schema \@databases, $version, $directory, $preversion, $sqlt_args
1083
1084 =back
1085
1086 Creates a SQL file based on the Schema, for each of the specified
1087 database types, in the given directory.
1088
1089 Note that this feature is currently EXPERIMENTAL and may not work correctly
1090 across all databases, or fully handle complex relationships.
1091
1092 =cut
1093
1094 sub create_ddl_dir
1095 {
1096   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1097
1098   if(!$dir || !-d $dir)
1099   {
1100     warn "No directory given, using ./\n";
1101     $dir = "./";
1102   }
1103   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1104   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1105   $version ||= $schema->VERSION || '1.x';
1106   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1107
1108   eval "use SQL::Translator";
1109   $self->throw_exception("Can't create a ddl file without SQL::Translator: $@") if $@;
1110
1111   my $sqlt = SQL::Translator->new({
1112 #      debug => 1,
1113       add_drop_table => 1,
1114   });
1115   foreach my $db (@$databases)
1116   {
1117     $sqlt->reset();
1118     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1119 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1120     $sqlt = $self->configure_sqlt($sqlt, $db);
1121     $sqlt->data($schema);
1122     $sqlt->producer($db);
1123
1124     my $file;
1125     my $filename = $schema->ddl_filename($db, $dir, $version);
1126     if(-e $filename)
1127     {
1128       warn("$filename already exists, skipping $db");
1129       next;
1130     }
1131
1132     my $output = $sqlt->translate;
1133     if(!$output)
1134     {
1135       warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1136       next;
1137     }
1138     if(!open($file, ">$filename"))
1139     {
1140         $self->throw_exception("Can't open $filename for writing ($!)");
1141         next;
1142     }
1143     print $file $output;
1144     close($file);
1145
1146     if($preversion)
1147     {
1148       eval "use SQL::Translator::Diff";
1149       if($@)
1150       {
1151         warn("Can't diff versions without SQL::Translator::Diff: $@");
1152         next;
1153       }
1154
1155       my $prefilename = $schema->ddl_filename($db, $dir, $preversion);
1156       print "Previous version $prefilename\n";
1157       if(!-e $prefilename)
1158       {
1159         warn("No previous schema file found ($prefilename)");
1160         next;
1161       }
1162       #### We need to reparse the SQLite file we just wrote, so that 
1163       ##   Diff doesnt get all confoosed, and Diff is *very* confused.
1164       ##   FIXME: rip Diff to pieces!
1165 #      my $target_schema = $sqlt->schema;
1166 #      unless ( $target_schema->name ) {
1167 #        $target_schema->name( $filename );
1168 #      }
1169       my @input;
1170       push @input, {file => $prefilename, parser => $db};
1171       push @input, {file => $filename, parser => $db};
1172       my ( $source_schema, $source_db, $target_schema, $target_db ) = map {
1173         my $file   = $_->{'file'};
1174         my $parser = $_->{'parser'};
1175
1176         my $t = SQL::Translator->new;
1177         $t->debug( 0 );
1178         $t->trace( 0 );
1179         $t->parser( $parser )            or die $t->error;
1180         my $out = $t->translate( $file ) or die $t->error;
1181         my $schema = $t->schema;
1182         unless ( $schema->name ) {
1183           $schema->name( $file );
1184         }
1185         ($schema, $parser);
1186       } @input;
1187
1188       my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1189                                                     $target_schema, $db,
1190                                                     {}
1191                                                    );
1192       my $difffile = $schema->ddl_filename($db, $dir, $version, $preversion);
1193       print STDERR "Diff: $difffile: $db, $dir, $version, $preversion \n";
1194       if(-e $difffile)
1195       {
1196         warn("$difffile already exists, skipping");
1197         next;
1198       }
1199       if(!open $file, ">$difffile")
1200       { 
1201         $self->throw_exception("Can't write to $difffile ($!)");
1202         next;
1203       }
1204       print $file $diff;
1205       close($file);
1206     }
1207   }
1208 }
1209
1210 sub configure_sqlt() {
1211   my $self = shift;
1212   my $tr = shift;
1213   my $db = shift || $self->sqlt_type;
1214   if ($db eq 'PostgreSQL') {
1215     $tr->quote_table_names(0);
1216     $tr->quote_field_names(0);
1217   }
1218   return $tr;
1219 }
1220
1221 =head2 deployment_statements
1222
1223 =over 4
1224
1225 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1226
1227 =back
1228
1229 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1230 The database driver name is given by C<$type>, though the value from
1231 L</sqlt_type> is used if it is not specified.
1232
1233 C<$directory> is used to return statements from files in a previously created
1234 L</create_ddl_dir> directory and is optional. The filenames are constructed
1235 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1236
1237 If no C<$directory> is specified then the statements are constructed on the
1238 fly using L<SQL::Translator> and C<$version> is ignored.
1239
1240 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1241
1242 =cut
1243
1244 sub deployment_statements {
1245   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1246   # Need to be connected to get the correct sqlt_type
1247   $self->ensure_connected() unless $type;
1248   $type ||= $self->sqlt_type;
1249   $version ||= $schema->VERSION || '1.x';
1250   $dir ||= './';
1251   my $filename = $schema->ddl_filename($type, $dir, $version);
1252   if(-f $filename)
1253   {
1254       my $file;
1255       open($file, "<$filename") 
1256         or $self->throw_exception("Can't open $filename ($!)");
1257       my @rows = <$file>;
1258       close($file);
1259       return join('', @rows);
1260   }
1261
1262   eval "use SQL::Translator";
1263   if(!$@)
1264   {
1265     eval "use SQL::Translator::Parser::DBIx::Class;";
1266     $self->throw_exception($@) if $@;
1267     eval "use SQL::Translator::Producer::${type};";
1268     $self->throw_exception($@) if $@;
1269     my $tr = SQL::Translator->new(%$sqltargs);
1270     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1271     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1272   }
1273
1274   $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1275   return;
1276
1277 }
1278
1279 sub deploy {
1280   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1281   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1282     for ( split(";\n", $statement)) {
1283       next if($_ =~ /^--/);
1284       next if(!$_);
1285 #      next if($_ =~ /^DROP/m);
1286       next if($_ =~ /^BEGIN TRANSACTION/m);
1287       next if($_ =~ /^COMMIT/m);
1288       next if $_ =~ /^\s+$/; # skip whitespace only
1289       $self->debugobj->query_start($_) if $self->debug;
1290       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1291       $self->debugobj->query_end($_) if $self->debug;
1292     }
1293   }
1294 }
1295
1296 =head2 datetime_parser
1297
1298 Returns the datetime parser class
1299
1300 =cut
1301
1302 sub datetime_parser {
1303   my $self = shift;
1304   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1305 }
1306
1307 =head2 datetime_parser_type
1308
1309 Defines (returns) the datetime parser class - currently hardwired to
1310 L<DateTime::Format::MySQL>
1311
1312 =cut
1313
1314 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1315
1316 =head2 build_datetime_parser
1317
1318 See L</datetime_parser>
1319
1320 =cut
1321
1322 sub build_datetime_parser {
1323   my $self = shift;
1324   my $type = $self->datetime_parser_type(@_);
1325   eval "use ${type}";
1326   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1327   return $type;
1328 }
1329
1330 sub DESTROY {
1331   my $self = shift;
1332   return if !$self->_dbh;
1333   $self->_verify_pid;
1334   $self->_dbh(undef);
1335 }
1336
1337 1;
1338
1339 =head1 SQL METHODS
1340
1341 The module defines a set of methods within the DBIC::SQL::Abstract
1342 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1343 SQL query functions.
1344
1345 The following methods are extended:-
1346
1347 =over 4
1348
1349 =item delete
1350
1351 =item insert
1352
1353 =item select
1354
1355 =item update
1356
1357 =item limit_dialect
1358
1359 See L</connect_info> for details.
1360 For setting, this method is deprecated in favor of L</connect_info>.
1361
1362 =item quote_char
1363
1364 See L</connect_info> for details.
1365 For setting, this method is deprecated in favor of L</connect_info>.
1366
1367 =item name_sep
1368
1369 See L</connect_info> for details.
1370 For setting, this method is deprecated in favor of L</connect_info>.
1371
1372 =back
1373
1374 =head1 AUTHORS
1375
1376 Matt S. Trout <mst@shadowcatsystems.co.uk>
1377
1378 Andy Grundman <andy@hybridized.org>
1379
1380 =head1 LICENSE
1381
1382 You may distribute this code under the same terms as Perl itself.
1383
1384 =cut