895de8888541d4d1c1dcb8e61f580133d64431c0
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 use Scalar::Util 'blessed';
62 sub _find_syntax {
63   my ($self, $syntax) = @_;
64   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
65 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
66   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
67     return 'RowNumberOver';
68   }
69
70   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
71 }
72
73 sub select {
74   my ($self, $table, $fields, $where, $order, @rest) = @_;
75   $table = $self->_quote($table) unless ref($table);
76   local $self->{rownum_hack_count} = 1
77     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
78   @rest = (-1) unless defined $rest[0];
79   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
80     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
81   local $self->{having_bind} = [];
82   my ($sql, @ret) = $self->SUPER::select(
83     $table, $self->_recurse_fields($fields), $where, $order, @rest
84   );
85   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
86 }
87
88 sub insert {
89   my $self = shift;
90   my $table = shift;
91   $table = $self->_quote($table) unless ref($table);
92   $self->SUPER::insert($table, @_);
93 }
94
95 sub update {
96   my $self = shift;
97   my $table = shift;
98   $table = $self->_quote($table) unless ref($table);
99   $self->SUPER::update($table, @_);
100 }
101
102 sub delete {
103   my $self = shift;
104   my $table = shift;
105   $table = $self->_quote($table) unless ref($table);
106   $self->SUPER::delete($table, @_);
107 }
108
109 sub _emulate_limit {
110   my $self = shift;
111   if ($_[3] == -1) {
112     return $_[1].$self->_order_by($_[2]);
113   } else {
114     return $self->SUPER::_emulate_limit(@_);
115   }
116 }
117
118 sub _recurse_fields {
119   my ($self, $fields) = @_;
120   my $ref = ref $fields;
121   return $self->_quote($fields) unless $ref;
122   return $$fields if $ref eq 'SCALAR';
123
124   if ($ref eq 'ARRAY') {
125     return join(', ', map {
126       $self->_recurse_fields($_)
127       .(exists $self->{rownum_hack_count}
128          ? ' AS col'.$self->{rownum_hack_count}++
129          : '')
130      } @$fields);
131   } elsif ($ref eq 'HASH') {
132     foreach my $func (keys %$fields) {
133       return $self->_sqlcase($func)
134         .'( '.$self->_recurse_fields($fields->{$func}).' )';
135     }
136   }
137 }
138
139 sub _order_by {
140   my $self = shift;
141   my $ret = '';
142   my @extra;
143   if (ref $_[0] eq 'HASH') {
144     if (defined $_[0]->{group_by}) {
145       $ret = $self->_sqlcase(' group by ')
146                .$self->_recurse_fields($_[0]->{group_by});
147     }
148     if (defined $_[0]->{having}) {
149       my $frag;
150       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
151       push(@{$self->{having_bind}}, @extra);
152       $ret .= $self->_sqlcase(' having ').$frag;
153     }
154     if (defined $_[0]->{order_by}) {
155       $ret .= $self->_order_by($_[0]->{order_by});
156     }
157   } elsif (ref $_[0] eq 'SCALAR') {
158     $ret = $self->_sqlcase(' order by ').${ $_[0] };
159   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
160     my @order = @{+shift};
161     $ret = $self->_sqlcase(' order by ')
162           .join(', ', map {
163                         my $r = $self->_order_by($_, @_);
164                         $r =~ s/^ ?ORDER BY //i;
165                         $r;
166                       } @order);
167   } else {
168     $ret = $self->SUPER::_order_by(@_);
169   }
170   return $ret;
171 }
172
173 sub _order_directions {
174   my ($self, $order) = @_;
175   $order = $order->{order_by} if ref $order eq 'HASH';
176   return $self->SUPER::_order_directions($order);
177 }
178
179 sub _table {
180   my ($self, $from) = @_;
181   if (ref $from eq 'ARRAY') {
182     return $self->_recurse_from(@$from);
183   } elsif (ref $from eq 'HASH') {
184     return $self->_make_as($from);
185   } else {
186     return $from; # would love to quote here but _table ends up getting called
187                   # twice during an ->select without a limit clause due to
188                   # the way S::A::Limit->select works. should maybe consider
189                   # bypassing this and doing S::A::select($self, ...) in
190                   # our select method above. meantime, quoting shims have
191                   # been added to select/insert/update/delete here
192   }
193 }
194
195 sub _recurse_from {
196   my ($self, $from, @join) = @_;
197   my @sqlf;
198   push(@sqlf, $self->_make_as($from));
199   foreach my $j (@join) {
200     my ($to, $on) = @$j;
201
202     # check whether a join type exists
203     my $join_clause = '';
204     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
205     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
206       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
207     } else {
208       $join_clause = ' JOIN ';
209     }
210     push(@sqlf, $join_clause);
211
212     if (ref $to eq 'ARRAY') {
213       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
214     } else {
215       push(@sqlf, $self->_make_as($to));
216     }
217     push(@sqlf, ' ON ', $self->_join_condition($on));
218   }
219   return join('', @sqlf);
220 }
221
222 sub _make_as {
223   my ($self, $from) = @_;
224   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
225                      reverse each %{$self->_skip_options($from)});
226 }
227
228 sub _skip_options {
229   my ($self, $hash) = @_;
230   my $clean_hash = {};
231   $clean_hash->{$_} = $hash->{$_}
232     for grep {!/^-/} keys %$hash;
233   return $clean_hash;
234 }
235
236 sub _join_condition {
237   my ($self, $cond) = @_;
238   if (ref $cond eq 'HASH') {
239     my %j;
240     for (keys %$cond) {
241       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
242     };
243     return $self->_recurse_where(\%j);
244   } elsif (ref $cond eq 'ARRAY') {
245     return join(' OR ', map { $self->_join_condition($_) } @$cond);
246   } else {
247     die "Can't handle this yet!";
248   }
249 }
250
251 sub _quote {
252   my ($self, $label) = @_;
253   return '' unless defined $label;
254   return "*" if $label eq '*';
255   return $label unless $self->{quote_char};
256   if(ref $self->{quote_char} eq "ARRAY"){
257     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
258       if !defined $self->{name_sep};
259     my $sep = $self->{name_sep};
260     return join($self->{name_sep},
261         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
262        split(/\Q$sep\E/,$label));
263   }
264   return $self->SUPER::_quote($label);
265 }
266
267 sub limit_dialect {
268     my $self = shift;
269     $self->{limit_dialect} = shift if @_;
270     return $self->{limit_dialect};
271 }
272
273 sub quote_char {
274     my $self = shift;
275     $self->{quote_char} = shift if @_;
276     return $self->{quote_char};
277 }
278
279 sub name_sep {
280     my $self = shift;
281     $self->{name_sep} = shift if @_;
282     return $self->{name_sep};
283 }
284
285 } # End of BEGIN block
286
287 =head1 NAME
288
289 DBIx::Class::Storage::DBI - DBI storage handler
290
291 =head1 SYNOPSIS
292
293 =head1 DESCRIPTION
294
295 This class represents the connection to an RDBMS via L<DBI>.  See
296 L<DBIx::Class::Storage> for general information.  This pod only
297 documents DBI-specific methods and behaviors.
298
299 =head1 METHODS
300
301 =cut
302
303 sub new {
304   my $new = shift->next::method(@_);
305
306   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
307   $new->transaction_depth(0);
308   $new->_sql_maker_opts({});
309   $new->{_in_dbh_do} = 0;
310   $new->{_dbh_gen} = 0;
311
312   $new;
313 }
314
315 =head2 connect_info
316
317 The arguments of C<connect_info> are always a single array reference.
318
319 This is normally accessed via L<DBIx::Class::Schema/connection>, which
320 encapsulates its argument list in an arrayref before calling
321 C<connect_info> here.
322
323 The arrayref can either contain the same set of arguments one would
324 normally pass to L<DBI/connect>, or a lone code reference which returns
325 a connected database handle.
326
327 In either case, if the final argument in your connect_info happens
328 to be a hashref, C<connect_info> will look there for several
329 connection-specific options:
330
331 =over 4
332
333 =item on_connect_do
334
335 This can be set to an arrayref of literal sql statements, which will
336 be executed immediately after making the connection to the database
337 every time we [re-]connect.
338
339 =item limit_dialect 
340
341 Sets the limit dialect. This is useful for JDBC-bridge among others
342 where the remote SQL-dialect cannot be determined by the name of the
343 driver alone.
344
345 =item quote_char
346
347 Specifies what characters to use to quote table and column names. If 
348 you use this you will want to specify L<name_sep> as well.
349
350 quote_char expects either a single character, in which case is it is placed
351 on either side of the table/column, or an arrayref of length 2 in which case the
352 table/column name is placed between the elements.
353
354 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
355 use C<quote_char =E<gt> [qw/[ ]/]>.
356
357 =item name_sep
358
359 This only needs to be used in conjunction with L<quote_char>, and is used to 
360 specify the charecter that seperates elements (schemas, tables, columns) from 
361 each other. In most cases this is simply a C<.>.
362
363 =back
364
365 These options can be mixed in with your other L<DBI> connection attributes,
366 or placed in a seperate hashref after all other normal L<DBI> connection
367 arguments.
368
369 Every time C<connect_info> is invoked, any previous settings for
370 these options will be cleared before setting the new ones, regardless of
371 whether any options are specified in the new C<connect_info>.
372
373 Important note:  DBIC expects the returned database handle provided by 
374 a subref argument to have RaiseError set on it.  If it doesn't, things
375 might not work very well, YMMV.  If you don't use a subref, DBIC will
376 force this setting for you anyways.  Setting HandleError to anything
377 other than simple exception object wrapper might cause problems too.
378
379 Examples:
380
381   # Simple SQLite connection
382   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
383
384   # Connect via subref
385   ->connect_info([ sub { DBI->connect(...) } ]);
386
387   # A bit more complicated
388   ->connect_info(
389     [
390       'dbi:Pg:dbname=foo',
391       'postgres',
392       'my_pg_password',
393       { AutoCommit => 0 },
394       { quote_char => q{"}, name_sep => q{.} },
395     ]
396   );
397
398   # Equivalent to the previous example
399   ->connect_info(
400     [
401       'dbi:Pg:dbname=foo',
402       'postgres',
403       'my_pg_password',
404       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
405     ]
406   );
407
408   # Subref + DBIC-specific connection options
409   ->connect_info(
410     [
411       sub { DBI->connect(...) },
412       {
413           quote_char => q{`},
414           name_sep => q{@},
415           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
416       },
417     ]
418   );
419
420 =cut
421
422 sub connect_info {
423   my ($self, $info_arg) = @_;
424
425   return $self->_connect_info if !$info_arg;
426
427   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
428   #  the new set of options
429   $self->_sql_maker(undef);
430   $self->_sql_maker_opts({});
431
432   my $info = [ @$info_arg ]; # copy because we can alter it
433   my $last_info = $info->[-1];
434   if(ref $last_info eq 'HASH') {
435     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
436       $self->on_connect_do($on_connect_do);
437     }
438     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
439       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
440         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
441       }
442     }
443
444     # Get rid of any trailing empty hashref
445     pop(@$info) if !keys %$last_info;
446   }
447
448   $self->_connect_info($info);
449 }
450
451 =head2 on_connect_do
452
453 This method is deprecated in favor of setting via L</connect_info>.
454
455 =head2 dbh_do
456
457 Arguments: $subref, @extra_coderef_args?
458
459 Execute the given subref using the new exception-based connection management.
460
461 The first two arguments will be the storage object that C<dbh_do> was called
462 on and a database handle to use.  Any additional arguments will be passed
463 verbatim to the called subref as arguments 2 and onwards.
464
465 Using this (instead of $self->_dbh or $self->dbh) ensures correct
466 exception handling and reconnection (or failover in future subclasses).
467
468 Your subref should have no side-effects outside of the database, as
469 there is the potential for your subref to be partially double-executed
470 if the database connection was stale/dysfunctional.
471
472 Example:
473
474   my @stuff = $schema->storage->dbh_do(
475     sub {
476       my ($storage, $dbh, @cols) = @_;
477       my $cols = join(q{, }, @cols);
478       $dbh->selectrow_array("SELECT $cols FROM foo");
479     },
480     @column_list
481   );
482
483 =cut
484
485 sub dbh_do {
486   my $self = shift;
487   my $coderef = shift;
488
489   ref $coderef eq 'CODE' or $self->throw_exception
490     ('$coderef must be a CODE reference');
491
492   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
493   local $self->{_in_dbh_do} = 1;
494
495   my @result;
496   my $want_array = wantarray;
497
498   eval {
499     $self->_verify_pid if $self->_dbh;
500     $self->_populate_dbh if !$self->_dbh;
501     if($want_array) {
502         @result = $coderef->($self, $self->_dbh, @_);
503     }
504     elsif(defined $want_array) {
505         $result[0] = $coderef->($self, $self->_dbh, @_);
506     }
507     else {
508         $coderef->($self, $self->_dbh, @_);
509     }
510   };
511
512   my $exception = $@;
513   if(!$exception) { return $want_array ? @result : $result[0] }
514
515   $self->throw_exception($exception) if $self->connected;
516
517   # We were not connected - reconnect and retry, but let any
518   #  exception fall right through this time
519   $self->_populate_dbh;
520   $coderef->($self, $self->_dbh, @_);
521 }
522
523 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
524 # It also informs dbh_do to bypass itself while under the direction of txn_do,
525 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
526 sub txn_do {
527   my $self = shift;
528   my $coderef = shift;
529
530   ref $coderef eq 'CODE' or $self->throw_exception
531     ('$coderef must be a CODE reference');
532
533   local $self->{_in_dbh_do} = 1;
534
535   my @result;
536   my $want_array = wantarray;
537
538   my $tried = 0;
539   while(1) {
540     eval {
541       $self->_verify_pid if $self->_dbh;
542       $self->_populate_dbh if !$self->_dbh;
543
544       $self->txn_begin;
545       if($want_array) {
546           @result = $coderef->(@_);
547       }
548       elsif(defined $want_array) {
549           $result[0] = $coderef->(@_);
550       }
551       else {
552           $coderef->(@_);
553       }
554       $self->txn_commit;
555     };
556
557     my $exception = $@;
558     if(!$exception) { return $want_array ? @result : $result[0] }
559
560     if($tried++ > 0 || $self->connected) {
561       eval { $self->txn_rollback };
562       my $rollback_exception = $@;
563       if($rollback_exception) {
564         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
565         $self->throw_exception($exception)  # propagate nested rollback
566           if $rollback_exception =~ /$exception_class/;
567
568         $self->throw_exception(
569           "Transaction aborted: ${exception}. "
570           . "Rollback failed: ${rollback_exception}"
571         );
572       }
573       $self->throw_exception($exception)
574     }
575
576     # We were not connected, and was first try - reconnect and retry
577     # via the while loop
578     $self->_populate_dbh;
579   }
580 }
581
582 =head2 disconnect
583
584 Our C<disconnect> method also performs a rollback first if the
585 database is not in C<AutoCommit> mode.
586
587 =cut
588
589 sub disconnect {
590   my ($self) = @_;
591
592   if( $self->connected ) {
593     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
594     $self->_dbh->disconnect;
595     $self->_dbh(undef);
596     $self->{_dbh_gen}++;
597   }
598 }
599
600 sub connected {
601   my ($self) = @_;
602
603   if(my $dbh = $self->_dbh) {
604       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
605           $self->_dbh(undef);
606           $self->{_dbh_gen}++;
607           return;
608       }
609       else {
610           $self->_verify_pid;
611       }
612       return ($dbh->FETCH('Active') && $dbh->ping);
613   }
614
615   return 0;
616 }
617
618 # handle pid changes correctly
619 #  NOTE: assumes $self->_dbh is a valid $dbh
620 sub _verify_pid {
621   my ($self) = @_;
622
623   return if $self->_conn_pid == $$;
624
625   $self->_dbh->{InactiveDestroy} = 1;
626   $self->_dbh(undef);
627   $self->{_dbh_gen}++;
628
629   return;
630 }
631
632 sub ensure_connected {
633   my ($self) = @_;
634
635   unless ($self->connected) {
636     $self->_populate_dbh;
637   }
638 }
639
640 =head2 dbh
641
642 Returns the dbh - a data base handle of class L<DBI>.
643
644 =cut
645
646 sub dbh {
647   my ($self) = @_;
648
649   $self->ensure_connected;
650   return $self->_dbh;
651 }
652
653 sub _sql_maker_args {
654     my ($self) = @_;
655     
656     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
657 }
658
659 sub sql_maker {
660   my ($self) = @_;
661   unless ($self->_sql_maker) {
662     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
663   }
664   return $self->_sql_maker;
665 }
666
667 sub _populate_dbh {
668   my ($self) = @_;
669   my @info = @{$self->_connect_info || []};
670   $self->_dbh($self->_connect(@info));
671
672   if(ref $self eq 'DBIx::Class::Storage::DBI') {
673     my $driver = $self->_dbh->{Driver}->{Name};
674     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
675       bless $self, "DBIx::Class::Storage::DBI::${driver}";
676       $self->_rebless() if $self->can('_rebless');
677     }
678   }
679
680   # if on-connect sql statements are given execute them
681   foreach my $sql_statement (@{$self->on_connect_do || []}) {
682     $self->debugobj->query_start($sql_statement) if $self->debug();
683     $self->_dbh->do($sql_statement);
684     $self->debugobj->query_end($sql_statement) if $self->debug();
685   }
686
687   $self->_conn_pid($$);
688   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
689 }
690
691 sub _connect {
692   my ($self, @info) = @_;
693
694   $self->throw_exception("You failed to provide any connection info")
695       if !@info;
696
697   my ($old_connect_via, $dbh);
698
699   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
700       $old_connect_via = $DBI::connect_via;
701       $DBI::connect_via = 'connect';
702   }
703
704   eval {
705     if(ref $info[0] eq 'CODE') {
706        $dbh = &{$info[0]}
707     }
708     else {
709        $dbh = DBI->connect(@info);
710        $dbh->{RaiseError} = 1;
711        $dbh->{PrintError} = 0;
712        $dbh->{PrintWarn} = 0;
713     }
714   };
715
716   $DBI::connect_via = $old_connect_via if $old_connect_via;
717
718   if (!$dbh || $@) {
719     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
720   }
721
722   $dbh;
723 }
724
725 sub _dbh_txn_begin {
726   my ($self, $dbh) = @_;
727   if ($dbh->{AutoCommit}) {
728     $self->debugobj->txn_begin()
729       if ($self->debug);
730     $dbh->begin_work;
731   }
732 }
733
734 sub txn_begin {
735   my $self = shift;
736   $self->dbh_do($self->can('_dbh_txn_begin'))
737     if $self->{transaction_depth}++ == 0;
738 }
739
740 sub _dbh_txn_commit {
741   my ($self, $dbh) = @_;
742   if ($self->{transaction_depth} == 0) {
743     unless ($dbh->{AutoCommit}) {
744       $self->debugobj->txn_commit()
745         if ($self->debug);
746       $dbh->commit;
747     }
748   }
749   else {
750     if (--$self->{transaction_depth} == 0) {
751       $self->debugobj->txn_commit()
752         if ($self->debug);
753       $dbh->commit;
754     }
755   }
756 }
757
758 sub txn_commit {
759   my $self = shift;
760   $self->dbh_do($self->can('_dbh_txn_commit'));
761 }
762
763 sub _dbh_txn_rollback {
764   my ($self, $dbh) = @_;
765   if ($self->{transaction_depth} == 0) {
766     unless ($dbh->{AutoCommit}) {
767       $self->debugobj->txn_rollback()
768         if ($self->debug);
769       $dbh->rollback;
770     }
771   }
772   else {
773     if (--$self->{transaction_depth} == 0) {
774       $self->debugobj->txn_rollback()
775         if ($self->debug);
776       $dbh->rollback;
777     }
778     else {
779       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
780     }
781   }
782 }
783
784 sub txn_rollback {
785   my $self = shift;
786
787   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
788   if ($@) {
789     my $error = $@;
790     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
791     $error =~ /$exception_class/ and $self->throw_exception($error);
792     $self->{transaction_depth} = 0;          # ensure that a failed rollback
793     $self->throw_exception($error);          # resets the transaction depth
794   }
795 }
796
797 # This used to be the top-half of _execute.  It was split out to make it
798 #  easier to override in NoBindVars without duping the rest.  It takes up
799 #  all of _execute's args, and emits $sql, @bind.
800 sub _prep_for_execute {
801   my ($self, $op, $extra_bind, $ident, @args) = @_;
802
803   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
804   unshift(@bind, @$extra_bind) if $extra_bind;
805   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
806
807   return ($sql, @bind);
808 }
809
810 sub _execute {
811   my $self = shift;
812
813   my ($sql, @bind) = $self->_prep_for_execute(@_);
814
815   if ($self->debug) {
816       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
817       $self->debugobj->query_start($sql, @debug_bind);
818   }
819
820   my $sth = $self->sth($sql);
821
822   my $rv;
823   if ($sth) {
824     my $time = time();
825     $rv = eval { $sth->execute(@bind) };
826
827     if ($@ || !$rv) {
828       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
829     }
830   } else {
831     $self->throw_exception("'$sql' did not generate a statement.");
832   }
833   if ($self->debug) {
834       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind; 
835       $self->debugobj->query_end($sql, @debug_bind);
836   }
837   return (wantarray ? ($rv, $sth, @bind) : $rv);
838 }
839
840 sub insert {
841   my ($self, $ident, $to_insert) = @_;
842   $self->throw_exception(
843     "Couldn't insert ".join(', ',
844       map "$_ => $to_insert->{$_}", keys %$to_insert
845     )." into ${ident}"
846   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
847   return $to_insert;
848 }
849
850 sub update {
851   return shift->_execute('update' => [], @_);
852 }
853
854 sub delete {
855   return shift->_execute('delete' => [], @_);
856 }
857
858 sub _select {
859   my ($self, $ident, $select, $condition, $attrs) = @_;
860   my $order = $attrs->{order_by};
861   if (ref $condition eq 'SCALAR') {
862     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
863   }
864   if (exists $attrs->{group_by} || $attrs->{having}) {
865     $order = {
866       group_by => $attrs->{group_by},
867       having => $attrs->{having},
868       ($order ? (order_by => $order) : ())
869     };
870   }
871   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
872   if ($attrs->{software_limit} ||
873       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
874         $attrs->{software_limit} = 1;
875   } else {
876     $self->throw_exception("rows attribute must be positive if present")
877       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
878     push @args, $attrs->{rows}, $attrs->{offset};
879   }
880   return $self->_execute(@args);
881 }
882
883 =head2 select
884
885 =over 4
886
887 =item Arguments: $ident, $select, $condition, $attrs
888
889 =back
890
891 Handle a SQL select statement.
892
893 =cut
894
895 sub select {
896   my $self = shift;
897   my ($ident, $select, $condition, $attrs) = @_;
898   return $self->cursor->new($self, \@_, $attrs);
899 }
900
901 sub select_single {
902   my $self = shift;
903   my ($rv, $sth, @bind) = $self->_select(@_);
904   my @row = $sth->fetchrow_array;
905   # Need to call finish() to work round broken DBDs
906   $sth->finish();
907   return @row;
908 }
909
910 =head2 sth
911
912 =over 4
913
914 =item Arguments: $sql
915
916 =back
917
918 Returns a L<DBI> sth (statement handle) for the supplied SQL.
919
920 =cut
921
922 sub _dbh_sth {
923   my ($self, $dbh, $sql) = @_;
924   # 3 is the if_active parameter which avoids active sth re-use
925   $dbh->prepare_cached($sql, {}, 3) or
926     $self->throw_exception(
927       'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
928     );
929 }
930
931 sub sth {
932   my ($self, $sql) = @_;
933   $self->dbh_do($self->can('_dbh_sth'), $sql);
934 }
935
936 sub _dbh_columns_info_for {
937   my ($self, $dbh, $table) = @_;
938
939   if ($dbh->can('column_info')) {
940     my %result;
941     eval {
942       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
943       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
944       $sth->execute();
945       while ( my $info = $sth->fetchrow_hashref() ){
946         my %column_info;
947         $column_info{data_type}   = $info->{TYPE_NAME};
948         $column_info{size}      = $info->{COLUMN_SIZE};
949         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
950         $column_info{default_value} = $info->{COLUMN_DEF};
951         my $col_name = $info->{COLUMN_NAME};
952         $col_name =~ s/^\"(.*)\"$/$1/;
953
954         $result{$col_name} = \%column_info;
955       }
956     };
957     return \%result if !$@ && scalar keys %result;
958   }
959
960   my %result;
961   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
962   $sth->execute;
963   my @columns = @{$sth->{NAME_lc}};
964   for my $i ( 0 .. $#columns ){
965     my %column_info;
966     my $type_num = $sth->{TYPE}->[$i];
967     my $type_name;
968     if(defined $type_num && $dbh->can('type_info')) {
969       my $type_info = $dbh->type_info($type_num);
970       $type_name = $type_info->{TYPE_NAME} if $type_info;
971     }
972     $column_info{data_type} = $type_name ? $type_name : $type_num;
973     $column_info{size} = $sth->{PRECISION}->[$i];
974     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
975
976     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
977       $column_info{data_type} = $1;
978       $column_info{size}    = $2;
979     }
980
981     $result{$columns[$i]} = \%column_info;
982   }
983
984   return \%result;
985 }
986
987 sub columns_info_for {
988   my ($self, $table) = @_;
989   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
990 }
991
992 =head2 last_insert_id
993
994 Return the row id of the last insert.
995
996 =cut
997
998 sub _dbh_last_insert_id {
999     my ($self, $dbh, $source, $col) = @_;
1000     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1001     $dbh->func('last_insert_rowid');
1002 }
1003
1004 sub last_insert_id {
1005   my $self = shift;
1006   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1007 }
1008
1009 =head2 sqlt_type
1010
1011 Returns the database driver name.
1012
1013 =cut
1014
1015 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1016
1017 =head2 create_ddl_dir (EXPERIMENTAL)
1018
1019 =over 4
1020
1021 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1022
1023 =back
1024
1025 Creates a SQL file based on the Schema, for each of the specified
1026 database types, in the given directory.
1027
1028 Note that this feature is currently EXPERIMENTAL and may not work correctly
1029 across all databases, or fully handle complex relationships.
1030
1031 =cut
1032
1033 sub create_ddl_dir
1034 {
1035   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1036
1037   if(!$dir || !-d $dir)
1038   {
1039     warn "No directory given, using ./\n";
1040     $dir = "./";
1041   }
1042   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1043   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1044   $version ||= $schema->VERSION || '1.x';
1045   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1046
1047   eval "use SQL::Translator";
1048   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1049
1050   my $sqlt = SQL::Translator->new($sqltargs);
1051   foreach my $db (@$databases)
1052   {
1053     $sqlt->reset();
1054     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1055 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1056     $sqlt->data($schema);
1057     $sqlt->producer($db);
1058
1059     my $file;
1060     my $filename = $schema->ddl_filename($db, $dir, $version);
1061     if(-e $filename)
1062     {
1063       $self->throw_exception("$filename already exists, skipping $db");
1064       next;
1065     }
1066     open($file, ">$filename") 
1067       or $self->throw_exception("Can't open $filename for writing ($!)");
1068     my $output = $sqlt->translate;
1069 #use Data::Dumper;
1070 #    print join(":", keys %{$schema->source_registrations});
1071 #    print Dumper($sqlt->schema);
1072     if(!$output)
1073     {
1074       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1075       next;
1076     }
1077     print $file $output;
1078     close($file);
1079   }
1080
1081 }
1082
1083 =head2 deployment_statements
1084
1085 =over 4
1086
1087 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1088
1089 =back
1090
1091 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1092 The database driver name is given by C<$type>, though the value from
1093 L</sqlt_type> is used if it is not specified.
1094
1095 C<$directory> is used to return statements from files in a previously created
1096 L</create_ddl_dir> directory and is optional. The filenames are constructed
1097 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1098
1099 If no C<$directory> is specified then the statements are constructed on the
1100 fly using L<SQL::Translator> and C<$version> is ignored.
1101
1102 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1103
1104 =cut
1105
1106 sub deployment_statements {
1107   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1108   # Need to be connected to get the correct sqlt_type
1109   $self->ensure_connected() unless $type;
1110   $type ||= $self->sqlt_type;
1111   $version ||= $schema->VERSION || '1.x';
1112   $dir ||= './';
1113   eval "use SQL::Translator";
1114   if(!$@)
1115   {
1116     eval "use SQL::Translator::Parser::DBIx::Class;";
1117     $self->throw_exception($@) if $@;
1118     eval "use SQL::Translator::Producer::${type};";
1119     $self->throw_exception($@) if $@;
1120     my $tr = SQL::Translator->new(%$sqltargs);
1121     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1122     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1123   }
1124
1125   my $filename = $schema->ddl_filename($type, $dir, $version);
1126   if(!-f $filename)
1127   {
1128 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1129       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1130       return;
1131   }
1132   my $file;
1133   open($file, "<$filename") 
1134       or $self->throw_exception("Can't open $filename ($!)");
1135   my @rows = <$file>;
1136   close($file);
1137
1138   return join('', @rows);
1139   
1140 }
1141
1142 sub deploy {
1143   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1144   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1145     for ( split(";\n", $statement)) {
1146       next if($_ =~ /^--/);
1147       next if(!$_);
1148 #      next if($_ =~ /^DROP/m);
1149       next if($_ =~ /^BEGIN TRANSACTION/m);
1150       next if($_ =~ /^COMMIT/m);
1151       next if $_ =~ /^\s+$/; # skip whitespace only
1152       $self->debugobj->query_start($_) if $self->debug;
1153       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1154       $self->debugobj->query_end($_) if $self->debug;
1155     }
1156   }
1157 }
1158
1159 =head2 datetime_parser
1160
1161 Returns the datetime parser class
1162
1163 =cut
1164
1165 sub datetime_parser {
1166   my $self = shift;
1167   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1168 }
1169
1170 =head2 datetime_parser_type
1171
1172 Defines (returns) the datetime parser class - currently hardwired to
1173 L<DateTime::Format::MySQL>
1174
1175 =cut
1176
1177 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1178
1179 =head2 build_datetime_parser
1180
1181 See L</datetime_parser>
1182
1183 =cut
1184
1185 sub build_datetime_parser {
1186   my $self = shift;
1187   my $type = $self->datetime_parser_type(@_);
1188   eval "use ${type}";
1189   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1190   return $type;
1191 }
1192
1193 sub DESTROY {
1194   my $self = shift;
1195   return if !$self->_dbh;
1196   $self->_verify_pid;
1197   $self->_dbh(undef);
1198 }
1199
1200 1;
1201
1202 =head1 SQL METHODS
1203
1204 The module defines a set of methods within the DBIC::SQL::Abstract
1205 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1206 SQL query functions.
1207
1208 The following methods are extended:-
1209
1210 =over 4
1211
1212 =item delete
1213
1214 =item insert
1215
1216 =item select
1217
1218 =item update
1219
1220 =item limit_dialect
1221
1222 See L</connect_info> for details.
1223 For setting, this method is deprecated in favor of L</connect_info>.
1224
1225 =item quote_char
1226
1227 See L</connect_info> for details.
1228 For setting, this method is deprecated in favor of L</connect_info>.
1229
1230 =item name_sep
1231
1232 See L</connect_info> for details.
1233 For setting, this method is deprecated in favor of L</connect_info>.
1234
1235 =back
1236
1237 =head1 AUTHORS
1238
1239 Matt S. Trout <mst@shadowcatsystems.co.uk>
1240
1241 Andy Grundman <andy@hybridized.org>
1242
1243 =head1 LICENSE
1244
1245 You may distribute this code under the same terms as Perl itself.
1246
1247 =cut