Storage::DBI::Cursor now makes use of the new Storage::DBI exception stuff instead...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 sub _find_syntax {
62   my ($self, $syntax) = @_;
63   my $dbhname = ref $syntax eq 'HASH' ? $syntax->{Driver}{Name} : '';
64   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
65     return 'RowNumberOver';
66   }
67
68   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
69 }
70
71 sub select {
72   my ($self, $table, $fields, $where, $order, @rest) = @_;
73   $table = $self->_quote($table) unless ref($table);
74   local $self->{rownum_hack_count} = 1
75     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
76   @rest = (-1) unless defined $rest[0];
77   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
78     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
79   local $self->{having_bind} = [];
80   my ($sql, @ret) = $self->SUPER::select(
81     $table, $self->_recurse_fields($fields), $where, $order, @rest
82   );
83   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
84 }
85
86 sub insert {
87   my $self = shift;
88   my $table = shift;
89   $table = $self->_quote($table) unless ref($table);
90   $self->SUPER::insert($table, @_);
91 }
92
93 sub update {
94   my $self = shift;
95   my $table = shift;
96   $table = $self->_quote($table) unless ref($table);
97   $self->SUPER::update($table, @_);
98 }
99
100 sub delete {
101   my $self = shift;
102   my $table = shift;
103   $table = $self->_quote($table) unless ref($table);
104   $self->SUPER::delete($table, @_);
105 }
106
107 sub _emulate_limit {
108   my $self = shift;
109   if ($_[3] == -1) {
110     return $_[1].$self->_order_by($_[2]);
111   } else {
112     return $self->SUPER::_emulate_limit(@_);
113   }
114 }
115
116 sub _recurse_fields {
117   my ($self, $fields) = @_;
118   my $ref = ref $fields;
119   return $self->_quote($fields) unless $ref;
120   return $$fields if $ref eq 'SCALAR';
121
122   if ($ref eq 'ARRAY') {
123     return join(', ', map {
124       $self->_recurse_fields($_)
125       .(exists $self->{rownum_hack_count}
126          ? ' AS col'.$self->{rownum_hack_count}++
127          : '')
128      } @$fields);
129   } elsif ($ref eq 'HASH') {
130     foreach my $func (keys %$fields) {
131       return $self->_sqlcase($func)
132         .'( '.$self->_recurse_fields($fields->{$func}).' )';
133     }
134   }
135 }
136
137 sub _order_by {
138   my $self = shift;
139   my $ret = '';
140   my @extra;
141   if (ref $_[0] eq 'HASH') {
142     if (defined $_[0]->{group_by}) {
143       $ret = $self->_sqlcase(' group by ')
144                .$self->_recurse_fields($_[0]->{group_by});
145     }
146     if (defined $_[0]->{having}) {
147       my $frag;
148       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
149       push(@{$self->{having_bind}}, @extra);
150       $ret .= $self->_sqlcase(' having ').$frag;
151     }
152     if (defined $_[0]->{order_by}) {
153       $ret .= $self->_order_by($_[0]->{order_by});
154     }
155   } elsif (ref $_[0] eq 'SCALAR') {
156     $ret = $self->_sqlcase(' order by ').${ $_[0] };
157   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
158     my @order = @{+shift};
159     $ret = $self->_sqlcase(' order by ')
160           .join(', ', map {
161                         my $r = $self->_order_by($_, @_);
162                         $r =~ s/^ ?ORDER BY //i;
163                         $r;
164                       } @order);
165   } else {
166     $ret = $self->SUPER::_order_by(@_);
167   }
168   return $ret;
169 }
170
171 sub _order_directions {
172   my ($self, $order) = @_;
173   $order = $order->{order_by} if ref $order eq 'HASH';
174   return $self->SUPER::_order_directions($order);
175 }
176
177 sub _table {
178   my ($self, $from) = @_;
179   if (ref $from eq 'ARRAY') {
180     return $self->_recurse_from(@$from);
181   } elsif (ref $from eq 'HASH') {
182     return $self->_make_as($from);
183   } else {
184     return $from; # would love to quote here but _table ends up getting called
185                   # twice during an ->select without a limit clause due to
186                   # the way S::A::Limit->select works. should maybe consider
187                   # bypassing this and doing S::A::select($self, ...) in
188                   # our select method above. meantime, quoting shims have
189                   # been added to select/insert/update/delete here
190   }
191 }
192
193 sub _recurse_from {
194   my ($self, $from, @join) = @_;
195   my @sqlf;
196   push(@sqlf, $self->_make_as($from));
197   foreach my $j (@join) {
198     my ($to, $on) = @$j;
199
200     # check whether a join type exists
201     my $join_clause = '';
202     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
203     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
204       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
205     } else {
206       $join_clause = ' JOIN ';
207     }
208     push(@sqlf, $join_clause);
209
210     if (ref $to eq 'ARRAY') {
211       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
212     } else {
213       push(@sqlf, $self->_make_as($to));
214     }
215     push(@sqlf, ' ON ', $self->_join_condition($on));
216   }
217   return join('', @sqlf);
218 }
219
220 sub _make_as {
221   my ($self, $from) = @_;
222   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
223                      reverse each %{$self->_skip_options($from)});
224 }
225
226 sub _skip_options {
227   my ($self, $hash) = @_;
228   my $clean_hash = {};
229   $clean_hash->{$_} = $hash->{$_}
230     for grep {!/^-/} keys %$hash;
231   return $clean_hash;
232 }
233
234 sub _join_condition {
235   my ($self, $cond) = @_;
236   if (ref $cond eq 'HASH') {
237     my %j;
238     for (keys %$cond) {
239       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
240     };
241     return $self->_recurse_where(\%j);
242   } elsif (ref $cond eq 'ARRAY') {
243     return join(' OR ', map { $self->_join_condition($_) } @$cond);
244   } else {
245     die "Can't handle this yet!";
246   }
247 }
248
249 sub _quote {
250   my ($self, $label) = @_;
251   return '' unless defined $label;
252   return "*" if $label eq '*';
253   return $label unless $self->{quote_char};
254   if(ref $self->{quote_char} eq "ARRAY"){
255     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
256       if !defined $self->{name_sep};
257     my $sep = $self->{name_sep};
258     return join($self->{name_sep},
259         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
260        split(/\Q$sep\E/,$label));
261   }
262   return $self->SUPER::_quote($label);
263 }
264
265 sub limit_dialect {
266     my $self = shift;
267     $self->{limit_dialect} = shift if @_;
268     return $self->{limit_dialect};
269 }
270
271 sub quote_char {
272     my $self = shift;
273     $self->{quote_char} = shift if @_;
274     return $self->{quote_char};
275 }
276
277 sub name_sep {
278     my $self = shift;
279     $self->{name_sep} = shift if @_;
280     return $self->{name_sep};
281 }
282
283 } # End of BEGIN block
284
285 =head1 NAME
286
287 DBIx::Class::Storage::DBI - DBI storage handler
288
289 =head1 SYNOPSIS
290
291 =head1 DESCRIPTION
292
293 This class represents the connection to an RDBMS via L<DBI>.  See
294 L<DBIx::Class::Storage> for general information.  This pod only
295 documents DBI-specific methods and behaviors.
296
297 =head1 METHODS
298
299 =cut
300
301 sub new {
302   my $new = shift->next::method(@_);
303
304   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
305   $new->transaction_depth(0);
306   $new->_sql_maker_opts({});
307   $new->{_in_dbh_do} = 0;
308   $new->{_dbh_gen} = 0;
309
310   $new;
311 }
312
313 =head2 connect_info
314
315 The arguments of C<connect_info> are always a single array reference.
316
317 This is normally accessed via L<DBIx::Class::Schema/connection>, which
318 encapsulates its argument list in an arrayref before calling
319 C<connect_info> here.
320
321 The arrayref can either contain the same set of arguments one would
322 normally pass to L<DBI/connect>, or a lone code reference which returns
323 a connected database handle.
324
325 In either case, if the final argument in your connect_info happens
326 to be a hashref, C<connect_info> will look there for several
327 connection-specific options:
328
329 =over 4
330
331 =item on_connect_do
332
333 This can be set to an arrayref of literal sql statements, which will
334 be executed immediately after making the connection to the database
335 every time we [re-]connect.
336
337 =item limit_dialect 
338
339 Sets the limit dialect. This is useful for JDBC-bridge among others
340 where the remote SQL-dialect cannot be determined by the name of the
341 driver alone.
342
343 =item quote_char
344
345 Specifies what characters to use to quote table and column names. If 
346 you use this you will want to specify L<name_sep> as well.
347
348 quote_char expects either a single character, in which case is it is placed
349 on either side of the table/column, or an arrayref of length 2 in which case the
350 table/column name is placed between the elements.
351
352 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
353 use C<quote_char =E<gt> [qw/[ ]/]>.
354
355 =item name_sep
356
357 This only needs to be used in conjunction with L<quote_char>, and is used to 
358 specify the charecter that seperates elements (schemas, tables, columns) from 
359 each other. In most cases this is simply a C<.>.
360
361 =back
362
363 These options can be mixed in with your other L<DBI> connection attributes,
364 or placed in a seperate hashref after all other normal L<DBI> connection
365 arguments.
366
367 Every time C<connect_info> is invoked, any previous settings for
368 these options will be cleared before setting the new ones, regardless of
369 whether any options are specified in the new C<connect_info>.
370
371 Important note:  DBIC expects the returned database handle provided by 
372 a subref argument to have RaiseError set on it.  If it doesn't, things
373 might not work very well, YMMV.  If you don't use a subref, DBIC will
374 force this setting for you anyways.  Setting HandleError to anything
375 other than simple exception object wrapper might cause problems too.
376
377 Examples:
378
379   # Simple SQLite connection
380   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
381
382   # Connect via subref
383   ->connect_info([ sub { DBI->connect(...) } ]);
384
385   # A bit more complicated
386   ->connect_info(
387     [
388       'dbi:Pg:dbname=foo',
389       'postgres',
390       'my_pg_password',
391       { AutoCommit => 0 },
392       { quote_char => q{"}, name_sep => q{.} },
393     ]
394   );
395
396   # Equivalent to the previous example
397   ->connect_info(
398     [
399       'dbi:Pg:dbname=foo',
400       'postgres',
401       'my_pg_password',
402       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
403     ]
404   );
405
406   # Subref + DBIC-specific connection options
407   ->connect_info(
408     [
409       sub { DBI->connect(...) },
410       {
411           quote_char => q{`},
412           name_sep => q{@},
413           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
414       },
415     ]
416   );
417
418 =cut
419
420 sub connect_info {
421   my ($self, $info_arg) = @_;
422
423   return $self->_connect_info if !$info_arg;
424
425   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
426   #  the new set of options
427   $self->_sql_maker(undef);
428   $self->_sql_maker_opts({});
429
430   my $info = [ @$info_arg ]; # copy because we can alter it
431   my $last_info = $info->[-1];
432   if(ref $last_info eq 'HASH') {
433     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
434       $self->on_connect_do($on_connect_do);
435     }
436     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
437       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
438         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
439       }
440     }
441
442     # Get rid of any trailing empty hashref
443     pop(@$info) if !keys %$last_info;
444   }
445
446   $self->_connect_info($info);
447 }
448
449 =head2 on_connect_do
450
451 This method is deprecated in favor of setting via L</connect_info>.
452
453 =head2 dbh_do
454
455 Arguments: $subref, @extra_coderef_args?
456
457 Execute the given subref using the new exception-based connection management.
458
459 The first two arguments will be the storage object that C<dbh_do> was called
460 on and a database handle to use.  Any additional arguments will be passed
461 verbatim to the called subref as arguments 2 and onwards.
462
463 Using this (instead of $self->_dbh or $self->dbh) ensures correct
464 exception handling and reconnection (or failover in future subclasses).
465
466 Your subref should have no side-effects outside of the database, as
467 there is the potential for your subref to be partially double-executed
468 if the database connection was stale/dysfunctional.
469
470 Example:
471
472   my @stuff = $schema->storage->dbh_do(
473     sub {
474       my ($storage, $dbh, @cols) = @_;
475       my $cols = join(q{, }, @cols);
476       $dbh->selectrow_array("SELECT $cols FROM foo");
477     },
478     @column_list
479   );
480
481 =cut
482
483 sub dbh_do {
484   my $self = shift;
485   my $coderef = shift;
486
487   ref $coderef eq 'CODE' or $self->throw_exception
488     ('$coderef must be a CODE reference');
489
490   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
491   local $self->{_in_dbh_do} = 1;
492
493   my @result;
494   my $want_array = wantarray;
495
496   eval {
497     $self->_verify_pid if $self->_dbh;
498     $self->_populate_dbh if !$self->_dbh;
499     if($want_array) {
500         @result = $coderef->($self, $self->_dbh, @_);
501     }
502     elsif(defined $want_array) {
503         $result[0] = $coderef->($self, $self->_dbh, @_);
504     }
505     else {
506         $coderef->($self, $self->_dbh, @_);
507     }
508   };
509
510   my $exception = $@;
511   if(!$exception) { return $want_array ? @result : $result[0] }
512
513   $self->throw_exception($exception) if $self->connected;
514
515   # We were not connected - reconnect and retry, but let any
516   #  exception fall right through this time
517   $self->_populate_dbh;
518   $coderef->($self, $self->_dbh, @_);
519 }
520
521 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
522 # It also informs dbh_do to bypass itself while under the direction of txn_do,
523 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
524 sub txn_do {
525   my $self = shift;
526   my $coderef = shift;
527
528   ref $coderef eq 'CODE' or $self->throw_exception
529     ('$coderef must be a CODE reference');
530
531   local $self->{_in_dbh_do} = 1;
532
533   my @result;
534   my $want_array = wantarray;
535
536   my $tried = 0;
537   while(1) {
538     eval {
539       $self->_verify_pid if $self->_dbh;
540       $self->_populate_dbh if !$self->_dbh;
541
542       $self->txn_begin;
543       if($want_array) {
544           @result = $coderef->(@_);
545       }
546       elsif(defined $want_array) {
547           $result[0] = $coderef->(@_);
548       }
549       else {
550           $coderef->(@_);
551       }
552       $self->txn_commit;
553     };
554
555     my $exception = $@;
556     if(!$exception) { return $want_array ? @result : $result[0] }
557
558     if($tried++ > 0 || $self->connected) {
559       eval { $self->txn_rollback };
560       my $rollback_exception = $@;
561       if($rollback_exception) {
562         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
563         $self->throw_exception($exception)  # propagate nested rollback
564           if $rollback_exception =~ /$exception_class/;
565
566         $self->throw_exception(
567           "Transaction aborted: ${exception}. "
568           . "Rollback failed: ${rollback_exception}"
569         );
570       }
571       $self->throw_exception($exception)
572     }
573
574     # We were not connected, and was first try - reconnect and retry
575     # via the while loop
576     $self->_populate_dbh;
577   }
578 }
579
580 =head2 disconnect
581
582 Our C<disconnect> method also performs a rollback first if the
583 database is not in C<AutoCommit> mode.
584
585 =cut
586
587 sub disconnect {
588   my ($self) = @_;
589
590   if( $self->connected ) {
591     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
592     $self->_dbh->disconnect;
593     $self->_dbh(undef);
594     $self->{_dbh_gen}++;
595   }
596 }
597
598 sub connected {
599   my ($self) = @_;
600
601   if(my $dbh = $self->_dbh) {
602       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
603           $self->_dbh(undef);
604           $self->{_dbh_gen}++;
605           return;
606       }
607       else {
608           $self->_verify_pid;
609       }
610       return ($dbh->FETCH('Active') && $dbh->ping);
611   }
612
613   return 0;
614 }
615
616 # handle pid changes correctly
617 #  NOTE: assumes $self->_dbh is a valid $dbh
618 sub _verify_pid {
619   my ($self) = @_;
620
621   return if $self->_conn_pid == $$;
622
623   $self->_dbh->{InactiveDestroy} = 1;
624   $self->_dbh(undef);
625   $self->{_dbh_gen}++;
626
627   return;
628 }
629
630 sub ensure_connected {
631   my ($self) = @_;
632
633   unless ($self->connected) {
634     $self->_populate_dbh;
635   }
636 }
637
638 =head2 dbh
639
640 Returns the dbh - a data base handle of class L<DBI>.
641
642 =cut
643
644 sub dbh {
645   my ($self) = @_;
646
647   $self->ensure_connected;
648   return $self->_dbh;
649 }
650
651 sub _sql_maker_args {
652     my ($self) = @_;
653     
654     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
655 }
656
657 sub sql_maker {
658   my ($self) = @_;
659   unless ($self->_sql_maker) {
660     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
661   }
662   return $self->_sql_maker;
663 }
664
665 sub _populate_dbh {
666   my ($self) = @_;
667   my @info = @{$self->_connect_info || []};
668   $self->_dbh($self->_connect(@info));
669
670   if(ref $self eq 'DBIx::Class::Storage::DBI') {
671     my $driver = $self->_dbh->{Driver}->{Name};
672     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
673       bless $self, "DBIx::Class::Storage::DBI::${driver}";
674       $self->_rebless() if $self->can('_rebless');
675     }
676   }
677
678   # if on-connect sql statements are given execute them
679   foreach my $sql_statement (@{$self->on_connect_do || []}) {
680     $self->debugobj->query_start($sql_statement) if $self->debug();
681     $self->_dbh->do($sql_statement);
682     $self->debugobj->query_end($sql_statement) if $self->debug();
683   }
684
685   $self->_conn_pid($$);
686   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
687 }
688
689 sub _connect {
690   my ($self, @info) = @_;
691
692   $self->throw_exception("You failed to provide any connection info")
693       if !@info;
694
695   my ($old_connect_via, $dbh);
696
697   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
698       $old_connect_via = $DBI::connect_via;
699       $DBI::connect_via = 'connect';
700   }
701
702   eval {
703     if(ref $info[0] eq 'CODE') {
704        $dbh = &{$info[0]}
705     }
706     else {
707        $dbh = DBI->connect(@info);
708        $dbh->{RaiseError} = 1;
709        $dbh->{PrintError} = 0;
710        $dbh->{PrintWarn} = 0;
711     }
712   };
713
714   $DBI::connect_via = $old_connect_via if $old_connect_via;
715
716   if (!$dbh || $@) {
717     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
718   }
719
720   $dbh;
721 }
722
723 sub _dbh_txn_begin {
724   my ($self, $dbh) = @_;
725   if ($dbh->{AutoCommit}) {
726     $self->debugobj->txn_begin()
727       if ($self->debug);
728     $dbh->begin_work;
729   }
730 }
731
732 sub txn_begin {
733   my $self = shift;
734   $self->dbh_do($self->can('_dbh_txn_begin'))
735     if $self->{transaction_depth}++ == 0;
736 }
737
738 sub _dbh_txn_commit {
739   my ($self, $dbh) = @_;
740   if ($self->{transaction_depth} == 0) {
741     unless ($dbh->{AutoCommit}) {
742       $self->debugobj->txn_commit()
743         if ($self->debug);
744       $dbh->commit;
745     }
746   }
747   else {
748     if (--$self->{transaction_depth} == 0) {
749       $self->debugobj->txn_commit()
750         if ($self->debug);
751       $dbh->commit;
752     }
753   }
754 }
755
756 sub txn_commit {
757   my $self = shift;
758   $self->dbh_do($self->can('_dbh_txn_commit'));
759 }
760
761 sub _dbh_txn_rollback {
762   my ($self, $dbh) = @_;
763   if ($self->{transaction_depth} == 0) {
764     unless ($dbh->{AutoCommit}) {
765       $self->debugobj->txn_rollback()
766         if ($self->debug);
767       $dbh->rollback;
768     }
769   }
770   else {
771     if (--$self->{transaction_depth} == 0) {
772       $self->debugobj->txn_rollback()
773         if ($self->debug);
774       $dbh->rollback;
775     }
776     else {
777       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
778     }
779   }
780 }
781
782 sub txn_rollback {
783   my $self = shift;
784
785   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
786   if ($@) {
787     my $error = $@;
788     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
789     $error =~ /$exception_class/ and $self->throw_exception($error);
790     $self->{transaction_depth} = 0;          # ensure that a failed rollback
791     $self->throw_exception($error);          # resets the transaction depth
792   }
793 }
794
795 # This used to be the top-half of _execute.  It was split out to make it
796 #  easier to override in NoBindVars without duping the rest.  It takes up
797 #  all of _execute's args, and emits $sql, @bind.
798 sub _prep_for_execute {
799   my ($self, $op, $extra_bind, $ident, @args) = @_;
800
801   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
802   unshift(@bind, @$extra_bind) if $extra_bind;
803   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
804
805   return ($sql, @bind);
806 }
807
808 sub _execute {
809   my $self = shift;
810
811   my ($sql, @bind) = $self->_prep_for_execute(@_);
812
813   if ($self->debug) {
814       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
815       $self->debugobj->query_start($sql, @debug_bind);
816   }
817
818   my $sth = $self->sth($sql);
819
820   my $rv;
821   if ($sth) {
822     my $time = time();
823     $rv = eval { $sth->execute(@bind) };
824
825     if ($@ || !$rv) {
826       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
827     }
828   } else {
829     $self->throw_exception("'$sql' did not generate a statement.");
830   }
831   if ($self->debug) {
832       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
833       $self->debugobj->query_end($sql, @debug_bind);
834   }
835   return (wantarray ? ($rv, $sth, @bind) : $rv);
836 }
837
838 sub insert {
839   my ($self, $ident, $to_insert) = @_;
840   $self->throw_exception(
841     "Couldn't insert ".join(', ',
842       map "$_ => $to_insert->{$_}", keys %$to_insert
843     )." into ${ident}"
844   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
845   return $to_insert;
846 }
847
848 sub update {
849   return shift->_execute('update' => [], @_);
850 }
851
852 sub delete {
853   return shift->_execute('delete' => [], @_);
854 }
855
856 sub _select {
857   my ($self, $ident, $select, $condition, $attrs) = @_;
858   my $order = $attrs->{order_by};
859   if (ref $condition eq 'SCALAR') {
860     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
861   }
862   if (exists $attrs->{group_by} || $attrs->{having}) {
863     $order = {
864       group_by => $attrs->{group_by},
865       having => $attrs->{having},
866       ($order ? (order_by => $order) : ())
867     };
868   }
869   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
870   if ($attrs->{software_limit} ||
871       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
872         $attrs->{software_limit} = 1;
873   } else {
874     $self->throw_exception("rows attribute must be positive if present")
875       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
876     push @args, $attrs->{rows}, $attrs->{offset};
877   }
878   return $self->_execute(@args);
879 }
880
881 =head2 select
882
883 =over 4
884
885 =item Arguments: $ident, $select, $condition, $attrs
886
887 =back
888
889 Handle a SQL select statement.
890
891 =cut
892
893 sub select {
894   my $self = shift;
895   my ($ident, $select, $condition, $attrs) = @_;
896   return $self->cursor->new($self, \@_, $attrs);
897 }
898
899 sub select_single {
900   my $self = shift;
901   my ($rv, $sth, @bind) = $self->_select(@_);
902   my @row = $sth->fetchrow_array;
903   # Need to call finish() to work round broken DBDs
904   $sth->finish();
905   return @row;
906 }
907
908 =head2 sth
909
910 =over 4
911
912 =item Arguments: $sql
913
914 =back
915
916 Returns a L<DBI> sth (statement handle) for the supplied SQL.
917
918 =cut
919
920 sub _dbh_sth {
921   my ($self, $dbh, $sql) = @_;
922   # 3 is the if_active parameter which avoids active sth re-use
923   $dbh->prepare_cached($sql, {}, 3) or
924     $self->throw_exception(
925       'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
926     );
927 }
928
929 sub sth {
930   my ($self, $sql) = @_;
931   $self->dbh_do($self->can('_dbh_sth'), $sql);
932 }
933
934 sub _dbh_columns_info_for {
935   my ($self, $dbh, $table) = @_;
936
937   if ($dbh->can('column_info')) {
938     my %result;
939     eval {
940       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
941       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
942       $sth->execute();
943       while ( my $info = $sth->fetchrow_hashref() ){
944         my %column_info;
945         $column_info{data_type}   = $info->{TYPE_NAME};
946         $column_info{size}      = $info->{COLUMN_SIZE};
947         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
948         $column_info{default_value} = $info->{COLUMN_DEF};
949         my $col_name = $info->{COLUMN_NAME};
950         $col_name =~ s/^\"(.*)\"$/$1/;
951
952         $result{$col_name} = \%column_info;
953       }
954     };
955     return \%result if !$@ && scalar keys %result;
956   }
957
958   my %result;
959   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
960   $sth->execute;
961   my @columns = @{$sth->{NAME_lc}};
962   for my $i ( 0 .. $#columns ){
963     my %column_info;
964     my $type_num = $sth->{TYPE}->[$i];
965     my $type_name;
966     if(defined $type_num && $dbh->can('type_info')) {
967       my $type_info = $dbh->type_info($type_num);
968       $type_name = $type_info->{TYPE_NAME} if $type_info;
969     }
970     $column_info{data_type} = $type_name ? $type_name : $type_num;
971     $column_info{size} = $sth->{PRECISION}->[$i];
972     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
973
974     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
975       $column_info{data_type} = $1;
976       $column_info{size}    = $2;
977     }
978
979     $result{$columns[$i]} = \%column_info;
980   }
981
982   return \%result;
983 }
984
985 sub columns_info_for {
986   my ($self, $table) = @_;
987   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
988 }
989
990 =head2 last_insert_id
991
992 Return the row id of the last insert.
993
994 =cut
995
996 sub _dbh_last_insert_id {
997     my ($self, $dbh, $source, $col) = @_;
998     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
999     $dbh->func('last_insert_rowid');
1000 }
1001
1002 sub last_insert_id {
1003   my $self = shift;
1004   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1005 }
1006
1007 =head2 sqlt_type
1008
1009 Returns the database driver name.
1010
1011 =cut
1012
1013 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1014
1015 =head2 create_ddl_dir (EXPERIMENTAL)
1016
1017 =over 4
1018
1019 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1020
1021 =back
1022
1023 Creates a SQL file based on the Schema, for each of the specified
1024 database types, in the given directory.
1025
1026 Note that this feature is currently EXPERIMENTAL and may not work correctly
1027 across all databases, or fully handle complex relationships.
1028
1029 =cut
1030
1031 sub create_ddl_dir
1032 {
1033   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1034
1035   if(!$dir || !-d $dir)
1036   {
1037     warn "No directory given, using ./\n";
1038     $dir = "./";
1039   }
1040   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1041   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1042   $version ||= $schema->VERSION || '1.x';
1043   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1044
1045   eval "use SQL::Translator";
1046   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1047
1048   my $sqlt = SQL::Translator->new($sqltargs);
1049   foreach my $db (@$databases)
1050   {
1051     $sqlt->reset();
1052     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1053 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1054     $sqlt->data($schema);
1055     $sqlt->producer($db);
1056
1057     my $file;
1058     my $filename = $schema->ddl_filename($db, $dir, $version);
1059     if(-e $filename)
1060     {
1061       $self->throw_exception("$filename already exists, skipping $db");
1062       next;
1063     }
1064     open($file, ">$filename") 
1065       or $self->throw_exception("Can't open $filename for writing ($!)");
1066     my $output = $sqlt->translate;
1067 #use Data::Dumper;
1068 #    print join(":", keys %{$schema->source_registrations});
1069 #    print Dumper($sqlt->schema);
1070     if(!$output)
1071     {
1072       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1073       next;
1074     }
1075     print $file $output;
1076     close($file);
1077   }
1078
1079 }
1080
1081 =head2 deployment_statements
1082
1083 =over 4
1084
1085 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1086
1087 =back
1088
1089 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1090 The database driver name is given by C<$type>, though the value from
1091 L</sqlt_type> is used if it is not specified.
1092
1093 C<$directory> is used to return statements from files in a previously created
1094 L</create_ddl_dir> directory and is optional. The filenames are constructed
1095 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1096
1097 If no C<$directory> is specified then the statements are constructed on the
1098 fly using L<SQL::Translator> and C<$version> is ignored.
1099
1100 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1101
1102 =cut
1103
1104 sub deployment_statements {
1105   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1106   # Need to be connected to get the correct sqlt_type
1107   $self->ensure_connected() unless $type;
1108   $type ||= $self->sqlt_type;
1109   $version ||= $schema->VERSION || '1.x';
1110   $dir ||= './';
1111   eval "use SQL::Translator";
1112   if(!$@)
1113   {
1114     eval "use SQL::Translator::Parser::DBIx::Class;";
1115     $self->throw_exception($@) if $@;
1116     eval "use SQL::Translator::Producer::${type};";
1117     $self->throw_exception($@) if $@;
1118     my $tr = SQL::Translator->new(%$sqltargs);
1119     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1120     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1121   }
1122
1123   my $filename = $schema->ddl_filename($type, $dir, $version);
1124   if(!-f $filename)
1125   {
1126 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1127       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1128       return;
1129   }
1130   my $file;
1131   open($file, "<$filename") 
1132       or $self->throw_exception("Can't open $filename ($!)");
1133   my @rows = <$file>;
1134   close($file);
1135
1136   return join('', @rows);
1137   
1138 }
1139
1140 sub deploy {
1141   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1142   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1143     for ( split(";\n", $statement)) {
1144       next if($_ =~ /^--/);
1145       next if(!$_);
1146 #      next if($_ =~ /^DROP/m);
1147       next if($_ =~ /^BEGIN TRANSACTION/m);
1148       next if($_ =~ /^COMMIT/m);
1149       next if $_ =~ /^\s+$/; # skip whitespace only
1150       $self->debugobj->query_start($_) if $self->debug;
1151       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1152       $self->debugobj->query_end($_) if $self->debug;
1153     }
1154   }
1155 }
1156
1157 =head2 datetime_parser
1158
1159 Returns the datetime parser class
1160
1161 =cut
1162
1163 sub datetime_parser {
1164   my $self = shift;
1165   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1166 }
1167
1168 =head2 datetime_parser_type
1169
1170 Defines (returns) the datetime parser class - currently hardwired to
1171 L<DateTime::Format::MySQL>
1172
1173 =cut
1174
1175 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1176
1177 =head2 build_datetime_parser
1178
1179 See L</datetime_parser>
1180
1181 =cut
1182
1183 sub build_datetime_parser {
1184   my $self = shift;
1185   my $type = $self->datetime_parser_type(@_);
1186   eval "use ${type}";
1187   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1188   return $type;
1189 }
1190
1191 sub DESTROY {
1192   my $self = shift;
1193   return if !$self->_dbh;
1194   $self->_verify_pid;
1195   $self->_dbh(undef);
1196 }
1197
1198 1;
1199
1200 =head1 SQL METHODS
1201
1202 The module defines a set of methods within the DBIC::SQL::Abstract
1203 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1204 SQL query functions.
1205
1206 The following methods are extended:-
1207
1208 =over 4
1209
1210 =item delete
1211
1212 =item insert
1213
1214 =item select
1215
1216 =item update
1217
1218 =item limit_dialect
1219
1220 See L</connect_info> for details.
1221 For setting, this method is deprecated in favor of L</connect_info>.
1222
1223 =item quote_char
1224
1225 See L</connect_info> for details.
1226 For setting, this method is deprecated in favor of L</connect_info>.
1227
1228 =item name_sep
1229
1230 See L</connect_info> for details.
1231 For setting, this method is deprecated in favor of L</connect_info>.
1232
1233 =back
1234
1235 =head1 AUTHORS
1236
1237 Matt S. Trout <mst@shadowcatsystems.co.uk>
1238
1239 Andy Grundman <andy@hybridized.org>
1240
1241 =head1 LICENSE
1242
1243 You may distribute this code under the same terms as Perl itself.
1244
1245 =cut