Merge 'trunk' into 'DBIx-Class-current'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Scalar::Util qw/weaken/;
14 use Carp::Clan qw/DBIx::Class/;
15 BEGIN {
16
17 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
18
19 use base qw/SQL::Abstract::Limit/;
20
21 # This prevents the caching of $dbh in S::A::L, I believe
22 sub new {
23   my $self = shift->SUPER::new(@_);
24
25   # If limit_dialect is a ref (like a $dbh), go ahead and replace
26   #   it with what it resolves to:
27   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
28     if ref $self->{limit_dialect};
29
30   $self;
31 }
32
33 # While we're at it, this should make LIMIT queries more efficient,
34 #  without digging into things too deeply
35 sub _find_syntax {
36   my ($self, $syntax) = @_;
37   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
38 }
39
40 sub select {
41   my ($self, $table, $fields, $where, $order, @rest) = @_;
42   $table = $self->_quote($table) unless ref($table);
43   local $self->{rownum_hack_count} = 1
44     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
45   @rest = (-1) unless defined $rest[0];
46   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
47     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
48   local $self->{having_bind} = [];
49   my ($sql, @ret) = $self->SUPER::select(
50     $table, $self->_recurse_fields($fields), $where, $order, @rest
51   );
52   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
53 }
54
55 sub insert {
56   my $self = shift;
57   my $table = shift;
58   $table = $self->_quote($table) unless ref($table);
59   $self->SUPER::insert($table, @_);
60 }
61
62 sub update {
63   my $self = shift;
64   my $table = shift;
65   $table = $self->_quote($table) unless ref($table);
66   $self->SUPER::update($table, @_);
67 }
68
69 sub delete {
70   my $self = shift;
71   my $table = shift;
72   $table = $self->_quote($table) unless ref($table);
73   $self->SUPER::delete($table, @_);
74 }
75
76 sub _emulate_limit {
77   my $self = shift;
78   if ($_[3] == -1) {
79     return $_[1].$self->_order_by($_[2]);
80   } else {
81     return $self->SUPER::_emulate_limit(@_);
82   }
83 }
84
85 sub _recurse_fields {
86   my ($self, $fields) = @_;
87   my $ref = ref $fields;
88   return $self->_quote($fields) unless $ref;
89   return $$fields if $ref eq 'SCALAR';
90
91   if ($ref eq 'ARRAY') {
92     return join(', ', map {
93       $self->_recurse_fields($_)
94       .(exists $self->{rownum_hack_count}
95          ? ' AS col'.$self->{rownum_hack_count}++
96          : '')
97      } @$fields);
98   } elsif ($ref eq 'HASH') {
99     foreach my $func (keys %$fields) {
100       return $self->_sqlcase($func)
101         .'( '.$self->_recurse_fields($fields->{$func}).' )';
102     }
103   }
104 }
105
106 sub _order_by {
107   my $self = shift;
108   my $ret = '';
109   my @extra;
110   if (ref $_[0] eq 'HASH') {
111     if (defined $_[0]->{group_by}) {
112       $ret = $self->_sqlcase(' group by ')
113                .$self->_recurse_fields($_[0]->{group_by});
114     }
115     if (defined $_[0]->{having}) {
116       my $frag;
117       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
118       push(@{$self->{having_bind}}, @extra);
119       $ret .= $self->_sqlcase(' having ').$frag;
120     }
121     if (defined $_[0]->{order_by}) {
122       $ret .= $self->_order_by($_[0]->{order_by});
123     }
124   } elsif (ref $_[0] eq 'SCALAR') {
125     $ret = $self->_sqlcase(' order by ').${ $_[0] };
126   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
127     my @order = @{+shift};
128     $ret = $self->_sqlcase(' order by ')
129           .join(', ', map {
130                         my $r = $self->_order_by($_, @_);
131                         $r =~ s/^ ?ORDER BY //i;
132                         $r;
133                       } @order);
134   } else {
135     $ret = $self->SUPER::_order_by(@_);
136   }
137   return $ret;
138 }
139
140 sub _order_directions {
141   my ($self, $order) = @_;
142   $order = $order->{order_by} if ref $order eq 'HASH';
143   return $self->SUPER::_order_directions($order);
144 }
145
146 sub _table {
147   my ($self, $from) = @_;
148   if (ref $from eq 'ARRAY') {
149     return $self->_recurse_from(@$from);
150   } elsif (ref $from eq 'HASH') {
151     return $self->_make_as($from);
152   } else {
153     return $from; # would love to quote here but _table ends up getting called
154                   # twice during an ->select without a limit clause due to
155                   # the way S::A::Limit->select works. should maybe consider
156                   # bypassing this and doing S::A::select($self, ...) in
157                   # our select method above. meantime, quoting shims have
158                   # been added to select/insert/update/delete here
159   }
160 }
161
162 sub _recurse_from {
163   my ($self, $from, @join) = @_;
164   my @sqlf;
165   push(@sqlf, $self->_make_as($from));
166   foreach my $j (@join) {
167     my ($to, $on) = @$j;
168
169     # check whether a join type exists
170     my $join_clause = '';
171     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
172     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
173       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
174     } else {
175       $join_clause = ' JOIN ';
176     }
177     push(@sqlf, $join_clause);
178
179     if (ref $to eq 'ARRAY') {
180       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
181     } else {
182       push(@sqlf, $self->_make_as($to));
183     }
184     push(@sqlf, ' ON ', $self->_join_condition($on));
185   }
186   return join('', @sqlf);
187 }
188
189 sub _make_as {
190   my ($self, $from) = @_;
191   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
192                      reverse each %{$self->_skip_options($from)});
193 }
194
195 sub _skip_options {
196   my ($self, $hash) = @_;
197   my $clean_hash = {};
198   $clean_hash->{$_} = $hash->{$_}
199     for grep {!/^-/} keys %$hash;
200   return $clean_hash;
201 }
202
203 sub _join_condition {
204   my ($self, $cond) = @_;
205   if (ref $cond eq 'HASH') {
206     my %j;
207     for (keys %$cond) {
208       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
209     };
210     return $self->_recurse_where(\%j);
211   } elsif (ref $cond eq 'ARRAY') {
212     return join(' OR ', map { $self->_join_condition($_) } @$cond);
213   } else {
214     die "Can't handle this yet!";
215   }
216 }
217
218 sub _quote {
219   my ($self, $label) = @_;
220   return '' unless defined $label;
221   return "*" if $label eq '*';
222   return $label unless $self->{quote_char};
223   if(ref $self->{quote_char} eq "ARRAY"){
224     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
225       if !defined $self->{name_sep};
226     my $sep = $self->{name_sep};
227     return join($self->{name_sep},
228         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
229        split(/\Q$sep\E/,$label));
230   }
231   return $self->SUPER::_quote($label);
232 }
233
234 sub limit_dialect {
235     my $self = shift;
236     $self->{limit_dialect} = shift if @_;
237     return $self->{limit_dialect};
238 }
239
240 sub quote_char {
241     my $self = shift;
242     $self->{quote_char} = shift if @_;
243     return $self->{quote_char};
244 }
245
246 sub name_sep {
247     my $self = shift;
248     $self->{name_sep} = shift if @_;
249     return $self->{name_sep};
250 }
251
252 } # End of BEGIN block
253
254 use base qw/DBIx::Class/;
255
256 __PACKAGE__->load_components(qw/AccessorGroup/);
257
258 __PACKAGE__->mk_group_accessors('simple' =>
259   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
260      debug debugobj cursor on_connect_do transaction_depth schema/);
261
262 =head1 NAME
263
264 DBIx::Class::Storage::DBI - DBI storage handler
265
266 =head1 SYNOPSIS
267
268 =head1 DESCRIPTION
269
270 This class represents the connection to the database
271
272 =head1 METHODS
273
274 =head2 new
275
276 Constructor.  Only argument is the schema which instantiated us.
277
278 =cut
279
280 sub new {
281   my ($self, $schema) = @_;
282
283   my $new = {};
284   bless $new, (ref $_[0] || $_[0]);
285   $new->set_schema($schema);
286   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
287   $new->transaction_depth(0);
288
289   $new->debugobj(new DBIx::Class::Storage::Statistics());
290
291   my $fh;
292
293   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
294                   || $ENV{DBIC_TRACE};
295
296   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
297     $fh = IO::File->new($1, 'w')
298       or $new->throw_exception("Cannot open trace file $1");
299   } else {
300     $fh = IO::File->new('>&STDERR');
301   }
302   $new->debugfh($fh);
303   $new->debug(1) if $debug_env;
304   $new->_sql_maker_opts({});
305   return $new;
306 }
307
308 =head2 set_schema
309
310 Used to reset the schema class or object which owns this
311 storage object, such as after a C<clone()>.
312
313 =cut
314
315 sub set_schema {
316   my ($self, $schema) = @_;
317   $self->schema($schema);
318   weaken($self->{schema}) if ref $self->{schema};
319 }
320
321
322 =head2 throw_exception
323
324 Throws an exception - croaks.
325
326 =cut
327
328 sub throw_exception {
329   my $self = shift;
330
331   $self->schema->throw_exception(@_) if $self->schema;
332   croak @_;
333 }
334
335 =head2 connect_info
336
337 The arguments of C<connect_info> are always a single array reference.
338
339 This is normally accessed via L<DBIx::Class::Schema/connection>, which
340 encapsulates its argument list in an arrayref before calling
341 C<connect_info> here.
342
343 The arrayref can either contain the same set of arguments one would
344 normally pass to L<DBI/connect>, or a lone code reference which returns
345 a connected database handle.
346
347 In either case, if the final argument in your connect_info happens
348 to be a hashref, C<connect_info> will look there for several
349 connection-specific options:
350
351 =over 4
352
353 =item on_connect_do
354
355 This can be set to an arrayref of literal sql statements, which will
356 be executed immediately after making the connection to the database
357 every time we [re-]connect.
358
359 =item limit_dialect 
360
361 Sets the limit dialect. This is useful for JDBC-bridge among others
362 where the remote SQL-dialect cannot be determined by the name of the
363 driver alone.
364
365 =item quote_char
366
367 Specifies what characters to use to quote table and column names. If 
368 you use this you will want to specify L<name_sep> as well.
369
370 quote_char expects either a single character, in which case is it is placed
371 on either side of the table/column, or an arrayref of length 2 in which case the
372 table/column name is placed between the elements.
373
374 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
375 use C<quote_char =E<gt> [qw/[ ]/]>.
376
377 =item name_sep
378
379 This only needs to be used in conjunction with L<quote_char>, and is used to 
380 specify the charecter that seperates elements (schemas, tables, columns) from 
381 each other. In most cases this is simply a C<.>.
382
383 =back
384
385 These options can be mixed in with your other L<DBI> connection attributes,
386 or placed in a seperate hashref after all other normal L<DBI> connection
387 arguments.
388
389 Every time C<connect_info> is invoked, any previous settings for
390 these options will be cleared before setting the new ones, regardless of
391 whether any options are specified in the new C<connect_info>.
392
393 Important note:  DBIC expects the returned database handle provided by 
394 a subref argument to have RaiseError set on it.  If it doesn't, things
395 might not work very well, YMMV.  If you don't use a subref, DBIC will
396 force this setting for you anyways.  Setting HandleError to anything
397 other than simple exception object wrapper might cause problems too.
398
399 Examples:
400
401   # Simple SQLite connection
402   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
403
404   # Connect via subref
405   ->connect_info([ sub { DBI->connect(...) } ]);
406
407   # A bit more complicated
408   ->connect_info(
409     [
410       'dbi:Pg:dbname=foo',
411       'postgres',
412       'my_pg_password',
413       { AutoCommit => 0 },
414       { quote_char => q{"}, name_sep => q{.} },
415     ]
416   );
417
418   # Equivalent to the previous example
419   ->connect_info(
420     [
421       'dbi:Pg:dbname=foo',
422       'postgres',
423       'my_pg_password',
424       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
425     ]
426   );
427
428   # Subref + DBIC-specific connection options
429   ->connect_info(
430     [
431       sub { DBI->connect(...) },
432       {
433           quote_char => q{`},
434           name_sep => q{@},
435           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
436       },
437     ]
438   );
439
440 =head2 on_connect_do
441
442 This method is deprecated in favor of setting via L</connect_info>.
443
444 =head2 debug
445
446 Causes SQL trace information to be emitted on the C<debugobj> object.
447 (or C<STDERR> if C<debugobj> has not specifically been set).
448
449 This is the equivalent to setting L</DBIC_TRACE> in your
450 shell environment.
451
452 =head2 debugfh
453
454 Set or retrieve the filehandle used for trace/debug output.  This should be
455 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
456 set to be STDERR - although see information on the
457 L<DBIC_TRACE> environment variable.
458
459 =cut
460
461 sub debugfh {
462     my $self = shift;
463
464     if ($self->debugobj->can('debugfh')) {
465         return $self->debugobj->debugfh(@_);
466     }
467 }
468
469 =head2 debugobj
470
471 Sets or retrieves the object used for metric collection. Defaults to an instance
472 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
473 method of using a coderef as a callback.  See the aforementioned Statistics
474 class for more information.
475
476 =head2 debugcb
477
478 Sets a callback to be executed each time a statement is run; takes a sub
479 reference.  Callback is executed as $sub->($op, $info) where $op is
480 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
481
482 See L<debugobj> for a better way.
483
484 =cut
485
486 sub debugcb {
487     my $self = shift;
488
489     if ($self->debugobj->can('callback')) {
490         return $self->debugobj->callback(@_);
491     }
492 }
493
494 =head2 dbh_do
495
496 Execute the given subref with the underlying database handle as its
497 first argument, using the new exception-based connection management.
498 Example:
499
500   my @stuff = $schema->storage->dbh_do(
501     sub {
502       shift->selectrow_array("SELECT * FROM foo")
503     }
504   );
505
506 =cut
507
508 sub dbh_do {
509   my ($self, $todo) = @_;
510
511   my @result;
512   my $want_array = wantarray;
513
514   eval {
515     $self->_verify_pid if $self->_dbh;
516     $self->_populate_dbh if !$self->_dbh;
517     my $dbh = $self->_dbh;
518     if($want_array) {
519         @result = $todo->($dbh);
520     }
521     elsif(defined $want_array) {
522         $result[0] = $todo->($dbh);
523     }
524     else {
525         $todo->($dbh);
526     }
527   };
528
529   if($@) {
530     my $exception = $@;
531     $self->connected
532       ? $self->throw_exception($exception)
533       : $self->_populate_dbh;
534
535     my $dbh = $self->_dbh;
536     return $todo->($dbh);
537   }
538
539   return $want_array ? @result : $result[0];
540 }
541
542 =head2 disconnect
543
544 Disconnect the L<DBI> handle, performing a rollback first if the
545 database is not in C<AutoCommit> mode.
546
547 =cut
548
549 sub disconnect {
550   my ($self) = @_;
551
552   if( $self->connected ) {
553     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
554     $self->_dbh->disconnect;
555     $self->_dbh(undef);
556   }
557 }
558
559 =head2 connected
560
561 Check if the L<DBI> handle is connected.  Returns true if the handle
562 is connected.
563
564 =cut
565
566 sub connected {
567   my ($self) = @_;
568
569   if(my $dbh = $self->_dbh) {
570       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
571           return $self->_dbh(undef);
572       }
573       else {
574           $self->_verify_pid;
575       }
576       return ($dbh->FETCH('Active') && $dbh->ping);
577   }
578
579   return 0;
580 }
581
582 # handle pid changes correctly
583 #  NOTE: assumes $self->_dbh is a valid $dbh
584 sub _verify_pid {
585   my ($self) = @_;
586
587   return if $self->_conn_pid == $$;
588
589   $self->_dbh->{InactiveDestroy} = 1;
590   $self->_dbh(undef);
591
592   return;
593 }
594
595 =head2 ensure_connected
596
597 Check whether the database handle is connected - if not then make a
598 connection.
599
600 =cut
601
602 sub ensure_connected {
603   my ($self) = @_;
604
605   unless ($self->connected) {
606     $self->_populate_dbh;
607   }
608 }
609
610 =head2 dbh
611
612 Returns the dbh - a data base handle of class L<DBI>.
613
614 =cut
615
616 sub dbh {
617   my ($self) = @_;
618
619   $self->ensure_connected;
620   return $self->_dbh;
621 }
622
623 sub _sql_maker_args {
624     my ($self) = @_;
625     
626     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
627 }
628
629 =head2 sql_maker
630
631 Returns a C<sql_maker> object - normally an object of class
632 C<DBIC::SQL::Abstract>.
633
634 =cut
635
636 sub sql_maker {
637   my ($self) = @_;
638   unless ($self->_sql_maker) {
639     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
640   }
641   return $self->_sql_maker;
642 }
643
644 sub connect_info {
645   my ($self, $info_arg) = @_;
646
647   return $self->_connect_info if !$info_arg;
648
649   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
650   #  the new set of options
651   $self->_sql_maker(undef);
652   $self->_sql_maker_opts({});
653
654   my $info = [ @$info_arg ]; # copy because we can alter it
655   my $last_info = $info->[-1];
656   if(ref $last_info eq 'HASH') {
657     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
658       $self->on_connect_do($on_connect_do);
659     }
660     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
661       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
662         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
663       }
664     }
665
666     # Get rid of any trailing empty hashref
667     pop(@$info) if !keys %$last_info;
668   }
669
670   $self->_connect_info($info);
671 }
672
673 sub _populate_dbh {
674   my ($self) = @_;
675   my @info = @{$self->_connect_info || []};
676   $self->_dbh($self->_connect(@info));
677
678   if(ref $self eq 'DBIx::Class::Storage::DBI') {
679     my $driver = $self->_dbh->{Driver}->{Name};
680     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
681       bless $self, "DBIx::Class::Storage::DBI::${driver}";
682       $self->_rebless() if $self->can('_rebless');
683     }
684   }
685
686   # if on-connect sql statements are given execute them
687   foreach my $sql_statement (@{$self->on_connect_do || []}) {
688     $self->debugobj->query_start($sql_statement) if $self->debug();
689     $self->_dbh->do($sql_statement);
690     $self->debugobj->query_end($sql_statement) if $self->debug();
691   }
692
693   $self->_conn_pid($$);
694   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
695 }
696
697 sub _connect {
698   my ($self, @info) = @_;
699
700   $self->throw_exception("You failed to provide any connection info")
701       if !@info;
702
703   my ($old_connect_via, $dbh);
704
705   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
706       $old_connect_via = $DBI::connect_via;
707       $DBI::connect_via = 'connect';
708   }
709
710   eval {
711     if(ref $info[0] eq 'CODE') {
712        $dbh = &{$info[0]}
713     }
714     else {
715        $dbh = DBI->connect(@info);
716        $dbh->{RaiseError} = 1;
717        $dbh->{PrintError} = 0;
718     }
719   };
720
721   $DBI::connect_via = $old_connect_via if $old_connect_via;
722
723   if (!$dbh || $@) {
724     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
725   }
726
727   $dbh;
728 }
729
730 =head2 txn_begin
731
732 Calls begin_work on the current dbh.
733
734 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
735 an entire code block to be executed transactionally.
736
737 =cut
738
739 sub txn_begin {
740   my $self = shift;
741   if ($self->{transaction_depth}++ == 0) {
742     $self->dbh_do(sub {
743       my $dbh = shift;
744       if ($dbh->{AutoCommit}) {
745         $self->debugobj->txn_begin()
746           if ($self->debug);
747         $dbh->begin_work;
748       }
749     });
750   }
751 }
752
753 =head2 txn_commit
754
755 Issues a commit against the current dbh.
756
757 =cut
758
759 sub txn_commit {
760   my $self = shift;
761   $self->dbh_do(sub {
762     my $dbh = shift;
763     if ($self->{transaction_depth} == 0) {
764       unless ($dbh->{AutoCommit}) {
765         $self->debugobj->txn_commit()
766           if ($self->debug);
767         $dbh->commit;
768       }
769     }
770     else {
771       if (--$self->{transaction_depth} == 0) {
772         $self->debugobj->txn_commit()
773           if ($self->debug);
774         $dbh->commit;
775       }
776     }
777   });
778 }
779
780 =head2 txn_rollback
781
782 Issues a rollback against the current dbh. A nested rollback will
783 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
784 which allows the rollback to propagate to the outermost transaction.
785
786 =cut
787
788 sub txn_rollback {
789   my $self = shift;
790
791   eval {
792     $self->dbh_do(sub {
793       my $dbh = shift;
794       if ($self->{transaction_depth} == 0) {
795         unless ($dbh->{AutoCommit}) {
796           $self->debugobj->txn_rollback()
797             if ($self->debug);
798           $dbh->rollback;
799         }
800       }
801       else {
802         if (--$self->{transaction_depth} == 0) {
803           $self->debugobj->txn_rollback()
804             if ($self->debug);
805           $dbh->rollback;
806         }
807         else {
808           die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
809         }
810       }
811     });
812   };
813
814   if ($@) {
815     my $error = $@;
816     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
817     $error =~ /$exception_class/ and $self->throw_exception($error);
818     $self->{transaction_depth} = 0;          # ensure that a failed rollback
819     $self->throw_exception($error);          # resets the transaction depth
820   }
821 }
822
823 sub _execute {
824   my ($self, $op, $extra_bind, $ident, @args) = @_;
825   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
826   unshift(@bind, @$extra_bind) if $extra_bind;
827   if ($self->debug) {
828       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
829       $self->debugobj->query_start($sql, @debug_bind);
830   }
831   my $sth = eval { $self->sth($sql,$op) };
832
833   if (!$sth || $@) {
834     $self->throw_exception(
835       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
836     );
837   }
838   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
839   my $rv;
840   if ($sth) {
841     my $time = time();
842     $rv = eval { $sth->execute(@bind) };
843
844     if ($@ || !$rv) {
845       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
846     }
847   } else {
848     $self->throw_exception("'$sql' did not generate a statement.");
849   }
850   if ($self->debug) {
851       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
852       $self->debugobj->query_end($sql, @debug_bind);
853   }
854   return (wantarray ? ($rv, $sth, @bind) : $rv);
855 }
856
857 sub insert {
858   my ($self, $ident, $to_insert) = @_;
859   $self->throw_exception(
860     "Couldn't insert ".join(', ',
861       map "$_ => $to_insert->{$_}", keys %$to_insert
862     )." into ${ident}"
863   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
864   return $to_insert;
865 }
866
867 sub update {
868   return shift->_execute('update' => [], @_);
869 }
870
871 sub delete {
872   return shift->_execute('delete' => [], @_);
873 }
874
875 sub _select {
876   my ($self, $ident, $select, $condition, $attrs) = @_;
877   my $order = $attrs->{order_by};
878   if (ref $condition eq 'SCALAR') {
879     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
880   }
881   if (exists $attrs->{group_by} || $attrs->{having}) {
882     $order = {
883       group_by => $attrs->{group_by},
884       having => $attrs->{having},
885       ($order ? (order_by => $order) : ())
886     };
887   }
888   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
889   if ($attrs->{software_limit} ||
890       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
891         $attrs->{software_limit} = 1;
892   } else {
893     $self->throw_exception("rows attribute must be positive if present")
894       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
895     push @args, $attrs->{rows}, $attrs->{offset};
896   }
897   return $self->_execute(@args);
898 }
899
900 =head2 select
901
902 Handle a SQL select statement.
903
904 =cut
905
906 sub select {
907   my $self = shift;
908   my ($ident, $select, $condition, $attrs) = @_;
909   return $self->cursor->new($self, \@_, $attrs);
910 }
911
912 =head2 select_single
913
914 Performs a select, fetch and return of data - handles a single row
915 only.
916
917 =cut
918
919 # Need to call finish() to work round broken DBDs
920
921 sub select_single {
922   my $self = shift;
923   my ($rv, $sth, @bind) = $self->_select(@_);
924   my @row = $sth->fetchrow_array;
925   $sth->finish();
926   return @row;
927 }
928
929 =head2 sth
930
931 Returns a L<DBI> sth (statement handle) for the supplied SQL.
932
933 =cut
934
935 sub sth {
936   my ($self, $sql) = @_;
937   # 3 is the if_active parameter which avoids active sth re-use
938   return $self->dbh_do(sub { shift->prepare_cached($sql, {}, 3) });
939 }
940
941 =head2 columns_info_for
942
943 Returns database type info for a given table columns.
944
945 =cut
946
947 sub columns_info_for {
948   my ($self, $table) = @_;
949
950   $self->dbh_do(sub {
951     my $dbh = shift;
952
953     if ($dbh->can('column_info')) {
954       my %result;
955       eval {
956         my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
957         my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
958         $sth->execute();
959         while ( my $info = $sth->fetchrow_hashref() ){
960           my %column_info;
961           $column_info{data_type}   = $info->{TYPE_NAME};
962           $column_info{size}      = $info->{COLUMN_SIZE};
963           $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
964           $column_info{default_value} = $info->{COLUMN_DEF};
965           my $col_name = $info->{COLUMN_NAME};
966           $col_name =~ s/^\"(.*)\"$/$1/;
967
968           $result{$col_name} = \%column_info;
969         }
970       };
971       return \%result if !$@;
972     }
973
974     my %result;
975     my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
976     $sth->execute;
977     my @columns = @{$sth->{NAME_lc}};
978     for my $i ( 0 .. $#columns ){
979       my %column_info;
980       my $type_num = $sth->{TYPE}->[$i];
981       my $type_name;
982       if(defined $type_num && $dbh->can('type_info')) {
983         my $type_info = $dbh->type_info($type_num);
984         $type_name = $type_info->{TYPE_NAME} if $type_info;
985       }
986       $column_info{data_type} = $type_name ? $type_name : $type_num;
987       $column_info{size} = $sth->{PRECISION}->[$i];
988       $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
989
990       if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
991         $column_info{data_type} = $1;
992         $column_info{size}    = $2;
993       }
994
995       $result{$columns[$i]} = \%column_info;
996     }
997
998     return \%result;
999   });
1000 }
1001
1002 =head2 last_insert_id
1003
1004 Return the row id of the last insert.
1005
1006 =cut
1007
1008 sub last_insert_id {
1009   my ($self, $row) = @_;
1010     
1011   $self->dbh_do(sub { shift->func('last_insert_rowid') });
1012 }
1013
1014 =head2 sqlt_type
1015
1016 Returns the database driver name.
1017
1018 =cut
1019
1020 sub sqlt_type { shift->dbh_do(sub { shift->{Driver}->{Name} }) }
1021
1022 =head2 create_ddl_dir (EXPERIMENTAL)
1023
1024 =over 4
1025
1026 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1027
1028 =back
1029
1030 Creates an SQL file based on the Schema, for each of the specified
1031 database types, in the given directory.
1032
1033 Note that this feature is currently EXPERIMENTAL and may not work correctly
1034 across all databases, or fully handle complex relationships.
1035
1036 =cut
1037
1038 sub create_ddl_dir
1039 {
1040   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1041
1042   if(!$dir || !-d $dir)
1043   {
1044     warn "No directory given, using ./\n";
1045     $dir = "./";
1046   }
1047   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1048   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1049   $version ||= $schema->VERSION || '1.x';
1050   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1051
1052   eval "use SQL::Translator";
1053   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1054
1055   my $sqlt = SQL::Translator->new($sqltargs);
1056   foreach my $db (@$databases)
1057   {
1058     $sqlt->reset();
1059     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1060 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1061     $sqlt->data($schema);
1062     $sqlt->producer($db);
1063
1064     my $file;
1065     my $filename = $schema->ddl_filename($db, $dir, $version);
1066     if(-e $filename)
1067     {
1068       $self->throw_exception("$filename already exists, skipping $db");
1069       next;
1070     }
1071     open($file, ">$filename") 
1072       or $self->throw_exception("Can't open $filename for writing ($!)");
1073     my $output = $sqlt->translate;
1074 #use Data::Dumper;
1075 #    print join(":", keys %{$schema->source_registrations});
1076 #    print Dumper($sqlt->schema);
1077     if(!$output)
1078     {
1079       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1080       next;
1081     }
1082     print $file $output;
1083     close($file);
1084   }
1085
1086 }
1087
1088 =head2 deployment_statements
1089
1090 Create the statements for L</deploy> and
1091 L<DBIx::Class::Schema/deploy>.
1092
1093 =cut
1094
1095 sub deployment_statements {
1096   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1097   # Need to be connected to get the correct sqlt_type
1098   $self->ensure_connected() unless $type;
1099   $type ||= $self->sqlt_type;
1100   $version ||= $schema->VERSION || '1.x';
1101   $dir ||= './';
1102   eval "use SQL::Translator";
1103   if(!$@)
1104   {
1105     eval "use SQL::Translator::Parser::DBIx::Class;";
1106     $self->throw_exception($@) if $@;
1107     eval "use SQL::Translator::Producer::${type};";
1108     $self->throw_exception($@) if $@;
1109     my $tr = SQL::Translator->new(%$sqltargs);
1110     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1111     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1112   }
1113
1114   my $filename = $schema->ddl_filename($type, $dir, $version);
1115   if(!-f $filename)
1116   {
1117 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1118       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1119       return;
1120   }
1121   my $file;
1122   open($file, "<$filename") 
1123       or $self->throw_exception("Can't open $filename ($!)");
1124   my @rows = <$file>;
1125   close($file);
1126
1127   return join('', @rows);
1128   
1129 }
1130
1131 =head2 deploy
1132
1133 Sends the appropriate statements to create or modify tables to the
1134 db. This would normally be called through
1135 L<DBIx::Class::Schema/deploy>.
1136
1137 =cut
1138
1139 sub deploy {
1140   my ($self, $schema, $type, $sqltargs) = @_;
1141   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1142     for ( split(";\n", $statement)) {
1143       next if($_ =~ /^--/);
1144       next if(!$_);
1145 #      next if($_ =~ /^DROP/m);
1146       next if($_ =~ /^BEGIN TRANSACTION/m);
1147       next if($_ =~ /^COMMIT/m);
1148       next if $_ =~ /^\s+$/; # skip whitespace only
1149       $self->debugobj->query_start($_) if $self->debug;
1150       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1151       $self->debugobj->query_end($_) if $self->debug;
1152     }
1153   }
1154 }
1155
1156 =head2 datetime_parser
1157
1158 Returns the datetime parser class
1159
1160 =cut
1161
1162 sub datetime_parser {
1163   my $self = shift;
1164   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1165 }
1166
1167 =head2 datetime_parser_type
1168
1169 Defines (returns) the datetime parser class - currently hardwired to
1170 L<DateTime::Format::MySQL>
1171
1172 =cut
1173
1174 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1175
1176 =head2 build_datetime_parser
1177
1178 See L</datetime_parser>
1179
1180 =cut
1181
1182 sub build_datetime_parser {
1183   my $self = shift;
1184   my $type = $self->datetime_parser_type(@_);
1185   eval "use ${type}";
1186   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1187   return $type;
1188 }
1189
1190 sub DESTROY {
1191   my $self = shift;
1192   return if !$self->_dbh;
1193
1194   $self->_verify_pid;
1195   $self->_dbh(undef);
1196 }
1197
1198 1;
1199
1200 =head1 SQL METHODS
1201
1202 The module defines a set of methods within the DBIC::SQL::Abstract
1203 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1204 SQL query functions.
1205
1206 The following methods are extended:-
1207
1208 =over 4
1209
1210 =item delete
1211
1212 =item insert
1213
1214 =item select
1215
1216 =item update
1217
1218 =item limit_dialect
1219
1220 See L</connect_info> for details.
1221 For setting, this method is deprecated in favor of L</connect_info>.
1222
1223 =item quote_char
1224
1225 See L</connect_info> for details.
1226 For setting, this method is deprecated in favor of L</connect_info>.
1227
1228 =item name_sep
1229
1230 See L</connect_info> for details.
1231 For setting, this method is deprecated in favor of L</connect_info>.
1232
1233 =back
1234
1235 =head1 ENVIRONMENT VARIABLES
1236
1237 =head2 DBIC_TRACE
1238
1239 If C<DBIC_TRACE> is set then SQL trace information
1240 is produced (as when the L<debug> method is set).
1241
1242 If the value is of the form C<1=/path/name> then the trace output is
1243 written to the file C</path/name>.
1244
1245 This environment variable is checked when the storage object is first
1246 created (when you call connect on your schema).  So, run-time changes 
1247 to this environment variable will not take effect unless you also 
1248 re-connect on your schema.
1249
1250 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1251
1252 Old name for DBIC_TRACE
1253
1254 =head1 AUTHORS
1255
1256 Matt S. Trout <mst@shadowcatsystems.co.uk>
1257
1258 Andy Grundman <andy@hybridized.org>
1259
1260 =head1 LICENSE
1261
1262 You may distribute this code under the same terms as Perl itself.
1263
1264 =cut
1265