5b2cfaa8f1ad720c912f0124fa4a24edd2d68fcc
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        disable_sth_caching cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 use Scalar::Util 'blessed';
62 sub _find_syntax {
63   my ($self, $syntax) = @_;
64   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
65 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
66   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
67     return 'RowNumberOver';
68   }
69
70   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
71 }
72
73 sub select {
74   my ($self, $table, $fields, $where, $order, @rest) = @_;
75   $table = $self->_quote($table) unless ref($table);
76   local $self->{rownum_hack_count} = 1
77     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
78   @rest = (-1) unless defined $rest[0];
79   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
80     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
81   local $self->{having_bind} = [];
82   my ($sql, @ret) = $self->SUPER::select(
83     $table, $self->_recurse_fields($fields), $where, $order, @rest
84   );
85   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
86 }
87
88 sub insert {
89   my $self = shift;
90   my $table = shift;
91   $table = $self->_quote($table) unless ref($table);
92   $self->SUPER::insert($table, @_);
93 }
94
95 sub update {
96   my $self = shift;
97   my $table = shift;
98   $table = $self->_quote($table) unless ref($table);
99   $self->SUPER::update($table, @_);
100 }
101
102 sub delete {
103   my $self = shift;
104   my $table = shift;
105   $table = $self->_quote($table) unless ref($table);
106   $self->SUPER::delete($table, @_);
107 }
108
109 sub _emulate_limit {
110   my $self = shift;
111   if ($_[3] == -1) {
112     return $_[1].$self->_order_by($_[2]);
113   } else {
114     return $self->SUPER::_emulate_limit(@_);
115   }
116 }
117
118 sub _recurse_fields {
119   my ($self, $fields) = @_;
120   my $ref = ref $fields;
121   return $self->_quote($fields) unless $ref;
122   return $$fields if $ref eq 'SCALAR';
123
124   if ($ref eq 'ARRAY') {
125     return join(', ', map {
126       $self->_recurse_fields($_)
127       .(exists $self->{rownum_hack_count}
128          ? ' AS col'.$self->{rownum_hack_count}++
129          : '')
130      } @$fields);
131   } elsif ($ref eq 'HASH') {
132     foreach my $func (keys %$fields) {
133       return $self->_sqlcase($func)
134         .'( '.$self->_recurse_fields($fields->{$func}).' )';
135     }
136   }
137 }
138
139 sub _order_by {
140   my $self = shift;
141   my $ret = '';
142   my @extra;
143   if (ref $_[0] eq 'HASH') {
144     if (defined $_[0]->{group_by}) {
145       $ret = $self->_sqlcase(' group by ')
146                .$self->_recurse_fields($_[0]->{group_by});
147     }
148     if (defined $_[0]->{having}) {
149       my $frag;
150       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
151       push(@{$self->{having_bind}}, @extra);
152       $ret .= $self->_sqlcase(' having ').$frag;
153     }
154     if (defined $_[0]->{order_by}) {
155       $ret .= $self->_order_by($_[0]->{order_by});
156     }
157   } elsif (ref $_[0] eq 'SCALAR') {
158     $ret = $self->_sqlcase(' order by ').${ $_[0] };
159   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
160     my @order = @{+shift};
161     $ret = $self->_sqlcase(' order by ')
162           .join(', ', map {
163                         my $r = $self->_order_by($_, @_);
164                         $r =~ s/^ ?ORDER BY //i;
165                         $r;
166                       } @order);
167   } else {
168     $ret = $self->SUPER::_order_by(@_);
169   }
170   return $ret;
171 }
172
173 sub _order_directions {
174   my ($self, $order) = @_;
175   $order = $order->{order_by} if ref $order eq 'HASH';
176   return $self->SUPER::_order_directions($order);
177 }
178
179 sub _table {
180   my ($self, $from) = @_;
181   if (ref $from eq 'ARRAY') {
182     return $self->_recurse_from(@$from);
183   } elsif (ref $from eq 'HASH') {
184     return $self->_make_as($from);
185   } else {
186     return $from; # would love to quote here but _table ends up getting called
187                   # twice during an ->select without a limit clause due to
188                   # the way S::A::Limit->select works. should maybe consider
189                   # bypassing this and doing S::A::select($self, ...) in
190                   # our select method above. meantime, quoting shims have
191                   # been added to select/insert/update/delete here
192   }
193 }
194
195 sub _recurse_from {
196   my ($self, $from, @join) = @_;
197   my @sqlf;
198   push(@sqlf, $self->_make_as($from));
199   foreach my $j (@join) {
200     my ($to, $on) = @$j;
201
202     # check whether a join type exists
203     my $join_clause = '';
204     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
205     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
206       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
207     } else {
208       $join_clause = ' JOIN ';
209     }
210     push(@sqlf, $join_clause);
211
212     if (ref $to eq 'ARRAY') {
213       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
214     } else {
215       push(@sqlf, $self->_make_as($to));
216     }
217     push(@sqlf, ' ON ', $self->_join_condition($on));
218   }
219   return join('', @sqlf);
220 }
221
222 sub _make_as {
223   my ($self, $from) = @_;
224   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
225                      reverse each %{$self->_skip_options($from)});
226 }
227
228 sub _skip_options {
229   my ($self, $hash) = @_;
230   my $clean_hash = {};
231   $clean_hash->{$_} = $hash->{$_}
232     for grep {!/^-/} keys %$hash;
233   return $clean_hash;
234 }
235
236 sub _join_condition {
237   my ($self, $cond) = @_;
238   if (ref $cond eq 'HASH') {
239     my %j;
240     for (keys %$cond) {
241       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
242     };
243     return $self->_recurse_where(\%j);
244   } elsif (ref $cond eq 'ARRAY') {
245     return join(' OR ', map { $self->_join_condition($_) } @$cond);
246   } else {
247     die "Can't handle this yet!";
248   }
249 }
250
251 sub _quote {
252   my ($self, $label) = @_;
253   return '' unless defined $label;
254   return "*" if $label eq '*';
255   return $label unless $self->{quote_char};
256   if(ref $self->{quote_char} eq "ARRAY"){
257     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
258       if !defined $self->{name_sep};
259     my $sep = $self->{name_sep};
260     return join($self->{name_sep},
261         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
262        split(/\Q$sep\E/,$label));
263   }
264   return $self->SUPER::_quote($label);
265 }
266
267 sub limit_dialect {
268     my $self = shift;
269     $self->{limit_dialect} = shift if @_;
270     return $self->{limit_dialect};
271 }
272
273 sub quote_char {
274     my $self = shift;
275     $self->{quote_char} = shift if @_;
276     return $self->{quote_char};
277 }
278
279 sub name_sep {
280     my $self = shift;
281     $self->{name_sep} = shift if @_;
282     return $self->{name_sep};
283 }
284
285 } # End of BEGIN block
286
287 =head1 NAME
288
289 DBIx::Class::Storage::DBI - DBI storage handler
290
291 =head1 SYNOPSIS
292
293 =head1 DESCRIPTION
294
295 This class represents the connection to an RDBMS via L<DBI>.  See
296 L<DBIx::Class::Storage> for general information.  This pod only
297 documents DBI-specific methods and behaviors.
298
299 =head1 METHODS
300
301 =cut
302
303 sub new {
304   my $new = shift->next::method(@_);
305
306   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
307   $new->transaction_depth(0);
308   $new->_sql_maker_opts({});
309   $new->{_in_dbh_do} = 0;
310   $new->{_dbh_gen} = 0;
311
312   $new;
313 }
314
315 =head2 connect_info
316
317 The arguments of C<connect_info> are always a single array reference.
318
319 This is normally accessed via L<DBIx::Class::Schema/connection>, which
320 encapsulates its argument list in an arrayref before calling
321 C<connect_info> here.
322
323 The arrayref can either contain the same set of arguments one would
324 normally pass to L<DBI/connect>, or a lone code reference which returns
325 a connected database handle.
326
327 In either case, if the final argument in your connect_info happens
328 to be a hashref, C<connect_info> will look there for several
329 connection-specific options:
330
331 =over 4
332
333 =item on_connect_do
334
335 This can be set to an arrayref of literal sql statements, which will
336 be executed immediately after making the connection to the database
337 every time we [re-]connect.
338
339 =item disable_sth_caching
340
341 If set to a true value, this option will disable the caching of
342 statement handles via L<DBI/prepare_cached>.
343
344 =item limit_dialect 
345
346 Sets the limit dialect. This is useful for JDBC-bridge among others
347 where the remote SQL-dialect cannot be determined by the name of the
348 driver alone.
349
350 =item quote_char
351
352 Specifies what characters to use to quote table and column names. If 
353 you use this you will want to specify L<name_sep> as well.
354
355 quote_char expects either a single character, in which case is it is placed
356 on either side of the table/column, or an arrayref of length 2 in which case the
357 table/column name is placed between the elements.
358
359 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
360 use C<quote_char =E<gt> [qw/[ ]/]>.
361
362 =item name_sep
363
364 This only needs to be used in conjunction with L<quote_char>, and is used to 
365 specify the charecter that seperates elements (schemas, tables, columns) from 
366 each other. In most cases this is simply a C<.>.
367
368 =back
369
370 These options can be mixed in with your other L<DBI> connection attributes,
371 or placed in a seperate hashref after all other normal L<DBI> connection
372 arguments.
373
374 Every time C<connect_info> is invoked, any previous settings for
375 these options will be cleared before setting the new ones, regardless of
376 whether any options are specified in the new C<connect_info>.
377
378 Important note:  DBIC expects the returned database handle provided by 
379 a subref argument to have RaiseError set on it.  If it doesn't, things
380 might not work very well, YMMV.  If you don't use a subref, DBIC will
381 force this setting for you anyways.  Setting HandleError to anything
382 other than simple exception object wrapper might cause problems too.
383
384 Examples:
385
386   # Simple SQLite connection
387   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
388
389   # Connect via subref
390   ->connect_info([ sub { DBI->connect(...) } ]);
391
392   # A bit more complicated
393   ->connect_info(
394     [
395       'dbi:Pg:dbname=foo',
396       'postgres',
397       'my_pg_password',
398       { AutoCommit => 0 },
399       { quote_char => q{"}, name_sep => q{.} },
400     ]
401   );
402
403   # Equivalent to the previous example
404   ->connect_info(
405     [
406       'dbi:Pg:dbname=foo',
407       'postgres',
408       'my_pg_password',
409       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
410     ]
411   );
412
413   # Subref + DBIC-specific connection options
414   ->connect_info(
415     [
416       sub { DBI->connect(...) },
417       {
418           quote_char => q{`},
419           name_sep => q{@},
420           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
421           disable_sth_caching => 1,
422       },
423     ]
424   );
425
426 =cut
427
428 sub connect_info {
429   my ($self, $info_arg) = @_;
430
431   return $self->_connect_info if !$info_arg;
432
433   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
434   #  the new set of options
435   $self->_sql_maker(undef);
436   $self->_sql_maker_opts({});
437
438   my $info = [ @$info_arg ]; # copy because we can alter it
439   my $last_info = $info->[-1];
440   if(ref $last_info eq 'HASH') {
441     for my $storage_opt (qw/on_connect_do disable_sth_caching/) {
442       if(my $value = delete $last_info->{$storage_opt}) {
443         $self->$storage_opt($value);
444       }
445     }
446     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
447       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
448         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
449       }
450     }
451
452     # Get rid of any trailing empty hashref
453     pop(@$info) if !keys %$last_info;
454   }
455
456   $self->_connect_info($info);
457 }
458
459 =head2 on_connect_do
460
461 This method is deprecated in favor of setting via L</connect_info>.
462
463 =head2 dbh_do
464
465 Arguments: $subref, @extra_coderef_args?
466
467 Execute the given subref using the new exception-based connection management.
468
469 The first two arguments will be the storage object that C<dbh_do> was called
470 on and a database handle to use.  Any additional arguments will be passed
471 verbatim to the called subref as arguments 2 and onwards.
472
473 Using this (instead of $self->_dbh or $self->dbh) ensures correct
474 exception handling and reconnection (or failover in future subclasses).
475
476 Your subref should have no side-effects outside of the database, as
477 there is the potential for your subref to be partially double-executed
478 if the database connection was stale/dysfunctional.
479
480 Example:
481
482   my @stuff = $schema->storage->dbh_do(
483     sub {
484       my ($storage, $dbh, @cols) = @_;
485       my $cols = join(q{, }, @cols);
486       $dbh->selectrow_array("SELECT $cols FROM foo");
487     },
488     @column_list
489   );
490
491 =cut
492
493 sub dbh_do {
494   my $self = shift;
495   my $coderef = shift;
496
497   ref $coderef eq 'CODE' or $self->throw_exception
498     ('$coderef must be a CODE reference');
499
500   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
501   local $self->{_in_dbh_do} = 1;
502
503   my @result;
504   my $want_array = wantarray;
505
506   eval {
507     $self->_verify_pid if $self->_dbh;
508     $self->_populate_dbh if !$self->_dbh;
509     if($want_array) {
510         @result = $coderef->($self, $self->_dbh, @_);
511     }
512     elsif(defined $want_array) {
513         $result[0] = $coderef->($self, $self->_dbh, @_);
514     }
515     else {
516         $coderef->($self, $self->_dbh, @_);
517     }
518   };
519
520   my $exception = $@;
521   if(!$exception) { return $want_array ? @result : $result[0] }
522
523   $self->throw_exception($exception) if $self->connected;
524
525   # We were not connected - reconnect and retry, but let any
526   #  exception fall right through this time
527   $self->_populate_dbh;
528   $coderef->($self, $self->_dbh, @_);
529 }
530
531 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
532 # It also informs dbh_do to bypass itself while under the direction of txn_do,
533 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
534 sub txn_do {
535   my $self = shift;
536   my $coderef = shift;
537
538   ref $coderef eq 'CODE' or $self->throw_exception
539     ('$coderef must be a CODE reference');
540
541   local $self->{_in_dbh_do} = 1;
542
543   my @result;
544   my $want_array = wantarray;
545
546   my $tried = 0;
547   while(1) {
548     eval {
549       $self->_verify_pid if $self->_dbh;
550       $self->_populate_dbh if !$self->_dbh;
551
552       $self->txn_begin;
553       if($want_array) {
554           @result = $coderef->(@_);
555       }
556       elsif(defined $want_array) {
557           $result[0] = $coderef->(@_);
558       }
559       else {
560           $coderef->(@_);
561       }
562       $self->txn_commit;
563     };
564
565     my $exception = $@;
566     if(!$exception) { return $want_array ? @result : $result[0] }
567
568     if($tried++ > 0 || $self->connected) {
569       eval { $self->txn_rollback };
570       my $rollback_exception = $@;
571       if($rollback_exception) {
572         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
573         $self->throw_exception($exception)  # propagate nested rollback
574           if $rollback_exception =~ /$exception_class/;
575
576         $self->throw_exception(
577           "Transaction aborted: ${exception}. "
578           . "Rollback failed: ${rollback_exception}"
579         );
580       }
581       $self->throw_exception($exception)
582     }
583
584     # We were not connected, and was first try - reconnect and retry
585     # via the while loop
586     $self->_populate_dbh;
587   }
588 }
589
590 =head2 disconnect
591
592 Our C<disconnect> method also performs a rollback first if the
593 database is not in C<AutoCommit> mode.
594
595 =cut
596
597 sub disconnect {
598   my ($self) = @_;
599
600   if( $self->connected ) {
601     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
602     $self->_dbh->disconnect;
603     $self->_dbh(undef);
604     $self->{_dbh_gen}++;
605   }
606 }
607
608 sub connected {
609   my ($self) = @_;
610
611   if(my $dbh = $self->_dbh) {
612       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
613           $self->_dbh(undef);
614           $self->{_dbh_gen}++;
615           return;
616       }
617       else {
618           $self->_verify_pid;
619       }
620       return ($dbh->FETCH('Active') && $dbh->ping);
621   }
622
623   return 0;
624 }
625
626 # handle pid changes correctly
627 #  NOTE: assumes $self->_dbh is a valid $dbh
628 sub _verify_pid {
629   my ($self) = @_;
630
631   return if $self->_conn_pid == $$;
632
633   $self->_dbh->{InactiveDestroy} = 1;
634   $self->_dbh(undef);
635   $self->{_dbh_gen}++;
636
637   return;
638 }
639
640 sub ensure_connected {
641   my ($self) = @_;
642
643   unless ($self->connected) {
644     $self->_populate_dbh;
645   }
646 }
647
648 =head2 dbh
649
650 Returns the dbh - a data base handle of class L<DBI>.
651
652 =cut
653
654 sub dbh {
655   my ($self) = @_;
656
657   $self->ensure_connected;
658   return $self->_dbh;
659 }
660
661 sub _sql_maker_args {
662     my ($self) = @_;
663     
664     return ( bindtype=>'columns', limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
665 }
666
667 sub sql_maker {
668   my ($self) = @_;
669   unless ($self->_sql_maker) {
670     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
671   }
672   return $self->_sql_maker;
673 }
674
675 sub _populate_dbh {
676   my ($self) = @_;
677   my @info = @{$self->_connect_info || []};
678   $self->_dbh($self->_connect(@info));
679
680   if(ref $self eq 'DBIx::Class::Storage::DBI') {
681     my $driver = $self->_dbh->{Driver}->{Name};
682     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
683       bless $self, "DBIx::Class::Storage::DBI::${driver}";
684       $self->_rebless() if $self->can('_rebless');
685     }
686   }
687
688   # if on-connect sql statements are given execute them
689   foreach my $sql_statement (@{$self->on_connect_do || []}) {
690     $self->debugobj->query_start($sql_statement) if $self->debug();
691     $self->_dbh->do($sql_statement);
692     $self->debugobj->query_end($sql_statement) if $self->debug();
693   }
694
695   $self->_conn_pid($$);
696   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
697 }
698
699 sub _connect {
700   my ($self, @info) = @_;
701
702   $self->throw_exception("You failed to provide any connection info")
703       if !@info;
704
705   my ($old_connect_via, $dbh);
706
707   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
708       $old_connect_via = $DBI::connect_via;
709       $DBI::connect_via = 'connect';
710   }
711
712   eval {
713     if(ref $info[0] eq 'CODE') {
714        $dbh = &{$info[0]}
715     }
716     else {
717        $dbh = DBI->connect(@info);
718        $dbh->{RaiseError} = 1;
719        $dbh->{PrintError} = 0;
720        $dbh->{PrintWarn} = 0;
721     }
722   };
723
724   $DBI::connect_via = $old_connect_via if $old_connect_via;
725
726   if (!$dbh || $@) {
727     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
728   }
729
730   $dbh;
731 }
732
733 sub _dbh_txn_begin {
734   my ($self, $dbh) = @_;
735   if ($dbh->{AutoCommit}) {
736     $self->debugobj->txn_begin()
737       if ($self->debug);
738     $dbh->begin_work;
739   }
740 }
741
742 sub txn_begin {
743   my $self = shift;
744   $self->dbh_do($self->can('_dbh_txn_begin'))
745     if $self->{transaction_depth}++ == 0;
746 }
747
748 sub _dbh_txn_commit {
749   my ($self, $dbh) = @_;
750   if ($self->{transaction_depth} == 0) {
751     unless ($dbh->{AutoCommit}) {
752       $self->debugobj->txn_commit()
753         if ($self->debug);
754       $dbh->commit;
755     }
756   }
757   else {
758     if (--$self->{transaction_depth} == 0) {
759       $self->debugobj->txn_commit()
760         if ($self->debug);
761       $dbh->commit;
762     }
763   }
764 }
765
766 sub txn_commit {
767   my $self = shift;
768   $self->dbh_do($self->can('_dbh_txn_commit'));
769 }
770
771 sub _dbh_txn_rollback {
772   my ($self, $dbh) = @_;
773   if ($self->{transaction_depth} == 0) {
774     unless ($dbh->{AutoCommit}) {
775       $self->debugobj->txn_rollback()
776         if ($self->debug);
777       $dbh->rollback;
778     }
779   }
780   else {
781     if (--$self->{transaction_depth} == 0) {
782       $self->debugobj->txn_rollback()
783         if ($self->debug);
784       $dbh->rollback;
785     }
786     else {
787       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
788     }
789   }
790 }
791
792 sub txn_rollback {
793   my $self = shift;
794
795   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
796   if ($@) {
797     my $error = $@;
798     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
799     $error =~ /$exception_class/ and $self->throw_exception($error);
800     $self->{transaction_depth} = 0;          # ensure that a failed rollback
801     $self->throw_exception($error);          # resets the transaction depth
802   }
803 }
804
805 # This used to be the top-half of _execute.  It was split out to make it
806 #  easier to override in NoBindVars without duping the rest.  It takes up
807 #  all of _execute's args, and emits $sql, @bind.
808 sub _prep_for_execute {
809   my ($self, $op, $extra_bind, $ident, @args) = @_;
810
811   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
812   unshift(@bind, @$extra_bind) if $extra_bind;
813   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
814
815   return ($sql, @bind);
816 }
817
818 sub _execute {
819   my ($self, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
820   
821   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
822   unshift(@bind, @$extra_bind) if $extra_bind;
823   if ($self->debug) {
824       my @debug_bind = map { defined ($_ && $_->[1]) ? qq{'$_->[1]'} : q{'NULL'} } @bind;
825       $self->debugobj->query_start($sql, @debug_bind);
826   }
827   my $sth = eval { $self->sth($sql,$op) };
828
829   if (!$sth || $@) {
830     $self->throw_exception(
831       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
832     );
833   }
834
835   my $rv;
836   if ($sth) {
837     my $time = time();
838         
839     $rv = eval {
840         
841       my $placeholder_index = 1; 
842
843       foreach my $bound (@bind) {
844
845         my $attributes = {};
846         my($column_name, $data) = @$bound;
847
848         if( $bind_attributes ) {
849           $attributes = $bind_attributes->{$column_name}
850           if defined $bind_attributes->{$column_name};
851         }                       
852
853         $data = ref $data ? ''.$data : $data; # stringify args
854
855         $sth->bind_param($placeholder_index, $data, $attributes);
856         $placeholder_index++;
857       }
858       $sth->execute;
859     };
860   
861     if ($@ || !$rv) {
862       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
863     }
864   } else {
865     $self->throw_exception("'$sql' did not generate a statement.");
866   }
867   if ($self->debug) {
868      my @debug_bind = map { defined ($_ && $_->[1]) ? qq{'$_->[1]'} : q{'NULL'} } @bind; 
869       $self->debugobj->query_end($sql, @debug_bind);
870   }
871   return (wantarray ? ($rv, $sth, @bind) : $rv);
872 }
873
874 sub insert {
875   my ($self, $source, $to_insert) = @_;
876   
877   my $ident = $source->from; 
878   my $bind_attributes;
879   foreach my $column ($source->columns) {
880   
881     my $data_type = $source->column_info($column)->{data_type} || '';
882     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
883          if $data_type;
884   } 
885   
886   $self->throw_exception(
887     "Couldn't insert ".join(', ',
888       map "$_ => $to_insert->{$_}", keys %$to_insert
889     )." into ${ident}"
890   ) unless ($self->_execute('insert' => [], $ident, $bind_attributes, $to_insert));
891   return $to_insert;
892 }
893
894 ## Still not quite perfect, and EXPERIMENTAL
895 ## Currently it is assumed that all values passed will be "normal", i.e. not 
896 ## scalar refs, or at least, all the same type as the first set, the statement is
897 ## only prepped once.
898 sub insert_bulk {
899   my ($self, $source, $cols, $data) = @_;
900   my %colvalues;
901   my $table = $source->from;
902   @colvalues{@$cols} = (0..$#$cols);
903   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
904   
905   if ($self->debug) {
906       my @debug_bind = map { defined $_->[1] ? qq{$_->[1]} : q{'NULL'} } @bind;
907       $self->debugobj->query_start($sql, @debug_bind);
908   }
909   my $sth = $self->sth($sql);
910
911 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
912
913   my $rv;
914   
915   ## This must be an arrayref, else nothing works!
916   
917   my $tuple_status = [];
918   
919   ##use Data::Dumper;
920   ##print STDERR Dumper( $data, $sql, [@bind] );
921         
922   if ($sth) {
923   
924     my $time = time();
925         
926     #$rv = eval {
927         #
928         #  $sth->execute_array({
929
930         #    ArrayTupleFetch => sub {
931
932         #      my $values = shift @$data;  
933     #      return if !$values; 
934     #      return [ @{$values}[@bind] ];
935         #    },
936           
937         #    ArrayTupleStatus => $tuple_status,
938         #  })
939     #};
940         
941         ## Get the bind_attributes, if any exist
942         
943     my $bind_attributes;
944     foreach my $column ($source->columns) {
945   
946       my $data_type = $source->column_info($column)->{data_type} || '';
947           
948       $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
949            if $data_type;
950     } 
951         
952         ## Bind the values and execute
953         
954         $rv = eval {
955         
956      my $placeholder_index = 1; 
957
958         foreach my $bound (@bind) {
959
960           my $attributes = {};
961           my ($column_name, $data_index) = @$bound;
962
963           if( $bind_attributes ) {
964             $attributes = $bind_attributes->{$column_name}
965             if defined $bind_attributes->{$column_name};
966           }
967                   
968                   my @data = map { $_->[$data_index] } @$data;
969
970           $sth->bind_param_array( $placeholder_index, [@data], $attributes );
971           $placeholder_index++;
972       }
973           $sth->execute_array( {ArrayTupleStatus => $tuple_status} );
974
975         };
976    
977 #print STDERR Dumper($tuple_status);
978 #print STDERR "RV: $rv\n";
979
980     if ($@ || !defined $rv) {
981       my $errors = '';
982       foreach my $tuple (@$tuple_status)
983       {
984           $errors .= "\n" . $tuple->[1] if(ref $tuple);
985       }
986       $self->throw_exception("Error executing '$sql': ".($@ || $errors));
987     }
988   } else {
989     $self->throw_exception("'$sql' did not generate a statement.");
990   }
991   if ($self->debug) {
992       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
993       $self->debugobj->query_end($sql, @debug_bind);
994   }
995   return (wantarray ? ($rv, $sth, @bind) : $rv);
996 }
997
998 sub update {
999   my $self = shift @_;
1000   my $source = shift @_;
1001   
1002   my $bind_attributes;
1003   foreach my $column ($source->columns) {
1004   
1005     my $data_type = $source->column_info($column)->{data_type} || '';
1006     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1007          if $data_type;
1008   }
1009
1010   my $ident = $source->from;
1011   return $self->_execute('update' => [], $ident, $bind_attributes, @_);
1012 }
1013
1014
1015 sub delete {
1016   my $self = shift @_;
1017   my $source = shift @_;
1018   
1019   my $bind_attrs = {}; ## If ever it's needed...
1020   my $ident = $source->from;
1021   
1022   return $self->_execute('delete' => [], $ident, $bind_attrs, @_);
1023 }
1024
1025 sub _select {
1026   my ($self, $ident, $select, $condition, $attrs) = @_;
1027   my $order = $attrs->{order_by};
1028   if (ref $condition eq 'SCALAR') {
1029     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
1030   }
1031   if (exists $attrs->{group_by} || $attrs->{having}) {
1032     $order = {
1033       group_by => $attrs->{group_by},
1034       having => $attrs->{having},
1035       ($order ? (order_by => $order) : ())
1036     };
1037   }
1038   my $bind_attrs = {}; ## Future support
1039   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1040   if ($attrs->{software_limit} ||
1041       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1042         $attrs->{software_limit} = 1;
1043   } else {
1044     $self->throw_exception("rows attribute must be positive if present")
1045       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1046     push @args, $attrs->{rows}, $attrs->{offset};
1047   }
1048   return $self->_execute(@args);
1049 }
1050
1051 =head2 select
1052
1053 =over 4
1054
1055 =item Arguments: $ident, $select, $condition, $attrs
1056
1057 =back
1058
1059 Handle a SQL select statement.
1060
1061 =cut
1062
1063 sub select {
1064   my $self = shift;
1065   my ($ident, $select, $condition, $attrs) = @_;
1066   return $self->cursor->new($self, \@_, $attrs);
1067 }
1068
1069 sub select_single {
1070   my $self = shift;
1071   my ($rv, $sth, @bind) = $self->_select(@_);
1072   my @row = $sth->fetchrow_array;
1073   # Need to call finish() to work round broken DBDs
1074   $sth->finish();
1075   return @row;
1076 }
1077
1078 =head2 sth
1079
1080 =over 4
1081
1082 =item Arguments: $sql
1083
1084 =back
1085
1086 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1087
1088 =cut
1089
1090 sub _dbh_sth {
1091   my ($self, $dbh, $sql) = @_;
1092
1093   # 3 is the if_active parameter which avoids active sth re-use
1094   my $sth = $self->disable_sth_caching
1095     ? $dbh->prepare($sql)
1096     : $dbh->prepare_cached($sql, {}, 3);
1097
1098   $self->throw_exception(
1099     'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
1100   ) if !$sth;
1101
1102   $sth;
1103 }
1104
1105 sub sth {
1106   my ($self, $sql) = @_;
1107   $self->dbh_do($self->can('_dbh_sth'), $sql);
1108 }
1109
1110 sub _dbh_columns_info_for {
1111   my ($self, $dbh, $table) = @_;
1112
1113   if ($dbh->can('column_info')) {
1114     my %result;
1115     eval {
1116       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1117       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1118       $sth->execute();
1119       while ( my $info = $sth->fetchrow_hashref() ){
1120         my %column_info;
1121         $column_info{data_type}   = $info->{TYPE_NAME};
1122         $column_info{size}      = $info->{COLUMN_SIZE};
1123         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1124         $column_info{default_value} = $info->{COLUMN_DEF};
1125         my $col_name = $info->{COLUMN_NAME};
1126         $col_name =~ s/^\"(.*)\"$/$1/;
1127
1128         $result{$col_name} = \%column_info;
1129       }
1130     };
1131     return \%result if !$@ && scalar keys %result;
1132   }
1133
1134   my %result;
1135   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
1136   $sth->execute;
1137   my @columns = @{$sth->{NAME_lc}};
1138   for my $i ( 0 .. $#columns ){
1139     my %column_info;
1140     my $type_num = $sth->{TYPE}->[$i];
1141     my $type_name;
1142     if(defined $type_num && $dbh->can('type_info')) {
1143       my $type_info = $dbh->type_info($type_num);
1144       $type_name = $type_info->{TYPE_NAME} if $type_info;
1145     }
1146     $column_info{data_type} = $type_name ? $type_name : $type_num;
1147     $column_info{size} = $sth->{PRECISION}->[$i];
1148     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1149
1150     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1151       $column_info{data_type} = $1;
1152       $column_info{size}    = $2;
1153     }
1154
1155     $result{$columns[$i]} = \%column_info;
1156   }
1157
1158   return \%result;
1159 }
1160
1161 sub columns_info_for {
1162   my ($self, $table) = @_;
1163   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
1164 }
1165
1166 =head2 last_insert_id
1167
1168 Return the row id of the last insert.
1169
1170 =cut
1171
1172 sub _dbh_last_insert_id {
1173     my ($self, $dbh, $source, $col) = @_;
1174     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1175     $dbh->func('last_insert_rowid');
1176 }
1177
1178 sub last_insert_id {
1179   my $self = shift;
1180   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1181 }
1182
1183 =head2 sqlt_type
1184
1185 Returns the database driver name.
1186
1187 =cut
1188
1189 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1190
1191 =head2 bind_attribute_by_data_type
1192
1193 Given a datatype from column info, returns a database specific bind attribute for
1194 $dbh->bind_param($val,$attribute) or nothing if we will let the database planner
1195 just handle it.
1196
1197 Generally only needed for special case column types, like bytea in postgres.
1198
1199 =cut
1200
1201 sub bind_attribute_by_data_type {
1202     return;
1203 }
1204
1205 =head2 create_ddl_dir (EXPERIMENTAL)
1206
1207 =over 4
1208
1209 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1210
1211 =back
1212
1213 Creates a SQL file based on the Schema, for each of the specified
1214 database types, in the given directory.
1215
1216 Note that this feature is currently EXPERIMENTAL and may not work correctly
1217 across all databases, or fully handle complex relationships.
1218
1219 =cut
1220
1221 sub create_ddl_dir
1222 {
1223   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1224
1225   if(!$dir || !-d $dir)
1226   {
1227     warn "No directory given, using ./\n";
1228     $dir = "./";
1229   }
1230   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1231   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1232   $version ||= $schema->VERSION || '1.x';
1233   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1234
1235   eval "use SQL::Translator";
1236   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1237
1238   my $sqlt = SQL::Translator->new($sqltargs);
1239   foreach my $db (@$databases)
1240   {
1241     $sqlt->reset();
1242     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1243 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1244     $sqlt->data($schema);
1245     $sqlt->producer($db);
1246
1247     my $file;
1248     my $filename = $schema->ddl_filename($db, $dir, $version);
1249     if(-e $filename)
1250     {
1251       $self->throw_exception("$filename already exists, skipping $db");
1252       next;
1253     }
1254     open($file, ">$filename") 
1255       or $self->throw_exception("Can't open $filename for writing ($!)");
1256     my $output = $sqlt->translate;
1257 #use Data::Dumper;
1258 #    print join(":", keys %{$schema->source_registrations});
1259 #    print Dumper($sqlt->schema);
1260     if(!$output)
1261     {
1262       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1263       next;
1264     }
1265     print $file $output;
1266     close($file);
1267   }
1268
1269 }
1270
1271 =head2 deployment_statements
1272
1273 =over 4
1274
1275 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1276
1277 =back
1278
1279 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1280 The database driver name is given by C<$type>, though the value from
1281 L</sqlt_type> is used if it is not specified.
1282
1283 C<$directory> is used to return statements from files in a previously created
1284 L</create_ddl_dir> directory and is optional. The filenames are constructed
1285 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1286
1287 If no C<$directory> is specified then the statements are constructed on the
1288 fly using L<SQL::Translator> and C<$version> is ignored.
1289
1290 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1291
1292 =cut
1293
1294 sub deployment_statements {
1295   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1296   # Need to be connected to get the correct sqlt_type
1297   $self->ensure_connected() unless $type;
1298   $type ||= $self->sqlt_type;
1299   $version ||= $schema->VERSION || '1.x';
1300   $dir ||= './';
1301   eval "use SQL::Translator";
1302   if(!$@)
1303   {
1304     eval "use SQL::Translator::Parser::DBIx::Class;";
1305     $self->throw_exception($@) if $@;
1306     eval "use SQL::Translator::Producer::${type};";
1307     $self->throw_exception($@) if $@;
1308     my $tr = SQL::Translator->new(%$sqltargs);
1309     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1310     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1311   }
1312
1313   my $filename = $schema->ddl_filename($type, $dir, $version);
1314   if(!-f $filename)
1315   {
1316 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1317       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1318       return;
1319   }
1320   my $file;
1321   open($file, "<$filename") 
1322       or $self->throw_exception("Can't open $filename ($!)");
1323   my @rows = <$file>;
1324   close($file);
1325
1326   return join('', @rows);
1327   
1328 }
1329
1330 sub deploy {
1331   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1332   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1333     for ( split(";\n", $statement)) {
1334       next if($_ =~ /^--/);
1335       next if(!$_);
1336 #      next if($_ =~ /^DROP/m);
1337       next if($_ =~ /^BEGIN TRANSACTION/m);
1338       next if($_ =~ /^COMMIT/m);
1339       next if $_ =~ /^\s+$/; # skip whitespace only
1340       $self->debugobj->query_start($_) if $self->debug;
1341       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1342       $self->debugobj->query_end($_) if $self->debug;
1343     }
1344   }
1345 }
1346
1347 =head2 datetime_parser
1348
1349 Returns the datetime parser class
1350
1351 =cut
1352
1353 sub datetime_parser {
1354   my $self = shift;
1355   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1356 }
1357
1358 =head2 datetime_parser_type
1359
1360 Defines (returns) the datetime parser class - currently hardwired to
1361 L<DateTime::Format::MySQL>
1362
1363 =cut
1364
1365 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1366
1367 =head2 build_datetime_parser
1368
1369 See L</datetime_parser>
1370
1371 =cut
1372
1373 sub build_datetime_parser {
1374   my $self = shift;
1375   my $type = $self->datetime_parser_type(@_);
1376   eval "use ${type}";
1377   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1378   return $type;
1379 }
1380
1381 sub DESTROY {
1382   my $self = shift;
1383   return if !$self->_dbh;
1384   $self->_verify_pid;
1385   $self->_dbh(undef);
1386 }
1387
1388 1;
1389
1390 =head1 SQL METHODS
1391
1392 The module defines a set of methods within the DBIC::SQL::Abstract
1393 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1394 SQL query functions.
1395
1396 The following methods are extended:-
1397
1398 =over 4
1399
1400 =item delete
1401
1402 =item insert
1403
1404 =item select
1405
1406 =item update
1407
1408 =item limit_dialect
1409
1410 See L</connect_info> for details.
1411 For setting, this method is deprecated in favor of L</connect_info>.
1412
1413 =item quote_char
1414
1415 See L</connect_info> for details.
1416 For setting, this method is deprecated in favor of L</connect_info>.
1417
1418 =item name_sep
1419
1420 See L</connect_info> for details.
1421 For setting, this method is deprecated in favor of L</connect_info>.
1422
1423 =back
1424
1425 =head1 AUTHORS
1426
1427 Matt S. Trout <mst@shadowcatsystems.co.uk>
1428
1429 Andy Grundman <andy@hybridized.org>
1430
1431 =head1 LICENSE
1432
1433 You may distribute this code under the same terms as Perl itself.
1434
1435 =cut