move !$sth exception up a level to fix storage exceptions
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 sub _find_syntax {
62   my ($self, $syntax) = @_;
63   my $dbhname = ref $syntax eq 'HASH' ? $syntax->{Driver}{Name} : '';
64   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
65     return 'RowNumberOver';
66   }
67
68   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
69 }
70
71 sub select {
72   my ($self, $table, $fields, $where, $order, @rest) = @_;
73   $table = $self->_quote($table) unless ref($table);
74   local $self->{rownum_hack_count} = 1
75     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
76   @rest = (-1) unless defined $rest[0];
77   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
78     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
79   local $self->{having_bind} = [];
80   my ($sql, @ret) = $self->SUPER::select(
81     $table, $self->_recurse_fields($fields), $where, $order, @rest
82   );
83   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
84 }
85
86 sub insert {
87   my $self = shift;
88   my $table = shift;
89   $table = $self->_quote($table) unless ref($table);
90   $self->SUPER::insert($table, @_);
91 }
92
93 sub update {
94   my $self = shift;
95   my $table = shift;
96   $table = $self->_quote($table) unless ref($table);
97   $self->SUPER::update($table, @_);
98 }
99
100 sub delete {
101   my $self = shift;
102   my $table = shift;
103   $table = $self->_quote($table) unless ref($table);
104   $self->SUPER::delete($table, @_);
105 }
106
107 sub _emulate_limit {
108   my $self = shift;
109   if ($_[3] == -1) {
110     return $_[1].$self->_order_by($_[2]);
111   } else {
112     return $self->SUPER::_emulate_limit(@_);
113   }
114 }
115
116 sub _recurse_fields {
117   my ($self, $fields) = @_;
118   my $ref = ref $fields;
119   return $self->_quote($fields) unless $ref;
120   return $$fields if $ref eq 'SCALAR';
121
122   if ($ref eq 'ARRAY') {
123     return join(', ', map {
124       $self->_recurse_fields($_)
125       .(exists $self->{rownum_hack_count}
126          ? ' AS col'.$self->{rownum_hack_count}++
127          : '')
128      } @$fields);
129   } elsif ($ref eq 'HASH') {
130     foreach my $func (keys %$fields) {
131       return $self->_sqlcase($func)
132         .'( '.$self->_recurse_fields($fields->{$func}).' )';
133     }
134   }
135 }
136
137 sub _order_by {
138   my $self = shift;
139   my $ret = '';
140   my @extra;
141   if (ref $_[0] eq 'HASH') {
142     if (defined $_[0]->{group_by}) {
143       $ret = $self->_sqlcase(' group by ')
144                .$self->_recurse_fields($_[0]->{group_by});
145     }
146     if (defined $_[0]->{having}) {
147       my $frag;
148       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
149       push(@{$self->{having_bind}}, @extra);
150       $ret .= $self->_sqlcase(' having ').$frag;
151     }
152     if (defined $_[0]->{order_by}) {
153       $ret .= $self->_order_by($_[0]->{order_by});
154     }
155   } elsif (ref $_[0] eq 'SCALAR') {
156     $ret = $self->_sqlcase(' order by ').${ $_[0] };
157   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
158     my @order = @{+shift};
159     $ret = $self->_sqlcase(' order by ')
160           .join(', ', map {
161                         my $r = $self->_order_by($_, @_);
162                         $r =~ s/^ ?ORDER BY //i;
163                         $r;
164                       } @order);
165   } else {
166     $ret = $self->SUPER::_order_by(@_);
167   }
168   return $ret;
169 }
170
171 sub _order_directions {
172   my ($self, $order) = @_;
173   $order = $order->{order_by} if ref $order eq 'HASH';
174   return $self->SUPER::_order_directions($order);
175 }
176
177 sub _table {
178   my ($self, $from) = @_;
179   if (ref $from eq 'ARRAY') {
180     return $self->_recurse_from(@$from);
181   } elsif (ref $from eq 'HASH') {
182     return $self->_make_as($from);
183   } else {
184     return $from; # would love to quote here but _table ends up getting called
185                   # twice during an ->select without a limit clause due to
186                   # the way S::A::Limit->select works. should maybe consider
187                   # bypassing this and doing S::A::select($self, ...) in
188                   # our select method above. meantime, quoting shims have
189                   # been added to select/insert/update/delete here
190   }
191 }
192
193 sub _recurse_from {
194   my ($self, $from, @join) = @_;
195   my @sqlf;
196   push(@sqlf, $self->_make_as($from));
197   foreach my $j (@join) {
198     my ($to, $on) = @$j;
199
200     # check whether a join type exists
201     my $join_clause = '';
202     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
203     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
204       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
205     } else {
206       $join_clause = ' JOIN ';
207     }
208     push(@sqlf, $join_clause);
209
210     if (ref $to eq 'ARRAY') {
211       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
212     } else {
213       push(@sqlf, $self->_make_as($to));
214     }
215     push(@sqlf, ' ON ', $self->_join_condition($on));
216   }
217   return join('', @sqlf);
218 }
219
220 sub _make_as {
221   my ($self, $from) = @_;
222   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
223                      reverse each %{$self->_skip_options($from)});
224 }
225
226 sub _skip_options {
227   my ($self, $hash) = @_;
228   my $clean_hash = {};
229   $clean_hash->{$_} = $hash->{$_}
230     for grep {!/^-/} keys %$hash;
231   return $clean_hash;
232 }
233
234 sub _join_condition {
235   my ($self, $cond) = @_;
236   if (ref $cond eq 'HASH') {
237     my %j;
238     for (keys %$cond) {
239       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
240     };
241     return $self->_recurse_where(\%j);
242   } elsif (ref $cond eq 'ARRAY') {
243     return join(' OR ', map { $self->_join_condition($_) } @$cond);
244   } else {
245     die "Can't handle this yet!";
246   }
247 }
248
249 sub _quote {
250   my ($self, $label) = @_;
251   return '' unless defined $label;
252   return "*" if $label eq '*';
253   return $label unless $self->{quote_char};
254   if(ref $self->{quote_char} eq "ARRAY"){
255     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
256       if !defined $self->{name_sep};
257     my $sep = $self->{name_sep};
258     return join($self->{name_sep},
259         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
260        split(/\Q$sep\E/,$label));
261   }
262   return $self->SUPER::_quote($label);
263 }
264
265 sub limit_dialect {
266     my $self = shift;
267     $self->{limit_dialect} = shift if @_;
268     return $self->{limit_dialect};
269 }
270
271 sub quote_char {
272     my $self = shift;
273     $self->{quote_char} = shift if @_;
274     return $self->{quote_char};
275 }
276
277 sub name_sep {
278     my $self = shift;
279     $self->{name_sep} = shift if @_;
280     return $self->{name_sep};
281 }
282
283 } # End of BEGIN block
284
285 =head1 NAME
286
287 DBIx::Class::Storage::DBI - DBI storage handler
288
289 =head1 SYNOPSIS
290
291 =head1 DESCRIPTION
292
293 This class represents the connection to an RDBMS via L<DBI>.  See
294 L<DBIx::Class::Storage> for general information.  This pod only
295 documents DBI-specific methods and behaviors.
296
297 =head1 METHODS
298
299 =cut
300
301 sub new {
302   my $new = shift->next::method(@_);
303
304   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
305   $new->transaction_depth(0);
306   $new->_sql_maker_opts({});
307
308   $new;
309 }
310
311 =head2 connect_info
312
313 The arguments of C<connect_info> are always a single array reference.
314
315 This is normally accessed via L<DBIx::Class::Schema/connection>, which
316 encapsulates its argument list in an arrayref before calling
317 C<connect_info> here.
318
319 The arrayref can either contain the same set of arguments one would
320 normally pass to L<DBI/connect>, or a lone code reference which returns
321 a connected database handle.
322
323 In either case, if the final argument in your connect_info happens
324 to be a hashref, C<connect_info> will look there for several
325 connection-specific options:
326
327 =over 4
328
329 =item on_connect_do
330
331 This can be set to an arrayref of literal sql statements, which will
332 be executed immediately after making the connection to the database
333 every time we [re-]connect.
334
335 =item limit_dialect 
336
337 Sets the limit dialect. This is useful for JDBC-bridge among others
338 where the remote SQL-dialect cannot be determined by the name of the
339 driver alone.
340
341 =item quote_char
342
343 Specifies what characters to use to quote table and column names. If 
344 you use this you will want to specify L<name_sep> as well.
345
346 quote_char expects either a single character, in which case is it is placed
347 on either side of the table/column, or an arrayref of length 2 in which case the
348 table/column name is placed between the elements.
349
350 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
351 use C<quote_char =E<gt> [qw/[ ]/]>.
352
353 =item name_sep
354
355 This only needs to be used in conjunction with L<quote_char>, and is used to 
356 specify the charecter that seperates elements (schemas, tables, columns) from 
357 each other. In most cases this is simply a C<.>.
358
359 =back
360
361 These options can be mixed in with your other L<DBI> connection attributes,
362 or placed in a seperate hashref after all other normal L<DBI> connection
363 arguments.
364
365 Every time C<connect_info> is invoked, any previous settings for
366 these options will be cleared before setting the new ones, regardless of
367 whether any options are specified in the new C<connect_info>.
368
369 Important note:  DBIC expects the returned database handle provided by 
370 a subref argument to have RaiseError set on it.  If it doesn't, things
371 might not work very well, YMMV.  If you don't use a subref, DBIC will
372 force this setting for you anyways.  Setting HandleError to anything
373 other than simple exception object wrapper might cause problems too.
374
375 Examples:
376
377   # Simple SQLite connection
378   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
379
380   # Connect via subref
381   ->connect_info([ sub { DBI->connect(...) } ]);
382
383   # A bit more complicated
384   ->connect_info(
385     [
386       'dbi:Pg:dbname=foo',
387       'postgres',
388       'my_pg_password',
389       { AutoCommit => 0 },
390       { quote_char => q{"}, name_sep => q{.} },
391     ]
392   );
393
394   # Equivalent to the previous example
395   ->connect_info(
396     [
397       'dbi:Pg:dbname=foo',
398       'postgres',
399       'my_pg_password',
400       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
401     ]
402   );
403
404   # Subref + DBIC-specific connection options
405   ->connect_info(
406     [
407       sub { DBI->connect(...) },
408       {
409           quote_char => q{`},
410           name_sep => q{@},
411           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
412       },
413     ]
414   );
415
416 =cut
417
418 sub connect_info {
419   my ($self, $info_arg) = @_;
420
421   return $self->_connect_info if !$info_arg;
422
423   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
424   #  the new set of options
425   $self->_sql_maker(undef);
426   $self->_sql_maker_opts({});
427
428   my $info = [ @$info_arg ]; # copy because we can alter it
429   my $last_info = $info->[-1];
430   if(ref $last_info eq 'HASH') {
431     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
432       $self->on_connect_do($on_connect_do);
433     }
434     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
435       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
436         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
437       }
438     }
439
440     # Get rid of any trailing empty hashref
441     pop(@$info) if !keys %$last_info;
442   }
443
444   $self->_connect_info($info);
445 }
446
447 =head2 on_connect_do
448
449 This method is deprecated in favor of setting via L</connect_info>.
450
451 =head2 dbh_do
452
453 Arguments: $subref, @extra_coderef_args?
454
455 Execute the given subref using the new exception-based connection management.
456
457 The first two arguments will be the storage object that C<dbh_do> was called
458 on and a database handle to use.  Any additional arguments will be passed
459 verbatim to the called subref as arguments 2 and onwards.
460
461 Using this (instead of $self->_dbh or $self->dbh) ensures correct
462 exception handling and reconnection (or failover in future subclasses).
463
464 Your subref should have no side-effects outside of the database, as
465 there is the potential for your subref to be partially double-executed
466 if the database connection was stale/dysfunctional.
467
468 Example:
469
470   my @stuff = $schema->storage->dbh_do(
471     sub {
472       my ($storage, $dbh, @cols) = @_;
473       my $cols = join(q{, }, @cols);
474       $dbh->selectrow_array("SELECT $cols FROM foo");
475     },
476     @column_list
477   );
478
479 =cut
480
481 sub dbh_do {
482   my $self = shift;
483   my $coderef = shift;
484
485   return $coderef->($self, $self->_dbh, @_) if $self->{_in_txn_do};
486
487   ref $coderef eq 'CODE' or $self->throw_exception
488     ('$coderef must be a CODE reference');
489
490   my @result;
491   my $want_array = wantarray;
492
493   eval {
494     $self->_verify_pid if $self->_dbh;
495     $self->_populate_dbh if !$self->_dbh;
496     if($want_array) {
497         @result = $coderef->($self, $self->_dbh, @_);
498     }
499     elsif(defined $want_array) {
500         $result[0] = $coderef->($self, $self->_dbh, @_);
501     }
502     else {
503         $coderef->($self, $self->_dbh, @_);
504     }
505   };
506
507   my $exception = $@;
508   if(!$exception) { return $want_array ? @result : $result[0] }
509
510   $self->throw_exception($exception) if $self->connected;
511
512   # We were not connected - reconnect and retry, but let any
513   #  exception fall right through this time
514   $self->_populate_dbh;
515   $coderef->($self, $self->_dbh, @_);
516 }
517
518 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
519 # It also informs dbh_do to bypass itself while under the direction of txn_do,
520 #  via $self->{_in_txn_do} (this saves some redundant eval and errorcheck, etc)
521 sub txn_do {
522   my $self = shift;
523   my $coderef = shift;
524
525   ref $coderef eq 'CODE' or $self->throw_exception
526     ('$coderef must be a CODE reference');
527
528   local $self->{_in_txn_do} = 1;
529
530   my @result;
531   my $want_array = wantarray;
532
533   my $tried = 0;
534   while(1) {
535     eval {
536       $self->_verify_pid if $self->_dbh;
537       $self->_populate_dbh if !$self->_dbh;
538
539       $self->txn_begin;
540       if($want_array) {
541           @result = $coderef->(@_);
542       }
543       elsif(defined $want_array) {
544           $result[0] = $coderef->(@_);
545       }
546       else {
547           $coderef->(@_);
548       }
549       $self->txn_commit;
550     };
551
552     my $exception = $@;
553     if(!$exception) { return $want_array ? @result : $result[0] }
554
555     if($tried++ > 0 || $self->connected) {
556       eval { $self->txn_rollback };
557       my $rollback_exception = $@;
558       if($rollback_exception) {
559         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
560         $self->throw_exception($exception)  # propagate nested rollback
561           if $rollback_exception =~ /$exception_class/;
562
563         $self->throw_exception(
564           "Transaction aborted: ${exception}. "
565           . "Rollback failed: ${rollback_exception}"
566         );
567       }
568       $self->throw_exception($exception)
569     }
570
571     # We were not connected, and was first try - reconnect and retry
572     # via the while loop
573     $self->_populate_dbh;
574   }
575 }
576
577 =head2 disconnect
578
579 Our C<disconnect> method also performs a rollback first if the
580 database is not in C<AutoCommit> mode.
581
582 =cut
583
584 sub disconnect {
585   my ($self) = @_;
586
587   if( $self->connected ) {
588     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
589     $self->_dbh->disconnect;
590     $self->_dbh(undef);
591   }
592 }
593
594 sub connected {
595   my ($self) = @_;
596
597   if(my $dbh = $self->_dbh) {
598       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
599           return $self->_dbh(undef);
600       }
601       else {
602           $self->_verify_pid;
603       }
604       return ($dbh->FETCH('Active') && $dbh->ping);
605   }
606
607   return 0;
608 }
609
610 # handle pid changes correctly
611 #  NOTE: assumes $self->_dbh is a valid $dbh
612 sub _verify_pid {
613   my ($self) = @_;
614
615   return if $self->_conn_pid == $$;
616
617   $self->_dbh->{InactiveDestroy} = 1;
618   $self->_dbh(undef);
619
620   return;
621 }
622
623 sub ensure_connected {
624   my ($self) = @_;
625
626   unless ($self->connected) {
627     $self->_populate_dbh;
628   }
629 }
630
631 =head2 dbh
632
633 Returns the dbh - a data base handle of class L<DBI>.
634
635 =cut
636
637 sub dbh {
638   my ($self) = @_;
639
640   $self->ensure_connected;
641   return $self->_dbh;
642 }
643
644 sub _sql_maker_args {
645     my ($self) = @_;
646     
647     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
648 }
649
650 sub sql_maker {
651   my ($self) = @_;
652   unless ($self->_sql_maker) {
653     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
654   }
655   return $self->_sql_maker;
656 }
657
658 sub _populate_dbh {
659   my ($self) = @_;
660   my @info = @{$self->_connect_info || []};
661   $self->_dbh($self->_connect(@info));
662
663   if(ref $self eq 'DBIx::Class::Storage::DBI') {
664     my $driver = $self->_dbh->{Driver}->{Name};
665     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
666       bless $self, "DBIx::Class::Storage::DBI::${driver}";
667       $self->_rebless() if $self->can('_rebless');
668     }
669   }
670
671   # if on-connect sql statements are given execute them
672   foreach my $sql_statement (@{$self->on_connect_do || []}) {
673     $self->debugobj->query_start($sql_statement) if $self->debug();
674     $self->_dbh->do($sql_statement);
675     $self->debugobj->query_end($sql_statement) if $self->debug();
676   }
677
678   $self->_conn_pid($$);
679   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
680 }
681
682 sub _connect {
683   my ($self, @info) = @_;
684
685   $self->throw_exception("You failed to provide any connection info")
686       if !@info;
687
688   my ($old_connect_via, $dbh);
689
690   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
691       $old_connect_via = $DBI::connect_via;
692       $DBI::connect_via = 'connect';
693   }
694
695   eval {
696     if(ref $info[0] eq 'CODE') {
697        $dbh = &{$info[0]}
698     }
699     else {
700        $dbh = DBI->connect(@info);
701        $dbh->{RaiseError} = 1;
702        $dbh->{PrintError} = 0;
703        $dbh->{PrintWarn} = 0;
704     }
705   };
706
707   $DBI::connect_via = $old_connect_via if $old_connect_via;
708
709   if (!$dbh || $@) {
710     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
711   }
712
713   $dbh;
714 }
715
716 sub _dbh_txn_begin {
717   my ($self, $dbh) = @_;
718   if ($dbh->{AutoCommit}) {
719     $self->debugobj->txn_begin()
720       if ($self->debug);
721     $dbh->begin_work;
722   }
723 }
724
725 sub txn_begin {
726   my $self = shift;
727   $self->dbh_do($self->can('_dbh_txn_begin'))
728     if $self->{transaction_depth}++ == 0;
729 }
730
731 sub _dbh_txn_commit {
732   my ($self, $dbh) = @_;
733   if ($self->{transaction_depth} == 0) {
734     unless ($dbh->{AutoCommit}) {
735       $self->debugobj->txn_commit()
736         if ($self->debug);
737       $dbh->commit;
738     }
739   }
740   else {
741     if (--$self->{transaction_depth} == 0) {
742       $self->debugobj->txn_commit()
743         if ($self->debug);
744       $dbh->commit;
745     }
746   }
747 }
748
749 sub txn_commit {
750   my $self = shift;
751   $self->dbh_do($self->can('_dbh_txn_commit'));
752 }
753
754 sub _dbh_txn_rollback {
755   my ($self, $dbh) = @_;
756   if ($self->{transaction_depth} == 0) {
757     unless ($dbh->{AutoCommit}) {
758       $self->debugobj->txn_rollback()
759         if ($self->debug);
760       $dbh->rollback;
761     }
762   }
763   else {
764     if (--$self->{transaction_depth} == 0) {
765       $self->debugobj->txn_rollback()
766         if ($self->debug);
767       $dbh->rollback;
768     }
769     else {
770       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
771     }
772   }
773 }
774
775 sub txn_rollback {
776   my $self = shift;
777
778   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
779   if ($@) {
780     my $error = $@;
781     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
782     $error =~ /$exception_class/ and $self->throw_exception($error);
783     $self->{transaction_depth} = 0;          # ensure that a failed rollback
784     $self->throw_exception($error);          # resets the transaction depth
785   }
786 }
787
788 # This used to be the top-half of _execute.  It was split out to make it
789 #  easier to override in NoBindVars without duping the rest.  It takes up
790 #  all of _execute's args, and emits $sql, @bind.
791 sub _prep_for_execute {
792   my ($self, $op, $extra_bind, $ident, @args) = @_;
793
794   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
795   unshift(@bind, @$extra_bind) if $extra_bind;
796   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
797
798   return ($sql, @bind);
799 }
800
801 sub _execute {
802   my $self = shift;
803
804   my ($sql, @bind) = $self->_prep_for_execute(@_);
805
806   if ($self->debug) {
807       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
808       $self->debugobj->query_start($sql, @debug_bind);
809   }
810
811   my $sth = $self->sth($sql);
812
813   my $rv;
814   if ($sth) {
815     my $time = time();
816     $rv = eval { $sth->execute(@bind) };
817
818     if ($@ || !$rv) {
819       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
820     }
821   } else {
822     $self->throw_exception("'$sql' did not generate a statement.");
823   }
824   if ($self->debug) {
825       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
826       $self->debugobj->query_end($sql, @debug_bind);
827   }
828   return (wantarray ? ($rv, $sth, @bind) : $rv);
829 }
830
831 sub insert {
832   my ($self, $ident, $to_insert) = @_;
833   $self->throw_exception(
834     "Couldn't insert ".join(', ',
835       map "$_ => $to_insert->{$_}", keys %$to_insert
836     )." into ${ident}"
837   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
838   return $to_insert;
839 }
840
841 sub update {
842   return shift->_execute('update' => [], @_);
843 }
844
845 sub delete {
846   return shift->_execute('delete' => [], @_);
847 }
848
849 sub _select {
850   my ($self, $ident, $select, $condition, $attrs) = @_;
851   my $order = $attrs->{order_by};
852   if (ref $condition eq 'SCALAR') {
853     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
854   }
855   if (exists $attrs->{group_by} || $attrs->{having}) {
856     $order = {
857       group_by => $attrs->{group_by},
858       having => $attrs->{having},
859       ($order ? (order_by => $order) : ())
860     };
861   }
862   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
863   if ($attrs->{software_limit} ||
864       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
865         $attrs->{software_limit} = 1;
866   } else {
867     $self->throw_exception("rows attribute must be positive if present")
868       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
869     push @args, $attrs->{rows}, $attrs->{offset};
870   }
871   return $self->_execute(@args);
872 }
873
874 =head2 select
875
876 =over 4
877
878 =item Arguments: $ident, $select, $condition, $attrs
879
880 =back
881
882 Handle a SQL select statement.
883
884 =cut
885
886 sub select {
887   my $self = shift;
888   my ($ident, $select, $condition, $attrs) = @_;
889   return $self->cursor->new($self, \@_, $attrs);
890 }
891
892 sub select_single {
893   my $self = shift;
894   my ($rv, $sth, @bind) = $self->_select(@_);
895   my @row = $sth->fetchrow_array;
896   # Need to call finish() to work round broken DBDs
897   $sth->finish();
898   return @row;
899 }
900
901 =head2 sth
902
903 =over 4
904
905 =item Arguments: $sql
906
907 =back
908
909 Returns a L<DBI> sth (statement handle) for the supplied SQL.
910
911 =cut
912
913 sub _dbh_sth {
914   my ($self, $dbh, $sql) = @_;
915   # 3 is the if_active parameter which avoids active sth re-use
916   $dbh->prepare_cached($sql, {}, 3) or
917     $self->throw_exception(
918       'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
919     );
920 }
921
922 sub sth {
923   my ($self, $sql) = @_;
924   $self->dbh_do($self->can('_dbh_sth'), $sql);
925 }
926
927 sub _dbh_columns_info_for {
928   my ($self, $dbh, $table) = @_;
929
930   if ($dbh->can('column_info')) {
931     my %result;
932     eval {
933       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
934       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
935       $sth->execute();
936       while ( my $info = $sth->fetchrow_hashref() ){
937         my %column_info;
938         $column_info{data_type}   = $info->{TYPE_NAME};
939         $column_info{size}      = $info->{COLUMN_SIZE};
940         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
941         $column_info{default_value} = $info->{COLUMN_DEF};
942         my $col_name = $info->{COLUMN_NAME};
943         $col_name =~ s/^\"(.*)\"$/$1/;
944
945         $result{$col_name} = \%column_info;
946       }
947     };
948     return \%result if !$@ && scalar keys %result;
949   }
950
951   my %result;
952   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
953   $sth->execute;
954   my @columns = @{$sth->{NAME_lc}};
955   for my $i ( 0 .. $#columns ){
956     my %column_info;
957     my $type_num = $sth->{TYPE}->[$i];
958     my $type_name;
959     if(defined $type_num && $dbh->can('type_info')) {
960       my $type_info = $dbh->type_info($type_num);
961       $type_name = $type_info->{TYPE_NAME} if $type_info;
962     }
963     $column_info{data_type} = $type_name ? $type_name : $type_num;
964     $column_info{size} = $sth->{PRECISION}->[$i];
965     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
966
967     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
968       $column_info{data_type} = $1;
969       $column_info{size}    = $2;
970     }
971
972     $result{$columns[$i]} = \%column_info;
973   }
974
975   return \%result;
976 }
977
978 sub columns_info_for {
979   my ($self, $table) = @_;
980   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
981 }
982
983 =head2 last_insert_id
984
985 Return the row id of the last insert.
986
987 =cut
988
989 sub _dbh_last_insert_id {
990     my ($self, $dbh, $source, $col) = @_;
991     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
992     $dbh->func('last_insert_rowid');
993 }
994
995 sub last_insert_id {
996   my $self = shift;
997   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
998 }
999
1000 =head2 sqlt_type
1001
1002 Returns the database driver name.
1003
1004 =cut
1005
1006 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1007
1008 =head2 create_ddl_dir (EXPERIMENTAL)
1009
1010 =over 4
1011
1012 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1013
1014 =back
1015
1016 Creates a SQL file based on the Schema, for each of the specified
1017 database types, in the given directory.
1018
1019 Note that this feature is currently EXPERIMENTAL and may not work correctly
1020 across all databases, or fully handle complex relationships.
1021
1022 =cut
1023
1024 sub create_ddl_dir
1025 {
1026   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1027
1028   if(!$dir || !-d $dir)
1029   {
1030     warn "No directory given, using ./\n";
1031     $dir = "./";
1032   }
1033   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1034   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1035   $version ||= $schema->VERSION || '1.x';
1036   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1037
1038   eval "use SQL::Translator";
1039   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1040
1041   my $sqlt = SQL::Translator->new($sqltargs);
1042   foreach my $db (@$databases)
1043   {
1044     $sqlt->reset();
1045     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1046 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1047     $sqlt->data($schema);
1048     $sqlt->producer($db);
1049
1050     my $file;
1051     my $filename = $schema->ddl_filename($db, $dir, $version);
1052     if(-e $filename)
1053     {
1054       $self->throw_exception("$filename already exists, skipping $db");
1055       next;
1056     }
1057     open($file, ">$filename") 
1058       or $self->throw_exception("Can't open $filename for writing ($!)");
1059     my $output = $sqlt->translate;
1060 #use Data::Dumper;
1061 #    print join(":", keys %{$schema->source_registrations});
1062 #    print Dumper($sqlt->schema);
1063     if(!$output)
1064     {
1065       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1066       next;
1067     }
1068     print $file $output;
1069     close($file);
1070   }
1071
1072 }
1073
1074 =head2 deployment_statements
1075
1076 =over 4
1077
1078 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1079
1080 =back
1081
1082 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1083 The database driver name is given by C<$type>, though the value from
1084 L</sqlt_type> is used if it is not specified.
1085
1086 C<$directory> is used to return statements from files in a previously created
1087 L</create_ddl_dir> directory and is optional. The filenames are constructed
1088 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1089
1090 If no C<$directory> is specified then the statements are constructed on the
1091 fly using L<SQL::Translator> and C<$version> is ignored.
1092
1093 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1094
1095 =cut
1096
1097 sub deployment_statements {
1098   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1099   # Need to be connected to get the correct sqlt_type
1100   $self->ensure_connected() unless $type;
1101   $type ||= $self->sqlt_type;
1102   $version ||= $schema->VERSION || '1.x';
1103   $dir ||= './';
1104   eval "use SQL::Translator";
1105   if(!$@)
1106   {
1107     eval "use SQL::Translator::Parser::DBIx::Class;";
1108     $self->throw_exception($@) if $@;
1109     eval "use SQL::Translator::Producer::${type};";
1110     $self->throw_exception($@) if $@;
1111     my $tr = SQL::Translator->new(%$sqltargs);
1112     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1113     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1114   }
1115
1116   my $filename = $schema->ddl_filename($type, $dir, $version);
1117   if(!-f $filename)
1118   {
1119 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1120       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1121       return;
1122   }
1123   my $file;
1124   open($file, "<$filename") 
1125       or $self->throw_exception("Can't open $filename ($!)");
1126   my @rows = <$file>;
1127   close($file);
1128
1129   return join('', @rows);
1130   
1131 }
1132
1133 sub deploy {
1134   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1135   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1136     for ( split(";\n", $statement)) {
1137       next if($_ =~ /^--/);
1138       next if(!$_);
1139 #      next if($_ =~ /^DROP/m);
1140       next if($_ =~ /^BEGIN TRANSACTION/m);
1141       next if($_ =~ /^COMMIT/m);
1142       next if $_ =~ /^\s+$/; # skip whitespace only
1143       $self->debugobj->query_start($_) if $self->debug;
1144       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1145       $self->debugobj->query_end($_) if $self->debug;
1146     }
1147   }
1148 }
1149
1150 =head2 datetime_parser
1151
1152 Returns the datetime parser class
1153
1154 =cut
1155
1156 sub datetime_parser {
1157   my $self = shift;
1158   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1159 }
1160
1161 =head2 datetime_parser_type
1162
1163 Defines (returns) the datetime parser class - currently hardwired to
1164 L<DateTime::Format::MySQL>
1165
1166 =cut
1167
1168 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1169
1170 =head2 build_datetime_parser
1171
1172 See L</datetime_parser>
1173
1174 =cut
1175
1176 sub build_datetime_parser {
1177   my $self = shift;
1178   my $type = $self->datetime_parser_type(@_);
1179   eval "use ${type}";
1180   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1181   return $type;
1182 }
1183
1184 sub DESTROY {
1185   my $self = shift;
1186   return if !$self->_dbh;
1187   $self->_verify_pid;
1188   $self->_dbh(undef);
1189 }
1190
1191 1;
1192
1193 =head1 SQL METHODS
1194
1195 The module defines a set of methods within the DBIC::SQL::Abstract
1196 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1197 SQL query functions.
1198
1199 The following methods are extended:-
1200
1201 =over 4
1202
1203 =item delete
1204
1205 =item insert
1206
1207 =item select
1208
1209 =item update
1210
1211 =item limit_dialect
1212
1213 See L</connect_info> for details.
1214 For setting, this method is deprecated in favor of L</connect_info>.
1215
1216 =item quote_char
1217
1218 See L</connect_info> for details.
1219 For setting, this method is deprecated in favor of L</connect_info>.
1220
1221 =item name_sep
1222
1223 See L</connect_info> for details.
1224 For setting, this method is deprecated in favor of L</connect_info>.
1225
1226 =back
1227
1228 =head1 AUTHORS
1229
1230 Matt S. Trout <mst@shadowcatsystems.co.uk>
1231
1232 Andy Grundman <andy@hybridized.org>
1233
1234 =head1 LICENSE
1235
1236 You may distribute this code under the same terms as Perl itself.
1237
1238 =cut