changed storage->insert|update|delete to accept the source object directly and to...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        disable_sth_caching cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 use Scalar::Util 'blessed';
62 sub _find_syntax {
63   my ($self, $syntax) = @_;
64   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
65 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
66   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
67     return 'RowNumberOver';
68   }
69
70   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
71 }
72
73 sub select {
74   my ($self, $table, $fields, $where, $order, @rest) = @_;
75   $table = $self->_quote($table) unless ref($table);
76   local $self->{rownum_hack_count} = 1
77     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
78   @rest = (-1) unless defined $rest[0];
79   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
80     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
81   local $self->{having_bind} = [];
82   my ($sql, @ret) = $self->SUPER::select(
83     $table, $self->_recurse_fields($fields), $where, $order, @rest
84   );
85   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
86 }
87
88 sub insert {
89   my $self = shift;
90   my $table = shift;
91   $table = $self->_quote($table) unless ref($table);
92   $self->SUPER::insert($table, @_);
93 }
94
95 sub update {
96   my $self = shift;
97   my $table = shift;
98   $table = $self->_quote($table) unless ref($table);
99   $self->SUPER::update($table, @_);
100 }
101
102 sub delete {
103   my $self = shift;
104   my $table = shift;
105   $table = $self->_quote($table) unless ref($table);
106   $self->SUPER::delete($table, @_);
107 }
108
109 sub _emulate_limit {
110   my $self = shift;
111   if ($_[3] == -1) {
112     return $_[1].$self->_order_by($_[2]);
113   } else {
114     return $self->SUPER::_emulate_limit(@_);
115   }
116 }
117
118 sub _recurse_fields {
119   my ($self, $fields) = @_;
120   my $ref = ref $fields;
121   return $self->_quote($fields) unless $ref;
122   return $$fields if $ref eq 'SCALAR';
123
124   if ($ref eq 'ARRAY') {
125     return join(', ', map {
126       $self->_recurse_fields($_)
127       .(exists $self->{rownum_hack_count}
128          ? ' AS col'.$self->{rownum_hack_count}++
129          : '')
130      } @$fields);
131   } elsif ($ref eq 'HASH') {
132     foreach my $func (keys %$fields) {
133       return $self->_sqlcase($func)
134         .'( '.$self->_recurse_fields($fields->{$func}).' )';
135     }
136   }
137 }
138
139 sub _order_by {
140   my $self = shift;
141   my $ret = '';
142   my @extra;
143   if (ref $_[0] eq 'HASH') {
144     if (defined $_[0]->{group_by}) {
145       $ret = $self->_sqlcase(' group by ')
146                .$self->_recurse_fields($_[0]->{group_by});
147     }
148     if (defined $_[0]->{having}) {
149       my $frag;
150       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
151       push(@{$self->{having_bind}}, @extra);
152       $ret .= $self->_sqlcase(' having ').$frag;
153     }
154     if (defined $_[0]->{order_by}) {
155       $ret .= $self->_order_by($_[0]->{order_by});
156     }
157   } elsif (ref $_[0] eq 'SCALAR') {
158     $ret = $self->_sqlcase(' order by ').${ $_[0] };
159   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
160     my @order = @{+shift};
161     $ret = $self->_sqlcase(' order by ')
162           .join(', ', map {
163                         my $r = $self->_order_by($_, @_);
164                         $r =~ s/^ ?ORDER BY //i;
165                         $r;
166                       } @order);
167   } else {
168     $ret = $self->SUPER::_order_by(@_);
169   }
170   return $ret;
171 }
172
173 sub _order_directions {
174   my ($self, $order) = @_;
175   $order = $order->{order_by} if ref $order eq 'HASH';
176   return $self->SUPER::_order_directions($order);
177 }
178
179 sub _table {
180   my ($self, $from) = @_;
181   if (ref $from eq 'ARRAY') {
182     return $self->_recurse_from(@$from);
183   } elsif (ref $from eq 'HASH') {
184     return $self->_make_as($from);
185   } else {
186     return $from; # would love to quote here but _table ends up getting called
187                   # twice during an ->select without a limit clause due to
188                   # the way S::A::Limit->select works. should maybe consider
189                   # bypassing this and doing S::A::select($self, ...) in
190                   # our select method above. meantime, quoting shims have
191                   # been added to select/insert/update/delete here
192   }
193 }
194
195 sub _recurse_from {
196   my ($self, $from, @join) = @_;
197   my @sqlf;
198   push(@sqlf, $self->_make_as($from));
199   foreach my $j (@join) {
200     my ($to, $on) = @$j;
201
202     # check whether a join type exists
203     my $join_clause = '';
204     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
205     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
206       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
207     } else {
208       $join_clause = ' JOIN ';
209     }
210     push(@sqlf, $join_clause);
211
212     if (ref $to eq 'ARRAY') {
213       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
214     } else {
215       push(@sqlf, $self->_make_as($to));
216     }
217     push(@sqlf, ' ON ', $self->_join_condition($on));
218   }
219   return join('', @sqlf);
220 }
221
222 sub _make_as {
223   my ($self, $from) = @_;
224   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
225                      reverse each %{$self->_skip_options($from)});
226 }
227
228 sub _skip_options {
229   my ($self, $hash) = @_;
230   my $clean_hash = {};
231   $clean_hash->{$_} = $hash->{$_}
232     for grep {!/^-/} keys %$hash;
233   return $clean_hash;
234 }
235
236 sub _join_condition {
237   my ($self, $cond) = @_;
238   if (ref $cond eq 'HASH') {
239     my %j;
240     for (keys %$cond) {
241       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
242     };
243     return $self->_recurse_where(\%j);
244   } elsif (ref $cond eq 'ARRAY') {
245     return join(' OR ', map { $self->_join_condition($_) } @$cond);
246   } else {
247     die "Can't handle this yet!";
248   }
249 }
250
251 sub _quote {
252   my ($self, $label) = @_;
253   return '' unless defined $label;
254   return "*" if $label eq '*';
255   return $label unless $self->{quote_char};
256   if(ref $self->{quote_char} eq "ARRAY"){
257     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
258       if !defined $self->{name_sep};
259     my $sep = $self->{name_sep};
260     return join($self->{name_sep},
261         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
262        split(/\Q$sep\E/,$label));
263   }
264   return $self->SUPER::_quote($label);
265 }
266
267 sub limit_dialect {
268     my $self = shift;
269     $self->{limit_dialect} = shift if @_;
270     return $self->{limit_dialect};
271 }
272
273 sub quote_char {
274     my $self = shift;
275     $self->{quote_char} = shift if @_;
276     return $self->{quote_char};
277 }
278
279 sub name_sep {
280     my $self = shift;
281     $self->{name_sep} = shift if @_;
282     return $self->{name_sep};
283 }
284
285 } # End of BEGIN block
286
287 =head1 NAME
288
289 DBIx::Class::Storage::DBI - DBI storage handler
290
291 =head1 SYNOPSIS
292
293 =head1 DESCRIPTION
294
295 This class represents the connection to an RDBMS via L<DBI>.  See
296 L<DBIx::Class::Storage> for general information.  This pod only
297 documents DBI-specific methods and behaviors.
298
299 =head1 METHODS
300
301 =cut
302
303 sub new {
304   my $new = shift->next::method(@_);
305
306   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
307   $new->transaction_depth(0);
308   $new->_sql_maker_opts({});
309   $new->{_in_dbh_do} = 0;
310   $new->{_dbh_gen} = 0;
311
312   $new;
313 }
314
315 =head2 connect_info
316
317 The arguments of C<connect_info> are always a single array reference.
318
319 This is normally accessed via L<DBIx::Class::Schema/connection>, which
320 encapsulates its argument list in an arrayref before calling
321 C<connect_info> here.
322
323 The arrayref can either contain the same set of arguments one would
324 normally pass to L<DBI/connect>, or a lone code reference which returns
325 a connected database handle.
326
327 In either case, if the final argument in your connect_info happens
328 to be a hashref, C<connect_info> will look there for several
329 connection-specific options:
330
331 =over 4
332
333 =item on_connect_do
334
335 This can be set to an arrayref of literal sql statements, which will
336 be executed immediately after making the connection to the database
337 every time we [re-]connect.
338
339 =item disable_sth_caching
340
341 If set to a true value, this option will disable the caching of
342 statement handles via L<DBI/prepare_cached>.
343
344 =item limit_dialect 
345
346 Sets the limit dialect. This is useful for JDBC-bridge among others
347 where the remote SQL-dialect cannot be determined by the name of the
348 driver alone.
349
350 =item quote_char
351
352 Specifies what characters to use to quote table and column names. If 
353 you use this you will want to specify L<name_sep> as well.
354
355 quote_char expects either a single character, in which case is it is placed
356 on either side of the table/column, or an arrayref of length 2 in which case the
357 table/column name is placed between the elements.
358
359 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
360 use C<quote_char =E<gt> [qw/[ ]/]>.
361
362 =item name_sep
363
364 This only needs to be used in conjunction with L<quote_char>, and is used to 
365 specify the charecter that seperates elements (schemas, tables, columns) from 
366 each other. In most cases this is simply a C<.>.
367
368 =back
369
370 These options can be mixed in with your other L<DBI> connection attributes,
371 or placed in a seperate hashref after all other normal L<DBI> connection
372 arguments.
373
374 Every time C<connect_info> is invoked, any previous settings for
375 these options will be cleared before setting the new ones, regardless of
376 whether any options are specified in the new C<connect_info>.
377
378 Important note:  DBIC expects the returned database handle provided by 
379 a subref argument to have RaiseError set on it.  If it doesn't, things
380 might not work very well, YMMV.  If you don't use a subref, DBIC will
381 force this setting for you anyways.  Setting HandleError to anything
382 other than simple exception object wrapper might cause problems too.
383
384 Examples:
385
386   # Simple SQLite connection
387   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
388
389   # Connect via subref
390   ->connect_info([ sub { DBI->connect(...) } ]);
391
392   # A bit more complicated
393   ->connect_info(
394     [
395       'dbi:Pg:dbname=foo',
396       'postgres',
397       'my_pg_password',
398       { AutoCommit => 0 },
399       { quote_char => q{"}, name_sep => q{.} },
400     ]
401   );
402
403   # Equivalent to the previous example
404   ->connect_info(
405     [
406       'dbi:Pg:dbname=foo',
407       'postgres',
408       'my_pg_password',
409       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
410     ]
411   );
412
413   # Subref + DBIC-specific connection options
414   ->connect_info(
415     [
416       sub { DBI->connect(...) },
417       {
418           quote_char => q{`},
419           name_sep => q{@},
420           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
421           disable_sth_caching => 1,
422       },
423     ]
424   );
425
426 =cut
427
428 sub connect_info {
429   my ($self, $info_arg) = @_;
430
431   return $self->_connect_info if !$info_arg;
432
433   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
434   #  the new set of options
435   $self->_sql_maker(undef);
436   $self->_sql_maker_opts({});
437
438   my $info = [ @$info_arg ]; # copy because we can alter it
439   my $last_info = $info->[-1];
440   if(ref $last_info eq 'HASH') {
441     for my $storage_opt (qw/on_connect_do disable_sth_caching/) {
442       if(my $value = delete $last_info->{$storage_opt}) {
443         $self->$storage_opt($value);
444       }
445     }
446     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
447       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
448         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
449       }
450     }
451
452     # Get rid of any trailing empty hashref
453     pop(@$info) if !keys %$last_info;
454   }
455
456   $self->_connect_info($info);
457 }
458
459 =head2 on_connect_do
460
461 This method is deprecated in favor of setting via L</connect_info>.
462
463 =head2 dbh_do
464
465 Arguments: $subref, @extra_coderef_args?
466
467 Execute the given subref using the new exception-based connection management.
468
469 The first two arguments will be the storage object that C<dbh_do> was called
470 on and a database handle to use.  Any additional arguments will be passed
471 verbatim to the called subref as arguments 2 and onwards.
472
473 Using this (instead of $self->_dbh or $self->dbh) ensures correct
474 exception handling and reconnection (or failover in future subclasses).
475
476 Your subref should have no side-effects outside of the database, as
477 there is the potential for your subref to be partially double-executed
478 if the database connection was stale/dysfunctional.
479
480 Example:
481
482   my @stuff = $schema->storage->dbh_do(
483     sub {
484       my ($storage, $dbh, @cols) = @_;
485       my $cols = join(q{, }, @cols);
486       $dbh->selectrow_array("SELECT $cols FROM foo");
487     },
488     @column_list
489   );
490
491 =cut
492
493 sub dbh_do {
494   my $self = shift;
495   my $coderef = shift;
496
497   ref $coderef eq 'CODE' or $self->throw_exception
498     ('$coderef must be a CODE reference');
499
500   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
501   local $self->{_in_dbh_do} = 1;
502
503   my @result;
504   my $want_array = wantarray;
505
506   eval {
507     $self->_verify_pid if $self->_dbh;
508     $self->_populate_dbh if !$self->_dbh;
509     if($want_array) {
510         @result = $coderef->($self, $self->_dbh, @_);
511     }
512     elsif(defined $want_array) {
513         $result[0] = $coderef->($self, $self->_dbh, @_);
514     }
515     else {
516         $coderef->($self, $self->_dbh, @_);
517     }
518   };
519
520   my $exception = $@;
521   if(!$exception) { return $want_array ? @result : $result[0] }
522
523   $self->throw_exception($exception) if $self->connected;
524
525   # We were not connected - reconnect and retry, but let any
526   #  exception fall right through this time
527   $self->_populate_dbh;
528   $coderef->($self, $self->_dbh, @_);
529 }
530
531 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
532 # It also informs dbh_do to bypass itself while under the direction of txn_do,
533 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
534 sub txn_do {
535   my $self = shift;
536   my $coderef = shift;
537
538   ref $coderef eq 'CODE' or $self->throw_exception
539     ('$coderef must be a CODE reference');
540
541   local $self->{_in_dbh_do} = 1;
542
543   my @result;
544   my $want_array = wantarray;
545
546   my $tried = 0;
547   while(1) {
548     eval {
549       $self->_verify_pid if $self->_dbh;
550       $self->_populate_dbh if !$self->_dbh;
551
552       $self->txn_begin;
553       if($want_array) {
554           @result = $coderef->(@_);
555       }
556       elsif(defined $want_array) {
557           $result[0] = $coderef->(@_);
558       }
559       else {
560           $coderef->(@_);
561       }
562       $self->txn_commit;
563     };
564
565     my $exception = $@;
566     if(!$exception) { return $want_array ? @result : $result[0] }
567
568     if($tried++ > 0 || $self->connected) {
569       eval { $self->txn_rollback };
570       my $rollback_exception = $@;
571       if($rollback_exception) {
572         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
573         $self->throw_exception($exception)  # propagate nested rollback
574           if $rollback_exception =~ /$exception_class/;
575
576         $self->throw_exception(
577           "Transaction aborted: ${exception}. "
578           . "Rollback failed: ${rollback_exception}"
579         );
580       }
581       $self->throw_exception($exception)
582     }
583
584     # We were not connected, and was first try - reconnect and retry
585     # via the while loop
586     $self->_populate_dbh;
587   }
588 }
589
590 =head2 disconnect
591
592 Our C<disconnect> method also performs a rollback first if the
593 database is not in C<AutoCommit> mode.
594
595 =cut
596
597 sub disconnect {
598   my ($self) = @_;
599
600   if( $self->connected ) {
601     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
602     $self->_dbh->disconnect;
603     $self->_dbh(undef);
604     $self->{_dbh_gen}++;
605   }
606 }
607
608 sub connected {
609   my ($self) = @_;
610
611   if(my $dbh = $self->_dbh) {
612       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
613           $self->_dbh(undef);
614           $self->{_dbh_gen}++;
615           return;
616       }
617       else {
618           $self->_verify_pid;
619       }
620       return ($dbh->FETCH('Active') && $dbh->ping);
621   }
622
623   return 0;
624 }
625
626 # handle pid changes correctly
627 #  NOTE: assumes $self->_dbh is a valid $dbh
628 sub _verify_pid {
629   my ($self) = @_;
630
631   return if $self->_conn_pid == $$;
632
633   $self->_dbh->{InactiveDestroy} = 1;
634   $self->_dbh(undef);
635   $self->{_dbh_gen}++;
636
637   return;
638 }
639
640 sub ensure_connected {
641   my ($self) = @_;
642
643   unless ($self->connected) {
644     $self->_populate_dbh;
645   }
646 }
647
648 =head2 dbh
649
650 Returns the dbh - a data base handle of class L<DBI>.
651
652 =cut
653
654 sub dbh {
655   my ($self) = @_;
656
657   $self->ensure_connected;
658   return $self->_dbh;
659 }
660
661 sub _sql_maker_args {
662     my ($self) = @_;
663     
664     return ( bindtype=>'columns', limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
665 }
666
667 sub sql_maker {
668   my ($self) = @_;
669   unless ($self->_sql_maker) {
670     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
671   }
672   return $self->_sql_maker;
673 }
674
675 sub _populate_dbh {
676   my ($self) = @_;
677   my @info = @{$self->_connect_info || []};
678   $self->_dbh($self->_connect(@info));
679
680   if(ref $self eq 'DBIx::Class::Storage::DBI') {
681     my $driver = $self->_dbh->{Driver}->{Name};
682     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
683       bless $self, "DBIx::Class::Storage::DBI::${driver}";
684       $self->_rebless() if $self->can('_rebless');
685     }
686   }
687
688   # if on-connect sql statements are given execute them
689   foreach my $sql_statement (@{$self->on_connect_do || []}) {
690     $self->debugobj->query_start($sql_statement) if $self->debug();
691     $self->_dbh->do($sql_statement);
692     $self->debugobj->query_end($sql_statement) if $self->debug();
693   }
694
695   $self->_conn_pid($$);
696   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
697 }
698
699 sub _connect {
700   my ($self, @info) = @_;
701
702   $self->throw_exception("You failed to provide any connection info")
703       if !@info;
704
705   my ($old_connect_via, $dbh);
706
707   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
708       $old_connect_via = $DBI::connect_via;
709       $DBI::connect_via = 'connect';
710   }
711
712   eval {
713     if(ref $info[0] eq 'CODE') {
714        $dbh = &{$info[0]}
715     }
716     else {
717        $dbh = DBI->connect(@info);
718        $dbh->{RaiseError} = 1;
719        $dbh->{PrintError} = 0;
720        $dbh->{PrintWarn} = 0;
721     }
722   };
723
724   $DBI::connect_via = $old_connect_via if $old_connect_via;
725
726   if (!$dbh || $@) {
727     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
728   }
729
730   $dbh;
731 }
732
733 sub _dbh_txn_begin {
734   my ($self, $dbh) = @_;
735   if ($dbh->{AutoCommit}) {
736     $self->debugobj->txn_begin()
737       if ($self->debug);
738     $dbh->begin_work;
739   }
740 }
741
742 sub txn_begin {
743   my $self = shift;
744   $self->dbh_do($self->can('_dbh_txn_begin'))
745     if $self->{transaction_depth}++ == 0;
746 }
747
748 sub _dbh_txn_commit {
749   my ($self, $dbh) = @_;
750   if ($self->{transaction_depth} == 0) {
751     unless ($dbh->{AutoCommit}) {
752       $self->debugobj->txn_commit()
753         if ($self->debug);
754       $dbh->commit;
755     }
756   }
757   else {
758     if (--$self->{transaction_depth} == 0) {
759       $self->debugobj->txn_commit()
760         if ($self->debug);
761       $dbh->commit;
762     }
763   }
764 }
765
766 sub txn_commit {
767   my $self = shift;
768   $self->dbh_do($self->can('_dbh_txn_commit'));
769 }
770
771 sub _dbh_txn_rollback {
772   my ($self, $dbh) = @_;
773   if ($self->{transaction_depth} == 0) {
774     unless ($dbh->{AutoCommit}) {
775       $self->debugobj->txn_rollback()
776         if ($self->debug);
777       $dbh->rollback;
778     }
779   }
780   else {
781     if (--$self->{transaction_depth} == 0) {
782       $self->debugobj->txn_rollback()
783         if ($self->debug);
784       $dbh->rollback;
785     }
786     else {
787       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
788     }
789   }
790 }
791
792 sub txn_rollback {
793   my $self = shift;
794
795   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
796   if ($@) {
797     my $error = $@;
798     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
799     $error =~ /$exception_class/ and $self->throw_exception($error);
800     $self->{transaction_depth} = 0;          # ensure that a failed rollback
801     $self->throw_exception($error);          # resets the transaction depth
802   }
803 }
804
805 # This used to be the top-half of _execute.  It was split out to make it
806 #  easier to override in NoBindVars without duping the rest.  It takes up
807 #  all of _execute's args, and emits $sql, @bind.
808 sub _prep_for_execute {
809   my ($self, $op, $extra_bind, $ident, @args) = @_;
810
811   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
812   unshift(@bind, @$extra_bind) if $extra_bind;
813   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
814
815   return ($sql, @bind);
816 }
817
818 sub _execute {
819   my ($self, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
820   
821   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
822   unshift(@bind, @$extra_bind) if $extra_bind;
823   if ($self->debug) {
824       my @debug_bind = map { defined ($_ && $_->[1]) ? qq{'$_->[1]'} : q{'NULL'} } @bind;
825       $self->debugobj->query_start($sql, @debug_bind);
826   }
827   my $sth = eval { $self->sth($sql,$op) };
828
829   if (!$sth || $@) {
830     $self->throw_exception(
831       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
832     );
833   }
834
835   my $rv;
836   if ($sth) {
837     my $time = time();
838         
839     $rv = eval {
840         
841       my $placeholder_index = 1; 
842
843       foreach my $bound (@bind) {
844
845         my $attributes = {};
846         my($column_name, $data) = @$bound;
847
848         if( $bind_attributes ) {
849           $attributes = $bind_attributes->{$column_name}
850           if defined $bind_attributes->{$column_name};
851         }                       
852
853         $data = ref $data ? ''.$data : $data; # stringify args
854
855         $sth->bind_param($placeholder_index, $data, $attributes);
856         $placeholder_index++;
857       }
858       $sth->execute;
859     };
860   
861     if ($@ || !$rv) {
862       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
863     }
864   } else {
865     $self->throw_exception("'$sql' did not generate a statement.");
866   }
867   if ($self->debug) {
868      my @debug_bind = map { defined ($_ && $_->[1]) ? qq{'$_->[1]'} : q{'NULL'} } @bind; 
869       $self->debugobj->query_end($sql, @debug_bind);
870   }
871   return (wantarray ? ($rv, $sth, @bind) : $rv);
872 }
873
874 sub insert {
875   my ($self, $source, $to_insert) = @_;
876   
877   my $ident = $source->from; 
878   my $bind_attributes;
879   foreach my $column ($source->columns) {
880   
881     $bind_attributes->{$column} = $source->column_info($column)->{bind_attributes}
882      if defined $source->column_info($column)->{bind_attributes};
883   } 
884   
885   $self->throw_exception(
886     "Couldn't insert ".join(', ',
887       map "$_ => $to_insert->{$_}", keys %$to_insert
888     )." into ${ident}"
889   ) unless ($self->_execute('insert' => [], $ident, $bind_attributes, $to_insert));
890   return $to_insert;
891 }
892
893 ## Still not quite perfect, and EXPERIMENTAL
894 ## Currently it is assumed that all values passed will be "normal", i.e. not 
895 ## scalar refs, or at least, all the same type as the first set, the statement is
896 ## only prepped once.
897 sub insert_bulk {
898   my ($self, $table, $cols, $data) = @_;
899   my %colvalues;
900   @colvalues{@$cols} = (0..$#$cols);
901   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
902   
903   ##need this to support using bindtype=>columns for sql abstract
904   @bind = map {$_->[1]} @bind;
905
906   if ($self->debug) {
907       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
908       $self->debugobj->query_start($sql, @debug_bind);
909   }
910   my $sth = $self->sth($sql);
911
912 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
913
914   my $rv;
915   ## This must be an arrayref, else nothing works!
916   my $tuple_status = [];
917 #  use Data::Dumper;
918 #  print STDERR Dumper($data);
919   if ($sth) {
920     my $time = time();
921     $rv = eval { $sth->execute_array({ ArrayTupleFetch => sub { my $values = shift @$data;  return if !$values; return [ @{$values}[@bind] ]},
922                                        ArrayTupleStatus => $tuple_status }) };
923 # print STDERR Dumper($tuple_status);
924 # print STDERR "RV: $rv\n";
925     if ($@ || !defined $rv) {
926       my $errors = '';
927       foreach my $tuple (@$tuple_status)
928       {
929           $errors .= "\n" . $tuple->[1] if(ref $tuple);
930       }
931       $self->throw_exception("Error executing '$sql': ".($@ || $errors));
932     }
933   } else {
934     $self->throw_exception("'$sql' did not generate a statement.");
935   }
936   if ($self->debug) {
937       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
938       $self->debugobj->query_end($sql, @debug_bind);
939   }
940   return (wantarray ? ($rv, $sth, @bind) : $rv);
941 }
942
943 sub update {
944   my $self = shift @_;
945   my $source = shift @_;
946   
947   my $bind_attributes;
948   foreach my $column ($source->columns) {
949   
950     $bind_attributes->{$column} = $source->column_info($column)->{bind_attributes}
951      if defined $source->column_info($column)->{bind_attributes};
952   }
953
954   my $ident = $source->from;
955   return $self->_execute('update' => [], $ident, $bind_attributes, @_);
956 }
957
958
959 sub delete {
960   my $self = shift @_;
961   my $source = shift @_;
962   
963   my $bind_attrs = {}; ## If ever it's needed...
964   my $ident = $source->from;
965   
966   return $self->_execute('delete' => [], $ident, $bind_attrs, @_);
967 }
968
969 sub _select {
970   my ($self, $ident, $select, $condition, $attrs) = @_;
971   my $order = $attrs->{order_by};
972   if (ref $condition eq 'SCALAR') {
973     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
974   }
975   if (exists $attrs->{group_by} || $attrs->{having}) {
976     $order = {
977       group_by => $attrs->{group_by},
978       having => $attrs->{having},
979       ($order ? (order_by => $order) : ())
980     };
981   }
982   my $bind_attrs = {}; ## Future support
983   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
984   if ($attrs->{software_limit} ||
985       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
986         $attrs->{software_limit} = 1;
987   } else {
988     $self->throw_exception("rows attribute must be positive if present")
989       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
990     push @args, $attrs->{rows}, $attrs->{offset};
991   }
992   return $self->_execute(@args);
993 }
994
995 =head2 select
996
997 =over 4
998
999 =item Arguments: $ident, $select, $condition, $attrs
1000
1001 =back
1002
1003 Handle a SQL select statement.
1004
1005 =cut
1006
1007 sub select {
1008   my $self = shift;
1009   my ($ident, $select, $condition, $attrs) = @_;
1010   return $self->cursor->new($self, \@_, $attrs);
1011 }
1012
1013 sub select_single {
1014   my $self = shift;
1015   my ($rv, $sth, @bind) = $self->_select(@_);
1016   my @row = $sth->fetchrow_array;
1017   # Need to call finish() to work round broken DBDs
1018   $sth->finish();
1019   return @row;
1020 }
1021
1022 =head2 sth
1023
1024 =over 4
1025
1026 =item Arguments: $sql
1027
1028 =back
1029
1030 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1031
1032 =cut
1033
1034 sub _dbh_sth {
1035   my ($self, $dbh, $sql) = @_;
1036
1037   # 3 is the if_active parameter which avoids active sth re-use
1038   my $sth = $self->disable_sth_caching
1039     ? $dbh->prepare($sql)
1040     : $dbh->prepare_cached($sql, {}, 3);
1041
1042   $self->throw_exception(
1043     'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
1044   ) if !$sth;
1045
1046   $sth;
1047 }
1048
1049 sub sth {
1050   my ($self, $sql) = @_;
1051   $self->dbh_do($self->can('_dbh_sth'), $sql);
1052 }
1053
1054 sub _dbh_columns_info_for {
1055   my ($self, $dbh, $table) = @_;
1056
1057   if ($dbh->can('column_info')) {
1058     my %result;
1059     eval {
1060       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1061       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1062       $sth->execute();
1063       while ( my $info = $sth->fetchrow_hashref() ){
1064         my %column_info;
1065         $column_info{data_type}   = $info->{TYPE_NAME};
1066         $column_info{size}      = $info->{COLUMN_SIZE};
1067         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1068         $column_info{default_value} = $info->{COLUMN_DEF};
1069         my $col_name = $info->{COLUMN_NAME};
1070         $col_name =~ s/^\"(.*)\"$/$1/;
1071
1072         $result{$col_name} = \%column_info;
1073       }
1074     };
1075     return \%result if !$@ && scalar keys %result;
1076   }
1077
1078   my %result;
1079   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
1080   $sth->execute;
1081   my @columns = @{$sth->{NAME_lc}};
1082   for my $i ( 0 .. $#columns ){
1083     my %column_info;
1084     my $type_num = $sth->{TYPE}->[$i];
1085     my $type_name;
1086     if(defined $type_num && $dbh->can('type_info')) {
1087       my $type_info = $dbh->type_info($type_num);
1088       $type_name = $type_info->{TYPE_NAME} if $type_info;
1089     }
1090     $column_info{data_type} = $type_name ? $type_name : $type_num;
1091     $column_info{size} = $sth->{PRECISION}->[$i];
1092     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1093
1094     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1095       $column_info{data_type} = $1;
1096       $column_info{size}    = $2;
1097     }
1098
1099     $result{$columns[$i]} = \%column_info;
1100   }
1101
1102   return \%result;
1103 }
1104
1105 sub columns_info_for {
1106   my ($self, $table) = @_;
1107   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
1108 }
1109
1110 =head2 last_insert_id
1111
1112 Return the row id of the last insert.
1113
1114 =cut
1115
1116 sub _dbh_last_insert_id {
1117     my ($self, $dbh, $source, $col) = @_;
1118     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1119     $dbh->func('last_insert_rowid');
1120 }
1121
1122 sub last_insert_id {
1123   my $self = shift;
1124   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1125 }
1126
1127 =head2 sqlt_type
1128
1129 Returns the database driver name.
1130
1131 =cut
1132
1133 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1134
1135 =head2 create_ddl_dir (EXPERIMENTAL)
1136
1137 =over 4
1138
1139 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1140
1141 =back
1142
1143 Creates a SQL file based on the Schema, for each of the specified
1144 database types, in the given directory.
1145
1146 Note that this feature is currently EXPERIMENTAL and may not work correctly
1147 across all databases, or fully handle complex relationships.
1148
1149 =cut
1150
1151 sub create_ddl_dir
1152 {
1153   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1154
1155   if(!$dir || !-d $dir)
1156   {
1157     warn "No directory given, using ./\n";
1158     $dir = "./";
1159   }
1160   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1161   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1162   $version ||= $schema->VERSION || '1.x';
1163   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1164
1165   eval "use SQL::Translator";
1166   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1167
1168   my $sqlt = SQL::Translator->new($sqltargs);
1169   foreach my $db (@$databases)
1170   {
1171     $sqlt->reset();
1172     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1173 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1174     $sqlt->data($schema);
1175     $sqlt->producer($db);
1176
1177     my $file;
1178     my $filename = $schema->ddl_filename($db, $dir, $version);
1179     if(-e $filename)
1180     {
1181       $self->throw_exception("$filename already exists, skipping $db");
1182       next;
1183     }
1184     open($file, ">$filename") 
1185       or $self->throw_exception("Can't open $filename for writing ($!)");
1186     my $output = $sqlt->translate;
1187 #use Data::Dumper;
1188 #    print join(":", keys %{$schema->source_registrations});
1189 #    print Dumper($sqlt->schema);
1190     if(!$output)
1191     {
1192       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1193       next;
1194     }
1195     print $file $output;
1196     close($file);
1197   }
1198
1199 }
1200
1201 =head2 deployment_statements
1202
1203 =over 4
1204
1205 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1206
1207 =back
1208
1209 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1210 The database driver name is given by C<$type>, though the value from
1211 L</sqlt_type> is used if it is not specified.
1212
1213 C<$directory> is used to return statements from files in a previously created
1214 L</create_ddl_dir> directory and is optional. The filenames are constructed
1215 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1216
1217 If no C<$directory> is specified then the statements are constructed on the
1218 fly using L<SQL::Translator> and C<$version> is ignored.
1219
1220 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1221
1222 =cut
1223
1224 sub deployment_statements {
1225   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1226   # Need to be connected to get the correct sqlt_type
1227   $self->ensure_connected() unless $type;
1228   $type ||= $self->sqlt_type;
1229   $version ||= $schema->VERSION || '1.x';
1230   $dir ||= './';
1231   eval "use SQL::Translator";
1232   if(!$@)
1233   {
1234     eval "use SQL::Translator::Parser::DBIx::Class;";
1235     $self->throw_exception($@) if $@;
1236     eval "use SQL::Translator::Producer::${type};";
1237     $self->throw_exception($@) if $@;
1238     my $tr = SQL::Translator->new(%$sqltargs);
1239     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1240     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1241   }
1242
1243   my $filename = $schema->ddl_filename($type, $dir, $version);
1244   if(!-f $filename)
1245   {
1246 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1247       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1248       return;
1249   }
1250   my $file;
1251   open($file, "<$filename") 
1252       or $self->throw_exception("Can't open $filename ($!)");
1253   my @rows = <$file>;
1254   close($file);
1255
1256   return join('', @rows);
1257   
1258 }
1259
1260 sub deploy {
1261   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1262   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1263     for ( split(";\n", $statement)) {
1264       next if($_ =~ /^--/);
1265       next if(!$_);
1266 #      next if($_ =~ /^DROP/m);
1267       next if($_ =~ /^BEGIN TRANSACTION/m);
1268       next if($_ =~ /^COMMIT/m);
1269       next if $_ =~ /^\s+$/; # skip whitespace only
1270       $self->debugobj->query_start($_) if $self->debug;
1271       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1272       $self->debugobj->query_end($_) if $self->debug;
1273     }
1274   }
1275 }
1276
1277 =head2 datetime_parser
1278
1279 Returns the datetime parser class
1280
1281 =cut
1282
1283 sub datetime_parser {
1284   my $self = shift;
1285   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1286 }
1287
1288 =head2 datetime_parser_type
1289
1290 Defines (returns) the datetime parser class - currently hardwired to
1291 L<DateTime::Format::MySQL>
1292
1293 =cut
1294
1295 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1296
1297 =head2 build_datetime_parser
1298
1299 See L</datetime_parser>
1300
1301 =cut
1302
1303 sub build_datetime_parser {
1304   my $self = shift;
1305   my $type = $self->datetime_parser_type(@_);
1306   eval "use ${type}";
1307   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1308   return $type;
1309 }
1310
1311 sub DESTROY {
1312   my $self = shift;
1313   return if !$self->_dbh;
1314   $self->_verify_pid;
1315   $self->_dbh(undef);
1316 }
1317
1318 1;
1319
1320 =head1 SQL METHODS
1321
1322 The module defines a set of methods within the DBIC::SQL::Abstract
1323 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1324 SQL query functions.
1325
1326 The following methods are extended:-
1327
1328 =over 4
1329
1330 =item delete
1331
1332 =item insert
1333
1334 =item select
1335
1336 =item update
1337
1338 =item limit_dialect
1339
1340 See L</connect_info> for details.
1341 For setting, this method is deprecated in favor of L</connect_info>.
1342
1343 =item quote_char
1344
1345 See L</connect_info> for details.
1346 For setting, this method is deprecated in favor of L</connect_info>.
1347
1348 =item name_sep
1349
1350 See L</connect_info> for details.
1351 For setting, this method is deprecated in favor of L</connect_info>.
1352
1353 =back
1354
1355 =head1 AUTHORS
1356
1357 Matt S. Trout <mst@shadowcatsystems.co.uk>
1358
1359 Andy Grundman <andy@hybridized.org>
1360
1361 =head1 LICENSE
1362
1363 You may distribute this code under the same terms as Perl itself.
1364
1365 =cut