(no commit message)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        disable_sth_caching cursor on_connect_do transaction_depth bind_attributes/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 use Scalar::Util 'blessed';
62 sub _find_syntax {
63   my ($self, $syntax) = @_;
64   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
65 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
66   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
67     return 'RowNumberOver';
68   }
69
70   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
71 }
72
73 sub select {
74   my ($self, $table, $fields, $where, $order, @rest) = @_;
75   $table = $self->_quote($table) unless ref($table);
76   local $self->{rownum_hack_count} = 1
77     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
78   @rest = (-1) unless defined $rest[0];
79   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
80     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
81   local $self->{having_bind} = [];
82   my ($sql, @ret) = $self->SUPER::select(
83     $table, $self->_recurse_fields($fields), $where, $order, @rest
84   );
85   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
86 }
87
88 sub insert {
89   my $self = shift;
90   my $table = shift;
91   $table = $self->_quote($table) unless ref($table);
92   $self->SUPER::insert($table, @_);
93 }
94
95 sub update {
96   my $self = shift;
97   my $table = shift;
98   $table = $self->_quote($table) unless ref($table);
99   $self->SUPER::update($table, @_);
100 }
101
102 sub delete {
103   my $self = shift;
104   my $table = shift;
105   $table = $self->_quote($table) unless ref($table);
106   $self->SUPER::delete($table, @_);
107 }
108
109 sub _emulate_limit {
110   my $self = shift;
111   if ($_[3] == -1) {
112     return $_[1].$self->_order_by($_[2]);
113   } else {
114     return $self->SUPER::_emulate_limit(@_);
115   }
116 }
117
118 sub _recurse_fields {
119   my ($self, $fields) = @_;
120   my $ref = ref $fields;
121   return $self->_quote($fields) unless $ref;
122   return $$fields if $ref eq 'SCALAR';
123
124   if ($ref eq 'ARRAY') {
125     return join(', ', map {
126       $self->_recurse_fields($_)
127       .(exists $self->{rownum_hack_count}
128          ? ' AS col'.$self->{rownum_hack_count}++
129          : '')
130      } @$fields);
131   } elsif ($ref eq 'HASH') {
132     foreach my $func (keys %$fields) {
133       return $self->_sqlcase($func)
134         .'( '.$self->_recurse_fields($fields->{$func}).' )';
135     }
136   }
137 }
138
139 sub _order_by {
140   my $self = shift;
141   my $ret = '';
142   my @extra;
143   if (ref $_[0] eq 'HASH') {
144     if (defined $_[0]->{group_by}) {
145       $ret = $self->_sqlcase(' group by ')
146                .$self->_recurse_fields($_[0]->{group_by});
147     }
148     if (defined $_[0]->{having}) {
149       my $frag;
150       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
151       push(@{$self->{having_bind}}, @extra);
152       $ret .= $self->_sqlcase(' having ').$frag;
153     }
154     if (defined $_[0]->{order_by}) {
155       $ret .= $self->_order_by($_[0]->{order_by});
156     }
157   } elsif (ref $_[0] eq 'SCALAR') {
158     $ret = $self->_sqlcase(' order by ').${ $_[0] };
159   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
160     my @order = @{+shift};
161     $ret = $self->_sqlcase(' order by ')
162           .join(', ', map {
163                         my $r = $self->_order_by($_, @_);
164                         $r =~ s/^ ?ORDER BY //i;
165                         $r;
166                       } @order);
167   } else {
168     $ret = $self->SUPER::_order_by(@_);
169   }
170   return $ret;
171 }
172
173 sub _order_directions {
174   my ($self, $order) = @_;
175   $order = $order->{order_by} if ref $order eq 'HASH';
176   return $self->SUPER::_order_directions($order);
177 }
178
179 sub _table {
180   my ($self, $from) = @_;
181   if (ref $from eq 'ARRAY') {
182     return $self->_recurse_from(@$from);
183   } elsif (ref $from eq 'HASH') {
184     return $self->_make_as($from);
185   } else {
186     return $from; # would love to quote here but _table ends up getting called
187                   # twice during an ->select without a limit clause due to
188                   # the way S::A::Limit->select works. should maybe consider
189                   # bypassing this and doing S::A::select($self, ...) in
190                   # our select method above. meantime, quoting shims have
191                   # been added to select/insert/update/delete here
192   }
193 }
194
195 sub _recurse_from {
196   my ($self, $from, @join) = @_;
197   my @sqlf;
198   push(@sqlf, $self->_make_as($from));
199   foreach my $j (@join) {
200     my ($to, $on) = @$j;
201
202     # check whether a join type exists
203     my $join_clause = '';
204     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
205     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
206       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
207     } else {
208       $join_clause = ' JOIN ';
209     }
210     push(@sqlf, $join_clause);
211
212     if (ref $to eq 'ARRAY') {
213       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
214     } else {
215       push(@sqlf, $self->_make_as($to));
216     }
217     push(@sqlf, ' ON ', $self->_join_condition($on));
218   }
219   return join('', @sqlf);
220 }
221
222 sub _make_as {
223   my ($self, $from) = @_;
224   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
225                      reverse each %{$self->_skip_options($from)});
226 }
227
228 sub _skip_options {
229   my ($self, $hash) = @_;
230   my $clean_hash = {};
231   $clean_hash->{$_} = $hash->{$_}
232     for grep {!/^-/} keys %$hash;
233   return $clean_hash;
234 }
235
236 sub _join_condition {
237   my ($self, $cond) = @_;
238   if (ref $cond eq 'HASH') {
239     my %j;
240     for (keys %$cond) {
241       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
242     };
243     return $self->_recurse_where(\%j);
244   } elsif (ref $cond eq 'ARRAY') {
245     return join(' OR ', map { $self->_join_condition($_) } @$cond);
246   } else {
247     die "Can't handle this yet!";
248   }
249 }
250
251 sub _quote {
252   my ($self, $label) = @_;
253   return '' unless defined $label;
254   return "*" if $label eq '*';
255   return $label unless $self->{quote_char};
256   if(ref $self->{quote_char} eq "ARRAY"){
257     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
258       if !defined $self->{name_sep};
259     my $sep = $self->{name_sep};
260     return join($self->{name_sep},
261         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
262        split(/\Q$sep\E/,$label));
263   }
264   return $self->SUPER::_quote($label);
265 }
266
267 sub limit_dialect {
268     my $self = shift;
269     $self->{limit_dialect} = shift if @_;
270     return $self->{limit_dialect};
271 }
272
273 sub quote_char {
274     my $self = shift;
275     $self->{quote_char} = shift if @_;
276     return $self->{quote_char};
277 }
278
279 sub name_sep {
280     my $self = shift;
281     $self->{name_sep} = shift if @_;
282     return $self->{name_sep};
283 }
284
285 } # End of BEGIN block
286
287 =head1 NAME
288
289 DBIx::Class::Storage::DBI - DBI storage handler
290
291 =head1 SYNOPSIS
292
293 =head1 DESCRIPTION
294
295 This class represents the connection to an RDBMS via L<DBI>.  See
296 L<DBIx::Class::Storage> for general information.  This pod only
297 documents DBI-specific methods and behaviors.
298
299 =head1 METHODS
300
301 =cut
302
303 sub new {
304   my $new = shift->next::method(@_);
305
306   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
307   $new->transaction_depth(0);
308   $new->_sql_maker_opts({});
309   $new->{_in_dbh_do} = 0;
310   $new->{_dbh_gen} = 0;
311
312   $new;
313 }
314
315 =head2 connect_info
316
317 The arguments of C<connect_info> are always a single array reference.
318
319 This is normally accessed via L<DBIx::Class::Schema/connection>, which
320 encapsulates its argument list in an arrayref before calling
321 C<connect_info> here.
322
323 The arrayref can either contain the same set of arguments one would
324 normally pass to L<DBI/connect>, or a lone code reference which returns
325 a connected database handle.
326
327 In either case, if the final argument in your connect_info happens
328 to be a hashref, C<connect_info> will look there for several
329 connection-specific options:
330
331 =over 4
332
333 =item on_connect_do
334
335 This can be set to an arrayref of literal sql statements, which will
336 be executed immediately after making the connection to the database
337 every time we [re-]connect.
338
339 =item disable_sth_caching
340
341 If set to a true value, this option will disable the caching of
342 statement handles via L<DBI/prepare_cached>.
343
344 =item limit_dialect 
345
346 Sets the limit dialect. This is useful for JDBC-bridge among others
347 where the remote SQL-dialect cannot be determined by the name of the
348 driver alone.
349
350 =item quote_char
351
352 Specifies what characters to use to quote table and column names. If 
353 you use this you will want to specify L<name_sep> as well.
354
355 quote_char expects either a single character, in which case is it is placed
356 on either side of the table/column, or an arrayref of length 2 in which case the
357 table/column name is placed between the elements.
358
359 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
360 use C<quote_char =E<gt> [qw/[ ]/]>.
361
362 =item name_sep
363
364 This only needs to be used in conjunction with L<quote_char>, and is used to 
365 specify the charecter that seperates elements (schemas, tables, columns) from 
366 each other. In most cases this is simply a C<.>.
367
368 =back
369
370 These options can be mixed in with your other L<DBI> connection attributes,
371 or placed in a seperate hashref after all other normal L<DBI> connection
372 arguments.
373
374 Every time C<connect_info> is invoked, any previous settings for
375 these options will be cleared before setting the new ones, regardless of
376 whether any options are specified in the new C<connect_info>.
377
378 Important note:  DBIC expects the returned database handle provided by 
379 a subref argument to have RaiseError set on it.  If it doesn't, things
380 might not work very well, YMMV.  If you don't use a subref, DBIC will
381 force this setting for you anyways.  Setting HandleError to anything
382 other than simple exception object wrapper might cause problems too.
383
384 Examples:
385
386   # Simple SQLite connection
387   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
388
389   # Connect via subref
390   ->connect_info([ sub { DBI->connect(...) } ]);
391
392   # A bit more complicated
393   ->connect_info(
394     [
395       'dbi:Pg:dbname=foo',
396       'postgres',
397       'my_pg_password',
398       { AutoCommit => 0 },
399       { quote_char => q{"}, name_sep => q{.} },
400     ]
401   );
402
403   # Equivalent to the previous example
404   ->connect_info(
405     [
406       'dbi:Pg:dbname=foo',
407       'postgres',
408       'my_pg_password',
409       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
410     ]
411   );
412
413   # Subref + DBIC-specific connection options
414   ->connect_info(
415     [
416       sub { DBI->connect(...) },
417       {
418           quote_char => q{`},
419           name_sep => q{@},
420           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
421           disable_sth_caching => 1,
422       },
423     ]
424   );
425
426 =cut
427
428 sub connect_info {
429   my ($self, $info_arg) = @_;
430
431   return $self->_connect_info if !$info_arg;
432
433   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
434   #  the new set of options
435   $self->_sql_maker(undef);
436   $self->_sql_maker_opts({});
437
438   my $info = [ @$info_arg ]; # copy because we can alter it
439   my $last_info = $info->[-1];
440   if(ref $last_info eq 'HASH') {
441     for my $storage_opt (qw/on_connect_do disable_sth_caching/) {
442       if(my $value = delete $last_info->{$storage_opt}) {
443         $self->$storage_opt($value);
444       }
445     }
446     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
447       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
448         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
449       }
450     }
451
452     # Get rid of any trailing empty hashref
453     pop(@$info) if !keys %$last_info;
454   }
455
456   $self->_connect_info($info);
457 }
458
459 =head2 on_connect_do
460
461 This method is deprecated in favor of setting via L</connect_info>.
462
463 =head2 dbh_do
464
465 Arguments: $subref, @extra_coderef_args?
466
467 Execute the given subref using the new exception-based connection management.
468
469 The first two arguments will be the storage object that C<dbh_do> was called
470 on and a database handle to use.  Any additional arguments will be passed
471 verbatim to the called subref as arguments 2 and onwards.
472
473 Using this (instead of $self->_dbh or $self->dbh) ensures correct
474 exception handling and reconnection (or failover in future subclasses).
475
476 Your subref should have no side-effects outside of the database, as
477 there is the potential for your subref to be partially double-executed
478 if the database connection was stale/dysfunctional.
479
480 Example:
481
482   my @stuff = $schema->storage->dbh_do(
483     sub {
484       my ($storage, $dbh, @cols) = @_;
485       my $cols = join(q{, }, @cols);
486       $dbh->selectrow_array("SELECT $cols FROM foo");
487     },
488     @column_list
489   );
490
491 =cut
492
493 sub dbh_do {
494   my $self = shift;
495   my $coderef = shift;
496
497   ref $coderef eq 'CODE' or $self->throw_exception
498     ('$coderef must be a CODE reference');
499
500   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
501   local $self->{_in_dbh_do} = 1;
502
503   my @result;
504   my $want_array = wantarray;
505
506   eval {
507     $self->_verify_pid if $self->_dbh;
508     $self->_populate_dbh if !$self->_dbh;
509     if($want_array) {
510         @result = $coderef->($self, $self->_dbh, @_);
511     }
512     elsif(defined $want_array) {
513         $result[0] = $coderef->($self, $self->_dbh, @_);
514     }
515     else {
516         $coderef->($self, $self->_dbh, @_);
517     }
518   };
519
520   my $exception = $@;
521   if(!$exception) { return $want_array ? @result : $result[0] }
522
523   $self->throw_exception($exception) if $self->connected;
524
525   # We were not connected - reconnect and retry, but let any
526   #  exception fall right through this time
527   $self->_populate_dbh;
528   $coderef->($self, $self->_dbh, @_);
529 }
530
531 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
532 # It also informs dbh_do to bypass itself while under the direction of txn_do,
533 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
534 sub txn_do {
535   my $self = shift;
536   my $coderef = shift;
537
538   ref $coderef eq 'CODE' or $self->throw_exception
539     ('$coderef must be a CODE reference');
540
541   local $self->{_in_dbh_do} = 1;
542
543   my @result;
544   my $want_array = wantarray;
545
546   my $tried = 0;
547   while(1) {
548     eval {
549       $self->_verify_pid if $self->_dbh;
550       $self->_populate_dbh if !$self->_dbh;
551
552       $self->txn_begin;
553       if($want_array) {
554           @result = $coderef->(@_);
555       }
556       elsif(defined $want_array) {
557           $result[0] = $coderef->(@_);
558       }
559       else {
560           $coderef->(@_);
561       }
562       $self->txn_commit;
563     };
564
565     my $exception = $@;
566     if(!$exception) { return $want_array ? @result : $result[0] }
567
568     if($tried++ > 0 || $self->connected) {
569       eval { $self->txn_rollback };
570       my $rollback_exception = $@;
571       if($rollback_exception) {
572         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
573         $self->throw_exception($exception)  # propagate nested rollback
574           if $rollback_exception =~ /$exception_class/;
575
576         $self->throw_exception(
577           "Transaction aborted: ${exception}. "
578           . "Rollback failed: ${rollback_exception}"
579         );
580       }
581       $self->throw_exception($exception)
582     }
583
584     # We were not connected, and was first try - reconnect and retry
585     # via the while loop
586     $self->_populate_dbh;
587   }
588 }
589
590 =head2 disconnect
591
592 Our C<disconnect> method also performs a rollback first if the
593 database is not in C<AutoCommit> mode.
594
595 =cut
596
597 sub disconnect {
598   my ($self) = @_;
599
600   if( $self->connected ) {
601     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
602     $self->_dbh->disconnect;
603     $self->_dbh(undef);
604     $self->{_dbh_gen}++;
605   }
606 }
607
608 sub connected {
609   my ($self) = @_;
610
611   if(my $dbh = $self->_dbh) {
612       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
613           $self->_dbh(undef);
614           $self->{_dbh_gen}++;
615           return;
616       }
617       else {
618           $self->_verify_pid;
619       }
620       return ($dbh->FETCH('Active') && $dbh->ping);
621   }
622
623   return 0;
624 }
625
626 # handle pid changes correctly
627 #  NOTE: assumes $self->_dbh is a valid $dbh
628 sub _verify_pid {
629   my ($self) = @_;
630
631   return if $self->_conn_pid == $$;
632
633   $self->_dbh->{InactiveDestroy} = 1;
634   $self->_dbh(undef);
635   $self->{_dbh_gen}++;
636
637   return;
638 }
639
640 sub ensure_connected {
641   my ($self) = @_;
642
643   unless ($self->connected) {
644     $self->_populate_dbh;
645   }
646 }
647
648 =head2 dbh
649
650 Returns the dbh - a data base handle of class L<DBI>.
651
652 =cut
653
654 sub dbh {
655   my ($self) = @_;
656
657   $self->ensure_connected;
658   return $self->_dbh;
659 }
660
661 sub _sql_maker_args {
662     my ($self) = @_;
663     
664     return ( bindtype=>'columns', limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
665 }
666
667 sub sql_maker {
668   my ($self) = @_;
669   unless ($self->_sql_maker) {
670     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
671   }
672   return $self->_sql_maker;
673 }
674
675 sub _populate_dbh {
676   my ($self) = @_;
677   my @info = @{$self->_connect_info || []};
678   $self->_dbh($self->_connect(@info));
679
680   if(ref $self eq 'DBIx::Class::Storage::DBI') {
681     my $driver = $self->_dbh->{Driver}->{Name};
682     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
683       bless $self, "DBIx::Class::Storage::DBI::${driver}";
684       $self->_rebless() if $self->can('_rebless');
685     }
686   }
687
688   # if on-connect sql statements are given execute them
689   foreach my $sql_statement (@{$self->on_connect_do || []}) {
690     $self->debugobj->query_start($sql_statement) if $self->debug();
691     $self->_dbh->do($sql_statement);
692     $self->debugobj->query_end($sql_statement) if $self->debug();
693   }
694
695   $self->_conn_pid($$);
696   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
697 }
698
699 sub _connect {
700   my ($self, @info) = @_;
701
702   $self->throw_exception("You failed to provide any connection info")
703       if !@info;
704
705   my ($old_connect_via, $dbh);
706
707   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
708       $old_connect_via = $DBI::connect_via;
709       $DBI::connect_via = 'connect';
710   }
711
712   eval {
713     if(ref $info[0] eq 'CODE') {
714        $dbh = &{$info[0]}
715     }
716     else {
717        $dbh = DBI->connect(@info);
718        $dbh->{RaiseError} = 1;
719        $dbh->{PrintError} = 0;
720        $dbh->{PrintWarn} = 0;
721     }
722   };
723
724   $DBI::connect_via = $old_connect_via if $old_connect_via;
725
726   if (!$dbh || $@) {
727     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
728   }
729
730   $dbh;
731 }
732
733 sub _dbh_txn_begin {
734   my ($self, $dbh) = @_;
735   if ($dbh->{AutoCommit}) {
736     $self->debugobj->txn_begin()
737       if ($self->debug);
738     $dbh->begin_work;
739   }
740 }
741
742 sub txn_begin {
743   my $self = shift;
744   $self->dbh_do($self->can('_dbh_txn_begin'))
745     if $self->{transaction_depth}++ == 0;
746 }
747
748 sub _dbh_txn_commit {
749   my ($self, $dbh) = @_;
750   if ($self->{transaction_depth} == 0) {
751     unless ($dbh->{AutoCommit}) {
752       $self->debugobj->txn_commit()
753         if ($self->debug);
754       $dbh->commit;
755     }
756   }
757   else {
758     if (--$self->{transaction_depth} == 0) {
759       $self->debugobj->txn_commit()
760         if ($self->debug);
761       $dbh->commit;
762     }
763   }
764 }
765
766 sub txn_commit {
767   my $self = shift;
768   $self->dbh_do($self->can('_dbh_txn_commit'));
769 }
770
771 sub _dbh_txn_rollback {
772   my ($self, $dbh) = @_;
773   if ($self->{transaction_depth} == 0) {
774     unless ($dbh->{AutoCommit}) {
775       $self->debugobj->txn_rollback()
776         if ($self->debug);
777       $dbh->rollback;
778     }
779   }
780   else {
781     if (--$self->{transaction_depth} == 0) {
782       $self->debugobj->txn_rollback()
783         if ($self->debug);
784       $dbh->rollback;
785     }
786     else {
787       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
788     }
789   }
790 }
791
792 sub txn_rollback {
793   my $self = shift;
794
795   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
796   if ($@) {
797     my $error = $@;
798     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
799     $error =~ /$exception_class/ and $self->throw_exception($error);
800     $self->{transaction_depth} = 0;          # ensure that a failed rollback
801     $self->throw_exception($error);          # resets the transaction depth
802   }
803 }
804
805 # This used to be the top-half of _execute.  It was split out to make it
806 #  easier to override in NoBindVars without duping the rest.  It takes up
807 #  all of _execute's args, and emits $sql, @bind.
808 sub _prep_for_execute {
809   my ($self, $op, $extra_bind, $ident, @args) = @_;
810
811   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
812   unshift(@bind, @$extra_bind) if $extra_bind;
813   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
814
815   return ($sql, @bind);
816 }
817
818 sub _execute {
819   my ($self, $op, $extra_bind, $ident, @args) = @_;
820   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
821   unshift(@bind, @$extra_bind) if $extra_bind;
822   if ($self->debug) {
823       my @debug_bind = map { defined ($_ && $_->[1]) ? qq{'$_->[1]'} : q{'NULL'} } @bind;
824       $self->debugobj->query_start($sql, @debug_bind);
825   }
826   my $sth = eval { $self->sth($sql,$op) };
827
828   if (!$sth || $@) {
829     $self->throw_exception(
830       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
831     );
832   }
833
834   my $rv;
835   if ($sth) {
836     my $time = time();
837         
838     $rv = eval {
839         
840       my $placeholder_index = 1; 
841       my $bind_attributes = $self->bind_attributes;
842
843       foreach my $bound (@bind) {
844
845         my $attributes = {};
846         my($column_name, $data) = @$bound;
847
848         if( $bind_attributes ) {
849           $attributes = $bind_attributes->{$column_name}
850           if defined $bind_attributes->{$column_name};
851         }                       
852
853         $data = ref $data ? ''.$data : $data; # stringify args
854
855         $sth->bind_param($placeholder_index, $data, $attributes);
856         $placeholder_index++;
857       }
858       $sth->execute;
859     };
860   $self->bind_attributes({});
861   
862     if ($@ || !$rv) {
863       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
864     }
865   } else {
866     $self->throw_exception("'$sql' did not generate a statement.");
867   }
868   if ($self->debug) {
869      my @debug_bind = map { defined ($_ && $_->[1]) ? qq{'$_->[1]'} : q{'NULL'} } @bind; 
870       $self->debugobj->query_end($sql, @debug_bind);
871   }
872   return (wantarray ? ($rv, $sth, @bind) : $rv);
873 }
874
875 sub insert {
876   my ($self, $ident, $to_insert) = @_;
877   $self->throw_exception(
878     "Couldn't insert ".join(', ',
879       map "$_ => $to_insert->{$_}", keys %$to_insert
880     )." into ${ident}"
881   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
882   return $to_insert;
883 }
884
885 ## Still not quite perfect, and EXPERIMENTAL
886 ## Currently it is assumed that all values passed will be "normal", i.e. not 
887 ## scalar refs, or at least, all the same type as the first set, the statement is
888 ## only prepped once.
889 sub insert_bulk {
890   my ($self, $table, $cols, $data) = @_;
891   my %colvalues;
892   @colvalues{@$cols} = (0..$#$cols);
893   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
894 # print STDERR "BIND".Dumper(\@bind);
895
896   if ($self->debug) {
897       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
898       $self->debugobj->query_start($sql, @debug_bind);
899   }
900   my $sth = $self->sth($sql);
901
902 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
903
904   my $rv;
905   ## This must be an arrayref, else nothing works!
906   my $tuple_status = [];
907 #  use Data::Dumper;
908 #  print STDERR Dumper($data);
909   if ($sth) {
910     my $time = time();
911     $rv = eval { $sth->execute_array({ ArrayTupleFetch => sub { my $values = shift @$data;  return if !$values; return [ @{$values}[@bind] ]},
912                                        ArrayTupleStatus => $tuple_status }) };
913 # print STDERR Dumper($tuple_status);
914 # print STDERR "RV: $rv\n";
915     if ($@ || !defined $rv) {
916       my $errors = '';
917       foreach my $tuple (@$tuple_status)
918       {
919           $errors .= "\n" . $tuple->[1] if(ref $tuple);
920       }
921       $self->throw_exception("Error executing '$sql': ".($@ || $errors));
922     }
923   } else {
924     $self->throw_exception("'$sql' did not generate a statement.");
925   }
926   if ($self->debug) {
927       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
928       $self->debugobj->query_end($sql, @debug_bind);
929   }
930   return (wantarray ? ($rv, $sth, @bind) : $rv);
931 }
932
933 sub update {
934   return shift->_execute('update' => [], @_);
935 }
936
937 sub delete {
938   return shift->_execute('delete' => [], @_);
939 }
940
941 sub _select {
942   my ($self, $ident, $select, $condition, $attrs) = @_;
943   my $order = $attrs->{order_by};
944   if (ref $condition eq 'SCALAR') {
945     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
946   }
947   if (exists $attrs->{group_by} || $attrs->{having}) {
948     $order = {
949       group_by => $attrs->{group_by},
950       having => $attrs->{having},
951       ($order ? (order_by => $order) : ())
952     };
953   }
954   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
955   if ($attrs->{software_limit} ||
956       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
957         $attrs->{software_limit} = 1;
958   } else {
959     $self->throw_exception("rows attribute must be positive if present")
960       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
961     push @args, $attrs->{rows}, $attrs->{offset};
962   }
963   return $self->_execute(@args);
964 }
965
966 =head2 select
967
968 =over 4
969
970 =item Arguments: $ident, $select, $condition, $attrs
971
972 =back
973
974 Handle a SQL select statement.
975
976 =cut
977
978 sub select {
979   my $self = shift;
980   my ($ident, $select, $condition, $attrs) = @_;
981   return $self->cursor->new($self, \@_, $attrs);
982 }
983
984 sub select_single {
985   my $self = shift;
986   my ($rv, $sth, @bind) = $self->_select(@_);
987   my @row = $sth->fetchrow_array;
988   # Need to call finish() to work round broken DBDs
989   $sth->finish();
990   return @row;
991 }
992
993 =head2 sth
994
995 =over 4
996
997 =item Arguments: $sql
998
999 =back
1000
1001 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1002
1003 =cut
1004
1005 sub _dbh_sth {
1006   my ($self, $dbh, $sql) = @_;
1007
1008   # 3 is the if_active parameter which avoids active sth re-use
1009   my $sth = $self->disable_sth_caching
1010     ? $dbh->prepare($sql)
1011     : $dbh->prepare_cached($sql, {}, 3);
1012
1013   $self->throw_exception(
1014     'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
1015   ) if !$sth;
1016
1017   $sth;
1018 }
1019
1020 sub sth {
1021   my ($self, $sql) = @_;
1022   $self->dbh_do($self->can('_dbh_sth'), $sql);
1023 }
1024
1025 sub _dbh_columns_info_for {
1026   my ($self, $dbh, $table) = @_;
1027
1028   if ($dbh->can('column_info')) {
1029     my %result;
1030     eval {
1031       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1032       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1033       $sth->execute();
1034       while ( my $info = $sth->fetchrow_hashref() ){
1035         my %column_info;
1036         $column_info{data_type}   = $info->{TYPE_NAME};
1037         $column_info{size}      = $info->{COLUMN_SIZE};
1038         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1039         $column_info{default_value} = $info->{COLUMN_DEF};
1040         my $col_name = $info->{COLUMN_NAME};
1041         $col_name =~ s/^\"(.*)\"$/$1/;
1042
1043         $result{$col_name} = \%column_info;
1044       }
1045     };
1046     return \%result if !$@ && scalar keys %result;
1047   }
1048
1049   my %result;
1050   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
1051   $sth->execute;
1052   my @columns = @{$sth->{NAME_lc}};
1053   for my $i ( 0 .. $#columns ){
1054     my %column_info;
1055     my $type_num = $sth->{TYPE}->[$i];
1056     my $type_name;
1057     if(defined $type_num && $dbh->can('type_info')) {
1058       my $type_info = $dbh->type_info($type_num);
1059       $type_name = $type_info->{TYPE_NAME} if $type_info;
1060     }
1061     $column_info{data_type} = $type_name ? $type_name : $type_num;
1062     $column_info{size} = $sth->{PRECISION}->[$i];
1063     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1064
1065     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1066       $column_info{data_type} = $1;
1067       $column_info{size}    = $2;
1068     }
1069
1070     $result{$columns[$i]} = \%column_info;
1071   }
1072
1073   return \%result;
1074 }
1075
1076 sub columns_info_for {
1077   my ($self, $table) = @_;
1078   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
1079 }
1080
1081 =head2 last_insert_id
1082
1083 Return the row id of the last insert.
1084
1085 =cut
1086
1087 sub _dbh_last_insert_id {
1088     my ($self, $dbh, $source, $col) = @_;
1089     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1090     $dbh->func('last_insert_rowid');
1091 }
1092
1093 sub last_insert_id {
1094   my $self = shift;
1095   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1096 }
1097
1098 =head2 sqlt_type
1099
1100 Returns the database driver name.
1101
1102 =cut
1103
1104 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1105
1106 =head2 create_ddl_dir (EXPERIMENTAL)
1107
1108 =over 4
1109
1110 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1111
1112 =back
1113
1114 Creates a SQL file based on the Schema, for each of the specified
1115 database types, in the given directory.
1116
1117 Note that this feature is currently EXPERIMENTAL and may not work correctly
1118 across all databases, or fully handle complex relationships.
1119
1120 =cut
1121
1122 sub create_ddl_dir
1123 {
1124   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1125
1126   if(!$dir || !-d $dir)
1127   {
1128     warn "No directory given, using ./\n";
1129     $dir = "./";
1130   }
1131   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1132   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1133   $version ||= $schema->VERSION || '1.x';
1134   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1135
1136   eval "use SQL::Translator";
1137   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1138
1139   my $sqlt = SQL::Translator->new($sqltargs);
1140   foreach my $db (@$databases)
1141   {
1142     $sqlt->reset();
1143     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1144 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1145     $sqlt->data($schema);
1146     $sqlt->producer($db);
1147
1148     my $file;
1149     my $filename = $schema->ddl_filename($db, $dir, $version);
1150     if(-e $filename)
1151     {
1152       $self->throw_exception("$filename already exists, skipping $db");
1153       next;
1154     }
1155     open($file, ">$filename") 
1156       or $self->throw_exception("Can't open $filename for writing ($!)");
1157     my $output = $sqlt->translate;
1158 #use Data::Dumper;
1159 #    print join(":", keys %{$schema->source_registrations});
1160 #    print Dumper($sqlt->schema);
1161     if(!$output)
1162     {
1163       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1164       next;
1165     }
1166     print $file $output;
1167     close($file);
1168   }
1169
1170 }
1171
1172 =head2 deployment_statements
1173
1174 =over 4
1175
1176 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1177
1178 =back
1179
1180 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1181 The database driver name is given by C<$type>, though the value from
1182 L</sqlt_type> is used if it is not specified.
1183
1184 C<$directory> is used to return statements from files in a previously created
1185 L</create_ddl_dir> directory and is optional. The filenames are constructed
1186 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1187
1188 If no C<$directory> is specified then the statements are constructed on the
1189 fly using L<SQL::Translator> and C<$version> is ignored.
1190
1191 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1192
1193 =cut
1194
1195 sub deployment_statements {
1196   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1197   # Need to be connected to get the correct sqlt_type
1198   $self->ensure_connected() unless $type;
1199   $type ||= $self->sqlt_type;
1200   $version ||= $schema->VERSION || '1.x';
1201   $dir ||= './';
1202   eval "use SQL::Translator";
1203   if(!$@)
1204   {
1205     eval "use SQL::Translator::Parser::DBIx::Class;";
1206     $self->throw_exception($@) if $@;
1207     eval "use SQL::Translator::Producer::${type};";
1208     $self->throw_exception($@) if $@;
1209     my $tr = SQL::Translator->new(%$sqltargs);
1210     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1211     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1212   }
1213
1214   my $filename = $schema->ddl_filename($type, $dir, $version);
1215   if(!-f $filename)
1216   {
1217 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1218       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1219       return;
1220   }
1221   my $file;
1222   open($file, "<$filename") 
1223       or $self->throw_exception("Can't open $filename ($!)");
1224   my @rows = <$file>;
1225   close($file);
1226
1227   return join('', @rows);
1228   
1229 }
1230
1231 sub deploy {
1232   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1233   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1234     for ( split(";\n", $statement)) {
1235       next if($_ =~ /^--/);
1236       next if(!$_);
1237 #      next if($_ =~ /^DROP/m);
1238       next if($_ =~ /^BEGIN TRANSACTION/m);
1239       next if($_ =~ /^COMMIT/m);
1240       next if $_ =~ /^\s+$/; # skip whitespace only
1241       $self->debugobj->query_start($_) if $self->debug;
1242       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1243       $self->debugobj->query_end($_) if $self->debug;
1244     }
1245   }
1246 }
1247
1248 =head2 datetime_parser
1249
1250 Returns the datetime parser class
1251
1252 =cut
1253
1254 sub datetime_parser {
1255   my $self = shift;
1256   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1257 }
1258
1259 =head2 datetime_parser_type
1260
1261 Defines (returns) the datetime parser class - currently hardwired to
1262 L<DateTime::Format::MySQL>
1263
1264 =cut
1265
1266 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1267
1268 =head2 build_datetime_parser
1269
1270 See L</datetime_parser>
1271
1272 =cut
1273
1274 sub build_datetime_parser {
1275   my $self = shift;
1276   my $type = $self->datetime_parser_type(@_);
1277   eval "use ${type}";
1278   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1279   return $type;
1280 }
1281
1282 sub DESTROY {
1283   my $self = shift;
1284   return if !$self->_dbh;
1285   $self->_verify_pid;
1286   $self->_dbh(undef);
1287 }
1288
1289 1;
1290
1291 =head1 SQL METHODS
1292
1293 The module defines a set of methods within the DBIC::SQL::Abstract
1294 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1295 SQL query functions.
1296
1297 The following methods are extended:-
1298
1299 =over 4
1300
1301 =item delete
1302
1303 =item insert
1304
1305 =item select
1306
1307 =item update
1308
1309 =item limit_dialect
1310
1311 See L</connect_info> for details.
1312 For setting, this method is deprecated in favor of L</connect_info>.
1313
1314 =item quote_char
1315
1316 See L</connect_info> for details.
1317 For setting, this method is deprecated in favor of L</connect_info>.
1318
1319 =item name_sep
1320
1321 See L</connect_info> for details.
1322 For setting, this method is deprecated in favor of L</connect_info>.
1323
1324 =back
1325
1326 =head1 AUTHORS
1327
1328 Matt S. Trout <mst@shadowcatsystems.co.uk>
1329
1330 Andy Grundman <andy@hybridized.org>
1331
1332 =head1 LICENSE
1333
1334 You may distribute this code under the same terms as Perl itself.
1335
1336 =cut