reverted previous deploy change(r8255) - sorry matt, i wont break it again
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 sub _RowNumberOver {
33   my ($self, $sql, $order, $rows, $offset ) = @_;
34
35   $offset += 1;
36   my $last = $rows + $offset;
37   my ( $order_by ) = $self->_order_by( $order );
38
39   $sql = <<"";
40 SELECT * FROM
41 (
42    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
43       $sql
44       $order_by
45    ) Q1
46 ) Q2
47 WHERE ROW_NUM BETWEEN $offset AND $last
48
49   return $sql;
50 }
51
52
53 # While we're at it, this should make LIMIT queries more efficient,
54 #  without digging into things too deeply
55 use Scalar::Util 'blessed';
56 sub _find_syntax {
57   my ($self, $syntax) = @_;
58   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
59 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
60   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
61     return 'RowNumberOver';
62   }
63
64   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
65 }
66
67 sub select {
68   my ($self, $table, $fields, $where, $order, @rest) = @_;
69   $table = $self->_quote($table) unless ref($table);
70   local $self->{rownum_hack_count} = 1
71     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
72   @rest = (-1) unless defined $rest[0];
73   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
74     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
75   local $self->{having_bind} = [];
76   my ($sql, @ret) = $self->SUPER::select(
77     $table, $self->_recurse_fields($fields), $where, $order, @rest
78   );
79   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
80 }
81
82 sub insert {
83   my $self = shift;
84   my $table = shift;
85   $table = $self->_quote($table) unless ref($table);
86   $self->SUPER::insert($table, @_);
87 }
88
89 sub update {
90   my $self = shift;
91   my $table = shift;
92   $table = $self->_quote($table) unless ref($table);
93   $self->SUPER::update($table, @_);
94 }
95
96 sub delete {
97   my $self = shift;
98   my $table = shift;
99   $table = $self->_quote($table) unless ref($table);
100   $self->SUPER::delete($table, @_);
101 }
102
103 sub _emulate_limit {
104   my $self = shift;
105   if ($_[3] == -1) {
106     return $_[1].$self->_order_by($_[2]);
107   } else {
108     return $self->SUPER::_emulate_limit(@_);
109   }
110 }
111
112 sub _recurse_fields {
113   my ($self, $fields) = @_;
114   my $ref = ref $fields;
115   return $self->_quote($fields) unless $ref;
116   return $$fields if $ref eq 'SCALAR';
117
118   if ($ref eq 'ARRAY') {
119     return join(', ', map {
120       $self->_recurse_fields($_)
121       .(exists $self->{rownum_hack_count}
122          ? ' AS col'.$self->{rownum_hack_count}++
123          : '')
124      } @$fields);
125   } elsif ($ref eq 'HASH') {
126     foreach my $func (keys %$fields) {
127       return $self->_sqlcase($func)
128         .'( '.$self->_recurse_fields($fields->{$func}).' )';
129     }
130   }
131 }
132
133 sub _order_by {
134   my $self = shift;
135   my $ret = '';
136   my @extra;
137   if (ref $_[0] eq 'HASH') {
138     if (defined $_[0]->{group_by}) {
139       $ret = $self->_sqlcase(' group by ')
140                .$self->_recurse_fields($_[0]->{group_by});
141     }
142     if (defined $_[0]->{having}) {
143       my $frag;
144       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
145       push(@{$self->{having_bind}}, @extra);
146       $ret .= $self->_sqlcase(' having ').$frag;
147     }
148     if (defined $_[0]->{order_by}) {
149       $ret .= $self->_order_by($_[0]->{order_by});
150     }
151   } elsif (ref $_[0] eq 'SCALAR') {
152     $ret = $self->_sqlcase(' order by ').${ $_[0] };
153   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
154     my @order = @{+shift};
155     $ret = $self->_sqlcase(' order by ')
156           .join(', ', map {
157                         my $r = $self->_order_by($_, @_);
158                         $r =~ s/^ ?ORDER BY //i;
159                         $r;
160                       } @order);
161   } else {
162     $ret = $self->SUPER::_order_by(@_);
163   }
164   return $ret;
165 }
166
167 sub _order_directions {
168   my ($self, $order) = @_;
169   $order = $order->{order_by} if ref $order eq 'HASH';
170   return $self->SUPER::_order_directions($order);
171 }
172
173 sub _table {
174   my ($self, $from) = @_;
175   if (ref $from eq 'ARRAY') {
176     return $self->_recurse_from(@$from);
177   } elsif (ref $from eq 'HASH') {
178     return $self->_make_as($from);
179   } else {
180     return $from; # would love to quote here but _table ends up getting called
181                   # twice during an ->select without a limit clause due to
182                   # the way S::A::Limit->select works. should maybe consider
183                   # bypassing this and doing S::A::select($self, ...) in
184                   # our select method above. meantime, quoting shims have
185                   # been added to select/insert/update/delete here
186   }
187 }
188
189 sub _recurse_from {
190   my ($self, $from, @join) = @_;
191   my @sqlf;
192   push(@sqlf, $self->_make_as($from));
193   foreach my $j (@join) {
194     my ($to, $on) = @$j;
195
196     # check whether a join type exists
197     my $join_clause = '';
198     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
199     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
200       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
201     } else {
202       $join_clause = ' JOIN ';
203     }
204     push(@sqlf, $join_clause);
205
206     if (ref $to eq 'ARRAY') {
207       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
208     } else {
209       push(@sqlf, $self->_make_as($to));
210     }
211     push(@sqlf, ' ON ', $self->_join_condition($on));
212   }
213   return join('', @sqlf);
214 }
215
216 sub _make_as {
217   my ($self, $from) = @_;
218   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
219                      reverse each %{$self->_skip_options($from)});
220 }
221
222 sub _skip_options {
223   my ($self, $hash) = @_;
224   my $clean_hash = {};
225   $clean_hash->{$_} = $hash->{$_}
226     for grep {!/^-/} keys %$hash;
227   return $clean_hash;
228 }
229
230 sub _join_condition {
231   my ($self, $cond) = @_;
232   if (ref $cond eq 'HASH') {
233     my %j;
234     for (keys %$cond) {
235       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
236     };
237     return $self->_recurse_where(\%j);
238   } elsif (ref $cond eq 'ARRAY') {
239     return join(' OR ', map { $self->_join_condition($_) } @$cond);
240   } else {
241     die "Can't handle this yet!";
242   }
243 }
244
245 sub _quote {
246   my ($self, $label) = @_;
247   return '' unless defined $label;
248   return "*" if $label eq '*';
249   return $label unless $self->{quote_char};
250   if(ref $self->{quote_char} eq "ARRAY"){
251     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
252       if !defined $self->{name_sep};
253     my $sep = $self->{name_sep};
254     return join($self->{name_sep},
255         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
256        split(/\Q$sep\E/,$label));
257   }
258   return $self->SUPER::_quote($label);
259 }
260
261 sub limit_dialect {
262     my $self = shift;
263     $self->{limit_dialect} = shift if @_;
264     return $self->{limit_dialect};
265 }
266
267 sub quote_char {
268     my $self = shift;
269     $self->{quote_char} = shift if @_;
270     return $self->{quote_char};
271 }
272
273 sub name_sep {
274     my $self = shift;
275     $self->{name_sep} = shift if @_;
276     return $self->{name_sep};
277 }
278
279 } # End of BEGIN block
280
281 use base qw/DBIx::Class/;
282
283 __PACKAGE__->load_components(qw/AccessorGroup/);
284
285 __PACKAGE__->mk_group_accessors('simple' =>
286   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
287      debug debugobj cursor on_connect_do transaction_depth/);
288
289 =head1 NAME
290
291 DBIx::Class::Storage::DBI - DBI storage handler
292
293 =head1 SYNOPSIS
294
295 =head1 DESCRIPTION
296
297 This class represents the connection to the database
298
299 =head1 METHODS
300
301 =head2 new
302
303 =cut
304
305 sub new {
306   my $new = {};
307   bless $new, (ref $_[0] || $_[0]);
308
309   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
310   $new->transaction_depth(0);
311
312   $new->debugobj(new DBIx::Class::Storage::Statistics());
313
314   my $fh;
315
316   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
317                   || $ENV{DBIC_TRACE};
318
319   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
320     $fh = IO::File->new($1, 'w')
321       or $new->throw_exception("Cannot open trace file $1");
322   } else {
323     $fh = IO::File->new('>&STDERR');
324   }
325   $new->debugfh($fh);
326   $new->debug(1) if $debug_env;
327   $new->_sql_maker_opts({});
328   return $new;
329 }
330
331 =head2 throw_exception
332
333 Throws an exception - croaks.
334
335 =cut
336
337 sub throw_exception {
338   my ($self, $msg) = @_;
339   croak($msg);
340 }
341
342 =head2 connect_info
343
344 The arguments of C<connect_info> are always a single array reference.
345
346 This is normally accessed via L<DBIx::Class::Schema/connection>, which
347 encapsulates its argument list in an arrayref before calling
348 C<connect_info> here.
349
350 The arrayref can either contain the same set of arguments one would
351 normally pass to L<DBI/connect>, or a lone code reference which returns
352 a connected database handle.
353
354 In either case, if the final argument in your connect_info happens
355 to be a hashref, C<connect_info> will look there for several
356 connection-specific options:
357
358 =over 4
359
360 =item on_connect_do
361
362 This can be set to an arrayref of literal sql statements, which will
363 be executed immediately after making the connection to the database
364 every time we [re-]connect.
365
366 =item limit_dialect 
367
368 Sets the limit dialect. This is useful for JDBC-bridge among others
369 where the remote SQL-dialect cannot be determined by the name of the
370 driver alone.
371
372 =item quote_char
373
374 Specifies what characters to use to quote table and column names. If 
375 you use this you will want to specify L<name_sep> as well.
376
377 quote_char expects either a single character, in which case is it is placed
378 on either side of the table/column, or an arrayref of length 2 in which case the
379 table/column name is placed between the elements.
380
381 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
382 use C<quote_char =E<gt> [qw/[ ]/]>.
383
384 =item name_sep
385
386 This only needs to be used in conjunction with L<quote_char>, and is used to 
387 specify the charecter that seperates elements (schemas, tables, columns) from 
388 each other. In most cases this is simply a C<.>.
389
390 =back
391
392 These options can be mixed in with your other L<DBI> connection attributes,
393 or placed in a seperate hashref after all other normal L<DBI> connection
394 arguments.
395
396 Every time C<connect_info> is invoked, any previous settings for
397 these options will be cleared before setting the new ones, regardless of
398 whether any options are specified in the new C<connect_info>.
399
400 Examples:
401
402   # Simple SQLite connection
403   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
404
405   # Connect via subref
406   ->connect_info([ sub { DBI->connect(...) } ]);
407
408   # A bit more complicated
409   ->connect_info(
410     [
411       'dbi:Pg:dbname=foo',
412       'postgres',
413       'my_pg_password',
414       { AutoCommit => 0 },
415       { quote_char => q{"}, name_sep => q{.} },
416     ]
417   );
418
419   # Equivalent to the previous example
420   ->connect_info(
421     [
422       'dbi:Pg:dbname=foo',
423       'postgres',
424       'my_pg_password',
425       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
426     ]
427   );
428
429   # Subref + DBIC-specific connection options
430   ->connect_info(
431     [
432       sub { DBI->connect(...) },
433       {
434           quote_char => q{`},
435           name_sep => q{@},
436           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
437       },
438     ]
439   );
440
441 =head2 on_connect_do
442
443 This method is deprecated in favor of setting via L</connect_info>.
444
445 =head2 debug
446
447 Causes SQL trace information to be emitted on the C<debugobj> object.
448 (or C<STDERR> if C<debugobj> has not specifically been set).
449
450 This is the equivalent to setting L</DBIC_TRACE> in your
451 shell environment.
452
453 =head2 debugfh
454
455 Set or retrieve the filehandle used for trace/debug output.  This should be
456 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
457 set to be STDERR - although see information on the
458 L<DBIC_TRACE> environment variable.
459
460 =cut
461
462 sub debugfh {
463     my $self = shift;
464
465     if ($self->debugobj->can('debugfh')) {
466         return $self->debugobj->debugfh(@_);
467     }
468 }
469
470 =head2 debugobj
471
472 Sets or retrieves the object used for metric collection. Defaults to an instance
473 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
474 method of using a coderef as a callback.  See the aforementioned Statistics
475 class for more information.
476
477 =head2 debugcb
478
479 Sets a callback to be executed each time a statement is run; takes a sub
480 reference.  Callback is executed as $sub->($op, $info) where $op is
481 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
482
483 See L<debugobj> for a better way.
484
485 =cut
486
487 sub debugcb {
488     my $self = shift;
489
490     if ($self->debugobj->can('callback')) {
491         return $self->debugobj->callback(@_);
492     }
493 }
494
495 =head2 disconnect
496
497 Disconnect the L<DBI> handle, performing a rollback first if the
498 database is not in C<AutoCommit> mode.
499
500 =cut
501
502 sub disconnect {
503   my ($self) = @_;
504
505   if( $self->connected ) {
506     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
507     $self->_dbh->disconnect;
508     $self->_dbh(undef);
509   }
510 }
511
512 =head2 connected
513
514 Check if the L<DBI> handle is connected.  Returns true if the handle
515 is connected.
516
517 =cut
518
519 sub connected { my ($self) = @_;
520
521   if(my $dbh = $self->_dbh) {
522       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
523           return $self->_dbh(undef);
524       }
525       elsif($self->_conn_pid != $$) {
526           $self->_dbh->{InactiveDestroy} = 1;
527           return $self->_dbh(undef);
528       }
529       return ($dbh->FETCH('Active') && $dbh->ping);
530   }
531
532   return 0;
533 }
534
535 =head2 ensure_connected
536
537 Check whether the database handle is connected - if not then make a
538 connection.
539
540 =cut
541
542 sub ensure_connected {
543   my ($self) = @_;
544
545   unless ($self->connected) {
546     $self->_populate_dbh;
547   }
548 }
549
550 =head2 dbh
551
552 Returns the dbh - a data base handle of class L<DBI>.
553
554 =cut
555
556 sub dbh {
557   my ($self) = @_;
558
559   $self->ensure_connected;
560   return $self->_dbh;
561 }
562
563 sub _sql_maker_args {
564     my ($self) = @_;
565     
566     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
567 }
568
569 =head2 sql_maker
570
571 Returns a C<sql_maker> object - normally an object of class
572 C<DBIC::SQL::Abstract>.
573
574 =cut
575
576 sub sql_maker {
577   my ($self) = @_;
578   unless ($self->_sql_maker) {
579     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
580   }
581   return $self->_sql_maker;
582 }
583
584 sub connect_info {
585   my ($self, $info_arg) = @_;
586
587   if($info_arg) {
588     # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
589     #  the new set of options
590     $self->_sql_maker(undef);
591     $self->_sql_maker_opts({});
592
593     my $info = [ @$info_arg ]; # copy because we can alter it
594     my $last_info = $info->[-1];
595     if(ref $last_info eq 'HASH') {
596       if(my $on_connect_do = delete $last_info->{on_connect_do}) {
597         $self->on_connect_do($on_connect_do);
598       }
599       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
600         if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
601           $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
602         }
603       }
604
605       # Get rid of any trailing empty hashref
606       pop(@$info) if !keys %$last_info;
607     }
608
609     $self->_connect_info($info);
610   }
611
612   $self->_connect_info;
613 }
614
615 sub _populate_dbh {
616   my ($self) = @_;
617   my @info = @{$self->_connect_info || []};
618   $self->_dbh($self->_connect(@info));
619
620   if(ref $self eq 'DBIx::Class::Storage::DBI') {
621     my $driver = $self->_dbh->{Driver}->{Name};
622     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
623       bless $self, "DBIx::Class::Storage::DBI::${driver}";
624       $self->_rebless() if $self->can('_rebless');
625     }
626   }
627
628   # if on-connect sql statements are given execute them
629   foreach my $sql_statement (@{$self->on_connect_do || []}) {
630     $self->debugobj->query_start($sql_statement) if $self->debug();
631     $self->_dbh->do($sql_statement);
632     $self->debugobj->query_end($sql_statement) if $self->debug();
633   }
634
635   $self->_conn_pid($$);
636   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
637 }
638
639 sub _connect {
640   my ($self, @info) = @_;
641
642   $self->throw_exception("You failed to provide any connection info")
643       if !@info;
644
645   my ($old_connect_via, $dbh);
646
647   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
648       $old_connect_via = $DBI::connect_via;
649       $DBI::connect_via = 'connect';
650   }
651
652   eval {
653     $dbh = ref $info[0] eq 'CODE'
654          ? &{$info[0]}
655          : DBI->connect(@info);
656   };
657
658   $DBI::connect_via = $old_connect_via if $old_connect_via;
659
660   if (!$dbh || $@) {
661     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
662   }
663
664   $dbh;
665 }
666
667 =head2 txn_begin
668
669 Calls begin_work on the current dbh.
670
671 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
672 an entire code block to be executed transactionally.
673
674 =cut
675
676 sub txn_begin {
677   my $self = shift;
678   if ($self->{transaction_depth}++ == 0) {
679     my $dbh = $self->dbh;
680     if ($dbh->{AutoCommit}) {
681       $self->debugobj->txn_begin()
682         if ($self->debug);
683       $dbh->begin_work;
684     }
685   }
686 }
687
688 =head2 txn_commit
689
690 Issues a commit against the current dbh.
691
692 =cut
693
694 sub txn_commit {
695   my $self = shift;
696   my $dbh = $self->dbh;
697   if ($self->{transaction_depth} == 0) {
698     unless ($dbh->{AutoCommit}) {
699       $self->debugobj->txn_commit()
700         if ($self->debug);
701       $dbh->commit;
702     }
703   }
704   else {
705     if (--$self->{transaction_depth} == 0) {
706       $self->debugobj->txn_commit()
707         if ($self->debug);
708       $dbh->commit;
709     }
710   }
711 }
712
713 =head2 txn_rollback
714
715 Issues a rollback against the current dbh. A nested rollback will
716 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
717 which allows the rollback to propagate to the outermost transaction.
718
719 =cut
720
721 sub txn_rollback {
722   my $self = shift;
723
724   eval {
725     my $dbh = $self->dbh;
726     if ($self->{transaction_depth} == 0) {
727       unless ($dbh->{AutoCommit}) {
728         $self->debugobj->txn_rollback()
729           if ($self->debug);
730         $dbh->rollback;
731       }
732     }
733     else {
734       if (--$self->{transaction_depth} == 0) {
735         $self->debugobj->txn_rollback()
736           if ($self->debug);
737         $dbh->rollback;
738       }
739       else {
740         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
741       }
742     }
743   };
744
745   if ($@) {
746     my $error = $@;
747     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
748     $error =~ /$exception_class/ and $self->throw_exception($error);
749     $self->{transaction_depth} = 0;          # ensure that a failed rollback
750     $self->throw_exception($error);          # resets the transaction depth
751   }
752 }
753
754 sub _execute {
755   my ($self, $op, $extra_bind, $ident, @args) = @_;
756   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
757   unshift(@bind, @$extra_bind) if $extra_bind;
758   if ($self->debug) {
759       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
760       $self->debugobj->query_start($sql, @debug_bind);
761   }
762   my $sth = eval { $self->sth($sql,$op) };
763
764   if (!$sth || $@) {
765     $self->throw_exception(
766       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
767     );
768   }
769   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
770   my $rv;
771   if ($sth) {
772     my $time = time();
773     $rv = eval { $sth->execute(@bind) };
774
775     if ($@ || !$rv) {
776       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
777     }
778   } else {
779     $self->throw_exception("'$sql' did not generate a statement.");
780   }
781   if ($self->debug) {
782       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind; 
783       $self->debugobj->query_end($sql, @debug_bind);
784   }
785   return (wantarray ? ($rv, $sth, @bind) : $rv);
786 }
787
788 sub insert {
789   my ($self, $ident, $to_insert) = @_;
790   $self->throw_exception(
791     "Couldn't insert ".join(', ',
792       map "$_ => $to_insert->{$_}", keys %$to_insert
793     )." into ${ident}"
794   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
795   return $to_insert;
796 }
797
798 sub update {
799   return shift->_execute('update' => [], @_);
800 }
801
802 sub delete {
803   return shift->_execute('delete' => [], @_);
804 }
805
806 sub _select {
807   my ($self, $ident, $select, $condition, $attrs) = @_;
808   my $order = $attrs->{order_by};
809   if (ref $condition eq 'SCALAR') {
810     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
811   }
812   if (exists $attrs->{group_by} || $attrs->{having}) {
813     $order = {
814       group_by => $attrs->{group_by},
815       having => $attrs->{having},
816       ($order ? (order_by => $order) : ())
817     };
818   }
819   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
820   if ($attrs->{software_limit} ||
821       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
822         $attrs->{software_limit} = 1;
823   } else {
824     $self->throw_exception("rows attribute must be positive if present")
825       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
826     push @args, $attrs->{rows}, $attrs->{offset};
827   }
828   return $self->_execute(@args);
829 }
830
831 =head2 select
832
833 =over 4
834
835 =item Arguments: $ident, $select, $condition, $attrs
836
837 =back
838
839 Handle a SQL select statement.
840
841 =cut
842
843 sub select {
844   my $self = shift;
845   my ($ident, $select, $condition, $attrs) = @_;
846   return $self->cursor->new($self, \@_, $attrs);
847 }
848
849 =head2 select_single
850
851 Performs a select, fetch and return of data - handles a single row
852 only.
853
854 =cut
855
856 # Need to call finish() to work round broken DBDs
857
858 sub select_single {
859   my $self = shift;
860   my ($rv, $sth, @bind) = $self->_select(@_);
861   my @row = $sth->fetchrow_array;
862   $sth->finish();
863   return @row;
864 }
865
866 =head2 sth
867
868 =over 4
869
870 =item Arguments: $sql
871
872 =back
873
874 Returns a L<DBI> sth (statement handle) for the supplied SQL.
875
876 =cut
877
878 sub sth {
879   my ($self, $sql) = @_;
880   # 3 is the if_active parameter which avoids active sth re-use
881   return $self->dbh->prepare_cached($sql, {}, 3);
882 }
883
884 =head2 columns_info_for
885
886 Returns database type info for a given table column.
887
888 =cut
889
890 sub columns_info_for {
891   my ($self, $table) = @_;
892
893   my $dbh = $self->dbh;
894
895   if ($dbh->can('column_info')) {
896     my %result;
897     local $dbh->{RaiseError} = 1;
898     local $dbh->{PrintError} = 0;
899     eval {
900       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
901       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
902       $sth->execute();
903
904       while ( my $info = $sth->fetchrow_hashref() ){
905         my %column_info;
906         $column_info{data_type}   = $info->{TYPE_NAME};
907         $column_info{size}      = $info->{COLUMN_SIZE};
908         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
909         $column_info{default_value} = $info->{COLUMN_DEF};
910         my $col_name = $info->{COLUMN_NAME};
911         $col_name =~ s/^\"(.*)\"$/$1/;
912
913         $result{$col_name} = \%column_info;
914       }
915     };
916     return \%result if !$@ && scalar keys %result;
917   }
918
919   my %result;
920   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
921   $sth->execute;
922   my @columns = @{$sth->{NAME_lc}};
923   for my $i ( 0 .. $#columns ){
924     my %column_info;
925     my $type_num = $sth->{TYPE}->[$i];
926     my $type_name;
927     if(defined $type_num && $dbh->can('type_info')) {
928       my $type_info = $dbh->type_info($type_num);
929       $type_name = $type_info->{TYPE_NAME} if $type_info;
930     }
931     $column_info{data_type} = $type_name ? $type_name : $type_num;
932     $column_info{size} = $sth->{PRECISION}->[$i];
933     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
934
935     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
936       $column_info{data_type} = $1;
937       $column_info{size}    = $2;
938     }
939
940     $result{$columns[$i]} = \%column_info;
941   }
942
943   return \%result;
944 }
945
946 =head2 last_insert_id
947
948 Return the row id of the last insert.
949
950 =cut
951
952 sub last_insert_id {
953   my ($self, $row) = @_;
954     
955   return $self->dbh->func('last_insert_rowid');
956
957 }
958
959 =head2 sqlt_type
960
961 Returns the database driver name.
962
963 =cut
964
965 sub sqlt_type { shift->dbh->{Driver}->{Name} }
966
967 =head2 create_ddl_dir (EXPERIMENTAL)
968
969 =over 4
970
971 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
972
973 =back
974
975 Creates a SQL file based on the Schema, for each of the specified
976 database types, in the given directory.
977
978 Note that this feature is currently EXPERIMENTAL and may not work correctly
979 across all databases, or fully handle complex relationships.
980
981 =cut
982
983 sub create_ddl_dir
984 {
985   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
986
987   if(!$dir || !-d $dir)
988   {
989     warn "No directory given, using ./\n";
990     $dir = "./";
991   }
992   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
993   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
994   $version ||= $schema->VERSION || '1.x';
995   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
996
997   eval "use SQL::Translator";
998   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
999
1000   my $sqlt = SQL::Translator->new($sqltargs);
1001   foreach my $db (@$databases)
1002   {
1003     $sqlt->reset();
1004     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1005 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1006     $sqlt->data($schema);
1007     $sqlt->producer($db);
1008
1009     my $file;
1010     my $filename = $schema->ddl_filename($db, $dir, $version);
1011     if(-e $filename)
1012     {
1013       $self->throw_exception("$filename already exists, skipping $db");
1014       next;
1015     }
1016     open($file, ">$filename") 
1017       or $self->throw_exception("Can't open $filename for writing ($!)");
1018     my $output = $sqlt->translate;
1019 #use Data::Dumper;
1020 #    print join(":", keys %{$schema->source_registrations});
1021 #    print Dumper($sqlt->schema);
1022     if(!$output)
1023     {
1024       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1025       next;
1026     }
1027     print $file $output;
1028     close($file);
1029   }
1030
1031 }
1032
1033 =head2 deployment_statements
1034
1035 =over 4
1036
1037 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1038
1039 =back
1040
1041 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1042 The database driver name is given by C<$type>, though the value from
1043 L</sqlt_type> is used if it is not specified.
1044
1045 C<$directory> is used to return statements from files in a previously created
1046 L</create_ddl_dir> directory and is optional. The filenames are constructed
1047 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1048
1049 If no C<$directory> is specified then the statements are constructed on the
1050 fly using L<SQL::Translator> and C<$version> is ignored.
1051
1052 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1053
1054 =cut
1055
1056 sub deployment_statements {
1057   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1058   # Need to be connected to get the correct sqlt_type
1059   $self->ensure_connected() unless $type;
1060   $type ||= $self->sqlt_type;
1061   $version ||= $schema->VERSION || '1.x';
1062   $dir ||= './';
1063   eval "use SQL::Translator";
1064   if(!$@)
1065   {
1066     eval "use SQL::Translator::Parser::DBIx::Class;";
1067     $self->throw_exception($@) if $@;
1068     eval "use SQL::Translator::Producer::${type};";
1069     $self->throw_exception($@) if $@;
1070     my $tr = SQL::Translator->new(%$sqltargs);
1071     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1072     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1073   }
1074
1075   my $filename = $schema->ddl_filename($type, $dir, $version);
1076   if(!-f $filename)
1077   {
1078 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1079       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1080       return;
1081   }
1082   my $file;
1083   open($file, "<$filename") 
1084       or $self->throw_exception("Can't open $filename ($!)");
1085   my @rows = <$file>;
1086   close($file);
1087
1088   return join('', @rows);
1089   
1090 }
1091
1092 =head2 deploy
1093
1094 Sends the appropriate statements to create or modify tables to the
1095 db. This would normally be called through
1096 L<DBIx::Class::Schema/deploy>.
1097
1098 =cut
1099
1100 sub deploy {
1101   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1102   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1103     for ( split(";\n", $statement)) {
1104       next if($_ =~ /^--/);
1105       next if(!$_);
1106 #      next if($_ =~ /^DROP/m);
1107       next if($_ =~ /^BEGIN TRANSACTION/m);
1108       next if($_ =~ /^COMMIT/m);
1109       next if $_ =~ /^\s+$/; # skip whitespace only
1110       $self->debugobj->query_start($_) if $self->debug;
1111       $self->dbh->do($_) or warn "SQL was:\n $_";
1112       $self->debugobj->query_end($_) if $self->debug;
1113     }
1114   }
1115 }
1116
1117 =head2 datetime_parser
1118
1119 Returns the datetime parser class
1120
1121 =cut
1122
1123 sub datetime_parser {
1124   my $self = shift;
1125   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1126 }
1127
1128 =head2 datetime_parser_type
1129
1130 Defines (returns) the datetime parser class - currently hardwired to
1131 L<DateTime::Format::MySQL>
1132
1133 =cut
1134
1135 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1136
1137 =head2 build_datetime_parser
1138
1139 See L</datetime_parser>
1140
1141 =cut
1142
1143 sub build_datetime_parser {
1144   my $self = shift;
1145   my $type = $self->datetime_parser_type(@_);
1146   eval "use ${type}";
1147   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1148   return $type;
1149 }
1150
1151 sub DESTROY {
1152   # NOTE: if there's a merge conflict here when -current is pushed
1153   #  back to trunk, take -current's version and ignore this trunk one :)
1154   my $self = shift;
1155
1156   if($self->_dbh && $self->_conn_pid != $$) {
1157     $self->_dbh->{InactiveDestroy} = 1;
1158   }
1159
1160   $self->_dbh(undef);
1161 }
1162
1163 1;
1164
1165 =head1 SQL METHODS
1166
1167 The module defines a set of methods within the DBIC::SQL::Abstract
1168 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1169 SQL query functions.
1170
1171 The following methods are extended:-
1172
1173 =over 4
1174
1175 =item delete
1176
1177 =item insert
1178
1179 =item select
1180
1181 =item update
1182
1183 =item limit_dialect
1184
1185 See L</connect_info> for details.
1186 For setting, this method is deprecated in favor of L</connect_info>.
1187
1188 =item quote_char
1189
1190 See L</connect_info> for details.
1191 For setting, this method is deprecated in favor of L</connect_info>.
1192
1193 =item name_sep
1194
1195 See L</connect_info> for details.
1196 For setting, this method is deprecated in favor of L</connect_info>.
1197
1198 =back
1199
1200 =head1 ENVIRONMENT VARIABLES
1201
1202 =head2 DBIC_TRACE
1203
1204 If C<DBIC_TRACE> is set then SQL trace information
1205 is produced (as when the L<debug> method is set).
1206
1207 If the value is of the form C<1=/path/name> then the trace output is
1208 written to the file C</path/name>.
1209
1210 This environment variable is checked when the storage object is first
1211 created (when you call connect on your schema).  So, run-time changes 
1212 to this environment variable will not take effect unless you also 
1213 re-connect on your schema.
1214
1215 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1216
1217 Old name for DBIC_TRACE
1218
1219 =head1 AUTHORS
1220
1221 Matt S. Trout <mst@shadowcatsystems.co.uk>
1222
1223 Andy Grundman <andy@hybridized.org>
1224
1225 =head1 LICENSE
1226
1227 You may distribute this code under the same terms as Perl itself.
1228
1229 =cut