fixups to ORDER BY, tweaks to deepen some copies in ResultSet
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 # While we're at it, this should make LIMIT queries more efficient,
33 #  without digging into things too deeply
34 sub _find_syntax {
35   my ($self, $syntax) = @_;
36   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
37 }
38
39 sub select {
40   my ($self, $table, $fields, $where, $order, @rest) = @_;
41   $table = $self->_quote($table) unless ref($table);
42   @rest = (-1) unless defined $rest[0];
43   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
44     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
45   local $self->{having_bind} = [];
46   my ($sql, @ret) = $self->SUPER::select(
47     $table, $self->_recurse_fields($fields), $where, $order, @rest
48   );
49   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
50 }
51
52 sub insert {
53   my $self = shift;
54   my $table = shift;
55   $table = $self->_quote($table) unless ref($table);
56   $self->SUPER::insert($table, @_);
57 }
58
59 sub update {
60   my $self = shift;
61   my $table = shift;
62   $table = $self->_quote($table) unless ref($table);
63   $self->SUPER::update($table, @_);
64 }
65
66 sub delete {
67   my $self = shift;
68   my $table = shift;
69   $table = $self->_quote($table) unless ref($table);
70   $self->SUPER::delete($table, @_);
71 }
72
73 sub _emulate_limit {
74   my $self = shift;
75   if ($_[3] == -1) {
76     return $_[1].$self->_order_by($_[2]);
77   } else {
78     return $self->SUPER::_emulate_limit(@_);
79   }
80 }
81
82 sub _recurse_fields {
83   my ($self, $fields) = @_;
84   my $ref = ref $fields;
85   return $self->_quote($fields) unless $ref;
86   return $$fields if $ref eq 'SCALAR';
87
88   if ($ref eq 'ARRAY') {
89     return join(', ', map { $self->_recurse_fields($_) } @$fields);
90   } elsif ($ref eq 'HASH') {
91     foreach my $func (keys %$fields) {
92       return $self->_sqlcase($func)
93         .'( '.$self->_recurse_fields($fields->{$func}).' )';
94     }
95   }
96 }
97
98 sub _order_by {
99   my $self = shift;
100   my $ret = '';
101   my @extra;
102   if (ref $_[0] eq 'HASH') {
103     if (defined $_[0]->{group_by}) {
104       $ret = $self->_sqlcase(' group by ')
105                .$self->_recurse_fields($_[0]->{group_by});
106     }
107     if (defined $_[0]->{having}) {
108       my $frag;
109       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
110       push(@{$self->{having_bind}}, @extra);
111       $ret .= $self->_sqlcase(' having ').$frag;
112     }
113     if (defined $_[0]->{order_by}) {
114       $ret .= $self->SUPER::_order_by($_[0]->{order_by});
115     }
116   } elsif (ref $_[0] eq 'SCALAR') {
117     $ret = $self->_sqlcase(' order by ').${ $_[0] };
118   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
119     my @order = @{+shift};
120     $ret = $self->_sqlcase(' order by ')
121           .join(', ', map {
122                         my $r = $self->_order_by($_, @_);
123                         $r =~ s/^ ?ORDER BY //i;
124                         $r;
125                       } @order);
126   } else {
127     $ret = $self->SUPER::_order_by(@_);
128   }
129   return $ret;
130 }
131
132 sub _order_directions {
133   my ($self, $order) = @_;
134   $order = $order->{order_by} if ref $order eq 'HASH';
135   return $self->SUPER::_order_directions($order);
136 }
137
138 sub _table {
139   my ($self, $from) = @_;
140   if (ref $from eq 'ARRAY') {
141     return $self->_recurse_from(@$from);
142   } elsif (ref $from eq 'HASH') {
143     return $self->_make_as($from);
144   } else {
145     return $from; # would love to quote here but _table ends up getting called
146                   # twice during an ->select without a limit clause due to
147                   # the way S::A::Limit->select works. should maybe consider
148                   # bypassing this and doing S::A::select($self, ...) in
149                   # our select method above. meantime, quoting shims have
150                   # been added to select/insert/update/delete here
151   }
152 }
153
154 sub _recurse_from {
155   my ($self, $from, @join) = @_;
156   my @sqlf;
157   push(@sqlf, $self->_make_as($from));
158   foreach my $j (@join) {
159     my ($to, $on) = @$j;
160
161     # check whether a join type exists
162     my $join_clause = '';
163     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
164     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
165       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
166     } else {
167       $join_clause = ' JOIN ';
168     }
169     push(@sqlf, $join_clause);
170
171     if (ref $to eq 'ARRAY') {
172       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
173     } else {
174       push(@sqlf, $self->_make_as($to));
175     }
176     push(@sqlf, ' ON ', $self->_join_condition($on));
177   }
178   return join('', @sqlf);
179 }
180
181 sub _make_as {
182   my ($self, $from) = @_;
183   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
184                      reverse each %{$self->_skip_options($from)});
185 }
186
187 sub _skip_options {
188   my ($self, $hash) = @_;
189   my $clean_hash = {};
190   $clean_hash->{$_} = $hash->{$_}
191     for grep {!/^-/} keys %$hash;
192   return $clean_hash;
193 }
194
195 sub _join_condition {
196   my ($self, $cond) = @_;
197   if (ref $cond eq 'HASH') {
198     my %j;
199     for (keys %$cond) {
200       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
201     };
202     return $self->_recurse_where(\%j);
203   } elsif (ref $cond eq 'ARRAY') {
204     return join(' OR ', map { $self->_join_condition($_) } @$cond);
205   } else {
206     die "Can't handle this yet!";
207   }
208 }
209
210 sub _quote {
211   my ($self, $label) = @_;
212   return '' unless defined $label;
213   return "*" if $label eq '*';
214   return $label unless $self->{quote_char};
215   if(ref $self->{quote_char} eq "ARRAY"){
216     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
217       if !defined $self->{name_sep};
218     my $sep = $self->{name_sep};
219     return join($self->{name_sep},
220         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
221        split(/\Q$sep\E/,$label));
222   }
223   return $self->SUPER::_quote($label);
224 }
225
226 sub _RowNum {
227    my $self = shift;
228    my $c;
229    $_[0] =~ s/SELECT (.*?) FROM/
230      'SELECT '.join(', ', map { $_.' AS col'.++$c } split(', ', $1)).' FROM'/e;
231    $self->SUPER::_RowNum(@_);
232 }
233
234 sub limit_dialect {
235     my $self = shift;
236     $self->{limit_dialect} = shift if @_;
237     return $self->{limit_dialect};
238 }
239
240 sub quote_char {
241     my $self = shift;
242     $self->{quote_char} = shift if @_;
243     return $self->{quote_char};
244 }
245
246 sub name_sep {
247     my $self = shift;
248     $self->{name_sep} = shift if @_;
249     return $self->{name_sep};
250 }
251
252 } # End of BEGIN block
253
254 use base qw/DBIx::Class/;
255
256 __PACKAGE__->load_components(qw/AccessorGroup/);
257
258 __PACKAGE__->mk_group_accessors('simple' =>
259   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
260      debug debugobj cursor on_connect_do transaction_depth/);
261
262 =head1 NAME
263
264 DBIx::Class::Storage::DBI - DBI storage handler
265
266 =head1 SYNOPSIS
267
268 =head1 DESCRIPTION
269
270 This class represents the connection to the database
271
272 =head1 METHODS
273
274 =head2 new
275
276 =cut
277
278 sub new {
279   my $new = bless({}, ref $_[0] || $_[0]);
280   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
281   $new->transaction_depth(0);
282
283   $new->debugobj(new DBIx::Class::Storage::Statistics());
284
285   my $fh;
286
287   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
288                   || $ENV{DBIC_TRACE};
289
290   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
291     $fh = IO::File->new($1, 'w')
292       or $new->throw_exception("Cannot open trace file $1");
293   } else {
294     $fh = IO::File->new('>&STDERR');
295   }
296   $new->debugfh($fh);
297   $new->debug(1) if $debug_env;
298   $new->_sql_maker_opts({});
299   return $new;
300 }
301
302 =head2 throw_exception
303
304 Throws an exception - croaks.
305
306 =cut
307
308 sub throw_exception {
309   my ($self, $msg) = @_;
310   croak($msg);
311 }
312
313 =head2 connect_info
314
315 The arguments of C<connect_info> are always a single array reference.
316
317 This is normally accessed via L<DBIx::Class::Schema/connection>, which
318 encapsulates its argument list in an arrayref before calling
319 C<connect_info> here.
320
321 The arrayref can either contain the same set of arguments one would
322 normally pass to L<DBI/connect>, or a lone code reference which returns
323 a connected database handle.
324
325 In either case, if the final argument in your connect_info happens
326 to be a hashref, C<connect_info> will look there for several
327 connection-specific options:
328
329 =over 4
330
331 =item on_connect_do
332
333 This can be set to an arrayref of literal sql statements, which will
334 be executed immediately after making the connection to the database
335 every time we [re-]connect.
336
337 =item limit_dialect 
338
339 Sets the limit dialect. This is useful for JDBC-bridge among others
340 where the remote SQL-dialect cannot be determined by the name of the
341 driver alone.
342
343 =item quote_char
344
345 Specifies what characters to use to quote table and column names. If 
346 you use this you will want to specify L<name_sep> as well.
347
348 quote_char expects either a single character, in which case is it is placed
349 on either side of the table/column, or an arrayref of length 2 in which case the
350 table/column name is placed between the elements.
351
352 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
353 use C<quote_char =E<gt> [qw/[ ]/]>.
354
355 =item name_sep
356
357 This only needs to be used in conjunction with L<quote_char>, and is used to 
358 specify the charecter that seperates elements (schemas, tables, columns) from 
359 each other. In most cases this is simply a C<.>.
360
361 =back
362
363 These options can be mixed in with your other L<DBI> connection attributes,
364 or placed in a seperate hashref after all other normal L<DBI> connection
365 arguments.
366
367 Every time C<connect_info> is invoked, any previous settings for
368 these options will be cleared before setting the new ones, regardless of
369 whether any options are specified in the new C<connect_info>.
370
371 Examples:
372
373   # Simple SQLite connection
374   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
375
376   # Connect via subref
377   ->connect_info([ sub { DBI->connect(...) } ]);
378
379   # A bit more complicated
380   ->connect_info(
381     [
382       'dbi:Pg:dbname=foo',
383       'postgres',
384       'my_pg_password',
385       { AutoCommit => 0 },
386       { quote_char => q{"}, name_sep => q{.} },
387     ]
388   );
389
390   # Equivalent to the previous example
391   ->connect_info(
392     [
393       'dbi:Pg:dbname=foo',
394       'postgres',
395       'my_pg_password',
396       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
397     ]
398   );
399
400   # Subref + DBIC-specific connection options
401   ->connect_info(
402     [
403       sub { DBI->connect(...) },
404       {
405           quote_char => q{`},
406           name_sep => q{@},
407           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
408       },
409     ]
410   );
411
412 =head2 on_connect_do
413
414 This method is deprecated in favor of setting via L</connect_info>.
415
416 =head2 debug
417
418 Causes SQL trace information to be emitted on the C<debugobj> object.
419 (or C<STDERR> if C<debugobj> has not specifically been set).
420
421 =head2 debugfh
422
423 Set or retrieve the filehandle used for trace/debug output.  This should be
424 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
425 set to be STDERR - although see information on the
426 L<DBIC_TRACE> environment variable.
427
428 =cut
429
430 sub debugfh {
431     my $self = shift;
432
433     if ($self->debugobj->can('debugfh')) {
434         return $self->debugobj->debugfh(@_);
435     }
436 }
437
438 =head2 debugobj
439
440 Sets or retrieves the object used for metric collection. Defaults to an instance
441 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
442 method of using a coderef as a callback.  See the aforementioned Statistics
443 class for more information.
444
445 =head2 debugcb
446
447 Sets a callback to be executed each time a statement is run; takes a sub
448 reference.  Callback is executed as $sub->($op, $info) where $op is
449 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
450
451 See L<debugobj> for a better way.
452
453 =cut
454
455 sub debugcb {
456     my $self = shift;
457
458     if ($self->debugobj->can('callback')) {
459         return $self->debugobj->callback(@_);
460     }
461 }
462
463 =head2 disconnect
464
465 Disconnect the L<DBI> handle, performing a rollback first if the
466 database is not in C<AutoCommit> mode.
467
468 =cut
469
470 sub disconnect {
471   my ($self) = @_;
472
473   if( $self->connected ) {
474     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
475     $self->_dbh->disconnect;
476     $self->_dbh(undef);
477   }
478 }
479
480 =head2 connected
481
482 Check if the L<DBI> handle is connected.  Returns true if the handle
483 is connected.
484
485 =cut
486
487 sub connected { my ($self) = @_;
488
489   if(my $dbh = $self->_dbh) {
490       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
491           return $self->_dbh(undef);
492       }
493       elsif($self->_conn_pid != $$) {
494           $self->_dbh->{InactiveDestroy} = 1;
495           return $self->_dbh(undef);
496       }
497       return ($dbh->FETCH('Active') && $dbh->ping);
498   }
499
500   return 0;
501 }
502
503 =head2 ensure_connected
504
505 Check whether the database handle is connected - if not then make a
506 connection.
507
508 =cut
509
510 sub ensure_connected {
511   my ($self) = @_;
512
513   unless ($self->connected) {
514     $self->_populate_dbh;
515   }
516 }
517
518 =head2 dbh
519
520 Returns the dbh - a data base handle of class L<DBI>.
521
522 =cut
523
524 sub dbh {
525   my ($self) = @_;
526
527   $self->ensure_connected;
528   return $self->_dbh;
529 }
530
531 sub _sql_maker_args {
532     my ($self) = @_;
533     
534     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
535 }
536
537 =head2 sql_maker
538
539 Returns a C<sql_maker> object - normally an object of class
540 C<DBIC::SQL::Abstract>.
541
542 =cut
543
544 sub sql_maker {
545   my ($self) = @_;
546   unless ($self->_sql_maker) {
547     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
548   }
549   return $self->_sql_maker;
550 }
551
552 sub connect_info {
553   my ($self, $info_arg) = @_;
554
555   if($info_arg) {
556     # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
557     #  the new set of options
558     $self->_sql_maker(undef);
559     $self->_sql_maker_opts({});
560
561     my $info = [ @$info_arg ]; # copy because we can alter it
562     my $last_info = $info->[-1];
563     if(ref $last_info eq 'HASH') {
564       if(my $on_connect_do = delete $last_info->{on_connect_do}) {
565         $self->on_connect_do($on_connect_do);
566       }
567       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
568         if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
569           $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
570         }
571       }
572
573       # Get rid of any trailing empty hashref
574       pop(@$info) if !keys %$last_info;
575     }
576
577     $self->_connect_info($info);
578   }
579
580   $self->_connect_info;
581 }
582
583 sub _populate_dbh {
584   my ($self) = @_;
585   my @info = @{$self->_connect_info || []};
586   $self->_dbh($self->_connect(@info));
587
588   if(ref $self eq 'DBIx::Class::Storage::DBI') {
589     my $driver = $self->_dbh->{Driver}->{Name};
590     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
591       bless $self, "DBIx::Class::Storage::DBI::${driver}";
592       $self->_rebless() if $self->can('_rebless');
593     }
594   }
595
596   # if on-connect sql statements are given execute them
597   foreach my $sql_statement (@{$self->on_connect_do || []}) {
598     $self->debugobj->query_start($sql_statement) if $self->debug();
599     $self->_dbh->do($sql_statement);
600     $self->debugobj->query_end($sql_statement) if $self->debug();
601   }
602
603   $self->_conn_pid($$);
604   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
605 }
606
607 sub _connect {
608   my ($self, @info) = @_;
609
610   $self->throw_exception("You failed to provide any connection info")
611       if !@info;
612
613   my ($old_connect_via, $dbh);
614
615   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
616       $old_connect_via = $DBI::connect_via;
617       $DBI::connect_via = 'connect';
618   }
619
620   eval {
621     $dbh = ref $info[0] eq 'CODE'
622          ? &{$info[0]}
623          : DBI->connect(@info);
624   };
625
626   $DBI::connect_via = $old_connect_via if $old_connect_via;
627
628   if (!$dbh || $@) {
629     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
630   }
631
632   $dbh;
633 }
634
635 =head2 txn_begin
636
637 Calls begin_work on the current dbh.
638
639 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
640 an entire code block to be executed transactionally.
641
642 =cut
643
644 sub txn_begin {
645   my $self = shift;
646   if ($self->{transaction_depth}++ == 0) {
647     my $dbh = $self->dbh;
648     if ($dbh->{AutoCommit}) {
649       $self->debugobj->txn_begin()
650         if ($self->debug);
651       $dbh->begin_work;
652     }
653   }
654 }
655
656 =head2 txn_commit
657
658 Issues a commit against the current dbh.
659
660 =cut
661
662 sub txn_commit {
663   my $self = shift;
664   my $dbh = $self->dbh;
665   if ($self->{transaction_depth} == 0) {
666     unless ($dbh->{AutoCommit}) {
667       $self->debugobj->txn_commit()
668         if ($self->debug);
669       $dbh->commit;
670     }
671   }
672   else {
673     if (--$self->{transaction_depth} == 0) {
674       $self->debugobj->txn_commit()
675         if ($self->debug);
676       $dbh->commit;
677     }
678   }
679 }
680
681 =head2 txn_rollback
682
683 Issues a rollback against the current dbh. A nested rollback will
684 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
685 which allows the rollback to propagate to the outermost transaction.
686
687 =cut
688
689 sub txn_rollback {
690   my $self = shift;
691
692   eval {
693     my $dbh = $self->dbh;
694     if ($self->{transaction_depth} == 0) {
695       unless ($dbh->{AutoCommit}) {
696         $self->debugobj->txn_rollback()
697           if ($self->debug);
698         $dbh->rollback;
699       }
700     }
701     else {
702       if (--$self->{transaction_depth} == 0) {
703         $self->debugobj->txn_rollback()
704           if ($self->debug);
705         $dbh->rollback;
706       }
707       else {
708         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
709       }
710     }
711   };
712
713   if ($@) {
714     my $error = $@;
715     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
716     $error =~ /$exception_class/ and $self->throw_exception($error);
717     $self->{transaction_depth} = 0;          # ensure that a failed rollback
718     $self->throw_exception($error);          # resets the transaction depth
719   }
720 }
721
722 sub _execute {
723   my ($self, $op, $extra_bind, $ident, @args) = @_;
724   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
725   unshift(@bind, @$extra_bind) if $extra_bind;
726   if ($self->debug) {
727       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
728       $self->debugobj->query_start($sql, @debug_bind);
729   }
730   my $sth = eval { $self->sth($sql,$op) };
731
732   if (!$sth || $@) {
733     $self->throw_exception(
734       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
735     );
736   }
737   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
738   my $rv;
739   if ($sth) {
740     my $time = time();
741     $rv = eval { $sth->execute(@bind) };
742
743     if ($@ || !$rv) {
744       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
745     }
746   } else {
747     $self->throw_exception("'$sql' did not generate a statement.");
748   }
749   if ($self->debug) {
750       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
751       $self->debugobj->query_end($sql, @debug_bind);
752   }
753   return (wantarray ? ($rv, $sth, @bind) : $rv);
754 }
755
756 sub insert {
757   my ($self, $ident, $to_insert) = @_;
758   $self->throw_exception(
759     "Couldn't insert ".join(', ',
760       map "$_ => $to_insert->{$_}", keys %$to_insert
761     )." into ${ident}"
762   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
763   return $to_insert;
764 }
765
766 sub update {
767   return shift->_execute('update' => [], @_);
768 }
769
770 sub delete {
771   return shift->_execute('delete' => [], @_);
772 }
773
774 sub _select {
775   my ($self, $ident, $select, $condition, $attrs) = @_;
776   my $order = $attrs->{order_by};
777   if (ref $condition eq 'SCALAR') {
778     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
779   }
780   if (exists $attrs->{group_by} || $attrs->{having}) {
781     $order = {
782       group_by => $attrs->{group_by},
783       having => $attrs->{having},
784       ($order ? (order_by => $order) : ())
785     };
786   }
787   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
788   if ($attrs->{software_limit} ||
789       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
790         $attrs->{software_limit} = 1;
791   } else {
792     $self->throw_exception("rows attribute must be positive if present")
793       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
794     push @args, $attrs->{rows}, $attrs->{offset};
795   }
796   return $self->_execute(@args);
797 }
798
799 =head2 select
800
801 Handle a SQL select statement.
802
803 =cut
804
805 sub select {
806   my $self = shift;
807   my ($ident, $select, $condition, $attrs) = @_;
808   return $self->cursor->new($self, \@_, $attrs);
809 }
810
811 =head2 select_single
812
813 Performs a select, fetch and return of data - handles a single row
814 only.
815
816 =cut
817
818 # Need to call finish() to work round broken DBDs
819
820 sub select_single {
821   my $self = shift;
822   my ($rv, $sth, @bind) = $self->_select(@_);
823   my @row = $sth->fetchrow_array;
824   $sth->finish();
825   return @row;
826 }
827
828 =head2 sth
829
830 Returns a L<DBI> sth (statement handle) for the supplied SQL.
831
832 =cut
833
834 sub sth {
835   my ($self, $sql) = @_;
836   # 3 is the if_active parameter which avoids active sth re-use
837   return $self->dbh->prepare_cached($sql, {}, 3);
838 }
839
840 =head2 columns_info_for
841
842 Returns database type info for a given table columns.
843
844 =cut
845
846 sub columns_info_for {
847   my ($self, $table) = @_;
848
849   my $dbh = $self->dbh;
850
851   if ($dbh->can('column_info')) {
852     my %result;
853     my $old_raise_err = $dbh->{RaiseError};
854     my $old_print_err = $dbh->{PrintError};
855     $dbh->{RaiseError} = 1;
856     $dbh->{PrintError} = 0;
857     eval {
858       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
859       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
860       $sth->execute();
861       while ( my $info = $sth->fetchrow_hashref() ){
862         my %column_info;
863         $column_info{data_type}   = $info->{TYPE_NAME};
864         $column_info{size}      = $info->{COLUMN_SIZE};
865         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
866         $column_info{default_value} = $info->{COLUMN_DEF};
867         my $col_name = $info->{COLUMN_NAME};
868         $col_name =~ s/^\"(.*)\"$/$1/;
869
870         $result{$col_name} = \%column_info;
871       }
872     };
873     $dbh->{RaiseError} = $old_raise_err;
874     $dbh->{PrintError} = $old_print_err;
875     return \%result if !$@;
876   }
877
878   my %result;
879   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
880   $sth->execute;
881   my @columns = @{$sth->{NAME_lc}};
882   for my $i ( 0 .. $#columns ){
883     my %column_info;
884     my $type_num = $sth->{TYPE}->[$i];
885     my $type_name;
886     if(defined $type_num && $dbh->can('type_info')) {
887       my $type_info = $dbh->type_info($type_num);
888       $type_name = $type_info->{TYPE_NAME} if $type_info;
889     }
890     $column_info{data_type} = $type_name ? $type_name : $type_num;
891     $column_info{size} = $sth->{PRECISION}->[$i];
892     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
893
894     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
895       $column_info{data_type} = $1;
896       $column_info{size}    = $2;
897     }
898
899     $result{$columns[$i]} = \%column_info;
900   }
901
902   return \%result;
903 }
904
905 =head2 last_insert_id
906
907 Return the row id of the last insert.
908
909 =cut
910
911 sub last_insert_id {
912   my ($self, $row) = @_;
913     
914   return $self->dbh->func('last_insert_rowid');
915
916 }
917
918 =head2 sqlt_type
919
920 Returns the database driver name.
921
922 =cut
923
924 sub sqlt_type { shift->dbh->{Driver}->{Name} }
925
926 =head2 create_ddl_dir (EXPERIMENTAL)
927
928 =over 4
929
930 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
931
932 =back
933
934 Creates an SQL file based on the Schema, for each of the specified
935 database types, in the given directory.
936
937 Note that this feature is currently EXPERIMENTAL and may not work correctly
938 across all databases, or fully handle complex relationships.
939
940 =cut
941
942 sub create_ddl_dir
943 {
944   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
945
946   if(!$dir || !-d $dir)
947   {
948     warn "No directory given, using ./\n";
949     $dir = "./";
950   }
951   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
952   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
953   $version ||= $schema->VERSION || '1.x';
954
955   eval "use SQL::Translator";
956   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
957
958   my $sqlt = SQL::Translator->new({
959 #      debug => 1,
960       add_drop_table => 1,
961   });
962   foreach my $db (@$databases)
963   {
964     $sqlt->reset();
965     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
966 #    $sqlt->parser_args({'DBIx::Class' => $schema);
967     $sqlt->data($schema);
968     $sqlt->producer($db);
969
970     my $file;
971     my $filename = $schema->ddl_filename($db, $dir, $version);
972     if(-e $filename)
973     {
974       $self->throw_exception("$filename already exists, skipping $db");
975       next;
976     }
977     open($file, ">$filename") 
978       or $self->throw_exception("Can't open $filename for writing ($!)");
979     my $output = $sqlt->translate;
980 #use Data::Dumper;
981 #    print join(":", keys %{$schema->source_registrations});
982 #    print Dumper($sqlt->schema);
983     if(!$output)
984     {
985       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
986       next;
987     }
988     print $file $output;
989     close($file);
990   }
991
992 }
993
994 =head2 deployment_statements
995
996 Create the statements for L</deploy> and
997 L<DBIx::Class::Schema/deploy>.
998
999 =cut
1000
1001 sub deployment_statements {
1002   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1003   # Need to be connected to get the correct sqlt_type
1004   $self->ensure_connected() unless $type;
1005   $type ||= $self->sqlt_type;
1006   $version ||= $schema->VERSION || '1.x';
1007   $dir ||= './';
1008   eval "use SQL::Translator";
1009   if(!$@)
1010   {
1011     eval "use SQL::Translator::Parser::DBIx::Class;";
1012     $self->throw_exception($@) if $@;
1013     eval "use SQL::Translator::Producer::${type};";
1014     $self->throw_exception($@) if $@;
1015     my $tr = SQL::Translator->new(%$sqltargs);
1016     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1017     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1018   }
1019
1020   my $filename = $schema->ddl_filename($type, $dir, $version);
1021   if(!-f $filename)
1022   {
1023 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1024       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1025       return;
1026   }
1027   my $file;
1028   open($file, "<$filename") 
1029       or $self->throw_exception("Can't open $filename ($!)");
1030   my @rows = <$file>;
1031   close($file);
1032
1033   return join('', @rows);
1034   
1035 }
1036
1037 =head2 deploy
1038
1039 Sends the appropriate statements to create or modify tables to the
1040 db. This would normally be called through
1041 L<DBIx::Class::Schema/deploy>.
1042
1043 =cut
1044
1045 sub deploy {
1046   my ($self, $schema, $type, $sqltargs) = @_;
1047   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %$sqltargs }) ) {
1048     for ( split(";\n", $statement)) {
1049       next if($_ =~ /^--/);
1050       next if(!$_);
1051 #      next if($_ =~ /^DROP/m);
1052       next if($_ =~ /^BEGIN TRANSACTION/m);
1053       next if($_ =~ /^COMMIT/m);
1054       $self->debugobj->query_start($_) if $self->debug;
1055       $self->dbh->do($_) or warn "SQL was:\n $_";
1056       $self->debugobj->query_end($_) if $self->debug;
1057     }
1058   }
1059 }
1060
1061 =head2 datetime_parser
1062
1063 Returns the datetime parser class
1064
1065 =cut
1066
1067 sub datetime_parser {
1068   my $self = shift;
1069   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1070 }
1071
1072 =head2 datetime_parser_type
1073
1074 Defines (returns) the datetime parser class - currently hardwired to
1075 L<DateTime::Format::MySQL>
1076
1077 =cut
1078
1079 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1080
1081 =head2 build_datetime_parser
1082
1083 See L</datetime_parser>
1084
1085 =cut
1086
1087 sub build_datetime_parser {
1088   my $self = shift;
1089   my $type = $self->datetime_parser_type(@_);
1090   eval "use ${type}";
1091   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1092   return $type;
1093 }
1094
1095 sub DESTROY { shift->disconnect }
1096
1097 1;
1098
1099 =head1 SQL METHODS
1100
1101 The module defines a set of methods within the DBIC::SQL::Abstract
1102 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1103 SQL query functions.
1104
1105 The following methods are extended:-
1106
1107 =over 4
1108
1109 =item delete
1110
1111 =item insert
1112
1113 =item select
1114
1115 =item update
1116
1117 =item limit_dialect
1118
1119 See L</connect_info> for details.
1120 For setting, this method is deprecated in favor of L</connect_info>.
1121
1122 =item quote_char
1123
1124 See L</connect_info> for details.
1125 For setting, this method is deprecated in favor of L</connect_info>.
1126
1127 =item name_sep
1128
1129 See L</connect_info> for details.
1130 For setting, this method is deprecated in favor of L</connect_info>.
1131
1132 =back
1133
1134 =head1 ENVIRONMENT VARIABLES
1135
1136 =head2 DBIC_TRACE
1137
1138 If C<DBIC_TRACE> is set then SQL trace information
1139 is produced (as when the L<debug> method is set).
1140
1141 If the value is of the form C<1=/path/name> then the trace output is
1142 written to the file C</path/name>.
1143
1144 This environment variable is checked when the storage object is first
1145 created (when you call connect on your schema).  So, run-time changes 
1146 to this environment variable will not take effect unless you also 
1147 re-connect on your schema.
1148
1149 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1150
1151 Old name for DBIC_TRACE
1152
1153 =head1 AUTHORS
1154
1155 Matt S. Trout <mst@shadowcatsystems.co.uk>
1156
1157 Andy Grundman <andy@hybridized.org>
1158
1159 =head1 LICENSE
1160
1161 You may distribute this code under the same terms as Perl itself.
1162
1163 =cut
1164