moved bind attributes to DBI.pm/DBI/Pg.pm
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        disable_sth_caching cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 use Scalar::Util 'blessed';
62 sub _find_syntax {
63   my ($self, $syntax) = @_;
64   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
65 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
66   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
67     return 'RowNumberOver';
68   }
69
70   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
71 }
72
73 sub select {
74   my ($self, $table, $fields, $where, $order, @rest) = @_;
75   $table = $self->_quote($table) unless ref($table);
76   local $self->{rownum_hack_count} = 1
77     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
78   @rest = (-1) unless defined $rest[0];
79   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
80     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
81   local $self->{having_bind} = [];
82   my ($sql, @ret) = $self->SUPER::select(
83     $table, $self->_recurse_fields($fields), $where, $order, @rest
84   );
85   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
86 }
87
88 sub insert {
89   my $self = shift;
90   my $table = shift;
91   $table = $self->_quote($table) unless ref($table);
92   $self->SUPER::insert($table, @_);
93 }
94
95 sub update {
96   my $self = shift;
97   my $table = shift;
98   $table = $self->_quote($table) unless ref($table);
99   $self->SUPER::update($table, @_);
100 }
101
102 sub delete {
103   my $self = shift;
104   my $table = shift;
105   $table = $self->_quote($table) unless ref($table);
106   $self->SUPER::delete($table, @_);
107 }
108
109 sub _emulate_limit {
110   my $self = shift;
111   if ($_[3] == -1) {
112     return $_[1].$self->_order_by($_[2]);
113   } else {
114     return $self->SUPER::_emulate_limit(@_);
115   }
116 }
117
118 sub _recurse_fields {
119   my ($self, $fields) = @_;
120   my $ref = ref $fields;
121   return $self->_quote($fields) unless $ref;
122   return $$fields if $ref eq 'SCALAR';
123
124   if ($ref eq 'ARRAY') {
125     return join(', ', map {
126       $self->_recurse_fields($_)
127       .(exists $self->{rownum_hack_count}
128          ? ' AS col'.$self->{rownum_hack_count}++
129          : '')
130      } @$fields);
131   } elsif ($ref eq 'HASH') {
132     foreach my $func (keys %$fields) {
133       return $self->_sqlcase($func)
134         .'( '.$self->_recurse_fields($fields->{$func}).' )';
135     }
136   }
137 }
138
139 sub _order_by {
140   my $self = shift;
141   my $ret = '';
142   my @extra;
143   if (ref $_[0] eq 'HASH') {
144     if (defined $_[0]->{group_by}) {
145       $ret = $self->_sqlcase(' group by ')
146                .$self->_recurse_fields($_[0]->{group_by});
147     }
148     if (defined $_[0]->{having}) {
149       my $frag;
150       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
151       push(@{$self->{having_bind}}, @extra);
152       $ret .= $self->_sqlcase(' having ').$frag;
153     }
154     if (defined $_[0]->{order_by}) {
155       $ret .= $self->_order_by($_[0]->{order_by});
156     }
157   } elsif (ref $_[0] eq 'SCALAR') {
158     $ret = $self->_sqlcase(' order by ').${ $_[0] };
159   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
160     my @order = @{+shift};
161     $ret = $self->_sqlcase(' order by ')
162           .join(', ', map {
163                         my $r = $self->_order_by($_, @_);
164                         $r =~ s/^ ?ORDER BY //i;
165                         $r;
166                       } @order);
167   } else {
168     $ret = $self->SUPER::_order_by(@_);
169   }
170   return $ret;
171 }
172
173 sub _order_directions {
174   my ($self, $order) = @_;
175   $order = $order->{order_by} if ref $order eq 'HASH';
176   return $self->SUPER::_order_directions($order);
177 }
178
179 sub _table {
180   my ($self, $from) = @_;
181   if (ref $from eq 'ARRAY') {
182     return $self->_recurse_from(@$from);
183   } elsif (ref $from eq 'HASH') {
184     return $self->_make_as($from);
185   } else {
186     return $from; # would love to quote here but _table ends up getting called
187                   # twice during an ->select without a limit clause due to
188                   # the way S::A::Limit->select works. should maybe consider
189                   # bypassing this and doing S::A::select($self, ...) in
190                   # our select method above. meantime, quoting shims have
191                   # been added to select/insert/update/delete here
192   }
193 }
194
195 sub _recurse_from {
196   my ($self, $from, @join) = @_;
197   my @sqlf;
198   push(@sqlf, $self->_make_as($from));
199   foreach my $j (@join) {
200     my ($to, $on) = @$j;
201
202     # check whether a join type exists
203     my $join_clause = '';
204     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
205     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
206       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
207     } else {
208       $join_clause = ' JOIN ';
209     }
210     push(@sqlf, $join_clause);
211
212     if (ref $to eq 'ARRAY') {
213       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
214     } else {
215       push(@sqlf, $self->_make_as($to));
216     }
217     push(@sqlf, ' ON ', $self->_join_condition($on));
218   }
219   return join('', @sqlf);
220 }
221
222 sub _make_as {
223   my ($self, $from) = @_;
224   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
225                      reverse each %{$self->_skip_options($from)});
226 }
227
228 sub _skip_options {
229   my ($self, $hash) = @_;
230   my $clean_hash = {};
231   $clean_hash->{$_} = $hash->{$_}
232     for grep {!/^-/} keys %$hash;
233   return $clean_hash;
234 }
235
236 sub _join_condition {
237   my ($self, $cond) = @_;
238   if (ref $cond eq 'HASH') {
239     my %j;
240     for (keys %$cond) {
241       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
242     };
243     return $self->_recurse_where(\%j);
244   } elsif (ref $cond eq 'ARRAY') {
245     return join(' OR ', map { $self->_join_condition($_) } @$cond);
246   } else {
247     die "Can't handle this yet!";
248   }
249 }
250
251 sub _quote {
252   my ($self, $label) = @_;
253   return '' unless defined $label;
254   return "*" if $label eq '*';
255   return $label unless $self->{quote_char};
256   if(ref $self->{quote_char} eq "ARRAY"){
257     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
258       if !defined $self->{name_sep};
259     my $sep = $self->{name_sep};
260     return join($self->{name_sep},
261         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
262        split(/\Q$sep\E/,$label));
263   }
264   return $self->SUPER::_quote($label);
265 }
266
267 sub limit_dialect {
268     my $self = shift;
269     $self->{limit_dialect} = shift if @_;
270     return $self->{limit_dialect};
271 }
272
273 sub quote_char {
274     my $self = shift;
275     $self->{quote_char} = shift if @_;
276     return $self->{quote_char};
277 }
278
279 sub name_sep {
280     my $self = shift;
281     $self->{name_sep} = shift if @_;
282     return $self->{name_sep};
283 }
284
285 } # End of BEGIN block
286
287 =head1 NAME
288
289 DBIx::Class::Storage::DBI - DBI storage handler
290
291 =head1 SYNOPSIS
292
293 =head1 DESCRIPTION
294
295 This class represents the connection to an RDBMS via L<DBI>.  See
296 L<DBIx::Class::Storage> for general information.  This pod only
297 documents DBI-specific methods and behaviors.
298
299 =head1 METHODS
300
301 =cut
302
303 sub new {
304   my $new = shift->next::method(@_);
305
306   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
307   $new->transaction_depth(0);
308   $new->_sql_maker_opts({});
309   $new->{_in_dbh_do} = 0;
310   $new->{_dbh_gen} = 0;
311
312   $new;
313 }
314
315 =head2 connect_info
316
317 The arguments of C<connect_info> are always a single array reference.
318
319 This is normally accessed via L<DBIx::Class::Schema/connection>, which
320 encapsulates its argument list in an arrayref before calling
321 C<connect_info> here.
322
323 The arrayref can either contain the same set of arguments one would
324 normally pass to L<DBI/connect>, or a lone code reference which returns
325 a connected database handle.
326
327 In either case, if the final argument in your connect_info happens
328 to be a hashref, C<connect_info> will look there for several
329 connection-specific options:
330
331 =over 4
332
333 =item on_connect_do
334
335 This can be set to an arrayref of literal sql statements, which will
336 be executed immediately after making the connection to the database
337 every time we [re-]connect.
338
339 =item disable_sth_caching
340
341 If set to a true value, this option will disable the caching of
342 statement handles via L<DBI/prepare_cached>.
343
344 =item limit_dialect 
345
346 Sets the limit dialect. This is useful for JDBC-bridge among others
347 where the remote SQL-dialect cannot be determined by the name of the
348 driver alone.
349
350 =item quote_char
351
352 Specifies what characters to use to quote table and column names. If 
353 you use this you will want to specify L<name_sep> as well.
354
355 quote_char expects either a single character, in which case is it is placed
356 on either side of the table/column, or an arrayref of length 2 in which case the
357 table/column name is placed between the elements.
358
359 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
360 use C<quote_char =E<gt> [qw/[ ]/]>.
361
362 =item name_sep
363
364 This only needs to be used in conjunction with L<quote_char>, and is used to 
365 specify the charecter that seperates elements (schemas, tables, columns) from 
366 each other. In most cases this is simply a C<.>.
367
368 =back
369
370 These options can be mixed in with your other L<DBI> connection attributes,
371 or placed in a seperate hashref after all other normal L<DBI> connection
372 arguments.
373
374 Every time C<connect_info> is invoked, any previous settings for
375 these options will be cleared before setting the new ones, regardless of
376 whether any options are specified in the new C<connect_info>.
377
378 Important note:  DBIC expects the returned database handle provided by 
379 a subref argument to have RaiseError set on it.  If it doesn't, things
380 might not work very well, YMMV.  If you don't use a subref, DBIC will
381 force this setting for you anyways.  Setting HandleError to anything
382 other than simple exception object wrapper might cause problems too.
383
384 Examples:
385
386   # Simple SQLite connection
387   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
388
389   # Connect via subref
390   ->connect_info([ sub { DBI->connect(...) } ]);
391
392   # A bit more complicated
393   ->connect_info(
394     [
395       'dbi:Pg:dbname=foo',
396       'postgres',
397       'my_pg_password',
398       { AutoCommit => 0 },
399       { quote_char => q{"}, name_sep => q{.} },
400     ]
401   );
402
403   # Equivalent to the previous example
404   ->connect_info(
405     [
406       'dbi:Pg:dbname=foo',
407       'postgres',
408       'my_pg_password',
409       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
410     ]
411   );
412
413   # Subref + DBIC-specific connection options
414   ->connect_info(
415     [
416       sub { DBI->connect(...) },
417       {
418           quote_char => q{`},
419           name_sep => q{@},
420           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
421           disable_sth_caching => 1,
422       },
423     ]
424   );
425
426 =cut
427
428 sub connect_info {
429   my ($self, $info_arg) = @_;
430
431   return $self->_connect_info if !$info_arg;
432
433   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
434   #  the new set of options
435   $self->_sql_maker(undef);
436   $self->_sql_maker_opts({});
437
438   my $info = [ @$info_arg ]; # copy because we can alter it
439   my $last_info = $info->[-1];
440   if(ref $last_info eq 'HASH') {
441     for my $storage_opt (qw/on_connect_do disable_sth_caching/) {
442       if(my $value = delete $last_info->{$storage_opt}) {
443         $self->$storage_opt($value);
444       }
445     }
446     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
447       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
448         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
449       }
450     }
451
452     # Get rid of any trailing empty hashref
453     pop(@$info) if !keys %$last_info;
454   }
455
456   $self->_connect_info($info);
457 }
458
459 =head2 on_connect_do
460
461 This method is deprecated in favor of setting via L</connect_info>.
462
463 =head2 dbh_do
464
465 Arguments: $subref, @extra_coderef_args?
466
467 Execute the given subref using the new exception-based connection management.
468
469 The first two arguments will be the storage object that C<dbh_do> was called
470 on and a database handle to use.  Any additional arguments will be passed
471 verbatim to the called subref as arguments 2 and onwards.
472
473 Using this (instead of $self->_dbh or $self->dbh) ensures correct
474 exception handling and reconnection (or failover in future subclasses).
475
476 Your subref should have no side-effects outside of the database, as
477 there is the potential for your subref to be partially double-executed
478 if the database connection was stale/dysfunctional.
479
480 Example:
481
482   my @stuff = $schema->storage->dbh_do(
483     sub {
484       my ($storage, $dbh, @cols) = @_;
485       my $cols = join(q{, }, @cols);
486       $dbh->selectrow_array("SELECT $cols FROM foo");
487     },
488     @column_list
489   );
490
491 =cut
492
493 sub dbh_do {
494   my $self = shift;
495   my $coderef = shift;
496
497   ref $coderef eq 'CODE' or $self->throw_exception
498     ('$coderef must be a CODE reference');
499
500   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
501   local $self->{_in_dbh_do} = 1;
502
503   my @result;
504   my $want_array = wantarray;
505
506   eval {
507     $self->_verify_pid if $self->_dbh;
508     $self->_populate_dbh if !$self->_dbh;
509     if($want_array) {
510         @result = $coderef->($self, $self->_dbh, @_);
511     }
512     elsif(defined $want_array) {
513         $result[0] = $coderef->($self, $self->_dbh, @_);
514     }
515     else {
516         $coderef->($self, $self->_dbh, @_);
517     }
518   };
519
520   my $exception = $@;
521   if(!$exception) { return $want_array ? @result : $result[0] }
522
523   $self->throw_exception($exception) if $self->connected;
524
525   # We were not connected - reconnect and retry, but let any
526   #  exception fall right through this time
527   $self->_populate_dbh;
528   $coderef->($self, $self->_dbh, @_);
529 }
530
531 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
532 # It also informs dbh_do to bypass itself while under the direction of txn_do,
533 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
534 sub txn_do {
535   my $self = shift;
536   my $coderef = shift;
537
538   ref $coderef eq 'CODE' or $self->throw_exception
539     ('$coderef must be a CODE reference');
540
541   local $self->{_in_dbh_do} = 1;
542
543   my @result;
544   my $want_array = wantarray;
545
546   my $tried = 0;
547   while(1) {
548     eval {
549       $self->_verify_pid if $self->_dbh;
550       $self->_populate_dbh if !$self->_dbh;
551
552       $self->txn_begin;
553       if($want_array) {
554           @result = $coderef->(@_);
555       }
556       elsif(defined $want_array) {
557           $result[0] = $coderef->(@_);
558       }
559       else {
560           $coderef->(@_);
561       }
562       $self->txn_commit;
563     };
564
565     my $exception = $@;
566     if(!$exception) { return $want_array ? @result : $result[0] }
567
568     if($tried++ > 0 || $self->connected) {
569       eval { $self->txn_rollback };
570       my $rollback_exception = $@;
571       if($rollback_exception) {
572         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
573         $self->throw_exception($exception)  # propagate nested rollback
574           if $rollback_exception =~ /$exception_class/;
575
576         $self->throw_exception(
577           "Transaction aborted: ${exception}. "
578           . "Rollback failed: ${rollback_exception}"
579         );
580       }
581       $self->throw_exception($exception)
582     }
583
584     # We were not connected, and was first try - reconnect and retry
585     # via the while loop
586     $self->_populate_dbh;
587   }
588 }
589
590 =head2 disconnect
591
592 Our C<disconnect> method also performs a rollback first if the
593 database is not in C<AutoCommit> mode.
594
595 =cut
596
597 sub disconnect {
598   my ($self) = @_;
599
600   if( $self->connected ) {
601     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
602     $self->_dbh->disconnect;
603     $self->_dbh(undef);
604     $self->{_dbh_gen}++;
605   }
606 }
607
608 sub connected {
609   my ($self) = @_;
610
611   if(my $dbh = $self->_dbh) {
612       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
613           $self->_dbh(undef);
614           $self->{_dbh_gen}++;
615           return;
616       }
617       else {
618           $self->_verify_pid;
619       }
620       return ($dbh->FETCH('Active') && $dbh->ping);
621   }
622
623   return 0;
624 }
625
626 # handle pid changes correctly
627 #  NOTE: assumes $self->_dbh is a valid $dbh
628 sub _verify_pid {
629   my ($self) = @_;
630
631   return if $self->_conn_pid == $$;
632
633   $self->_dbh->{InactiveDestroy} = 1;
634   $self->_dbh(undef);
635   $self->{_dbh_gen}++;
636
637   return;
638 }
639
640 sub ensure_connected {
641   my ($self) = @_;
642
643   unless ($self->connected) {
644     $self->_populate_dbh;
645   }
646 }
647
648 =head2 dbh
649
650 Returns the dbh - a data base handle of class L<DBI>.
651
652 =cut
653
654 sub dbh {
655   my ($self) = @_;
656
657   $self->ensure_connected;
658   return $self->_dbh;
659 }
660
661 sub _sql_maker_args {
662     my ($self) = @_;
663     
664     return ( bindtype=>'columns', limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
665 }
666
667 sub sql_maker {
668   my ($self) = @_;
669   unless ($self->_sql_maker) {
670     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
671   }
672   return $self->_sql_maker;
673 }
674
675 sub _populate_dbh {
676   my ($self) = @_;
677   my @info = @{$self->_connect_info || []};
678   $self->_dbh($self->_connect(@info));
679
680   if(ref $self eq 'DBIx::Class::Storage::DBI') {
681     my $driver = $self->_dbh->{Driver}->{Name};
682     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
683       bless $self, "DBIx::Class::Storage::DBI::${driver}";
684       $self->_rebless() if $self->can('_rebless');
685     }
686   }
687
688   # if on-connect sql statements are given execute them
689   foreach my $sql_statement (@{$self->on_connect_do || []}) {
690     $self->debugobj->query_start($sql_statement) if $self->debug();
691     $self->_dbh->do($sql_statement);
692     $self->debugobj->query_end($sql_statement) if $self->debug();
693   }
694
695   $self->_conn_pid($$);
696   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
697 }
698
699 sub _connect {
700   my ($self, @info) = @_;
701
702   $self->throw_exception("You failed to provide any connection info")
703       if !@info;
704
705   my ($old_connect_via, $dbh);
706
707   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
708       $old_connect_via = $DBI::connect_via;
709       $DBI::connect_via = 'connect';
710   }
711
712   eval {
713     if(ref $info[0] eq 'CODE') {
714        $dbh = &{$info[0]}
715     }
716     else {
717        $dbh = DBI->connect(@info);
718        $dbh->{RaiseError} = 1;
719        $dbh->{PrintError} = 0;
720        $dbh->{PrintWarn} = 0;
721     }
722   };
723
724   $DBI::connect_via = $old_connect_via if $old_connect_via;
725
726   if (!$dbh || $@) {
727     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
728   }
729
730   $dbh;
731 }
732
733 sub _dbh_txn_begin {
734   my ($self, $dbh) = @_;
735   if ($dbh->{AutoCommit}) {
736     $self->debugobj->txn_begin()
737       if ($self->debug);
738     $dbh->begin_work;
739   }
740 }
741
742 sub txn_begin {
743   my $self = shift;
744   $self->dbh_do($self->can('_dbh_txn_begin'))
745     if $self->{transaction_depth}++ == 0;
746 }
747
748 sub _dbh_txn_commit {
749   my ($self, $dbh) = @_;
750   if ($self->{transaction_depth} == 0) {
751     unless ($dbh->{AutoCommit}) {
752       $self->debugobj->txn_commit()
753         if ($self->debug);
754       $dbh->commit;
755     }
756   }
757   else {
758     if (--$self->{transaction_depth} == 0) {
759       $self->debugobj->txn_commit()
760         if ($self->debug);
761       $dbh->commit;
762     }
763   }
764 }
765
766 sub txn_commit {
767   my $self = shift;
768   $self->dbh_do($self->can('_dbh_txn_commit'));
769 }
770
771 sub _dbh_txn_rollback {
772   my ($self, $dbh) = @_;
773   if ($self->{transaction_depth} == 0) {
774     unless ($dbh->{AutoCommit}) {
775       $self->debugobj->txn_rollback()
776         if ($self->debug);
777       $dbh->rollback;
778     }
779   }
780   else {
781     if (--$self->{transaction_depth} == 0) {
782       $self->debugobj->txn_rollback()
783         if ($self->debug);
784       $dbh->rollback;
785     }
786     else {
787       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
788     }
789   }
790 }
791
792 sub txn_rollback {
793   my $self = shift;
794
795   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
796   if ($@) {
797     my $error = $@;
798     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
799     $error =~ /$exception_class/ and $self->throw_exception($error);
800     $self->{transaction_depth} = 0;          # ensure that a failed rollback
801     $self->throw_exception($error);          # resets the transaction depth
802   }
803 }
804
805 # This used to be the top-half of _execute.  It was split out to make it
806 #  easier to override in NoBindVars without duping the rest.  It takes up
807 #  all of _execute's args, and emits $sql, @bind.
808 sub _prep_for_execute {
809   my ($self, $op, $extra_bind, $ident, @args) = @_;
810
811   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
812   unshift(@bind, @$extra_bind) if $extra_bind;
813   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
814
815   return ($sql, @bind);
816 }
817
818 sub _execute {
819   my ($self, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
820   
821   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
822   unshift(@bind, @$extra_bind) if $extra_bind;
823   if ($self->debug) {
824       my @debug_bind = map { defined ($_ && $_->[1]) ? qq{'$_->[1]'} : q{'NULL'} } @bind;
825       $self->debugobj->query_start($sql, @debug_bind);
826   }
827   my $sth = eval { $self->sth($sql,$op) };
828
829   if (!$sth || $@) {
830     $self->throw_exception(
831       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
832     );
833   }
834
835   my $rv;
836   if ($sth) {
837     my $time = time();
838         
839     $rv = eval {
840         
841       my $placeholder_index = 1; 
842
843       foreach my $bound (@bind) {
844
845         my $attributes = {};
846         my($column_name, $data) = @$bound;
847
848         if( $bind_attributes ) {
849           $attributes = $bind_attributes->{$column_name}
850           if defined $bind_attributes->{$column_name};
851         }                       
852
853         $data = ref $data ? ''.$data : $data; # stringify args
854
855         $sth->bind_param($placeholder_index, $data, $attributes);
856         $placeholder_index++;
857       }
858       $sth->execute;
859     };
860   
861     if ($@ || !$rv) {
862       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
863     }
864   } else {
865     $self->throw_exception("'$sql' did not generate a statement.");
866   }
867   if ($self->debug) {
868      my @debug_bind = map { defined ($_ && $_->[1]) ? qq{'$_->[1]'} : q{'NULL'} } @bind; 
869       $self->debugobj->query_end($sql, @debug_bind);
870   }
871   return (wantarray ? ($rv, $sth, @bind) : $rv);
872 }
873
874 sub insert {
875   my ($self, $source, $to_insert) = @_;
876   
877   my $ident = $source->from; 
878   my $bind_attributes;
879   foreach my $column ($source->columns) {
880   
881     my $data_type = $source->column_info($column)->{data_type} || '';
882     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
883          if $data_type;
884   } 
885   
886   $self->throw_exception(
887     "Couldn't insert ".join(', ',
888       map "$_ => $to_insert->{$_}", keys %$to_insert
889     )." into ${ident}"
890   ) unless ($self->_execute('insert' => [], $ident, $bind_attributes, $to_insert));
891   return $to_insert;
892 }
893
894 ## Still not quite perfect, and EXPERIMENTAL
895 ## Currently it is assumed that all values passed will be "normal", i.e. not 
896 ## scalar refs, or at least, all the same type as the first set, the statement is
897 ## only prepped once.
898 sub insert_bulk {
899   my ($self, $table, $cols, $data) = @_;
900   my %colvalues;
901   @colvalues{@$cols} = (0..$#$cols);
902   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
903   
904   ##need this to support using bindtype=>columns for sql abstract
905   @bind = map {$_->[1]} @bind;
906
907   if ($self->debug) {
908       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
909       $self->debugobj->query_start($sql, @debug_bind);
910   }
911   my $sth = $self->sth($sql);
912
913 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
914
915   my $rv;
916   ## This must be an arrayref, else nothing works!
917   my $tuple_status = [];
918 #  use Data::Dumper;
919 #  print STDERR Dumper($data);
920   if ($sth) {
921     my $time = time();
922     $rv = eval { $sth->execute_array({ ArrayTupleFetch => sub { my $values = shift @$data;  return if !$values; return [ @{$values}[@bind] ]},
923                                        ArrayTupleStatus => $tuple_status }) };
924 # print STDERR Dumper($tuple_status);
925 # print STDERR "RV: $rv\n";
926     if ($@ || !defined $rv) {
927       my $errors = '';
928       foreach my $tuple (@$tuple_status)
929       {
930           $errors .= "\n" . $tuple->[1] if(ref $tuple);
931       }
932       $self->throw_exception("Error executing '$sql': ".($@ || $errors));
933     }
934   } else {
935     $self->throw_exception("'$sql' did not generate a statement.");
936   }
937   if ($self->debug) {
938       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
939       $self->debugobj->query_end($sql, @debug_bind);
940   }
941   return (wantarray ? ($rv, $sth, @bind) : $rv);
942 }
943
944 sub update {
945   my $self = shift @_;
946   my $source = shift @_;
947   
948   my $bind_attributes;
949   foreach my $column ($source->columns) {
950   
951     my $data_type = $source->column_info($column)->{data_type} || '';
952     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
953          if $data_type;
954   }
955
956   my $ident = $source->from;
957   return $self->_execute('update' => [], $ident, $bind_attributes, @_);
958 }
959
960
961 sub delete {
962   my $self = shift @_;
963   my $source = shift @_;
964   
965   my $bind_attrs = {}; ## If ever it's needed...
966   my $ident = $source->from;
967   
968   return $self->_execute('delete' => [], $ident, $bind_attrs, @_);
969 }
970
971 sub _select {
972   my ($self, $ident, $select, $condition, $attrs) = @_;
973   my $order = $attrs->{order_by};
974   if (ref $condition eq 'SCALAR') {
975     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
976   }
977   if (exists $attrs->{group_by} || $attrs->{having}) {
978     $order = {
979       group_by => $attrs->{group_by},
980       having => $attrs->{having},
981       ($order ? (order_by => $order) : ())
982     };
983   }
984   my $bind_attrs = {}; ## Future support
985   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
986   if ($attrs->{software_limit} ||
987       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
988         $attrs->{software_limit} = 1;
989   } else {
990     $self->throw_exception("rows attribute must be positive if present")
991       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
992     push @args, $attrs->{rows}, $attrs->{offset};
993   }
994   return $self->_execute(@args);
995 }
996
997 =head2 select
998
999 =over 4
1000
1001 =item Arguments: $ident, $select, $condition, $attrs
1002
1003 =back
1004
1005 Handle a SQL select statement.
1006
1007 =cut
1008
1009 sub select {
1010   my $self = shift;
1011   my ($ident, $select, $condition, $attrs) = @_;
1012   return $self->cursor->new($self, \@_, $attrs);
1013 }
1014
1015 sub select_single {
1016   my $self = shift;
1017   my ($rv, $sth, @bind) = $self->_select(@_);
1018   my @row = $sth->fetchrow_array;
1019   # Need to call finish() to work round broken DBDs
1020   $sth->finish();
1021   return @row;
1022 }
1023
1024 =head2 sth
1025
1026 =over 4
1027
1028 =item Arguments: $sql
1029
1030 =back
1031
1032 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1033
1034 =cut
1035
1036 sub _dbh_sth {
1037   my ($self, $dbh, $sql) = @_;
1038
1039   # 3 is the if_active parameter which avoids active sth re-use
1040   my $sth = $self->disable_sth_caching
1041     ? $dbh->prepare($sql)
1042     : $dbh->prepare_cached($sql, {}, 3);
1043
1044   $self->throw_exception(
1045     'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
1046   ) if !$sth;
1047
1048   $sth;
1049 }
1050
1051 sub sth {
1052   my ($self, $sql) = @_;
1053   $self->dbh_do($self->can('_dbh_sth'), $sql);
1054 }
1055
1056 sub _dbh_columns_info_for {
1057   my ($self, $dbh, $table) = @_;
1058
1059   if ($dbh->can('column_info')) {
1060     my %result;
1061     eval {
1062       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1063       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1064       $sth->execute();
1065       while ( my $info = $sth->fetchrow_hashref() ){
1066         my %column_info;
1067         $column_info{data_type}   = $info->{TYPE_NAME};
1068         $column_info{size}      = $info->{COLUMN_SIZE};
1069         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1070         $column_info{default_value} = $info->{COLUMN_DEF};
1071         my $col_name = $info->{COLUMN_NAME};
1072         $col_name =~ s/^\"(.*)\"$/$1/;
1073
1074         $result{$col_name} = \%column_info;
1075       }
1076     };
1077     return \%result if !$@ && scalar keys %result;
1078   }
1079
1080   my %result;
1081   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
1082   $sth->execute;
1083   my @columns = @{$sth->{NAME_lc}};
1084   for my $i ( 0 .. $#columns ){
1085     my %column_info;
1086     my $type_num = $sth->{TYPE}->[$i];
1087     my $type_name;
1088     if(defined $type_num && $dbh->can('type_info')) {
1089       my $type_info = $dbh->type_info($type_num);
1090       $type_name = $type_info->{TYPE_NAME} if $type_info;
1091     }
1092     $column_info{data_type} = $type_name ? $type_name : $type_num;
1093     $column_info{size} = $sth->{PRECISION}->[$i];
1094     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1095
1096     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1097       $column_info{data_type} = $1;
1098       $column_info{size}    = $2;
1099     }
1100
1101     $result{$columns[$i]} = \%column_info;
1102   }
1103
1104   return \%result;
1105 }
1106
1107 sub columns_info_for {
1108   my ($self, $table) = @_;
1109   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
1110 }
1111
1112 =head2 last_insert_id
1113
1114 Return the row id of the last insert.
1115
1116 =cut
1117
1118 sub _dbh_last_insert_id {
1119     my ($self, $dbh, $source, $col) = @_;
1120     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1121     $dbh->func('last_insert_rowid');
1122 }
1123
1124 sub last_insert_id {
1125   my $self = shift;
1126   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1127 }
1128
1129 =head2 sqlt_type
1130
1131 Returns the database driver name.
1132
1133 =cut
1134
1135 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1136
1137 =head2 bind_attribute_by_data_type
1138
1139 Given a datatype from column info, returns a database specific bind attribute for
1140 $dbh->bind_param($val,$attribute) or nothing if we will let the database planner
1141 just handle it.
1142
1143 Generally only needed for special case column types, like bytea in postgres.
1144
1145 =cut
1146
1147 sub bind_attribute_by_data_type {
1148     return;
1149 }
1150
1151 =head2 create_ddl_dir (EXPERIMENTAL)
1152
1153 =over 4
1154
1155 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1156
1157 =back
1158
1159 Creates a SQL file based on the Schema, for each of the specified
1160 database types, in the given directory.
1161
1162 Note that this feature is currently EXPERIMENTAL and may not work correctly
1163 across all databases, or fully handle complex relationships.
1164
1165 =cut
1166
1167 sub create_ddl_dir
1168 {
1169   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1170
1171   if(!$dir || !-d $dir)
1172   {
1173     warn "No directory given, using ./\n";
1174     $dir = "./";
1175   }
1176   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1177   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1178   $version ||= $schema->VERSION || '1.x';
1179   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1180
1181   eval "use SQL::Translator";
1182   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1183
1184   my $sqlt = SQL::Translator->new($sqltargs);
1185   foreach my $db (@$databases)
1186   {
1187     $sqlt->reset();
1188     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1189 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1190     $sqlt->data($schema);
1191     $sqlt->producer($db);
1192
1193     my $file;
1194     my $filename = $schema->ddl_filename($db, $dir, $version);
1195     if(-e $filename)
1196     {
1197       $self->throw_exception("$filename already exists, skipping $db");
1198       next;
1199     }
1200     open($file, ">$filename") 
1201       or $self->throw_exception("Can't open $filename for writing ($!)");
1202     my $output = $sqlt->translate;
1203 #use Data::Dumper;
1204 #    print join(":", keys %{$schema->source_registrations});
1205 #    print Dumper($sqlt->schema);
1206     if(!$output)
1207     {
1208       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1209       next;
1210     }
1211     print $file $output;
1212     close($file);
1213   }
1214
1215 }
1216
1217 =head2 deployment_statements
1218
1219 =over 4
1220
1221 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1222
1223 =back
1224
1225 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1226 The database driver name is given by C<$type>, though the value from
1227 L</sqlt_type> is used if it is not specified.
1228
1229 C<$directory> is used to return statements from files in a previously created
1230 L</create_ddl_dir> directory and is optional. The filenames are constructed
1231 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1232
1233 If no C<$directory> is specified then the statements are constructed on the
1234 fly using L<SQL::Translator> and C<$version> is ignored.
1235
1236 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1237
1238 =cut
1239
1240 sub deployment_statements {
1241   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1242   # Need to be connected to get the correct sqlt_type
1243   $self->ensure_connected() unless $type;
1244   $type ||= $self->sqlt_type;
1245   $version ||= $schema->VERSION || '1.x';
1246   $dir ||= './';
1247   eval "use SQL::Translator";
1248   if(!$@)
1249   {
1250     eval "use SQL::Translator::Parser::DBIx::Class;";
1251     $self->throw_exception($@) if $@;
1252     eval "use SQL::Translator::Producer::${type};";
1253     $self->throw_exception($@) if $@;
1254     my $tr = SQL::Translator->new(%$sqltargs);
1255     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1256     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1257   }
1258
1259   my $filename = $schema->ddl_filename($type, $dir, $version);
1260   if(!-f $filename)
1261   {
1262 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1263       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1264       return;
1265   }
1266   my $file;
1267   open($file, "<$filename") 
1268       or $self->throw_exception("Can't open $filename ($!)");
1269   my @rows = <$file>;
1270   close($file);
1271
1272   return join('', @rows);
1273   
1274 }
1275
1276 sub deploy {
1277   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1278   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1279     for ( split(";\n", $statement)) {
1280       next if($_ =~ /^--/);
1281       next if(!$_);
1282 #      next if($_ =~ /^DROP/m);
1283       next if($_ =~ /^BEGIN TRANSACTION/m);
1284       next if($_ =~ /^COMMIT/m);
1285       next if $_ =~ /^\s+$/; # skip whitespace only
1286       $self->debugobj->query_start($_) if $self->debug;
1287       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1288       $self->debugobj->query_end($_) if $self->debug;
1289     }
1290   }
1291 }
1292
1293 =head2 datetime_parser
1294
1295 Returns the datetime parser class
1296
1297 =cut
1298
1299 sub datetime_parser {
1300   my $self = shift;
1301   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1302 }
1303
1304 =head2 datetime_parser_type
1305
1306 Defines (returns) the datetime parser class - currently hardwired to
1307 L<DateTime::Format::MySQL>
1308
1309 =cut
1310
1311 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1312
1313 =head2 build_datetime_parser
1314
1315 See L</datetime_parser>
1316
1317 =cut
1318
1319 sub build_datetime_parser {
1320   my $self = shift;
1321   my $type = $self->datetime_parser_type(@_);
1322   eval "use ${type}";
1323   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1324   return $type;
1325 }
1326
1327 sub DESTROY {
1328   my $self = shift;
1329   return if !$self->_dbh;
1330   $self->_verify_pid;
1331   $self->_dbh(undef);
1332 }
1333
1334 1;
1335
1336 =head1 SQL METHODS
1337
1338 The module defines a set of methods within the DBIC::SQL::Abstract
1339 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1340 SQL query functions.
1341
1342 The following methods are extended:-
1343
1344 =over 4
1345
1346 =item delete
1347
1348 =item insert
1349
1350 =item select
1351
1352 =item update
1353
1354 =item limit_dialect
1355
1356 See L</connect_info> for details.
1357 For setting, this method is deprecated in favor of L</connect_info>.
1358
1359 =item quote_char
1360
1361 See L</connect_info> for details.
1362 For setting, this method is deprecated in favor of L</connect_info>.
1363
1364 =item name_sep
1365
1366 See L</connect_info> for details.
1367 For setting, this method is deprecated in favor of L</connect_info>.
1368
1369 =back
1370
1371 =head1 AUTHORS
1372
1373 Matt S. Trout <mst@shadowcatsystems.co.uk>
1374
1375 Andy Grundman <andy@hybridized.org>
1376
1377 =head1 LICENSE
1378
1379 You may distribute this code under the same terms as Perl itself.
1380
1381 =cut