edeb8d0b55f4d319e793c810436d60d05431d86b
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 use Scalar::Util 'blessed';
62 sub _find_syntax {
63   my ($self, $syntax) = @_;
64   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
65 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
66   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
67     return 'RowNumberOver';
68   }
69
70   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
71 }
72
73 sub select {
74   my ($self, $table, $fields, $where, $order, @rest) = @_;
75   $table = $self->_quote($table) unless ref($table);
76   local $self->{rownum_hack_count} = 1
77     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
78   @rest = (-1) unless defined $rest[0];
79   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
80     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
81   local $self->{having_bind} = [];
82   my ($sql, @ret) = $self->SUPER::select(
83     $table, $self->_recurse_fields($fields), $where, $order, @rest
84   );
85   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
86 }
87
88 sub insert {
89   my $self = shift;
90   my $table = shift;
91   $table = $self->_quote($table) unless ref($table);
92   $self->SUPER::insert($table, @_);
93 }
94
95 sub update {
96   my $self = shift;
97   my $table = shift;
98   $table = $self->_quote($table) unless ref($table);
99   $self->SUPER::update($table, @_);
100 }
101
102 sub delete {
103   my $self = shift;
104   my $table = shift;
105   $table = $self->_quote($table) unless ref($table);
106   $self->SUPER::delete($table, @_);
107 }
108
109 sub _emulate_limit {
110   my $self = shift;
111   if ($_[3] == -1) {
112     return $_[1].$self->_order_by($_[2]);
113   } else {
114     return $self->SUPER::_emulate_limit(@_);
115   }
116 }
117
118 sub _recurse_fields {
119   my ($self, $fields) = @_;
120   my $ref = ref $fields;
121   return $self->_quote($fields) unless $ref;
122   return $$fields if $ref eq 'SCALAR';
123
124   if ($ref eq 'ARRAY') {
125     return join(', ', map {
126       $self->_recurse_fields($_)
127       .(exists $self->{rownum_hack_count}
128          ? ' AS col'.$self->{rownum_hack_count}++
129          : '')
130      } @$fields);
131   } elsif ($ref eq 'HASH') {
132     foreach my $func (keys %$fields) {
133       return $self->_sqlcase($func)
134         .'( '.$self->_recurse_fields($fields->{$func}).' )';
135     }
136   }
137 }
138
139 sub _order_by {
140   my $self = shift;
141   my $ret = '';
142   my @extra;
143   if (ref $_[0] eq 'HASH') {
144     if (defined $_[0]->{group_by}) {
145       $ret = $self->_sqlcase(' group by ')
146                .$self->_recurse_fields($_[0]->{group_by});
147     }
148     if (defined $_[0]->{having}) {
149       my $frag;
150       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
151       push(@{$self->{having_bind}}, @extra);
152       $ret .= $self->_sqlcase(' having ').$frag;
153     }
154     if (defined $_[0]->{order_by}) {
155       $ret .= $self->_order_by($_[0]->{order_by});
156     }
157   } elsif (ref $_[0] eq 'SCALAR') {
158     $ret = $self->_sqlcase(' order by ').${ $_[0] };
159   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
160     my @order = @{+shift};
161     $ret = $self->_sqlcase(' order by ')
162           .join(', ', map {
163                         my $r = $self->_order_by($_, @_);
164                         $r =~ s/^ ?ORDER BY //i;
165                         $r;
166                       } @order);
167   } else {
168     $ret = $self->SUPER::_order_by(@_);
169   }
170   return $ret;
171 }
172
173 sub _order_directions {
174   my ($self, $order) = @_;
175   $order = $order->{order_by} if ref $order eq 'HASH';
176   return $self->SUPER::_order_directions($order);
177 }
178
179 sub _table {
180   my ($self, $from) = @_;
181   if (ref $from eq 'ARRAY') {
182     return $self->_recurse_from(@$from);
183   } elsif (ref $from eq 'HASH') {
184     return $self->_make_as($from);
185   } else {
186     return $from; # would love to quote here but _table ends up getting called
187                   # twice during an ->select without a limit clause due to
188                   # the way S::A::Limit->select works. should maybe consider
189                   # bypassing this and doing S::A::select($self, ...) in
190                   # our select method above. meantime, quoting shims have
191                   # been added to select/insert/update/delete here
192   }
193 }
194
195 sub _recurse_from {
196   my ($self, $from, @join) = @_;
197   my @sqlf;
198   push(@sqlf, $self->_make_as($from));
199   foreach my $j (@join) {
200     my ($to, $on) = @$j;
201
202     # check whether a join type exists
203     my $join_clause = '';
204     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
205     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
206       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
207     } else {
208       $join_clause = ' JOIN ';
209     }
210     push(@sqlf, $join_clause);
211
212     if (ref $to eq 'ARRAY') {
213       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
214     } else {
215       push(@sqlf, $self->_make_as($to));
216     }
217     push(@sqlf, ' ON ', $self->_join_condition($on));
218   }
219   return join('', @sqlf);
220 }
221
222 sub _make_as {
223   my ($self, $from) = @_;
224   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
225                      reverse each %{$self->_skip_options($from)});
226 }
227
228 sub _skip_options {
229   my ($self, $hash) = @_;
230   my $clean_hash = {};
231   $clean_hash->{$_} = $hash->{$_}
232     for grep {!/^-/} keys %$hash;
233   return $clean_hash;
234 }
235
236 sub _join_condition {
237   my ($self, $cond) = @_;
238   if (ref $cond eq 'HASH') {
239     my %j;
240     for (keys %$cond) {
241       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
242     };
243     return $self->_recurse_where(\%j);
244   } elsif (ref $cond eq 'ARRAY') {
245     return join(' OR ', map { $self->_join_condition($_) } @$cond);
246   } else {
247     die "Can't handle this yet!";
248   }
249 }
250
251 sub _quote {
252   my ($self, $label) = @_;
253   return '' unless defined $label;
254   return "*" if $label eq '*';
255   return $label unless $self->{quote_char};
256   if(ref $self->{quote_char} eq "ARRAY"){
257     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
258       if !defined $self->{name_sep};
259     my $sep = $self->{name_sep};
260     return join($self->{name_sep},
261         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
262        split(/\Q$sep\E/,$label));
263   }
264   return $self->SUPER::_quote($label);
265 }
266
267 sub limit_dialect {
268     my $self = shift;
269     $self->{limit_dialect} = shift if @_;
270     return $self->{limit_dialect};
271 }
272
273 sub quote_char {
274     my $self = shift;
275     $self->{quote_char} = shift if @_;
276     return $self->{quote_char};
277 }
278
279 sub name_sep {
280     my $self = shift;
281     $self->{name_sep} = shift if @_;
282     return $self->{name_sep};
283 }
284
285 } # End of BEGIN block
286
287 =head1 NAME
288
289 DBIx::Class::Storage::DBI - DBI storage handler
290
291 =head1 SYNOPSIS
292
293 =head1 DESCRIPTION
294
295 This class represents the connection to an RDBMS via L<DBI>.  See
296 L<DBIx::Class::Storage> for general information.  This pod only
297 documents DBI-specific methods and behaviors.
298
299 =head1 METHODS
300
301 =cut
302
303 sub new {
304   my $new = shift->next::method(@_);
305
306   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
307   $new->transaction_depth(0);
308   $new->_sql_maker_opts({});
309   $new->{_in_dbh_do} = 0;
310   $new->{_dbh_gen} = 0;
311
312   $new;
313 }
314
315 =head2 connect_info
316
317 The arguments of C<connect_info> are always a single array reference.
318
319 This is normally accessed via L<DBIx::Class::Schema/connection>, which
320 encapsulates its argument list in an arrayref before calling
321 C<connect_info> here.
322
323 The arrayref can either contain the same set of arguments one would
324 normally pass to L<DBI/connect>, or a lone code reference which returns
325 a connected database handle.
326
327 In either case, if the final argument in your connect_info happens
328 to be a hashref, C<connect_info> will look there for several
329 connection-specific options:
330
331 =over 4
332
333 =item on_connect_do
334
335 This can be set to an arrayref of literal sql statements, which will
336 be executed immediately after making the connection to the database
337 every time we [re-]connect.
338
339 =item limit_dialect 
340
341 Sets the limit dialect. This is useful for JDBC-bridge among others
342 where the remote SQL-dialect cannot be determined by the name of the
343 driver alone.
344
345 =item quote_char
346
347 Specifies what characters to use to quote table and column names. If 
348 you use this you will want to specify L<name_sep> as well.
349
350 quote_char expects either a single character, in which case is it is placed
351 on either side of the table/column, or an arrayref of length 2 in which case the
352 table/column name is placed between the elements.
353
354 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
355 use C<quote_char =E<gt> [qw/[ ]/]>.
356
357 =item name_sep
358
359 This only needs to be used in conjunction with L<quote_char>, and is used to 
360 specify the charecter that seperates elements (schemas, tables, columns) from 
361 each other. In most cases this is simply a C<.>.
362
363 =back
364
365 These options can be mixed in with your other L<DBI> connection attributes,
366 or placed in a seperate hashref after all other normal L<DBI> connection
367 arguments.
368
369 Every time C<connect_info> is invoked, any previous settings for
370 these options will be cleared before setting the new ones, regardless of
371 whether any options are specified in the new C<connect_info>.
372
373 Important note:  DBIC expects the returned database handle provided by 
374 a subref argument to have RaiseError set on it.  If it doesn't, things
375 might not work very well, YMMV.  If you don't use a subref, DBIC will
376 force this setting for you anyways.  Setting HandleError to anything
377 other than simple exception object wrapper might cause problems too.
378
379 Examples:
380
381   # Simple SQLite connection
382   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
383
384   # Connect via subref
385   ->connect_info([ sub { DBI->connect(...) } ]);
386
387   # A bit more complicated
388   ->connect_info(
389     [
390       'dbi:Pg:dbname=foo',
391       'postgres',
392       'my_pg_password',
393       { AutoCommit => 0 },
394       { quote_char => q{"}, name_sep => q{.} },
395     ]
396   );
397
398   # Equivalent to the previous example
399   ->connect_info(
400     [
401       'dbi:Pg:dbname=foo',
402       'postgres',
403       'my_pg_password',
404       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
405     ]
406   );
407
408   # Subref + DBIC-specific connection options
409   ->connect_info(
410     [
411       sub { DBI->connect(...) },
412       {
413           quote_char => q{`},
414           name_sep => q{@},
415           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
416       },
417     ]
418   );
419
420 =cut
421
422 sub connect_info {
423   my ($self, $info_arg) = @_;
424
425   return $self->_connect_info if !$info_arg;
426
427   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
428   #  the new set of options
429   $self->_sql_maker(undef);
430   $self->_sql_maker_opts({});
431
432   my $info = [ @$info_arg ]; # copy because we can alter it
433   my $last_info = $info->[-1];
434   if(ref $last_info eq 'HASH') {
435     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
436       $self->on_connect_do($on_connect_do);
437     }
438     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
439       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
440         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
441       }
442     }
443
444     # Get rid of any trailing empty hashref
445     pop(@$info) if !keys %$last_info;
446   }
447
448   $self->_connect_info($info);
449 }
450
451 =head2 on_connect_do
452
453 This method is deprecated in favor of setting via L</connect_info>.
454
455 =head2 dbh_do
456
457 Arguments: $subref, @extra_coderef_args?
458
459 Execute the given subref using the new exception-based connection management.
460
461 The first two arguments will be the storage object that C<dbh_do> was called
462 on and a database handle to use.  Any additional arguments will be passed
463 verbatim to the called subref as arguments 2 and onwards.
464
465 Using this (instead of $self->_dbh or $self->dbh) ensures correct
466 exception handling and reconnection (or failover in future subclasses).
467
468 Your subref should have no side-effects outside of the database, as
469 there is the potential for your subref to be partially double-executed
470 if the database connection was stale/dysfunctional.
471
472 Example:
473
474   my @stuff = $schema->storage->dbh_do(
475     sub {
476       my ($storage, $dbh, @cols) = @_;
477       my $cols = join(q{, }, @cols);
478       $dbh->selectrow_array("SELECT $cols FROM foo");
479     },
480     @column_list
481   );
482
483 =cut
484
485 sub dbh_do {
486   my $self = shift;
487   my $coderef = shift;
488
489   ref $coderef eq 'CODE' or $self->throw_exception
490     ('$coderef must be a CODE reference');
491
492   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
493   local $self->{_in_dbh_do} = 1;
494
495   my @result;
496   my $want_array = wantarray;
497
498   eval {
499     $self->_verify_pid if $self->_dbh;
500     $self->_populate_dbh if !$self->_dbh;
501     if($want_array) {
502         @result = $coderef->($self, $self->_dbh, @_);
503     }
504     elsif(defined $want_array) {
505         $result[0] = $coderef->($self, $self->_dbh, @_);
506     }
507     else {
508         $coderef->($self, $self->_dbh, @_);
509     }
510   };
511
512   my $exception = $@;
513   if(!$exception) { return $want_array ? @result : $result[0] }
514
515   $self->throw_exception($exception) if $self->connected;
516
517   # We were not connected - reconnect and retry, but let any
518   #  exception fall right through this time
519   $self->_populate_dbh;
520   $coderef->($self, $self->_dbh, @_);
521 }
522
523 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
524 # It also informs dbh_do to bypass itself while under the direction of txn_do,
525 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
526 sub txn_do {
527   my $self = shift;
528   my $coderef = shift;
529
530   ref $coderef eq 'CODE' or $self->throw_exception
531     ('$coderef must be a CODE reference');
532
533   local $self->{_in_dbh_do} = 1;
534
535   my @result;
536   my $want_array = wantarray;
537
538   my $tried = 0;
539   while(1) {
540     eval {
541       $self->_verify_pid if $self->_dbh;
542       $self->_populate_dbh if !$self->_dbh;
543
544       $self->txn_begin;
545       if($want_array) {
546           @result = $coderef->(@_);
547       }
548       elsif(defined $want_array) {
549           $result[0] = $coderef->(@_);
550       }
551       else {
552           $coderef->(@_);
553       }
554       $self->txn_commit;
555     };
556
557     my $exception = $@;
558     if(!$exception) { return $want_array ? @result : $result[0] }
559
560     if($tried++ > 0 || $self->connected) {
561       eval { $self->txn_rollback };
562       my $rollback_exception = $@;
563       if($rollback_exception) {
564         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
565         $self->throw_exception($exception)  # propagate nested rollback
566           if $rollback_exception =~ /$exception_class/;
567
568         $self->throw_exception(
569           "Transaction aborted: ${exception}. "
570           . "Rollback failed: ${rollback_exception}"
571         );
572       }
573       $self->throw_exception($exception)
574     }
575
576     # We were not connected, and was first try - reconnect and retry
577     # via the while loop
578     $self->_populate_dbh;
579   }
580 }
581
582 =head2 disconnect
583
584 Our C<disconnect> method also performs a rollback first if the
585 database is not in C<AutoCommit> mode.
586
587 =cut
588
589 sub disconnect {
590   my ($self) = @_;
591
592   if( $self->connected ) {
593     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
594     $self->_dbh->disconnect;
595     $self->_dbh(undef);
596     $self->{_dbh_gen}++;
597   }
598 }
599
600 sub connected {
601   my ($self) = @_;
602
603   if(my $dbh = $self->_dbh) {
604       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
605           $self->_dbh(undef);
606           $self->{_dbh_gen}++;
607           return;
608       }
609       else {
610           $self->_verify_pid;
611       }
612       return ($dbh->FETCH('Active') && $dbh->ping);
613   }
614
615   return 0;
616 }
617
618 # handle pid changes correctly
619 #  NOTE: assumes $self->_dbh is a valid $dbh
620 sub _verify_pid {
621   my ($self) = @_;
622
623   return if $self->_conn_pid == $$;
624
625   $self->_dbh->{InactiveDestroy} = 1;
626   $self->_dbh(undef);
627   $self->{_dbh_gen}++;
628
629   return;
630 }
631
632 sub ensure_connected {
633   my ($self) = @_;
634
635   unless ($self->connected) {
636     $self->_populate_dbh;
637   }
638 }
639
640 =head2 dbh
641
642 Returns the dbh - a data base handle of class L<DBI>.
643
644 =cut
645
646 sub dbh {
647   my ($self) = @_;
648
649   $self->ensure_connected;
650   return $self->_dbh;
651 }
652
653 sub _sql_maker_args {
654     my ($self) = @_;
655     
656     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
657 }
658
659 sub sql_maker {
660   my ($self) = @_;
661   unless ($self->_sql_maker) {
662     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
663   }
664   return $self->_sql_maker;
665 }
666
667 sub _populate_dbh {
668   my ($self) = @_;
669   my @info = @{$self->_connect_info || []};
670   $self->_dbh($self->_connect(@info));
671
672   if(ref $self eq 'DBIx::Class::Storage::DBI') {
673     my $driver = $self->_dbh->{Driver}->{Name};
674     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
675       bless $self, "DBIx::Class::Storage::DBI::${driver}";
676       $self->_rebless() if $self->can('_rebless');
677     }
678   }
679
680   # if on-connect sql statements are given execute them
681   foreach my $sql_statement (@{$self->on_connect_do || []}) {
682     $self->debugobj->query_start($sql_statement) if $self->debug();
683     $self->_dbh->do($sql_statement);
684     $self->debugobj->query_end($sql_statement) if $self->debug();
685   }
686
687   $self->_conn_pid($$);
688   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
689 }
690
691 sub _connect {
692   my ($self, @info) = @_;
693
694   $self->throw_exception("You failed to provide any connection info")
695       if !@info;
696
697   my ($old_connect_via, $dbh);
698
699   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
700       $old_connect_via = $DBI::connect_via;
701       $DBI::connect_via = 'connect';
702   }
703
704   eval {
705     if(ref $info[0] eq 'CODE') {
706        $dbh = &{$info[0]}
707     }
708     else {
709        $dbh = DBI->connect(@info);
710        $dbh->{RaiseError} = 1;
711        $dbh->{PrintError} = 0;
712        $dbh->{PrintWarn} = 0;
713     }
714   };
715
716   $DBI::connect_via = $old_connect_via if $old_connect_via;
717
718   if (!$dbh || $@) {
719     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
720   }
721
722   $dbh;
723 }
724
725 sub _dbh_txn_begin {
726   my ($self, $dbh) = @_;
727   if ($dbh->{AutoCommit}) {
728     $self->debugobj->txn_begin()
729       if ($self->debug);
730     $dbh->begin_work;
731   }
732 }
733
734 sub txn_begin {
735   my $self = shift;
736   $self->dbh_do($self->can('_dbh_txn_begin'))
737     if $self->{transaction_depth}++ == 0;
738 }
739
740 sub _dbh_txn_commit {
741   my ($self, $dbh) = @_;
742   if ($self->{transaction_depth} == 0) {
743     unless ($dbh->{AutoCommit}) {
744       $self->debugobj->txn_commit()
745         if ($self->debug);
746       $dbh->commit;
747     }
748   }
749   else {
750     if (--$self->{transaction_depth} == 0) {
751       $self->debugobj->txn_commit()
752         if ($self->debug);
753       $dbh->commit;
754     }
755   }
756 }
757
758 sub txn_commit {
759   my $self = shift;
760   $self->dbh_do($self->can('_dbh_txn_commit'));
761 }
762
763 sub _dbh_txn_rollback {
764   my ($self, $dbh) = @_;
765   if ($self->{transaction_depth} == 0) {
766     unless ($dbh->{AutoCommit}) {
767       $self->debugobj->txn_rollback()
768         if ($self->debug);
769       $dbh->rollback;
770     }
771   }
772   else {
773     if (--$self->{transaction_depth} == 0) {
774       $self->debugobj->txn_rollback()
775         if ($self->debug);
776       $dbh->rollback;
777     }
778     else {
779       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
780     }
781   }
782 }
783
784 sub txn_rollback {
785   my $self = shift;
786
787   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
788   if ($@) {
789     my $error = $@;
790     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
791     $error =~ /$exception_class/ and $self->throw_exception($error);
792     $self->{transaction_depth} = 0;          # ensure that a failed rollback
793     $self->throw_exception($error);          # resets the transaction depth
794   }
795 }
796
797 # This used to be the top-half of _execute.  It was split out to make it
798 #  easier to override in NoBindVars without duping the rest.  It takes up
799 #  all of _execute's args, and emits $sql, @bind.
800 sub _prep_for_execute {
801   my ($self, $op, $extra_bind, $ident, @args) = @_;
802
803   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
804   unshift(@bind, @$extra_bind) if $extra_bind;
805   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
806
807   return ($sql, @bind);
808 }
809
810 sub _execute {
811   my $self = shift;
812
813   my ($sql, @bind) = $self->_prep_for_execute(@_);
814
815   if ($self->debug) {
816       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
817       $self->debugobj->query_start($sql, @debug_bind);
818   }
819
820   my $sth = $self->sth($sql);
821
822   my $rv;
823   if ($sth) {
824     my $time = time();
825     $rv = eval { $sth->execute(@bind) };
826
827     if ($@ || !$rv) {
828       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
829     }
830   } else {
831     $self->throw_exception("'$sql' did not generate a statement.");
832   }
833   if ($self->debug) {
834       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind; 
835       $self->debugobj->query_end($sql, @debug_bind);
836   }
837   return (wantarray ? ($rv, $sth, @bind) : $rv);
838 }
839
840 sub insert {
841   my ($self, $ident, $to_insert) = @_;
842   $self->throw_exception(
843     "Couldn't insert ".join(', ',
844       map "$_ => $to_insert->{$_}", keys %$to_insert
845     )." into ${ident}"
846   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
847   return $to_insert;
848 }
849
850 ## Still not quite perfect, and EXPERIMENTAL
851 ## Currently it is assumed that all values passed will be "normal", i.e. not 
852 ## scalar refs, or at least, all the same type as the first set, the statement is
853 ## only prepped once.
854 sub insert_bulk {
855   my ($self, $table, $cols, $data) = @_;
856   my %colvalues;
857   @colvalues{@$cols} = (0..$#$cols);
858   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
859 # print STDERR "BIND".Dumper(\@bind);
860
861   if ($self->debug) {
862       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
863       $self->debugobj->query_start($sql, @debug_bind);
864   }
865   my $sth = $self->sth($sql);
866
867 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
868
869   my $rv;
870   ## This must be an arrayref, else nothing works!
871   my $tuple_status = [];
872 #  use Data::Dumper;
873 #  print STDERR Dumper($data);
874   if ($sth) {
875     my $time = time();
876     $rv = eval { $sth->execute_array({ ArrayTupleFetch => sub { my $values = shift @$data;  return if !$values; return [ @{$values}[@bind] ]},
877                                        ArrayTupleStatus => $tuple_status }) };
878 # print STDERR Dumper($tuple_status);
879 # print STDERR "RV: $rv\n";
880     if ($@ || !defined $rv) {
881       my $errors = '';
882       foreach my $tuple (@$tuple_status)
883       {
884           $errors .= "\n" . $tuple->[1] if(ref $tuple);
885       }
886       $self->throw_exception("Error executing '$sql': ".($@ || $errors));
887     }
888   } else {
889     $self->throw_exception("'$sql' did not generate a statement.");
890   }
891   if ($self->debug) {
892       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
893       $self->debugobj->query_end($sql, @debug_bind);
894   }
895   return (wantarray ? ($rv, $sth, @bind) : $rv);
896 }
897
898 sub update {
899   return shift->_execute('update' => [], @_);
900 }
901
902 sub delete {
903   return shift->_execute('delete' => [], @_);
904 }
905
906 sub _select {
907   my ($self, $ident, $select, $condition, $attrs) = @_;
908   my $order = $attrs->{order_by};
909   if (ref $condition eq 'SCALAR') {
910     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
911   }
912   if (exists $attrs->{group_by} || $attrs->{having}) {
913     $order = {
914       group_by => $attrs->{group_by},
915       having => $attrs->{having},
916       ($order ? (order_by => $order) : ())
917     };
918   }
919   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
920   if ($attrs->{software_limit} ||
921       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
922         $attrs->{software_limit} = 1;
923   } else {
924     $self->throw_exception("rows attribute must be positive if present")
925       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
926     push @args, $attrs->{rows}, $attrs->{offset};
927   }
928   return $self->_execute(@args);
929 }
930
931 =head2 select
932
933 =over 4
934
935 =item Arguments: $ident, $select, $condition, $attrs
936
937 =back
938
939 Handle a SQL select statement.
940
941 =cut
942
943 sub select {
944   my $self = shift;
945   my ($ident, $select, $condition, $attrs) = @_;
946   return $self->cursor->new($self, \@_, $attrs);
947 }
948
949 sub select_single {
950   my $self = shift;
951   my ($rv, $sth, @bind) = $self->_select(@_);
952   my @row = $sth->fetchrow_array;
953   # Need to call finish() to work round broken DBDs
954   $sth->finish();
955   return @row;
956 }
957
958 =head2 sth
959
960 =over 4
961
962 =item Arguments: $sql
963
964 =back
965
966 Returns a L<DBI> sth (statement handle) for the supplied SQL.
967
968 =cut
969
970 sub _dbh_sth {
971   my ($self, $dbh, $sql) = @_;
972   # 3 is the if_active parameter which avoids active sth re-use
973   $dbh->prepare_cached($sql, {}, 3) or
974     $self->throw_exception(
975       'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
976     );
977 }
978
979 sub sth {
980   my ($self, $sql) = @_;
981   $self->dbh_do($self->can('_dbh_sth'), $sql);
982 }
983
984 sub _dbh_columns_info_for {
985   my ($self, $dbh, $table) = @_;
986
987   if ($dbh->can('column_info')) {
988     my %result;
989     eval {
990       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
991       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
992       $sth->execute();
993       while ( my $info = $sth->fetchrow_hashref() ){
994         my %column_info;
995         $column_info{data_type}   = $info->{TYPE_NAME};
996         $column_info{size}      = $info->{COLUMN_SIZE};
997         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
998         $column_info{default_value} = $info->{COLUMN_DEF};
999         my $col_name = $info->{COLUMN_NAME};
1000         $col_name =~ s/^\"(.*)\"$/$1/;
1001
1002         $result{$col_name} = \%column_info;
1003       }
1004     };
1005     return \%result if !$@ && scalar keys %result;
1006   }
1007
1008   my %result;
1009   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
1010   $sth->execute;
1011   my @columns = @{$sth->{NAME_lc}};
1012   for my $i ( 0 .. $#columns ){
1013     my %column_info;
1014     my $type_num = $sth->{TYPE}->[$i];
1015     my $type_name;
1016     if(defined $type_num && $dbh->can('type_info')) {
1017       my $type_info = $dbh->type_info($type_num);
1018       $type_name = $type_info->{TYPE_NAME} if $type_info;
1019     }
1020     $column_info{data_type} = $type_name ? $type_name : $type_num;
1021     $column_info{size} = $sth->{PRECISION}->[$i];
1022     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1023
1024     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1025       $column_info{data_type} = $1;
1026       $column_info{size}    = $2;
1027     }
1028
1029     $result{$columns[$i]} = \%column_info;
1030   }
1031
1032   return \%result;
1033 }
1034
1035 sub columns_info_for {
1036   my ($self, $table) = @_;
1037   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
1038 }
1039
1040 =head2 last_insert_id
1041
1042 Return the row id of the last insert.
1043
1044 =cut
1045
1046 sub _dbh_last_insert_id {
1047     my ($self, $dbh, $source, $col) = @_;
1048     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1049     $dbh->func('last_insert_rowid');
1050 }
1051
1052 sub last_insert_id {
1053   my $self = shift;
1054   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1055 }
1056
1057 =head2 sqlt_type
1058
1059 Returns the database driver name.
1060
1061 =cut
1062
1063 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1064
1065 =head2 create_ddl_dir (EXPERIMENTAL)
1066
1067 =over 4
1068
1069 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1070
1071 =back
1072
1073 Creates a SQL file based on the Schema, for each of the specified
1074 database types, in the given directory.
1075
1076 Note that this feature is currently EXPERIMENTAL and may not work correctly
1077 across all databases, or fully handle complex relationships.
1078
1079 =cut
1080
1081 sub create_ddl_dir
1082 {
1083   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1084
1085   if(!$dir || !-d $dir)
1086   {
1087     warn "No directory given, using ./\n";
1088     $dir = "./";
1089   }
1090   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1091   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1092   $version ||= $schema->VERSION || '1.x';
1093   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1094
1095   eval "use SQL::Translator";
1096   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1097
1098   my $sqlt = SQL::Translator->new($sqltargs);
1099   foreach my $db (@$databases)
1100   {
1101     $sqlt->reset();
1102     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1103 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1104     $sqlt->data($schema);
1105     $sqlt->producer($db);
1106
1107     my $file;
1108     my $filename = $schema->ddl_filename($db, $dir, $version);
1109     if(-e $filename)
1110     {
1111       $self->throw_exception("$filename already exists, skipping $db");
1112       next;
1113     }
1114     open($file, ">$filename") 
1115       or $self->throw_exception("Can't open $filename for writing ($!)");
1116     my $output = $sqlt->translate;
1117 #use Data::Dumper;
1118 #    print join(":", keys %{$schema->source_registrations});
1119 #    print Dumper($sqlt->schema);
1120     if(!$output)
1121     {
1122       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1123       next;
1124     }
1125     print $file $output;
1126     close($file);
1127   }
1128
1129 }
1130
1131 =head2 deployment_statements
1132
1133 =over 4
1134
1135 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1136
1137 =back
1138
1139 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1140 The database driver name is given by C<$type>, though the value from
1141 L</sqlt_type> is used if it is not specified.
1142
1143 C<$directory> is used to return statements from files in a previously created
1144 L</create_ddl_dir> directory and is optional. The filenames are constructed
1145 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1146
1147 If no C<$directory> is specified then the statements are constructed on the
1148 fly using L<SQL::Translator> and C<$version> is ignored.
1149
1150 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1151
1152 =cut
1153
1154 sub deployment_statements {
1155   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1156   # Need to be connected to get the correct sqlt_type
1157   $self->ensure_connected() unless $type;
1158   $type ||= $self->sqlt_type;
1159   $version ||= $schema->VERSION || '1.x';
1160   $dir ||= './';
1161   eval "use SQL::Translator";
1162   if(!$@)
1163   {
1164     eval "use SQL::Translator::Parser::DBIx::Class;";
1165     $self->throw_exception($@) if $@;
1166     eval "use SQL::Translator::Producer::${type};";
1167     $self->throw_exception($@) if $@;
1168     my $tr = SQL::Translator->new(%$sqltargs);
1169     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1170     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1171   }
1172
1173   my $filename = $schema->ddl_filename($type, $dir, $version);
1174   if(!-f $filename)
1175   {
1176 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1177       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1178       return;
1179   }
1180   my $file;
1181   open($file, "<$filename") 
1182       or $self->throw_exception("Can't open $filename ($!)");
1183   my @rows = <$file>;
1184   close($file);
1185
1186   return join('', @rows);
1187   
1188 }
1189
1190 sub deploy {
1191   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1192   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1193     for ( split(";\n", $statement)) {
1194       next if($_ =~ /^--/);
1195       next if(!$_);
1196 #      next if($_ =~ /^DROP/m);
1197       next if($_ =~ /^BEGIN TRANSACTION/m);
1198       next if($_ =~ /^COMMIT/m);
1199       next if $_ =~ /^\s+$/; # skip whitespace only
1200       $self->debugobj->query_start($_) if $self->debug;
1201       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1202       $self->debugobj->query_end($_) if $self->debug;
1203     }
1204   }
1205 }
1206
1207 =head2 datetime_parser
1208
1209 Returns the datetime parser class
1210
1211 =cut
1212
1213 sub datetime_parser {
1214   my $self = shift;
1215   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1216 }
1217
1218 =head2 datetime_parser_type
1219
1220 Defines (returns) the datetime parser class - currently hardwired to
1221 L<DateTime::Format::MySQL>
1222
1223 =cut
1224
1225 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1226
1227 =head2 build_datetime_parser
1228
1229 See L</datetime_parser>
1230
1231 =cut
1232
1233 sub build_datetime_parser {
1234   my $self = shift;
1235   my $type = $self->datetime_parser_type(@_);
1236   eval "use ${type}";
1237   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1238   return $type;
1239 }
1240
1241 sub DESTROY {
1242   my $self = shift;
1243   return if !$self->_dbh;
1244   $self->_verify_pid;
1245   $self->_dbh(undef);
1246 }
1247
1248 1;
1249
1250 =head1 SQL METHODS
1251
1252 The module defines a set of methods within the DBIC::SQL::Abstract
1253 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1254 SQL query functions.
1255
1256 The following methods are extended:-
1257
1258 =over 4
1259
1260 =item delete
1261
1262 =item insert
1263
1264 =item select
1265
1266 =item update
1267
1268 =item limit_dialect
1269
1270 See L</connect_info> for details.
1271 For setting, this method is deprecated in favor of L</connect_info>.
1272
1273 =item quote_char
1274
1275 See L</connect_info> for details.
1276 For setting, this method is deprecated in favor of L</connect_info>.
1277
1278 =item name_sep
1279
1280 See L</connect_info> for details.
1281 For setting, this method is deprecated in favor of L</connect_info>.
1282
1283 =back
1284
1285 =head1 AUTHORS
1286
1287 Matt S. Trout <mst@shadowcatsystems.co.uk>
1288
1289 Andy Grundman <andy@hybridized.org>
1290
1291 =head1 LICENSE
1292
1293 You may distribute this code under the same terms as Perl itself.
1294
1295 =cut