Include the DBI error message if a deploy statement fails.
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 sub _RowNumberOver {
33   my ($self, $sql, $order, $rows, $offset ) = @_;
34
35   $offset += 1;
36   my $last = $rows + $offset;
37   my ( $order_by ) = $self->_order_by( $order );
38
39   $sql = <<"";
40 SELECT * FROM
41 (
42    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
43       $sql
44       $order_by
45    ) Q1
46 ) Q2
47 WHERE ROW_NUM BETWEEN $offset AND $last
48
49   return $sql;
50 }
51
52
53 # While we're at it, this should make LIMIT queries more efficient,
54 #  without digging into things too deeply
55 use Scalar::Util 'blessed';
56 sub _find_syntax {
57   my ($self, $syntax) = @_;
58   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
59 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
60   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
61     return 'RowNumberOver';
62   }
63
64   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
65 }
66
67 sub select {
68   my ($self, $table, $fields, $where, $order, @rest) = @_;
69   $table = $self->_quote($table) unless ref($table);
70   local $self->{rownum_hack_count} = 1
71     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
72   @rest = (-1) unless defined $rest[0];
73   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
74     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
75   local $self->{having_bind} = [];
76   my ($sql, @ret) = $self->SUPER::select(
77     $table, $self->_recurse_fields($fields), $where, $order, @rest
78   );
79   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
80 }
81
82 sub insert {
83   my $self = shift;
84   my $table = shift;
85   $table = $self->_quote($table) unless ref($table);
86   $self->SUPER::insert($table, @_);
87 }
88
89 sub update {
90   my $self = shift;
91   my $table = shift;
92   $table = $self->_quote($table) unless ref($table);
93   $self->SUPER::update($table, @_);
94 }
95
96 sub delete {
97   my $self = shift;
98   my $table = shift;
99   $table = $self->_quote($table) unless ref($table);
100   $self->SUPER::delete($table, @_);
101 }
102
103 sub _emulate_limit {
104   my $self = shift;
105   if ($_[3] == -1) {
106     return $_[1].$self->_order_by($_[2]);
107   } else {
108     return $self->SUPER::_emulate_limit(@_);
109   }
110 }
111
112 sub _recurse_fields {
113   my ($self, $fields) = @_;
114   my $ref = ref $fields;
115   return $self->_quote($fields) unless $ref;
116   return $$fields if $ref eq 'SCALAR';
117
118   if ($ref eq 'ARRAY') {
119     return join(', ', map {
120       $self->_recurse_fields($_)
121       .(exists $self->{rownum_hack_count}
122          ? ' AS col'.$self->{rownum_hack_count}++
123          : '')
124      } @$fields);
125   } elsif ($ref eq 'HASH') {
126     foreach my $func (keys %$fields) {
127       return $self->_sqlcase($func)
128         .'( '.$self->_recurse_fields($fields->{$func}).' )';
129     }
130   }
131 }
132
133 sub _order_by {
134   my $self = shift;
135   my $ret = '';
136   my @extra;
137   if (ref $_[0] eq 'HASH') {
138     if (defined $_[0]->{group_by}) {
139       $ret = $self->_sqlcase(' group by ')
140                .$self->_recurse_fields($_[0]->{group_by});
141     }
142     if (defined $_[0]->{having}) {
143       my $frag;
144       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
145       push(@{$self->{having_bind}}, @extra);
146       $ret .= $self->_sqlcase(' having ').$frag;
147     }
148     if (defined $_[0]->{order_by}) {
149       $ret .= $self->_order_by($_[0]->{order_by});
150     }
151   } elsif (ref $_[0] eq 'SCALAR') {
152     $ret = $self->_sqlcase(' order by ').${ $_[0] };
153   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
154     my @order = @{+shift};
155     $ret = $self->_sqlcase(' order by ')
156           .join(', ', map {
157                         my $r = $self->_order_by($_, @_);
158                         $r =~ s/^ ?ORDER BY //i;
159                         $r;
160                       } @order);
161   } else {
162     $ret = $self->SUPER::_order_by(@_);
163   }
164   return $ret;
165 }
166
167 sub _order_directions {
168   my ($self, $order) = @_;
169   $order = $order->{order_by} if ref $order eq 'HASH';
170   return $self->SUPER::_order_directions($order);
171 }
172
173 sub _table {
174   my ($self, $from) = @_;
175   if (ref $from eq 'ARRAY') {
176     return $self->_recurse_from(@$from);
177   } elsif (ref $from eq 'HASH') {
178     return $self->_make_as($from);
179   } else {
180     return $from; # would love to quote here but _table ends up getting called
181                   # twice during an ->select without a limit clause due to
182                   # the way S::A::Limit->select works. should maybe consider
183                   # bypassing this and doing S::A::select($self, ...) in
184                   # our select method above. meantime, quoting shims have
185                   # been added to select/insert/update/delete here
186   }
187 }
188
189 sub _recurse_from {
190   my ($self, $from, @join) = @_;
191   my @sqlf;
192   push(@sqlf, $self->_make_as($from));
193   foreach my $j (@join) {
194     my ($to, $on) = @$j;
195
196     # check whether a join type exists
197     my $join_clause = '';
198     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
199     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
200       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
201     } else {
202       $join_clause = ' JOIN ';
203     }
204     push(@sqlf, $join_clause);
205
206     if (ref $to eq 'ARRAY') {
207       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
208     } else {
209       push(@sqlf, $self->_make_as($to));
210     }
211     push(@sqlf, ' ON ', $self->_join_condition($on));
212   }
213   return join('', @sqlf);
214 }
215
216 sub _make_as {
217   my ($self, $from) = @_;
218   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
219                      reverse each %{$self->_skip_options($from)});
220 }
221
222 sub _skip_options {
223   my ($self, $hash) = @_;
224   my $clean_hash = {};
225   $clean_hash->{$_} = $hash->{$_}
226     for grep {!/^-/} keys %$hash;
227   return $clean_hash;
228 }
229
230 sub _join_condition {
231   my ($self, $cond) = @_;
232   if (ref $cond eq 'HASH') {
233     my %j;
234     for (keys %$cond) {
235       my $v = $cond->{$_};
236       if (ref $v) {
237         # XXX no throw_exception() in this package and croak() fails with strange results
238         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
239             if ref($v) ne 'SCALAR';
240         $j{$_} = $v;
241       }
242       else {
243         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
244       }
245     };
246     return scalar($self->_recurse_where(\%j));
247   } elsif (ref $cond eq 'ARRAY') {
248     return join(' OR ', map { $self->_join_condition($_) } @$cond);
249   } else {
250     die "Can't handle this yet!";
251   }
252 }
253
254 sub _quote {
255   my ($self, $label) = @_;
256   return '' unless defined $label;
257   return "*" if $label eq '*';
258   return $label unless $self->{quote_char};
259   if(ref $self->{quote_char} eq "ARRAY"){
260     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
261       if !defined $self->{name_sep};
262     my $sep = $self->{name_sep};
263     return join($self->{name_sep},
264         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
265        split(/\Q$sep\E/,$label));
266   }
267   return $self->SUPER::_quote($label);
268 }
269
270 sub limit_dialect {
271     my $self = shift;
272     $self->{limit_dialect} = shift if @_;
273     return $self->{limit_dialect};
274 }
275
276 sub quote_char {
277     my $self = shift;
278     $self->{quote_char} = shift if @_;
279     return $self->{quote_char};
280 }
281
282 sub name_sep {
283     my $self = shift;
284     $self->{name_sep} = shift if @_;
285     return $self->{name_sep};
286 }
287
288 } # End of BEGIN block
289
290 use base qw/DBIx::Class/;
291
292 __PACKAGE__->load_components(qw/AccessorGroup/);
293
294 __PACKAGE__->mk_group_accessors('simple' =>
295   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
296      _conn_pid _conn_tid debug debugobj cursor on_connect_do
297      transaction_depth/);
298
299 =head1 NAME
300
301 DBIx::Class::Storage::DBI - DBI storage handler
302
303 =head1 SYNOPSIS
304
305 =head1 DESCRIPTION
306
307 This class represents the connection to the database
308
309 =head1 METHODS
310
311 =head2 new
312
313 =cut
314
315 sub new {
316   my $new = {};
317   bless $new, (ref $_[0] || $_[0]);
318
319   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
320   $new->transaction_depth(0);
321
322   $new->debugobj(new DBIx::Class::Storage::Statistics());
323
324   my $fh;
325
326   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
327                   || $ENV{DBIC_TRACE};
328
329   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
330     $fh = IO::File->new($1, 'w')
331       or $new->throw_exception("Cannot open trace file $1");
332   } else {
333     $fh = IO::File->new('>&STDERR');
334   }
335   $new->debugfh($fh);
336   $new->debug(1) if $debug_env;
337   $new->_sql_maker_opts({});
338   return $new;
339 }
340
341 =head2 throw_exception
342
343 Throws an exception - croaks.
344
345 =cut
346
347 sub throw_exception {
348   my ($self, $msg) = @_;
349   croak($msg);
350 }
351
352 =head2 connect_info
353
354 The arguments of C<connect_info> are always a single array reference.
355
356 This is normally accessed via L<DBIx::Class::Schema/connection>, which
357 encapsulates its argument list in an arrayref before calling
358 C<connect_info> here.
359
360 The arrayref can either contain the same set of arguments one would
361 normally pass to L<DBI/connect>, or a lone code reference which returns
362 a connected database handle.
363
364 In either case, if the final argument in your connect_info happens
365 to be a hashref, C<connect_info> will look there for several
366 connection-specific options:
367
368 =over 4
369
370 =item on_connect_do
371
372 This can be set to an arrayref of literal sql statements, which will
373 be executed immediately after making the connection to the database
374 every time we [re-]connect.
375
376 =item limit_dialect 
377
378 Sets the limit dialect. This is useful for JDBC-bridge among others
379 where the remote SQL-dialect cannot be determined by the name of the
380 driver alone.
381
382 =item quote_char
383
384 Specifies what characters to use to quote table and column names. If 
385 you use this you will want to specify L<name_sep> as well.
386
387 quote_char expects either a single character, in which case is it is placed
388 on either side of the table/column, or an arrayref of length 2 in which case the
389 table/column name is placed between the elements.
390
391 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
392 use C<quote_char =E<gt> [qw/[ ]/]>.
393
394 =item name_sep
395
396 This only needs to be used in conjunction with L<quote_char>, and is used to 
397 specify the charecter that seperates elements (schemas, tables, columns) from 
398 each other. In most cases this is simply a C<.>.
399
400 =back
401
402 These options can be mixed in with your other L<DBI> connection attributes,
403 or placed in a seperate hashref after all other normal L<DBI> connection
404 arguments.
405
406 Every time C<connect_info> is invoked, any previous settings for
407 these options will be cleared before setting the new ones, regardless of
408 whether any options are specified in the new C<connect_info>.
409
410 Examples:
411
412   # Simple SQLite connection
413   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
414
415   # Connect via subref
416   ->connect_info([ sub { DBI->connect(...) } ]);
417
418   # A bit more complicated
419   ->connect_info(
420     [
421       'dbi:Pg:dbname=foo',
422       'postgres',
423       'my_pg_password',
424       { AutoCommit => 0 },
425       { quote_char => q{"}, name_sep => q{.} },
426     ]
427   );
428
429   # Equivalent to the previous example
430   ->connect_info(
431     [
432       'dbi:Pg:dbname=foo',
433       'postgres',
434       'my_pg_password',
435       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
436     ]
437   );
438
439   # Subref + DBIC-specific connection options
440   ->connect_info(
441     [
442       sub { DBI->connect(...) },
443       {
444           quote_char => q{`},
445           name_sep => q{@},
446           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
447       },
448     ]
449   );
450
451 =head2 on_connect_do
452
453 This method is deprecated in favor of setting via L</connect_info>.
454
455 =head2 debug
456
457 Causes SQL trace information to be emitted on the C<debugobj> object.
458 (or C<STDERR> if C<debugobj> has not specifically been set).
459
460 This is the equivalent to setting L</DBIC_TRACE> in your
461 shell environment.
462
463 =head2 debugfh
464
465 Set or retrieve the filehandle used for trace/debug output.  This should be
466 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
467 set to be STDERR - although see information on the
468 L<DBIC_TRACE> environment variable.
469
470 =cut
471
472 sub debugfh {
473     my $self = shift;
474
475     if ($self->debugobj->can('debugfh')) {
476         return $self->debugobj->debugfh(@_);
477     }
478 }
479
480 =head2 debugobj
481
482 Sets or retrieves the object used for metric collection. Defaults to an instance
483 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
484 method of using a coderef as a callback.  See the aforementioned Statistics
485 class for more information.
486
487 =head2 debugcb
488
489 Sets a callback to be executed each time a statement is run; takes a sub
490 reference.  Callback is executed as $sub->($op, $info) where $op is
491 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
492
493 See L<debugobj> for a better way.
494
495 =cut
496
497 sub debugcb {
498     my $self = shift;
499
500     if ($self->debugobj->can('callback')) {
501         return $self->debugobj->callback(@_);
502     }
503 }
504
505 =head2 disconnect
506
507 Disconnect the L<DBI> handle, performing a rollback first if the
508 database is not in C<AutoCommit> mode.
509
510 =cut
511
512 sub disconnect {
513   my ($self) = @_;
514
515   if( $self->connected ) {
516     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
517     $self->_dbh->disconnect;
518     $self->_dbh(undef);
519   }
520 }
521
522 =head2 connected
523
524 Check if the L<DBI> handle is connected.  Returns true if the handle
525 is connected.
526
527 =cut
528
529 sub connected { my ($self) = @_;
530
531   if(my $dbh = $self->_dbh) {
532       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
533           return $self->_dbh(undef);
534       }
535       elsif($self->_conn_pid != $$) {
536           $self->_dbh->{InactiveDestroy} = 1;
537           return $self->_dbh(undef);
538       }
539       return ($dbh->FETCH('Active') && $dbh->ping);
540   }
541
542   return 0;
543 }
544
545 =head2 ensure_connected
546
547 Check whether the database handle is connected - if not then make a
548 connection.
549
550 =cut
551
552 sub ensure_connected {
553   my ($self) = @_;
554
555   unless ($self->connected) {
556     $self->_populate_dbh;
557   }
558 }
559
560 =head2 dbh
561
562 Returns the dbh - a data base handle of class L<DBI>.
563
564 =cut
565
566 sub dbh {
567   my ($self) = @_;
568
569   $self->ensure_connected;
570   return $self->_dbh;
571 }
572
573 sub _sql_maker_args {
574     my ($self) = @_;
575     
576     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
577 }
578
579 =head2 sql_maker
580
581 Returns a C<sql_maker> object - normally an object of class
582 C<DBIC::SQL::Abstract>.
583
584 =cut
585
586 sub sql_maker {
587   my ($self) = @_;
588   unless ($self->_sql_maker) {
589     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
590   }
591   return $self->_sql_maker;
592 }
593
594 sub connect_info {
595   my ($self, $info_arg) = @_;
596
597   if($info_arg) {
598     # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
599     #  the new set of options
600     $self->_sql_maker(undef);
601     $self->_sql_maker_opts({});
602     $self->_connect_info($info_arg);
603
604     my $dbi_info = [@$info_arg]; # copy for DBI
605     my $last_info = $dbi_info->[-1];
606     if(ref $last_info eq 'HASH') {
607       if(my $on_connect_do = delete $last_info->{on_connect_do}) {
608         $self->on_connect_do($on_connect_do);
609       }
610       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
611         if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
612           $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
613         }
614       }
615
616       # Get rid of any trailing empty hashref
617       pop(@$dbi_info) if !keys %$last_info;
618     }
619
620     $self->_dbi_connect_info($dbi_info);
621   }
622
623   $self->_connect_info;
624 }
625
626 sub _populate_dbh {
627   my ($self) = @_;
628   my @info = @{$self->_dbi_connect_info || []};
629   $self->_dbh($self->_connect(@info));
630
631   if(ref $self eq 'DBIx::Class::Storage::DBI') {
632     my $driver = $self->_dbh->{Driver}->{Name};
633     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
634       bless $self, "DBIx::Class::Storage::DBI::${driver}";
635       $self->_rebless() if $self->can('_rebless');
636     }
637   }
638
639   # if on-connect sql statements are given execute them
640   foreach my $sql_statement (@{$self->on_connect_do || []}) {
641     $self->debugobj->query_start($sql_statement) if $self->debug();
642     $self->_dbh->do($sql_statement);
643     $self->debugobj->query_end($sql_statement) if $self->debug();
644   }
645
646   $self->_conn_pid($$);
647   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
648 }
649
650 sub _connect {
651   my ($self, @info) = @_;
652
653   $self->throw_exception("You failed to provide any connection info")
654       if !@info;
655
656   my ($old_connect_via, $dbh);
657
658   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
659       $old_connect_via = $DBI::connect_via;
660       $DBI::connect_via = 'connect';
661   }
662
663   eval {
664     $dbh = ref $info[0] eq 'CODE'
665          ? &{$info[0]}
666          : DBI->connect(@info);
667   };
668
669   $DBI::connect_via = $old_connect_via if $old_connect_via;
670
671   if (!$dbh || $@) {
672     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
673   }
674
675   $dbh;
676 }
677
678 =head2 txn_begin
679
680 Calls begin_work on the current dbh.
681
682 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
683 an entire code block to be executed transactionally.
684
685 =cut
686
687 sub txn_begin {
688   my $self = shift;
689   if ($self->{transaction_depth}++ == 0) {
690     my $dbh = $self->dbh;
691     if ($dbh->{AutoCommit}) {
692       $self->debugobj->txn_begin()
693         if ($self->debug);
694       $dbh->begin_work;
695     }
696   }
697 }
698
699 =head2 txn_commit
700
701 Issues a commit against the current dbh.
702
703 =cut
704
705 sub txn_commit {
706   my $self = shift;
707   my $dbh = $self->dbh;
708   if ($self->{transaction_depth} == 0) {
709     unless ($dbh->{AutoCommit}) {
710       $self->debugobj->txn_commit()
711         if ($self->debug);
712       $dbh->commit;
713     }
714   }
715   else {
716     if (--$self->{transaction_depth} == 0) {
717       $self->debugobj->txn_commit()
718         if ($self->debug);
719       $dbh->commit;
720     }
721   }
722 }
723
724 =head2 txn_rollback
725
726 Issues a rollback against the current dbh. A nested rollback will
727 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
728 which allows the rollback to propagate to the outermost transaction.
729
730 =cut
731
732 sub txn_rollback {
733   my $self = shift;
734
735   eval {
736     my $dbh = $self->dbh;
737     if ($self->{transaction_depth} == 0) {
738       unless ($dbh->{AutoCommit}) {
739         $self->debugobj->txn_rollback()
740           if ($self->debug);
741         $dbh->rollback;
742       }
743     }
744     else {
745       if (--$self->{transaction_depth} == 0) {
746         $self->debugobj->txn_rollback()
747           if ($self->debug);
748         $dbh->rollback;
749       }
750       else {
751         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
752       }
753     }
754   };
755
756   if ($@) {
757     my $error = $@;
758     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
759     $error =~ /$exception_class/ and $self->throw_exception($error);
760     $self->{transaction_depth} = 0;          # ensure that a failed rollback
761     $self->throw_exception($error);          # resets the transaction depth
762   }
763 }
764
765 sub _execute {
766   my ($self, $op, $extra_bind, $ident, @args) = @_;
767   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
768   unshift(@bind, @$extra_bind) if $extra_bind;
769   if ($self->debug) {
770       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
771       $self->debugobj->query_start($sql, @debug_bind);
772   }
773   my $sth = eval { $self->sth($sql,$op) };
774
775   if (!$sth || $@) {
776     $self->throw_exception(
777       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
778     );
779   }
780   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
781   my $rv;
782   if ($sth) {
783     my $time = time();
784     $rv = eval { $sth->execute(@bind) };
785
786     if ($@ || !$rv) {
787       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
788     }
789   } else {
790     $self->throw_exception("'$sql' did not generate a statement.");
791   }
792   if ($self->debug) {
793       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind; 
794       $self->debugobj->query_end($sql, @debug_bind);
795   }
796   return (wantarray ? ($rv, $sth, @bind) : $rv);
797 }
798
799 sub insert {
800   my ($self, $ident, $to_insert) = @_;
801   $self->throw_exception(
802     "Couldn't insert ".join(', ',
803       map "$_ => $to_insert->{$_}", keys %$to_insert
804     )." into ${ident}"
805   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
806   return $to_insert;
807 }
808
809 sub update {
810   return shift->_execute('update' => [], @_);
811 }
812
813 sub delete {
814   return shift->_execute('delete' => [], @_);
815 }
816
817 sub _select {
818   my ($self, $ident, $select, $condition, $attrs) = @_;
819   my $order = $attrs->{order_by};
820   if (ref $condition eq 'SCALAR') {
821     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
822   }
823   if (exists $attrs->{group_by} || $attrs->{having}) {
824     $order = {
825       group_by => $attrs->{group_by},
826       having => $attrs->{having},
827       ($order ? (order_by => $order) : ())
828     };
829   }
830   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
831   if ($attrs->{software_limit} ||
832       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
833         $attrs->{software_limit} = 1;
834   } else {
835     $self->throw_exception("rows attribute must be positive if present")
836       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
837     push @args, $attrs->{rows}, $attrs->{offset};
838   }
839   return $self->_execute(@args);
840 }
841
842 =head2 select
843
844 =over 4
845
846 =item Arguments: $ident, $select, $condition, $attrs
847
848 =back
849
850 Handle a SQL select statement.
851
852 =cut
853
854 sub select {
855   my $self = shift;
856   my ($ident, $select, $condition, $attrs) = @_;
857   return $self->cursor->new($self, \@_, $attrs);
858 }
859
860 =head2 select_single
861
862 Performs a select, fetch and return of data - handles a single row
863 only.
864
865 =cut
866
867 # Need to call finish() to work round broken DBDs
868
869 sub select_single {
870   my $self = shift;
871   my ($rv, $sth, @bind) = $self->_select(@_);
872   my @row = $sth->fetchrow_array;
873   $sth->finish();
874   return @row;
875 }
876
877 =head2 sth
878
879 =over 4
880
881 =item Arguments: $sql
882
883 =back
884
885 Returns a L<DBI> sth (statement handle) for the supplied SQL.
886
887 =cut
888
889 sub sth {
890   my ($self, $sql) = @_;
891   # 3 is the if_active parameter which avoids active sth re-use
892   return $self->dbh->prepare_cached($sql, {}, 3);
893 }
894
895 =head2 columns_info_for
896
897 Returns database type info for a given table column.
898
899 =cut
900
901 sub columns_info_for {
902   my ($self, $table) = @_;
903
904   my $dbh = $self->dbh;
905
906   if ($dbh->can('column_info')) {
907     my %result;
908     local $dbh->{RaiseError} = 1;
909     local $dbh->{PrintError} = 0;
910     eval {
911       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
912       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
913       $sth->execute();
914
915       while ( my $info = $sth->fetchrow_hashref() ){
916         my %column_info;
917         $column_info{data_type}   = $info->{TYPE_NAME};
918         $column_info{size}      = $info->{COLUMN_SIZE};
919         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
920         $column_info{default_value} = $info->{COLUMN_DEF};
921         my $col_name = $info->{COLUMN_NAME};
922         $col_name =~ s/^\"(.*)\"$/$1/;
923
924         $result{$col_name} = \%column_info;
925       }
926     };
927     return \%result if !$@ && scalar keys %result;
928   }
929
930   my %result;
931   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
932   $sth->execute;
933   my @columns = @{$sth->{NAME_lc}};
934   for my $i ( 0 .. $#columns ){
935     my %column_info;
936     $column_info{data_type} = $sth->{TYPE}->[$i];
937     $column_info{size} = $sth->{PRECISION}->[$i];
938     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
939
940     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
941       $column_info{data_type} = $1;
942       $column_info{size}    = $2;
943     }
944
945     $result{$columns[$i]} = \%column_info;
946   }
947   $sth->finish;
948
949   foreach my $col (keys %result) {
950     my $colinfo = $result{$col};
951     my $type_num = $colinfo->{data_type};
952     my $type_name;
953     if(defined $type_num && $dbh->can('type_info')) {
954       my $type_info = $dbh->type_info($type_num);
955       $type_name = $type_info->{TYPE_NAME} if $type_info;
956       $colinfo->{data_type} = $type_name if $type_name;
957     }
958   }
959
960   return \%result;
961 }
962
963 =head2 last_insert_id
964
965 Return the row id of the last insert.
966
967 =cut
968
969 sub last_insert_id {
970   my ($self, $row) = @_;
971     
972   return $self->dbh->func('last_insert_rowid');
973
974 }
975
976 =head2 sqlt_type
977
978 Returns the database driver name.
979
980 =cut
981
982 sub sqlt_type { shift->dbh->{Driver}->{Name} }
983
984 =head2 create_ddl_dir (EXPERIMENTAL)
985
986 =over 4
987
988 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
989
990 =back
991
992 Creates a SQL file based on the Schema, for each of the specified
993 database types, in the given directory.
994
995 Note that this feature is currently EXPERIMENTAL and may not work correctly
996 across all databases, or fully handle complex relationships.
997
998 =cut
999
1000 sub create_ddl_dir
1001 {
1002   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1003
1004   if(!$dir || !-d $dir)
1005   {
1006     warn "No directory given, using ./\n";
1007     $dir = "./";
1008   }
1009   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1010   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1011   $version ||= $schema->VERSION || '1.x';
1012   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1013
1014   eval "use SQL::Translator";
1015   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1016
1017   my $sqlt = SQL::Translator->new($sqltargs);
1018   foreach my $db (@$databases)
1019   {
1020     $sqlt->reset();
1021     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1022 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1023     $sqlt->data($schema);
1024     $sqlt->producer($db);
1025
1026     my $file;
1027     my $filename = $schema->ddl_filename($db, $dir, $version);
1028     if(-e $filename)
1029     {
1030       $self->throw_exception("$filename already exists, skipping $db");
1031       next;
1032     }
1033     open($file, ">$filename") 
1034       or $self->throw_exception("Can't open $filename for writing ($!)");
1035     my $output = $sqlt->translate;
1036 #use Data::Dumper;
1037 #    print join(":", keys %{$schema->source_registrations});
1038 #    print Dumper($sqlt->schema);
1039     if(!$output)
1040     {
1041       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1042       next;
1043     }
1044     print $file $output;
1045     close($file);
1046   }
1047
1048 }
1049
1050 =head2 deployment_statements
1051
1052 =over 4
1053
1054 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1055
1056 =back
1057
1058 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1059 The database driver name is given by C<$type>, though the value from
1060 L</sqlt_type> is used if it is not specified.
1061
1062 C<$directory> is used to return statements from files in a previously created
1063 L</create_ddl_dir> directory and is optional. The filenames are constructed
1064 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1065
1066 If no C<$directory> is specified then the statements are constructed on the
1067 fly using L<SQL::Translator> and C<$version> is ignored.
1068
1069 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1070
1071 =cut
1072
1073 sub deployment_statements {
1074   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1075   # Need to be connected to get the correct sqlt_type
1076   $self->ensure_connected() unless $type;
1077   $type ||= $self->sqlt_type;
1078   $version ||= $schema->VERSION || '1.x';
1079   $dir ||= './';
1080   eval "use SQL::Translator";
1081   if(!$@)
1082   {
1083     eval "use SQL::Translator::Parser::DBIx::Class;";
1084     $self->throw_exception($@) if $@;
1085     eval "use SQL::Translator::Producer::${type};";
1086     $self->throw_exception($@) if $@;
1087     my $tr = SQL::Translator->new(%$sqltargs);
1088     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1089     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1090   }
1091
1092   my $filename = $schema->ddl_filename($type, $dir, $version);
1093   if(!-f $filename)
1094   {
1095 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1096       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1097       return;
1098   }
1099   my $file;
1100   open($file, "<$filename") 
1101       or $self->throw_exception("Can't open $filename ($!)");
1102   my @rows = <$file>;
1103   close($file);
1104
1105   return join('', @rows);
1106   
1107 }
1108
1109 =head2 deploy
1110
1111 Sends the appropriate statements to create or modify tables to the
1112 db. This would normally be called through
1113 L<DBIx::Class::Schema/deploy>.
1114
1115 =cut
1116
1117 sub deploy {
1118   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1119   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1120     for ( split(";\n", $statement)) {
1121       next if($_ =~ /^--/);
1122       next if(!$_);
1123 #      next if($_ =~ /^DROP/m);
1124       next if($_ =~ /^BEGIN TRANSACTION/m);
1125       next if($_ =~ /^COMMIT/m);
1126       next if $_ =~ /^\s+$/; # skip whitespace only
1127       $self->debugobj->query_start($_) if $self->debug;
1128       $self->dbh->do($_) or warn $self->dbh->errstr, "\nSQL was:\n $_";
1129       $self->debugobj->query_end($_) if $self->debug;
1130     }
1131   }
1132 }
1133
1134 =head2 datetime_parser
1135
1136 Returns the datetime parser class
1137
1138 =cut
1139
1140 sub datetime_parser {
1141   my $self = shift;
1142   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1143 }
1144
1145 =head2 datetime_parser_type
1146
1147 Defines (returns) the datetime parser class - currently hardwired to
1148 L<DateTime::Format::MySQL>
1149
1150 =cut
1151
1152 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1153
1154 =head2 build_datetime_parser
1155
1156 See L</datetime_parser>
1157
1158 =cut
1159
1160 sub build_datetime_parser {
1161   my $self = shift;
1162   my $type = $self->datetime_parser_type(@_);
1163   eval "use ${type}";
1164   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1165   return $type;
1166 }
1167
1168 sub DESTROY {
1169   # NOTE: if there's a merge conflict here when -current is pushed
1170   #  back to trunk, take -current's version and ignore this trunk one :)
1171   my $self = shift;
1172
1173   if($self->_dbh && $self->_conn_pid != $$) {
1174     $self->_dbh->{InactiveDestroy} = 1;
1175   }
1176
1177   $self->_dbh(undef);
1178 }
1179
1180 1;
1181
1182 =head1 SQL METHODS
1183
1184 The module defines a set of methods within the DBIC::SQL::Abstract
1185 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1186 SQL query functions.
1187
1188 The following methods are extended:-
1189
1190 =over 4
1191
1192 =item delete
1193
1194 =item insert
1195
1196 =item select
1197
1198 =item update
1199
1200 =item limit_dialect
1201
1202 See L</connect_info> for details.
1203 For setting, this method is deprecated in favor of L</connect_info>.
1204
1205 =item quote_char
1206
1207 See L</connect_info> for details.
1208 For setting, this method is deprecated in favor of L</connect_info>.
1209
1210 =item name_sep
1211
1212 See L</connect_info> for details.
1213 For setting, this method is deprecated in favor of L</connect_info>.
1214
1215 =back
1216
1217 =head1 ENVIRONMENT VARIABLES
1218
1219 =head2 DBIC_TRACE
1220
1221 If C<DBIC_TRACE> is set then SQL trace information
1222 is produced (as when the L<debug> method is set).
1223
1224 If the value is of the form C<1=/path/name> then the trace output is
1225 written to the file C</path/name>.
1226
1227 This environment variable is checked when the storage object is first
1228 created (when you call connect on your schema).  So, run-time changes 
1229 to this environment variable will not take effect unless you also 
1230 re-connect on your schema.
1231
1232 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1233
1234 Old name for DBIC_TRACE
1235
1236 =head1 AUTHORS
1237
1238 Matt S. Trout <mst@shadowcatsystems.co.uk>
1239
1240 Andy Grundman <andy@hybridized.org>
1241
1242 =head1 LICENSE
1243
1244 You may distribute this code under the same terms as Perl itself.
1245
1246 =cut