backport Ash's quoting fix from Loader to columns_info_for
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 sub _RowNumberOver {
33   my ($self, $sql, $order, $rows, $offset ) = @_;
34
35   $offset += 1;
36   my $last = $rows + $offset;
37   my ( $order_by ) = $self->_order_by( $order );
38
39   $sql = <<"";
40 SELECT * FROM
41 (
42    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
43       $sql
44       $order_by
45    ) Q1
46 ) Q2
47 WHERE ROW_NUM BETWEEN $offset AND $last
48
49   return $sql;
50 }
51
52
53 # While we're at it, this should make LIMIT queries more efficient,
54 #  without digging into things too deeply
55 use Scalar::Util 'blessed';
56 sub _find_syntax {
57   my ($self, $syntax) = @_;
58   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
59 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
60   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
61     return 'RowNumberOver';
62   }
63
64   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
65 }
66
67 sub select {
68   my ($self, $table, $fields, $where, $order, @rest) = @_;
69   $table = $self->_quote($table) unless ref($table);
70   local $self->{rownum_hack_count} = 1
71     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
72   @rest = (-1) unless defined $rest[0];
73   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
74     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
75   local $self->{having_bind} = [];
76   my ($sql, @ret) = $self->SUPER::select(
77     $table, $self->_recurse_fields($fields), $where, $order, @rest
78   );
79   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
80 }
81
82 sub insert {
83   my $self = shift;
84   my $table = shift;
85   $table = $self->_quote($table) unless ref($table);
86   $self->SUPER::insert($table, @_);
87 }
88
89 sub update {
90   my $self = shift;
91   my $table = shift;
92   $table = $self->_quote($table) unless ref($table);
93   $self->SUPER::update($table, @_);
94 }
95
96 sub delete {
97   my $self = shift;
98   my $table = shift;
99   $table = $self->_quote($table) unless ref($table);
100   $self->SUPER::delete($table, @_);
101 }
102
103 sub _emulate_limit {
104   my $self = shift;
105   if ($_[3] == -1) {
106     return $_[1].$self->_order_by($_[2]);
107   } else {
108     return $self->SUPER::_emulate_limit(@_);
109   }
110 }
111
112 sub _recurse_fields {
113   my ($self, $fields) = @_;
114   my $ref = ref $fields;
115   return $self->_quote($fields) unless $ref;
116   return $$fields if $ref eq 'SCALAR';
117
118   if ($ref eq 'ARRAY') {
119     return join(', ', map {
120       $self->_recurse_fields($_)
121       .(exists $self->{rownum_hack_count}
122          ? ' AS col'.$self->{rownum_hack_count}++
123          : '')
124      } @$fields);
125   } elsif ($ref eq 'HASH') {
126     foreach my $func (keys %$fields) {
127       return $self->_sqlcase($func)
128         .'( '.$self->_recurse_fields($fields->{$func}).' )';
129     }
130   }
131 }
132
133 sub _order_by {
134   my $self = shift;
135   my $ret = '';
136   my @extra;
137   if (ref $_[0] eq 'HASH') {
138     if (defined $_[0]->{group_by}) {
139       $ret = $self->_sqlcase(' group by ')
140                .$self->_recurse_fields($_[0]->{group_by});
141     }
142     if (defined $_[0]->{having}) {
143       my $frag;
144       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
145       push(@{$self->{having_bind}}, @extra);
146       $ret .= $self->_sqlcase(' having ').$frag;
147     }
148     if (defined $_[0]->{order_by}) {
149       $ret .= $self->_order_by($_[0]->{order_by});
150     }
151   } elsif (ref $_[0] eq 'SCALAR') {
152     $ret = $self->_sqlcase(' order by ').${ $_[0] };
153   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
154     my @order = @{+shift};
155     $ret = $self->_sqlcase(' order by ')
156           .join(', ', map {
157                         my $r = $self->_order_by($_, @_);
158                         $r =~ s/^ ?ORDER BY //i;
159                         $r;
160                       } @order);
161   } else {
162     $ret = $self->SUPER::_order_by(@_);
163   }
164   return $ret;
165 }
166
167 sub _order_directions {
168   my ($self, $order) = @_;
169   $order = $order->{order_by} if ref $order eq 'HASH';
170   return $self->SUPER::_order_directions($order);
171 }
172
173 sub _table {
174   my ($self, $from) = @_;
175   if (ref $from eq 'ARRAY') {
176     return $self->_recurse_from(@$from);
177   } elsif (ref $from eq 'HASH') {
178     return $self->_make_as($from);
179   } else {
180     return $from; # would love to quote here but _table ends up getting called
181                   # twice during an ->select without a limit clause due to
182                   # the way S::A::Limit->select works. should maybe consider
183                   # bypassing this and doing S::A::select($self, ...) in
184                   # our select method above. meantime, quoting shims have
185                   # been added to select/insert/update/delete here
186   }
187 }
188
189 sub _recurse_from {
190   my ($self, $from, @join) = @_;
191   my @sqlf;
192   push(@sqlf, $self->_make_as($from));
193   foreach my $j (@join) {
194     my ($to, $on) = @$j;
195
196     # check whether a join type exists
197     my $join_clause = '';
198     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
199     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
200       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
201     } else {
202       $join_clause = ' JOIN ';
203     }
204     push(@sqlf, $join_clause);
205
206     if (ref $to eq 'ARRAY') {
207       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
208     } else {
209       push(@sqlf, $self->_make_as($to));
210     }
211     push(@sqlf, ' ON ', $self->_join_condition($on));
212   }
213   return join('', @sqlf);
214 }
215
216 sub _make_as {
217   my ($self, $from) = @_;
218   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
219                      reverse each %{$self->_skip_options($from)});
220 }
221
222 sub _skip_options {
223   my ($self, $hash) = @_;
224   my $clean_hash = {};
225   $clean_hash->{$_} = $hash->{$_}
226     for grep {!/^-/} keys %$hash;
227   return $clean_hash;
228 }
229
230 sub _join_condition {
231   my ($self, $cond) = @_;
232   if (ref $cond eq 'HASH') {
233     my %j;
234     for (keys %$cond) {
235       my $v = $cond->{$_};
236       if (ref $v) {
237         # XXX no throw_exception() in this package and croak() fails with strange results
238         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
239             if ref($v) ne 'SCALAR';
240         $j{$_} = $v;
241       }
242       else {
243         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
244       }
245     };
246     return scalar($self->_recurse_where(\%j));
247   } elsif (ref $cond eq 'ARRAY') {
248     return join(' OR ', map { $self->_join_condition($_) } @$cond);
249   } else {
250     die "Can't handle this yet!";
251   }
252 }
253
254 sub _quote {
255   my ($self, $label) = @_;
256   return '' unless defined $label;
257   return "*" if $label eq '*';
258   return $label unless $self->{quote_char};
259   if(ref $self->{quote_char} eq "ARRAY"){
260     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
261       if !defined $self->{name_sep};
262     my $sep = $self->{name_sep};
263     return join($self->{name_sep},
264         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
265        split(/\Q$sep\E/,$label));
266   }
267   return $self->SUPER::_quote($label);
268 }
269
270 sub limit_dialect {
271     my $self = shift;
272     $self->{limit_dialect} = shift if @_;
273     return $self->{limit_dialect};
274 }
275
276 sub quote_char {
277     my $self = shift;
278     $self->{quote_char} = shift if @_;
279     return $self->{quote_char};
280 }
281
282 sub name_sep {
283     my $self = shift;
284     $self->{name_sep} = shift if @_;
285     return $self->{name_sep};
286 }
287
288 } # End of BEGIN block
289
290 use base qw/DBIx::Class/;
291
292 __PACKAGE__->load_components(qw/AccessorGroup/);
293
294 __PACKAGE__->mk_group_accessors('simple' =>
295   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
296      debug debugobj cursor on_connect_do transaction_depth/);
297
298 =head1 NAME
299
300 DBIx::Class::Storage::DBI - DBI storage handler
301
302 =head1 SYNOPSIS
303
304 =head1 DESCRIPTION
305
306 This class represents the connection to the database
307
308 =head1 METHODS
309
310 =head2 new
311
312 =cut
313
314 sub new {
315   my $new = {};
316   bless $new, (ref $_[0] || $_[0]);
317
318   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
319   $new->transaction_depth(0);
320
321   $new->debugobj(new DBIx::Class::Storage::Statistics());
322
323   my $fh;
324
325   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
326                   || $ENV{DBIC_TRACE};
327
328   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
329     $fh = IO::File->new($1, 'w')
330       or $new->throw_exception("Cannot open trace file $1");
331   } else {
332     $fh = IO::File->new('>&STDERR');
333   }
334   $new->debugfh($fh);
335   $new->debug(1) if $debug_env;
336   $new->_sql_maker_opts({});
337   return $new;
338 }
339
340 =head2 throw_exception
341
342 Throws an exception - croaks.
343
344 =cut
345
346 sub throw_exception {
347   my ($self, $msg) = @_;
348   croak($msg);
349 }
350
351 =head2 connect_info
352
353 The arguments of C<connect_info> are always a single array reference.
354
355 This is normally accessed via L<DBIx::Class::Schema/connection>, which
356 encapsulates its argument list in an arrayref before calling
357 C<connect_info> here.
358
359 The arrayref can either contain the same set of arguments one would
360 normally pass to L<DBI/connect>, or a lone code reference which returns
361 a connected database handle.
362
363 In either case, if the final argument in your connect_info happens
364 to be a hashref, C<connect_info> will look there for several
365 connection-specific options:
366
367 =over 4
368
369 =item on_connect_do
370
371 This can be set to an arrayref of literal sql statements, which will
372 be executed immediately after making the connection to the database
373 every time we [re-]connect.
374
375 =item limit_dialect 
376
377 Sets the limit dialect. This is useful for JDBC-bridge among others
378 where the remote SQL-dialect cannot be determined by the name of the
379 driver alone.
380
381 =item quote_char
382
383 Specifies what characters to use to quote table and column names. If 
384 you use this you will want to specify L<name_sep> as well.
385
386 quote_char expects either a single character, in which case is it is placed
387 on either side of the table/column, or an arrayref of length 2 in which case the
388 table/column name is placed between the elements.
389
390 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
391 use C<quote_char =E<gt> [qw/[ ]/]>.
392
393 =item name_sep
394
395 This only needs to be used in conjunction with L<quote_char>, and is used to 
396 specify the charecter that seperates elements (schemas, tables, columns) from 
397 each other. In most cases this is simply a C<.>.
398
399 =back
400
401 These options can be mixed in with your other L<DBI> connection attributes,
402 or placed in a seperate hashref after all other normal L<DBI> connection
403 arguments.
404
405 Every time C<connect_info> is invoked, any previous settings for
406 these options will be cleared before setting the new ones, regardless of
407 whether any options are specified in the new C<connect_info>.
408
409 Examples:
410
411   # Simple SQLite connection
412   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
413
414   # Connect via subref
415   ->connect_info([ sub { DBI->connect(...) } ]);
416
417   # A bit more complicated
418   ->connect_info(
419     [
420       'dbi:Pg:dbname=foo',
421       'postgres',
422       'my_pg_password',
423       { AutoCommit => 0 },
424       { quote_char => q{"}, name_sep => q{.} },
425     ]
426   );
427
428   # Equivalent to the previous example
429   ->connect_info(
430     [
431       'dbi:Pg:dbname=foo',
432       'postgres',
433       'my_pg_password',
434       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
435     ]
436   );
437
438   # Subref + DBIC-specific connection options
439   ->connect_info(
440     [
441       sub { DBI->connect(...) },
442       {
443           quote_char => q{`},
444           name_sep => q{@},
445           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
446       },
447     ]
448   );
449
450 =head2 on_connect_do
451
452 This method is deprecated in favor of setting via L</connect_info>.
453
454 =head2 debug
455
456 Causes SQL trace information to be emitted on the C<debugobj> object.
457 (or C<STDERR> if C<debugobj> has not specifically been set).
458
459 This is the equivalent to setting L</DBIC_TRACE> in your
460 shell environment.
461
462 =head2 debugfh
463
464 Set or retrieve the filehandle used for trace/debug output.  This should be
465 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
466 set to be STDERR - although see information on the
467 L<DBIC_TRACE> environment variable.
468
469 =cut
470
471 sub debugfh {
472     my $self = shift;
473
474     if ($self->debugobj->can('debugfh')) {
475         return $self->debugobj->debugfh(@_);
476     }
477 }
478
479 =head2 debugobj
480
481 Sets or retrieves the object used for metric collection. Defaults to an instance
482 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
483 method of using a coderef as a callback.  See the aforementioned Statistics
484 class for more information.
485
486 =head2 debugcb
487
488 Sets a callback to be executed each time a statement is run; takes a sub
489 reference.  Callback is executed as $sub->($op, $info) where $op is
490 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
491
492 See L<debugobj> for a better way.
493
494 =cut
495
496 sub debugcb {
497     my $self = shift;
498
499     if ($self->debugobj->can('callback')) {
500         return $self->debugobj->callback(@_);
501     }
502 }
503
504 =head2 disconnect
505
506 Disconnect the L<DBI> handle, performing a rollback first if the
507 database is not in C<AutoCommit> mode.
508
509 =cut
510
511 sub disconnect {
512   my ($self) = @_;
513
514   if( $self->connected ) {
515     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
516     $self->_dbh->disconnect;
517     $self->_dbh(undef);
518   }
519 }
520
521 =head2 connected
522
523 Check if the L<DBI> handle is connected.  Returns true if the handle
524 is connected.
525
526 =cut
527
528 sub connected { my ($self) = @_;
529
530   if(my $dbh = $self->_dbh) {
531       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
532           return $self->_dbh(undef);
533       }
534       elsif($self->_conn_pid != $$) {
535           $self->_dbh->{InactiveDestroy} = 1;
536           return $self->_dbh(undef);
537       }
538       return ($dbh->FETCH('Active') && $dbh->ping);
539   }
540
541   return 0;
542 }
543
544 =head2 ensure_connected
545
546 Check whether the database handle is connected - if not then make a
547 connection.
548
549 =cut
550
551 sub ensure_connected {
552   my ($self) = @_;
553
554   unless ($self->connected) {
555     $self->_populate_dbh;
556   }
557 }
558
559 =head2 dbh
560
561 Returns the dbh - a data base handle of class L<DBI>.
562
563 =cut
564
565 sub dbh {
566   my ($self) = @_;
567
568   $self->ensure_connected;
569   return $self->_dbh;
570 }
571
572 sub _sql_maker_args {
573     my ($self) = @_;
574     
575     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
576 }
577
578 =head2 sql_maker
579
580 Returns a C<sql_maker> object - normally an object of class
581 C<DBIC::SQL::Abstract>.
582
583 =cut
584
585 sub sql_maker {
586   my ($self) = @_;
587   unless ($self->_sql_maker) {
588     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
589   }
590   return $self->_sql_maker;
591 }
592
593 sub connect_info {
594   my ($self, $info_arg) = @_;
595
596   if($info_arg) {
597     # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
598     #  the new set of options
599     $self->_sql_maker(undef);
600     $self->_sql_maker_opts({});
601
602     my $info = [ @$info_arg ]; # copy because we can alter it
603     my $last_info = $info->[-1];
604     if(ref $last_info eq 'HASH') {
605       if(my $on_connect_do = delete $last_info->{on_connect_do}) {
606         $self->on_connect_do($on_connect_do);
607       }
608       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
609         if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
610           $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
611         }
612       }
613
614       # Get rid of any trailing empty hashref
615       pop(@$info) if !keys %$last_info;
616     }
617
618     $self->_connect_info($info);
619   }
620
621   $self->_connect_info;
622 }
623
624 sub _populate_dbh {
625   my ($self) = @_;
626   my @info = @{$self->_connect_info || []};
627   $self->_dbh($self->_connect(@info));
628
629   if(ref $self eq 'DBIx::Class::Storage::DBI') {
630     my $driver = $self->_dbh->{Driver}->{Name};
631     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
632       bless $self, "DBIx::Class::Storage::DBI::${driver}";
633       $self->_rebless() if $self->can('_rebless');
634     }
635   }
636
637   # if on-connect sql statements are given execute them
638   foreach my $sql_statement (@{$self->on_connect_do || []}) {
639     $self->debugobj->query_start($sql_statement) if $self->debug();
640     $self->_dbh->do($sql_statement);
641     $self->debugobj->query_end($sql_statement) if $self->debug();
642   }
643
644   $self->_conn_pid($$);
645   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
646 }
647
648 sub _connect {
649   my ($self, @info) = @_;
650
651   $self->throw_exception("You failed to provide any connection info")
652       if !@info;
653
654   my ($old_connect_via, $dbh);
655
656   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
657       $old_connect_via = $DBI::connect_via;
658       $DBI::connect_via = 'connect';
659   }
660
661   eval {
662     $dbh = ref $info[0] eq 'CODE'
663          ? &{$info[0]}
664          : DBI->connect(@info);
665   };
666
667   $DBI::connect_via = $old_connect_via if $old_connect_via;
668
669   if (!$dbh || $@) {
670     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
671   }
672
673   $dbh;
674 }
675
676 =head2 txn_begin
677
678 Calls begin_work on the current dbh.
679
680 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
681 an entire code block to be executed transactionally.
682
683 =cut
684
685 sub txn_begin {
686   my $self = shift;
687   if ($self->{transaction_depth}++ == 0) {
688     my $dbh = $self->dbh;
689     if ($dbh->{AutoCommit}) {
690       $self->debugobj->txn_begin()
691         if ($self->debug);
692       $dbh->begin_work;
693     }
694   }
695 }
696
697 =head2 txn_commit
698
699 Issues a commit against the current dbh.
700
701 =cut
702
703 sub txn_commit {
704   my $self = shift;
705   my $dbh = $self->dbh;
706   if ($self->{transaction_depth} == 0) {
707     unless ($dbh->{AutoCommit}) {
708       $self->debugobj->txn_commit()
709         if ($self->debug);
710       $dbh->commit;
711     }
712   }
713   else {
714     if (--$self->{transaction_depth} == 0) {
715       $self->debugobj->txn_commit()
716         if ($self->debug);
717       $dbh->commit;
718     }
719   }
720 }
721
722 =head2 txn_rollback
723
724 Issues a rollback against the current dbh. A nested rollback will
725 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
726 which allows the rollback to propagate to the outermost transaction.
727
728 =cut
729
730 sub txn_rollback {
731   my $self = shift;
732
733   eval {
734     my $dbh = $self->dbh;
735     if ($self->{transaction_depth} == 0) {
736       unless ($dbh->{AutoCommit}) {
737         $self->debugobj->txn_rollback()
738           if ($self->debug);
739         $dbh->rollback;
740       }
741     }
742     else {
743       if (--$self->{transaction_depth} == 0) {
744         $self->debugobj->txn_rollback()
745           if ($self->debug);
746         $dbh->rollback;
747       }
748       else {
749         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
750       }
751     }
752   };
753
754   if ($@) {
755     my $error = $@;
756     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
757     $error =~ /$exception_class/ and $self->throw_exception($error);
758     $self->{transaction_depth} = 0;          # ensure that a failed rollback
759     $self->throw_exception($error);          # resets the transaction depth
760   }
761 }
762
763 sub _execute {
764   my ($self, $op, $extra_bind, $ident, @args) = @_;
765   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
766   unshift(@bind, @$extra_bind) if $extra_bind;
767   if ($self->debug) {
768       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
769       $self->debugobj->query_start($sql, @debug_bind);
770   }
771   my $sth = eval { $self->sth($sql,$op) };
772
773   if (!$sth || $@) {
774     $self->throw_exception(
775       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
776     );
777   }
778   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
779   my $rv;
780   if ($sth) {
781     my $time = time();
782     $rv = eval { $sth->execute(@bind) };
783
784     if ($@ || !$rv) {
785       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
786     }
787   } else {
788     $self->throw_exception("'$sql' did not generate a statement.");
789   }
790   if ($self->debug) {
791       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind; 
792       $self->debugobj->query_end($sql, @debug_bind);
793   }
794   return (wantarray ? ($rv, $sth, @bind) : $rv);
795 }
796
797 sub insert {
798   my ($self, $ident, $to_insert) = @_;
799   $self->throw_exception(
800     "Couldn't insert ".join(', ',
801       map "$_ => $to_insert->{$_}", keys %$to_insert
802     )." into ${ident}"
803   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
804   return $to_insert;
805 }
806
807 sub update {
808   return shift->_execute('update' => [], @_);
809 }
810
811 sub delete {
812   return shift->_execute('delete' => [], @_);
813 }
814
815 sub _select {
816   my ($self, $ident, $select, $condition, $attrs) = @_;
817   my $order = $attrs->{order_by};
818   if (ref $condition eq 'SCALAR') {
819     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
820   }
821   if (exists $attrs->{group_by} || $attrs->{having}) {
822     $order = {
823       group_by => $attrs->{group_by},
824       having => $attrs->{having},
825       ($order ? (order_by => $order) : ())
826     };
827   }
828   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
829   if ($attrs->{software_limit} ||
830       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
831         $attrs->{software_limit} = 1;
832   } else {
833     $self->throw_exception("rows attribute must be positive if present")
834       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
835     push @args, $attrs->{rows}, $attrs->{offset};
836   }
837   return $self->_execute(@args);
838 }
839
840 =head2 select
841
842 =over 4
843
844 =item Arguments: $ident, $select, $condition, $attrs
845
846 =back
847
848 Handle a SQL select statement.
849
850 =cut
851
852 sub select {
853   my $self = shift;
854   my ($ident, $select, $condition, $attrs) = @_;
855   return $self->cursor->new($self, \@_, $attrs);
856 }
857
858 =head2 select_single
859
860 Performs a select, fetch and return of data - handles a single row
861 only.
862
863 =cut
864
865 # Need to call finish() to work round broken DBDs
866
867 sub select_single {
868   my $self = shift;
869   my ($rv, $sth, @bind) = $self->_select(@_);
870   my @row = $sth->fetchrow_array;
871   $sth->finish();
872   return @row;
873 }
874
875 =head2 sth
876
877 =over 4
878
879 =item Arguments: $sql
880
881 =back
882
883 Returns a L<DBI> sth (statement handle) for the supplied SQL.
884
885 =cut
886
887 sub sth {
888   my ($self, $sql) = @_;
889   # 3 is the if_active parameter which avoids active sth re-use
890   return $self->dbh->prepare_cached($sql, {}, 3);
891 }
892
893 =head2 columns_info_for
894
895 Returns database type info for a given table column.
896
897 =cut
898
899 sub columns_info_for {
900   my ($self, $table) = @_;
901
902   my $dbh = $self->dbh;
903
904   if ($dbh->can('column_info')) {
905     my %result;
906     local $dbh->{RaiseError} = 1;
907     local $dbh->{PrintError} = 0;
908     eval {
909       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
910       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
911       $sth->execute();
912
913       while ( my $info = $sth->fetchrow_hashref() ){
914         my %column_info;
915         $column_info{data_type}   = $info->{TYPE_NAME};
916         $column_info{size}      = $info->{COLUMN_SIZE};
917         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
918         $column_info{default_value} = $info->{COLUMN_DEF};
919         my $col_name = $info->{COLUMN_NAME};
920         $col_name =~ s/^\"(.*)\"$/$1/;
921
922         $result{$col_name} = \%column_info;
923       }
924     };
925     return \%result if !$@ && scalar keys %result;
926   }
927
928   my %result;
929   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
930   $sth->execute;
931   my @columns = @{$sth->{NAME_lc}};
932   for my $i ( 0 .. $#columns ){
933     my %column_info;
934     $column_info{data_type} = $sth->{TYPE}->[$i];
935     $column_info{size} = $sth->{PRECISION}->[$i];
936     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
937
938     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
939       $column_info{data_type} = $1;
940       $column_info{size}    = $2;
941     }
942
943     $result{$columns[$i]} = \%column_info;
944   }
945   $sth->finish;
946
947   foreach my $col (keys %result) {
948     my $colinfo = $result{$col};
949     my $type_num = $colinfo->{data_type};
950     my $type_name;
951     if(defined $type_num && $dbh->can('type_info')) {
952       my $type_info = $dbh->type_info($type_num);
953       $type_name = $type_info->{TYPE_NAME} if $type_info;
954       $colinfo->{data_type} = $type_name if $type_name;
955     }
956   }
957
958   return \%result;
959 }
960
961 =head2 last_insert_id
962
963 Return the row id of the last insert.
964
965 =cut
966
967 sub last_insert_id {
968   my ($self, $row) = @_;
969     
970   return $self->dbh->func('last_insert_rowid');
971
972 }
973
974 =head2 sqlt_type
975
976 Returns the database driver name.
977
978 =cut
979
980 sub sqlt_type { shift->dbh->{Driver}->{Name} }
981
982 =head2 create_ddl_dir (EXPERIMENTAL)
983
984 =over 4
985
986 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
987
988 =back
989
990 Creates a SQL file based on the Schema, for each of the specified
991 database types, in the given directory.
992
993 Note that this feature is currently EXPERIMENTAL and may not work correctly
994 across all databases, or fully handle complex relationships.
995
996 =cut
997
998 sub create_ddl_dir
999 {
1000   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1001
1002   if(!$dir || !-d $dir)
1003   {
1004     warn "No directory given, using ./\n";
1005     $dir = "./";
1006   }
1007   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1008   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1009   $version ||= $schema->VERSION || '1.x';
1010   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1011
1012   eval "use SQL::Translator";
1013   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1014
1015   my $sqlt = SQL::Translator->new($sqltargs);
1016   foreach my $db (@$databases)
1017   {
1018     $sqlt->reset();
1019     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1020 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1021     $sqlt->data($schema);
1022     $sqlt->producer($db);
1023
1024     my $file;
1025     my $filename = $schema->ddl_filename($db, $dir, $version);
1026     if(-e $filename)
1027     {
1028       $self->throw_exception("$filename already exists, skipping $db");
1029       next;
1030     }
1031     open($file, ">$filename") 
1032       or $self->throw_exception("Can't open $filename for writing ($!)");
1033     my $output = $sqlt->translate;
1034 #use Data::Dumper;
1035 #    print join(":", keys %{$schema->source_registrations});
1036 #    print Dumper($sqlt->schema);
1037     if(!$output)
1038     {
1039       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1040       next;
1041     }
1042     print $file $output;
1043     close($file);
1044   }
1045
1046 }
1047
1048 =head2 deployment_statements
1049
1050 =over 4
1051
1052 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1053
1054 =back
1055
1056 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1057 The database driver name is given by C<$type>, though the value from
1058 L</sqlt_type> is used if it is not specified.
1059
1060 C<$directory> is used to return statements from files in a previously created
1061 L</create_ddl_dir> directory and is optional. The filenames are constructed
1062 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1063
1064 If no C<$directory> is specified then the statements are constructed on the
1065 fly using L<SQL::Translator> and C<$version> is ignored.
1066
1067 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1068
1069 =cut
1070
1071 sub deployment_statements {
1072   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1073   # Need to be connected to get the correct sqlt_type
1074   $self->ensure_connected() unless $type;
1075   $type ||= $self->sqlt_type;
1076   $version ||= $schema->VERSION || '1.x';
1077   $dir ||= './';
1078   eval "use SQL::Translator";
1079   if(!$@)
1080   {
1081     eval "use SQL::Translator::Parser::DBIx::Class;";
1082     $self->throw_exception($@) if $@;
1083     eval "use SQL::Translator::Producer::${type};";
1084     $self->throw_exception($@) if $@;
1085     my $tr = SQL::Translator->new(%$sqltargs);
1086     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1087     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1088   }
1089
1090   my $filename = $schema->ddl_filename($type, $dir, $version);
1091   if(!-f $filename)
1092   {
1093 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1094       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1095       return;
1096   }
1097   my $file;
1098   open($file, "<$filename") 
1099       or $self->throw_exception("Can't open $filename ($!)");
1100   my @rows = <$file>;
1101   close($file);
1102
1103   return join('', @rows);
1104   
1105 }
1106
1107 =head2 deploy
1108
1109 Sends the appropriate statements to create or modify tables to the
1110 db. This would normally be called through
1111 L<DBIx::Class::Schema/deploy>.
1112
1113 =cut
1114
1115 sub deploy {
1116   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1117   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1118     for ( split(";\n", $statement)) {
1119       next if($_ =~ /^--/);
1120       next if(!$_);
1121 #      next if($_ =~ /^DROP/m);
1122       next if($_ =~ /^BEGIN TRANSACTION/m);
1123       next if($_ =~ /^COMMIT/m);
1124       next if $_ =~ /^\s+$/; # skip whitespace only
1125       $self->debugobj->query_start($_) if $self->debug;
1126       $self->dbh->do($_) or warn "SQL was:\n $_";
1127       $self->debugobj->query_end($_) if $self->debug;
1128     }
1129   }
1130 }
1131
1132 =head2 datetime_parser
1133
1134 Returns the datetime parser class
1135
1136 =cut
1137
1138 sub datetime_parser {
1139   my $self = shift;
1140   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1141 }
1142
1143 =head2 datetime_parser_type
1144
1145 Defines (returns) the datetime parser class - currently hardwired to
1146 L<DateTime::Format::MySQL>
1147
1148 =cut
1149
1150 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1151
1152 =head2 build_datetime_parser
1153
1154 See L</datetime_parser>
1155
1156 =cut
1157
1158 sub build_datetime_parser {
1159   my $self = shift;
1160   my $type = $self->datetime_parser_type(@_);
1161   eval "use ${type}";
1162   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1163   return $type;
1164 }
1165
1166 sub DESTROY {
1167   # NOTE: if there's a merge conflict here when -current is pushed
1168   #  back to trunk, take -current's version and ignore this trunk one :)
1169   my $self = shift;
1170
1171   if($self->_dbh && $self->_conn_pid != $$) {
1172     $self->_dbh->{InactiveDestroy} = 1;
1173   }
1174
1175   $self->_dbh(undef);
1176 }
1177
1178 1;
1179
1180 =head1 SQL METHODS
1181
1182 The module defines a set of methods within the DBIC::SQL::Abstract
1183 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1184 SQL query functions.
1185
1186 The following methods are extended:-
1187
1188 =over 4
1189
1190 =item delete
1191
1192 =item insert
1193
1194 =item select
1195
1196 =item update
1197
1198 =item limit_dialect
1199
1200 See L</connect_info> for details.
1201 For setting, this method is deprecated in favor of L</connect_info>.
1202
1203 =item quote_char
1204
1205 See L</connect_info> for details.
1206 For setting, this method is deprecated in favor of L</connect_info>.
1207
1208 =item name_sep
1209
1210 See L</connect_info> for details.
1211 For setting, this method is deprecated in favor of L</connect_info>.
1212
1213 =back
1214
1215 =head1 ENVIRONMENT VARIABLES
1216
1217 =head2 DBIC_TRACE
1218
1219 If C<DBIC_TRACE> is set then SQL trace information
1220 is produced (as when the L<debug> method is set).
1221
1222 If the value is of the form C<1=/path/name> then the trace output is
1223 written to the file C</path/name>.
1224
1225 This environment variable is checked when the storage object is first
1226 created (when you call connect on your schema).  So, run-time changes 
1227 to this environment variable will not take effect unless you also 
1228 re-connect on your schema.
1229
1230 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1231
1232 Old name for DBIC_TRACE
1233
1234 =head1 AUTHORS
1235
1236 Matt S. Trout <mst@shadowcatsystems.co.uk>
1237
1238 Andy Grundman <andy@hybridized.org>
1239
1240 =head1 LICENSE
1241
1242 You may distribute this code under the same terms as Perl itself.
1243
1244 =cut