a0a34a8b29836ce55eae9a4443356f780ff21430
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        disable_sth_caching cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 use Scalar::Util 'blessed';
62 sub _find_syntax {
63   my ($self, $syntax) = @_;
64   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
65 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
66   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
67     return 'RowNumberOver';
68   }
69
70   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
71 }
72
73 sub select {
74   my ($self, $table, $fields, $where, $order, @rest) = @_;
75   $table = $self->_quote($table) unless ref($table);
76   local $self->{rownum_hack_count} = 1
77     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
78   @rest = (-1) unless defined $rest[0];
79   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
80     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
81   local $self->{having_bind} = [];
82   my ($sql, @ret) = $self->SUPER::select(
83     $table, $self->_recurse_fields($fields), $where, $order, @rest
84   );
85   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
86 }
87
88 sub insert {
89   my $self = shift;
90   my $table = shift;
91   $table = $self->_quote($table) unless ref($table);
92   $self->SUPER::insert($table, @_);
93 }
94
95 sub update {
96   my $self = shift;
97   my $table = shift;
98   $table = $self->_quote($table) unless ref($table);
99   $self->SUPER::update($table, @_);
100 }
101
102 sub delete {
103   my $self = shift;
104   my $table = shift;
105   $table = $self->_quote($table) unless ref($table);
106   $self->SUPER::delete($table, @_);
107 }
108
109 sub _emulate_limit {
110   my $self = shift;
111   if ($_[3] == -1) {
112     return $_[1].$self->_order_by($_[2]);
113   } else {
114     return $self->SUPER::_emulate_limit(@_);
115   }
116 }
117
118 sub _recurse_fields {
119   my ($self, $fields) = @_;
120   my $ref = ref $fields;
121   return $self->_quote($fields) unless $ref;
122   return $$fields if $ref eq 'SCALAR';
123
124   if ($ref eq 'ARRAY') {
125     return join(', ', map {
126       $self->_recurse_fields($_)
127       .(exists $self->{rownum_hack_count}
128          ? ' AS col'.$self->{rownum_hack_count}++
129          : '')
130      } @$fields);
131   } elsif ($ref eq 'HASH') {
132     foreach my $func (keys %$fields) {
133       return $self->_sqlcase($func)
134         .'( '.$self->_recurse_fields($fields->{$func}).' )';
135     }
136   }
137 }
138
139 sub _order_by {
140   my $self = shift;
141   my $ret = '';
142   my @extra;
143   if (ref $_[0] eq 'HASH') {
144     if (defined $_[0]->{group_by}) {
145       $ret = $self->_sqlcase(' group by ')
146                .$self->_recurse_fields($_[0]->{group_by});
147     }
148     if (defined $_[0]->{having}) {
149       my $frag;
150       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
151       push(@{$self->{having_bind}}, @extra);
152       $ret .= $self->_sqlcase(' having ').$frag;
153     }
154     if (defined $_[0]->{order_by}) {
155       $ret .= $self->_order_by($_[0]->{order_by});
156     }
157   } elsif (ref $_[0] eq 'SCALAR') {
158     $ret = $self->_sqlcase(' order by ').${ $_[0] };
159   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
160     my @order = @{+shift};
161     $ret = $self->_sqlcase(' order by ')
162           .join(', ', map {
163                         my $r = $self->_order_by($_, @_);
164                         $r =~ s/^ ?ORDER BY //i;
165                         $r;
166                       } @order);
167   } else {
168     $ret = $self->SUPER::_order_by(@_);
169   }
170   return $ret;
171 }
172
173 sub _order_directions {
174   my ($self, $order) = @_;
175   $order = $order->{order_by} if ref $order eq 'HASH';
176   return $self->SUPER::_order_directions($order);
177 }
178
179 sub _table {
180   my ($self, $from) = @_;
181   if (ref $from eq 'ARRAY') {
182     return $self->_recurse_from(@$from);
183   } elsif (ref $from eq 'HASH') {
184     return $self->_make_as($from);
185   } else {
186     return $from; # would love to quote here but _table ends up getting called
187                   # twice during an ->select without a limit clause due to
188                   # the way S::A::Limit->select works. should maybe consider
189                   # bypassing this and doing S::A::select($self, ...) in
190                   # our select method above. meantime, quoting shims have
191                   # been added to select/insert/update/delete here
192   }
193 }
194
195 sub _recurse_from {
196   my ($self, $from, @join) = @_;
197   my @sqlf;
198   push(@sqlf, $self->_make_as($from));
199   foreach my $j (@join) {
200     my ($to, $on) = @$j;
201
202     # check whether a join type exists
203     my $join_clause = '';
204     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
205     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
206       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
207     } else {
208       $join_clause = ' JOIN ';
209     }
210     push(@sqlf, $join_clause);
211
212     if (ref $to eq 'ARRAY') {
213       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
214     } else {
215       push(@sqlf, $self->_make_as($to));
216     }
217     push(@sqlf, ' ON ', $self->_join_condition($on));
218   }
219   return join('', @sqlf);
220 }
221
222 sub _make_as {
223   my ($self, $from) = @_;
224   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
225                      reverse each %{$self->_skip_options($from)});
226 }
227
228 sub _skip_options {
229   my ($self, $hash) = @_;
230   my $clean_hash = {};
231   $clean_hash->{$_} = $hash->{$_}
232     for grep {!/^-/} keys %$hash;
233   return $clean_hash;
234 }
235
236 sub _join_condition {
237   my ($self, $cond) = @_;
238   if (ref $cond eq 'HASH') {
239     my %j;
240     for (keys %$cond) {
241       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
242     };
243     return $self->_recurse_where(\%j);
244   } elsif (ref $cond eq 'ARRAY') {
245     return join(' OR ', map { $self->_join_condition($_) } @$cond);
246   } else {
247     die "Can't handle this yet!";
248   }
249 }
250
251 sub _quote {
252   my ($self, $label) = @_;
253   return '' unless defined $label;
254   return "*" if $label eq '*';
255   return $label unless $self->{quote_char};
256   if(ref $self->{quote_char} eq "ARRAY"){
257     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
258       if !defined $self->{name_sep};
259     my $sep = $self->{name_sep};
260     return join($self->{name_sep},
261         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
262        split(/\Q$sep\E/,$label));
263   }
264   return $self->SUPER::_quote($label);
265 }
266
267 sub limit_dialect {
268     my $self = shift;
269     $self->{limit_dialect} = shift if @_;
270     return $self->{limit_dialect};
271 }
272
273 sub quote_char {
274     my $self = shift;
275     $self->{quote_char} = shift if @_;
276     return $self->{quote_char};
277 }
278
279 sub name_sep {
280     my $self = shift;
281     $self->{name_sep} = shift if @_;
282     return $self->{name_sep};
283 }
284
285 } # End of BEGIN block
286
287 =head1 NAME
288
289 DBIx::Class::Storage::DBI - DBI storage handler
290
291 =head1 SYNOPSIS
292
293 =head1 DESCRIPTION
294
295 This class represents the connection to an RDBMS via L<DBI>.  See
296 L<DBIx::Class::Storage> for general information.  This pod only
297 documents DBI-specific methods and behaviors.
298
299 =head1 METHODS
300
301 =cut
302
303 sub new {
304   my $new = shift->next::method(@_);
305
306   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
307   $new->transaction_depth(0);
308   $new->_sql_maker_opts({});
309   $new->{_in_dbh_do} = 0;
310   $new->{_dbh_gen} = 0;
311
312   $new;
313 }
314
315 =head2 connect_info
316
317 The arguments of C<connect_info> are always a single array reference.
318
319 This is normally accessed via L<DBIx::Class::Schema/connection>, which
320 encapsulates its argument list in an arrayref before calling
321 C<connect_info> here.
322
323 The arrayref can either contain the same set of arguments one would
324 normally pass to L<DBI/connect>, or a lone code reference which returns
325 a connected database handle.
326
327 In either case, if the final argument in your connect_info happens
328 to be a hashref, C<connect_info> will look there for several
329 connection-specific options:
330
331 =over 4
332
333 =item on_connect_do
334
335 This can be set to an arrayref of literal sql statements, which will
336 be executed immediately after making the connection to the database
337 every time we [re-]connect.
338
339 =item disable_sth_caching
340
341 If set to a true value, this option will disable the caching of
342 statement handles via L<DBI/prepare_cached>.
343
344 =item limit_dialect 
345
346 Sets the limit dialect. This is useful for JDBC-bridge among others
347 where the remote SQL-dialect cannot be determined by the name of the
348 driver alone.
349
350 =item quote_char
351
352 Specifies what characters to use to quote table and column names. If 
353 you use this you will want to specify L<name_sep> as well.
354
355 quote_char expects either a single character, in which case is it is placed
356 on either side of the table/column, or an arrayref of length 2 in which case the
357 table/column name is placed between the elements.
358
359 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
360 use C<quote_char =E<gt> [qw/[ ]/]>.
361
362 =item name_sep
363
364 This only needs to be used in conjunction with L<quote_char>, and is used to 
365 specify the charecter that seperates elements (schemas, tables, columns) from 
366 each other. In most cases this is simply a C<.>.
367
368 =back
369
370 These options can be mixed in with your other L<DBI> connection attributes,
371 or placed in a seperate hashref after all other normal L<DBI> connection
372 arguments.
373
374 Every time C<connect_info> is invoked, any previous settings for
375 these options will be cleared before setting the new ones, regardless of
376 whether any options are specified in the new C<connect_info>.
377
378 Important note:  DBIC expects the returned database handle provided by 
379 a subref argument to have RaiseError set on it.  If it doesn't, things
380 might not work very well, YMMV.  If you don't use a subref, DBIC will
381 force this setting for you anyways.  Setting HandleError to anything
382 other than simple exception object wrapper might cause problems too.
383
384 Examples:
385
386   # Simple SQLite connection
387   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
388
389   # Connect via subref
390   ->connect_info([ sub { DBI->connect(...) } ]);
391
392   # A bit more complicated
393   ->connect_info(
394     [
395       'dbi:Pg:dbname=foo',
396       'postgres',
397       'my_pg_password',
398       { AutoCommit => 0 },
399       { quote_char => q{"}, name_sep => q{.} },
400     ]
401   );
402
403   # Equivalent to the previous example
404   ->connect_info(
405     [
406       'dbi:Pg:dbname=foo',
407       'postgres',
408       'my_pg_password',
409       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
410     ]
411   );
412
413   # Subref + DBIC-specific connection options
414   ->connect_info(
415     [
416       sub { DBI->connect(...) },
417       {
418           quote_char => q{`},
419           name_sep => q{@},
420           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
421           disable_sth_caching => 1,
422       },
423     ]
424   );
425
426 =cut
427
428 sub connect_info {
429   my ($self, $info_arg) = @_;
430
431   return $self->_connect_info if !$info_arg;
432
433   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
434   #  the new set of options
435   $self->_sql_maker(undef);
436   $self->_sql_maker_opts({});
437
438   my $info = [ @$info_arg ]; # copy because we can alter it
439   my $last_info = $info->[-1];
440   if(ref $last_info eq 'HASH') {
441     for my $storage_opt (qw/on_connect_do disable_sth_caching/) {
442       if(my $value = delete $last_info->{$storage_opt}) {
443         $self->$storage_opt($value);
444       }
445     }
446     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
447       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
448         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
449       }
450     }
451
452     # Get rid of any trailing empty hashref
453     pop(@$info) if !keys %$last_info;
454   }
455
456   $self->_connect_info($info);
457 }
458
459 =head2 on_connect_do
460
461 This method is deprecated in favor of setting via L</connect_info>.
462
463 =head2 dbh_do
464
465 Arguments: $subref, @extra_coderef_args?
466
467 Execute the given subref using the new exception-based connection management.
468
469 The first two arguments will be the storage object that C<dbh_do> was called
470 on and a database handle to use.  Any additional arguments will be passed
471 verbatim to the called subref as arguments 2 and onwards.
472
473 Using this (instead of $self->_dbh or $self->dbh) ensures correct
474 exception handling and reconnection (or failover in future subclasses).
475
476 Your subref should have no side-effects outside of the database, as
477 there is the potential for your subref to be partially double-executed
478 if the database connection was stale/dysfunctional.
479
480 Example:
481
482   my @stuff = $schema->storage->dbh_do(
483     sub {
484       my ($storage, $dbh, @cols) = @_;
485       my $cols = join(q{, }, @cols);
486       $dbh->selectrow_array("SELECT $cols FROM foo");
487     },
488     @column_list
489   );
490
491 =cut
492
493 sub dbh_do {
494   my $self = shift;
495   my $coderef = shift;
496
497   ref $coderef eq 'CODE' or $self->throw_exception
498     ('$coderef must be a CODE reference');
499
500   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
501   local $self->{_in_dbh_do} = 1;
502
503   my @result;
504   my $want_array = wantarray;
505
506   eval {
507     $self->_verify_pid if $self->_dbh;
508     $self->_populate_dbh if !$self->_dbh;
509     if($want_array) {
510         @result = $coderef->($self, $self->_dbh, @_);
511     }
512     elsif(defined $want_array) {
513         $result[0] = $coderef->($self, $self->_dbh, @_);
514     }
515     else {
516         $coderef->($self, $self->_dbh, @_);
517     }
518   };
519
520   my $exception = $@;
521   if(!$exception) { return $want_array ? @result : $result[0] }
522
523   $self->throw_exception($exception) if $self->connected;
524
525   # We were not connected - reconnect and retry, but let any
526   #  exception fall right through this time
527   $self->_populate_dbh;
528   $coderef->($self, $self->_dbh, @_);
529 }
530
531 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
532 # It also informs dbh_do to bypass itself while under the direction of txn_do,
533 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
534 sub txn_do {
535   my $self = shift;
536   my $coderef = shift;
537
538   ref $coderef eq 'CODE' or $self->throw_exception
539     ('$coderef must be a CODE reference');
540
541   local $self->{_in_dbh_do} = 1;
542
543   my @result;
544   my $want_array = wantarray;
545
546   my $tried = 0;
547   while(1) {
548     eval {
549       $self->_verify_pid if $self->_dbh;
550       $self->_populate_dbh if !$self->_dbh;
551
552       $self->txn_begin;
553       if($want_array) {
554           @result = $coderef->(@_);
555       }
556       elsif(defined $want_array) {
557           $result[0] = $coderef->(@_);
558       }
559       else {
560           $coderef->(@_);
561       }
562       $self->txn_commit;
563     };
564
565     my $exception = $@;
566     if(!$exception) { return $want_array ? @result : $result[0] }
567
568     if($tried++ > 0 || $self->connected) {
569       eval { $self->txn_rollback };
570       my $rollback_exception = $@;
571       if($rollback_exception) {
572         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
573         $self->throw_exception($exception)  # propagate nested rollback
574           if $rollback_exception =~ /$exception_class/;
575
576         $self->throw_exception(
577           "Transaction aborted: ${exception}. "
578           . "Rollback failed: ${rollback_exception}"
579         );
580       }
581       $self->throw_exception($exception)
582     }
583
584     # We were not connected, and was first try - reconnect and retry
585     # via the while loop
586     $self->_populate_dbh;
587   }
588 }
589
590 =head2 disconnect
591
592 Our C<disconnect> method also performs a rollback first if the
593 database is not in C<AutoCommit> mode.
594
595 =cut
596
597 sub disconnect {
598   my ($self) = @_;
599
600   if( $self->connected ) {
601     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
602     $self->_dbh->disconnect;
603     $self->_dbh(undef);
604     $self->{_dbh_gen}++;
605   }
606 }
607
608 sub connected {
609   my ($self) = @_;
610
611   if(my $dbh = $self->_dbh) {
612       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
613           $self->_dbh(undef);
614           $self->{_dbh_gen}++;
615           return;
616       }
617       else {
618           $self->_verify_pid;
619       }
620       return ($dbh->FETCH('Active') && $dbh->ping);
621   }
622
623   return 0;
624 }
625
626 # handle pid changes correctly
627 #  NOTE: assumes $self->_dbh is a valid $dbh
628 sub _verify_pid {
629   my ($self) = @_;
630
631   return if $self->_conn_pid == $$;
632
633   $self->_dbh->{InactiveDestroy} = 1;
634   $self->_dbh(undef);
635   $self->{_dbh_gen}++;
636
637   return;
638 }
639
640 sub ensure_connected {
641   my ($self) = @_;
642
643   unless ($self->connected) {
644     $self->_populate_dbh;
645   }
646 }
647
648 =head2 dbh
649
650 Returns the dbh - a data base handle of class L<DBI>.
651
652 =cut
653
654 sub dbh {
655   my ($self) = @_;
656
657   $self->ensure_connected;
658   return $self->_dbh;
659 }
660
661 sub _sql_maker_args {
662     my ($self) = @_;
663     
664     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
665 }
666
667 sub sql_maker {
668   my ($self) = @_;
669   unless ($self->_sql_maker) {
670     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
671   }
672   return $self->_sql_maker;
673 }
674
675 sub _populate_dbh {
676   my ($self) = @_;
677   my @info = @{$self->_connect_info || []};
678   $self->_dbh($self->_connect(@info));
679
680   if(ref $self eq 'DBIx::Class::Storage::DBI') {
681     my $driver = $self->_dbh->{Driver}->{Name};
682     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
683       bless $self, "DBIx::Class::Storage::DBI::${driver}";
684       $self->_rebless() if $self->can('_rebless');
685     }
686   }
687
688   # if on-connect sql statements are given execute them
689   foreach my $sql_statement (@{$self->on_connect_do || []}) {
690     $self->debugobj->query_start($sql_statement) if $self->debug();
691     $self->_dbh->do($sql_statement);
692     $self->debugobj->query_end($sql_statement) if $self->debug();
693   }
694
695   $self->_conn_pid($$);
696   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
697 }
698
699 sub _connect {
700   my ($self, @info) = @_;
701
702   $self->throw_exception("You failed to provide any connection info")
703       if !@info;
704
705   my ($old_connect_via, $dbh);
706
707   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
708       $old_connect_via = $DBI::connect_via;
709       $DBI::connect_via = 'connect';
710   }
711
712   eval {
713     if(ref $info[0] eq 'CODE') {
714        $dbh = &{$info[0]}
715     }
716     else {
717        $dbh = DBI->connect(@info);
718        $dbh->{RaiseError} = 1;
719        $dbh->{PrintError} = 0;
720        $dbh->{PrintWarn} = 0;
721     }
722   };
723
724   $DBI::connect_via = $old_connect_via if $old_connect_via;
725
726   if (!$dbh || $@) {
727     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
728   }
729
730   $dbh;
731 }
732
733 sub _dbh_txn_begin {
734   my ($self, $dbh) = @_;
735   if ($dbh->{AutoCommit}) {
736     $self->debugobj->txn_begin()
737       if ($self->debug);
738     $dbh->begin_work;
739   }
740 }
741
742 sub txn_begin {
743   my $self = shift;
744   $self->dbh_do($self->can('_dbh_txn_begin'))
745     if $self->{transaction_depth}++ == 0;
746 }
747
748 sub _dbh_txn_commit {
749   my ($self, $dbh) = @_;
750   if ($self->{transaction_depth} == 0) {
751     unless ($dbh->{AutoCommit}) {
752       $self->debugobj->txn_commit()
753         if ($self->debug);
754       $dbh->commit;
755     }
756   }
757   else {
758     if (--$self->{transaction_depth} == 0) {
759       $self->debugobj->txn_commit()
760         if ($self->debug);
761       $dbh->commit;
762     }
763   }
764 }
765
766 sub txn_commit {
767   my $self = shift;
768   $self->dbh_do($self->can('_dbh_txn_commit'));
769 }
770
771 sub _dbh_txn_rollback {
772   my ($self, $dbh) = @_;
773   if ($self->{transaction_depth} == 0) {
774     unless ($dbh->{AutoCommit}) {
775       $self->debugobj->txn_rollback()
776         if ($self->debug);
777       $dbh->rollback;
778     }
779   }
780   else {
781     if (--$self->{transaction_depth} == 0) {
782       $self->debugobj->txn_rollback()
783         if ($self->debug);
784       $dbh->rollback;
785     }
786     else {
787       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
788     }
789   }
790 }
791
792 sub txn_rollback {
793   my $self = shift;
794
795   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
796   if ($@) {
797     my $error = $@;
798     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
799     $error =~ /$exception_class/ and $self->throw_exception($error);
800     $self->{transaction_depth} = 0;          # ensure that a failed rollback
801     $self->throw_exception($error);          # resets the transaction depth
802   }
803 }
804
805 # This used to be the top-half of _execute.  It was split out to make it
806 #  easier to override in NoBindVars without duping the rest.  It takes up
807 #  all of _execute's args, and emits $sql, @bind.
808 sub _prep_for_execute {
809   my ($self, $op, $extra_bind, $ident, @args) = @_;
810
811   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
812   unshift(@bind, @$extra_bind) if $extra_bind;
813   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
814
815   return ($sql, @bind);
816 }
817
818 sub _execute {
819   my $self = shift;
820
821   my ($sql, @bind) = $self->_prep_for_execute(@_);
822
823   if ($self->debug) {
824       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
825       $self->debugobj->query_start($sql, @debug_bind);
826   }
827
828   my $sth = $self->sth($sql);
829
830   my $rv;
831   if ($sth) {
832     my $time = time();
833     $rv = eval { $sth->execute(@bind) };
834
835     if ($@ || !$rv) {
836       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
837     }
838   } else {
839     $self->throw_exception("'$sql' did not generate a statement.");
840   }
841   if ($self->debug) {
842       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind; 
843       $self->debugobj->query_end($sql, @debug_bind);
844   }
845   return (wantarray ? ($rv, $sth, @bind) : $rv);
846 }
847
848 sub insert {
849   my ($self, $ident, $to_insert) = @_;
850   $self->throw_exception(
851     "Couldn't insert ".join(', ',
852       map "$_ => $to_insert->{$_}", keys %$to_insert
853     )." into ${ident}"
854   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
855   return $to_insert;
856 }
857
858 ## Still not quite perfect, and EXPERIMENTAL
859 ## Currently it is assumed that all values passed will be "normal", i.e. not 
860 ## scalar refs, or at least, all the same type as the first set, the statement is
861 ## only prepped once.
862 sub insert_bulk {
863   my ($self, $table, $cols, $data) = @_;
864   my %colvalues;
865   @colvalues{@$cols} = (0..$#$cols);
866   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
867 # print STDERR "BIND".Dumper(\@bind);
868
869   if ($self->debug) {
870       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
871       $self->debugobj->query_start($sql, @debug_bind);
872   }
873   my $sth = $self->sth($sql);
874
875 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
876
877   my $rv;
878   ## This must be an arrayref, else nothing works!
879   my $tuple_status = [];
880 #  use Data::Dumper;
881 #  print STDERR Dumper($data);
882   if ($sth) {
883     my $time = time();
884     $rv = eval { $sth->execute_array({ ArrayTupleFetch => sub { my $values = shift @$data;  return if !$values; return [ @{$values}[@bind] ]},
885                                        ArrayTupleStatus => $tuple_status }) };
886 # print STDERR Dumper($tuple_status);
887 # print STDERR "RV: $rv\n";
888     if ($@ || !defined $rv) {
889       my $errors = '';
890       foreach my $tuple (@$tuple_status)
891       {
892           $errors .= "\n" . $tuple->[1] if(ref $tuple);
893       }
894       $self->throw_exception("Error executing '$sql': ".($@ || $errors));
895     }
896   } else {
897     $self->throw_exception("'$sql' did not generate a statement.");
898   }
899   if ($self->debug) {
900       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
901       $self->debugobj->query_end($sql, @debug_bind);
902   }
903   return (wantarray ? ($rv, $sth, @bind) : $rv);
904 }
905
906 sub update {
907   return shift->_execute('update' => [], @_);
908 }
909
910 sub delete {
911   return shift->_execute('delete' => [], @_);
912 }
913
914 sub _select {
915   my ($self, $ident, $select, $condition, $attrs) = @_;
916   my $order = $attrs->{order_by};
917   if (ref $condition eq 'SCALAR') {
918     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
919   }
920   if (exists $attrs->{group_by} || $attrs->{having}) {
921     $order = {
922       group_by => $attrs->{group_by},
923       having => $attrs->{having},
924       ($order ? (order_by => $order) : ())
925     };
926   }
927   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
928   if ($attrs->{software_limit} ||
929       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
930         $attrs->{software_limit} = 1;
931   } else {
932     $self->throw_exception("rows attribute must be positive if present")
933       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
934     push @args, $attrs->{rows}, $attrs->{offset};
935   }
936   return $self->_execute(@args);
937 }
938
939 =head2 select
940
941 =over 4
942
943 =item Arguments: $ident, $select, $condition, $attrs
944
945 =back
946
947 Handle a SQL select statement.
948
949 =cut
950
951 sub select {
952   my $self = shift;
953   my ($ident, $select, $condition, $attrs) = @_;
954   return $self->cursor->new($self, \@_, $attrs);
955 }
956
957 sub select_single {
958   my $self = shift;
959   my ($rv, $sth, @bind) = $self->_select(@_);
960   my @row = $sth->fetchrow_array;
961   # Need to call finish() to work round broken DBDs
962   $sth->finish();
963   return @row;
964 }
965
966 =head2 sth
967
968 =over 4
969
970 =item Arguments: $sql
971
972 =back
973
974 Returns a L<DBI> sth (statement handle) for the supplied SQL.
975
976 =cut
977
978 sub _dbh_sth {
979   my ($self, $dbh, $sql) = @_;
980
981   # 3 is the if_active parameter which avoids active sth re-use
982   my $sth = $self->disable_sth_caching
983     ? $dbh->prepare($sql)
984     : $dbh->prepare_cached($sql, {}, 3);
985
986   $self->throw_exception(
987     'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
988   ) if !$sth;
989
990   $sth;
991 }
992
993 sub sth {
994   my ($self, $sql) = @_;
995   $self->dbh_do($self->can('_dbh_sth'), $sql);
996 }
997
998 sub _dbh_columns_info_for {
999   my ($self, $dbh, $table) = @_;
1000
1001   if ($dbh->can('column_info')) {
1002     my %result;
1003     eval {
1004       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1005       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1006       $sth->execute();
1007       while ( my $info = $sth->fetchrow_hashref() ){
1008         my %column_info;
1009         $column_info{data_type}   = $info->{TYPE_NAME};
1010         $column_info{size}      = $info->{COLUMN_SIZE};
1011         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1012         $column_info{default_value} = $info->{COLUMN_DEF};
1013         my $col_name = $info->{COLUMN_NAME};
1014         $col_name =~ s/^\"(.*)\"$/$1/;
1015
1016         $result{$col_name} = \%column_info;
1017       }
1018     };
1019     return \%result if !$@ && scalar keys %result;
1020   }
1021
1022   my %result;
1023   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
1024   $sth->execute;
1025   my @columns = @{$sth->{NAME_lc}};
1026   for my $i ( 0 .. $#columns ){
1027     my %column_info;
1028     my $type_num = $sth->{TYPE}->[$i];
1029     my $type_name;
1030     if(defined $type_num && $dbh->can('type_info')) {
1031       my $type_info = $dbh->type_info($type_num);
1032       $type_name = $type_info->{TYPE_NAME} if $type_info;
1033     }
1034     $column_info{data_type} = $type_name ? $type_name : $type_num;
1035     $column_info{size} = $sth->{PRECISION}->[$i];
1036     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1037
1038     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1039       $column_info{data_type} = $1;
1040       $column_info{size}    = $2;
1041     }
1042
1043     $result{$columns[$i]} = \%column_info;
1044   }
1045
1046   return \%result;
1047 }
1048
1049 sub columns_info_for {
1050   my ($self, $table) = @_;
1051   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
1052 }
1053
1054 =head2 last_insert_id
1055
1056 Return the row id of the last insert.
1057
1058 =cut
1059
1060 sub _dbh_last_insert_id {
1061     my ($self, $dbh, $source, $col) = @_;
1062     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1063     $dbh->func('last_insert_rowid');
1064 }
1065
1066 sub last_insert_id {
1067   my $self = shift;
1068   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1069 }
1070
1071 =head2 sqlt_type
1072
1073 Returns the database driver name.
1074
1075 =cut
1076
1077 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1078
1079 =head2 create_ddl_dir (EXPERIMENTAL)
1080
1081 =over 4
1082
1083 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1084
1085 =back
1086
1087 Creates a SQL file based on the Schema, for each of the specified
1088 database types, in the given directory.
1089
1090 Note that this feature is currently EXPERIMENTAL and may not work correctly
1091 across all databases, or fully handle complex relationships.
1092
1093 =cut
1094
1095 sub create_ddl_dir
1096 {
1097   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1098
1099   if(!$dir || !-d $dir)
1100   {
1101     warn "No directory given, using ./\n";
1102     $dir = "./";
1103   }
1104   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1105   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1106   $version ||= $schema->VERSION || '1.x';
1107   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1108
1109   eval "use SQL::Translator";
1110   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1111
1112   my $sqlt = SQL::Translator->new($sqltargs);
1113   foreach my $db (@$databases)
1114   {
1115     $sqlt->reset();
1116     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1117 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1118     $sqlt->data($schema);
1119     $sqlt->producer($db);
1120
1121     my $file;
1122     my $filename = $schema->ddl_filename($db, $dir, $version);
1123     if(-e $filename)
1124     {
1125       $self->throw_exception("$filename already exists, skipping $db");
1126       next;
1127     }
1128     open($file, ">$filename") 
1129       or $self->throw_exception("Can't open $filename for writing ($!)");
1130     my $output = $sqlt->translate;
1131 #use Data::Dumper;
1132 #    print join(":", keys %{$schema->source_registrations});
1133 #    print Dumper($sqlt->schema);
1134     if(!$output)
1135     {
1136       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1137       next;
1138     }
1139     print $file $output;
1140     close($file);
1141   }
1142
1143 }
1144
1145 =head2 deployment_statements
1146
1147 =over 4
1148
1149 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1150
1151 =back
1152
1153 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1154 The database driver name is given by C<$type>, though the value from
1155 L</sqlt_type> is used if it is not specified.
1156
1157 C<$directory> is used to return statements from files in a previously created
1158 L</create_ddl_dir> directory and is optional. The filenames are constructed
1159 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1160
1161 If no C<$directory> is specified then the statements are constructed on the
1162 fly using L<SQL::Translator> and C<$version> is ignored.
1163
1164 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1165
1166 =cut
1167
1168 sub deployment_statements {
1169   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1170   # Need to be connected to get the correct sqlt_type
1171   $self->ensure_connected() unless $type;
1172   $type ||= $self->sqlt_type;
1173   $version ||= $schema->VERSION || '1.x';
1174   $dir ||= './';
1175   eval "use SQL::Translator";
1176   if(!$@)
1177   {
1178     eval "use SQL::Translator::Parser::DBIx::Class;";
1179     $self->throw_exception($@) if $@;
1180     eval "use SQL::Translator::Producer::${type};";
1181     $self->throw_exception($@) if $@;
1182     my $tr = SQL::Translator->new(%$sqltargs);
1183     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1184     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1185   }
1186
1187   my $filename = $schema->ddl_filename($type, $dir, $version);
1188   if(!-f $filename)
1189   {
1190 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1191       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1192       return;
1193   }
1194   my $file;
1195   open($file, "<$filename") 
1196       or $self->throw_exception("Can't open $filename ($!)");
1197   my @rows = <$file>;
1198   close($file);
1199
1200   return join('', @rows);
1201   
1202 }
1203
1204 sub deploy {
1205   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1206   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1207     for ( split(";\n", $statement)) {
1208       next if($_ =~ /^--/);
1209       next if(!$_);
1210 #      next if($_ =~ /^DROP/m);
1211       next if($_ =~ /^BEGIN TRANSACTION/m);
1212       next if($_ =~ /^COMMIT/m);
1213       next if $_ =~ /^\s+$/; # skip whitespace only
1214       $self->debugobj->query_start($_) if $self->debug;
1215       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1216       $self->debugobj->query_end($_) if $self->debug;
1217     }
1218   }
1219 }
1220
1221 =head2 datetime_parser
1222
1223 Returns the datetime parser class
1224
1225 =cut
1226
1227 sub datetime_parser {
1228   my $self = shift;
1229   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1230 }
1231
1232 =head2 datetime_parser_type
1233
1234 Defines (returns) the datetime parser class - currently hardwired to
1235 L<DateTime::Format::MySQL>
1236
1237 =cut
1238
1239 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1240
1241 =head2 build_datetime_parser
1242
1243 See L</datetime_parser>
1244
1245 =cut
1246
1247 sub build_datetime_parser {
1248   my $self = shift;
1249   my $type = $self->datetime_parser_type(@_);
1250   eval "use ${type}";
1251   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1252   return $type;
1253 }
1254
1255 sub DESTROY {
1256   my $self = shift;
1257   return if !$self->_dbh;
1258   $self->_verify_pid;
1259   $self->_dbh(undef);
1260 }
1261
1262 1;
1263
1264 =head1 SQL METHODS
1265
1266 The module defines a set of methods within the DBIC::SQL::Abstract
1267 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1268 SQL query functions.
1269
1270 The following methods are extended:-
1271
1272 =over 4
1273
1274 =item delete
1275
1276 =item insert
1277
1278 =item select
1279
1280 =item update
1281
1282 =item limit_dialect
1283
1284 See L</connect_info> for details.
1285 For setting, this method is deprecated in favor of L</connect_info>.
1286
1287 =item quote_char
1288
1289 See L</connect_info> for details.
1290 For setting, this method is deprecated in favor of L</connect_info>.
1291
1292 =item name_sep
1293
1294 See L</connect_info> for details.
1295 For setting, this method is deprecated in favor of L</connect_info>.
1296
1297 =back
1298
1299 =head1 AUTHORS
1300
1301 Matt S. Trout <mst@shadowcatsystems.co.uk>
1302
1303 Andy Grundman <andy@hybridized.org>
1304
1305 =head1 LICENSE
1306
1307 You may distribute this code under the same terms as Perl itself.
1308
1309 =cut