Remove anonymous blesses to avoid major speed hit on Fedora Core 5, or 'the anti...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 # While we're at it, this should make LIMIT queries more efficient,
33 #  without digging into things too deeply
34 sub _find_syntax {
35   my ($self, $syntax) = @_;
36   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
37 }
38
39 sub select {
40   my ($self, $table, $fields, $where, $order, @rest) = @_;
41   $table = $self->_quote($table) unless ref($table);
42   local $self->{rownum_hack_count} = 1
43     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
44   @rest = (-1) unless defined $rest[0];
45   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
46     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
47   local $self->{having_bind} = [];
48   my ($sql, @ret) = $self->SUPER::select(
49     $table, $self->_recurse_fields($fields), $where, $order, @rest
50   );
51   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
52 }
53
54 sub insert {
55   my $self = shift;
56   my $table = shift;
57   $table = $self->_quote($table) unless ref($table);
58   $self->SUPER::insert($table, @_);
59 }
60
61 sub update {
62   my $self = shift;
63   my $table = shift;
64   $table = $self->_quote($table) unless ref($table);
65   $self->SUPER::update($table, @_);
66 }
67
68 sub delete {
69   my $self = shift;
70   my $table = shift;
71   $table = $self->_quote($table) unless ref($table);
72   $self->SUPER::delete($table, @_);
73 }
74
75 sub _emulate_limit {
76   my $self = shift;
77   if ($_[3] == -1) {
78     return $_[1].$self->_order_by($_[2]);
79   } else {
80     return $self->SUPER::_emulate_limit(@_);
81   }
82 }
83
84 sub _recurse_fields {
85   my ($self, $fields) = @_;
86   my $ref = ref $fields;
87   return $self->_quote($fields) unless $ref;
88   return $$fields if $ref eq 'SCALAR';
89
90   if ($ref eq 'ARRAY') {
91     return join(', ', map {
92       $self->_recurse_fields($_)
93       .(exists $self->{rownum_hack_count}
94          ? ' AS col'.$self->{rownum_hack_count}++
95          : '')
96      } @$fields);
97   } elsif ($ref eq 'HASH') {
98     foreach my $func (keys %$fields) {
99       return $self->_sqlcase($func)
100         .'( '.$self->_recurse_fields($fields->{$func}).' )';
101     }
102   }
103 }
104
105 sub _order_by {
106   my $self = shift;
107   my $ret = '';
108   my @extra;
109   if (ref $_[0] eq 'HASH') {
110     if (defined $_[0]->{group_by}) {
111       $ret = $self->_sqlcase(' group by ')
112                .$self->_recurse_fields($_[0]->{group_by});
113     }
114     if (defined $_[0]->{having}) {
115       my $frag;
116       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
117       push(@{$self->{having_bind}}, @extra);
118       $ret .= $self->_sqlcase(' having ').$frag;
119     }
120     if (defined $_[0]->{order_by}) {
121       $ret .= $self->_order_by($_[0]->{order_by});
122     }
123   } elsif (ref $_[0] eq 'SCALAR') {
124     $ret = $self->_sqlcase(' order by ').${ $_[0] };
125   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
126     my @order = @{+shift};
127     $ret = $self->_sqlcase(' order by ')
128           .join(', ', map {
129                         my $r = $self->_order_by($_, @_);
130                         $r =~ s/^ ?ORDER BY //i;
131                         $r;
132                       } @order);
133   } else {
134     $ret = $self->SUPER::_order_by(@_);
135   }
136   return $ret;
137 }
138
139 sub _order_directions {
140   my ($self, $order) = @_;
141   $order = $order->{order_by} if ref $order eq 'HASH';
142   return $self->SUPER::_order_directions($order);
143 }
144
145 sub _table {
146   my ($self, $from) = @_;
147   if (ref $from eq 'ARRAY') {
148     return $self->_recurse_from(@$from);
149   } elsif (ref $from eq 'HASH') {
150     return $self->_make_as($from);
151   } else {
152     return $from; # would love to quote here but _table ends up getting called
153                   # twice during an ->select without a limit clause due to
154                   # the way S::A::Limit->select works. should maybe consider
155                   # bypassing this and doing S::A::select($self, ...) in
156                   # our select method above. meantime, quoting shims have
157                   # been added to select/insert/update/delete here
158   }
159 }
160
161 sub _recurse_from {
162   my ($self, $from, @join) = @_;
163   my @sqlf;
164   push(@sqlf, $self->_make_as($from));
165   foreach my $j (@join) {
166     my ($to, $on) = @$j;
167
168     # check whether a join type exists
169     my $join_clause = '';
170     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
171     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
172       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
173     } else {
174       $join_clause = ' JOIN ';
175     }
176     push(@sqlf, $join_clause);
177
178     if (ref $to eq 'ARRAY') {
179       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
180     } else {
181       push(@sqlf, $self->_make_as($to));
182     }
183     push(@sqlf, ' ON ', $self->_join_condition($on));
184   }
185   return join('', @sqlf);
186 }
187
188 sub _make_as {
189   my ($self, $from) = @_;
190   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
191                      reverse each %{$self->_skip_options($from)});
192 }
193
194 sub _skip_options {
195   my ($self, $hash) = @_;
196   my $clean_hash = {};
197   $clean_hash->{$_} = $hash->{$_}
198     for grep {!/^-/} keys %$hash;
199   return $clean_hash;
200 }
201
202 sub _join_condition {
203   my ($self, $cond) = @_;
204   if (ref $cond eq 'HASH') {
205     my %j;
206     for (keys %$cond) {
207       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
208     };
209     return $self->_recurse_where(\%j);
210   } elsif (ref $cond eq 'ARRAY') {
211     return join(' OR ', map { $self->_join_condition($_) } @$cond);
212   } else {
213     die "Can't handle this yet!";
214   }
215 }
216
217 sub _quote {
218   my ($self, $label) = @_;
219   return '' unless defined $label;
220   return "*" if $label eq '*';
221   return $label unless $self->{quote_char};
222   if(ref $self->{quote_char} eq "ARRAY"){
223     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
224       if !defined $self->{name_sep};
225     my $sep = $self->{name_sep};
226     return join($self->{name_sep},
227         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
228        split(/\Q$sep\E/,$label));
229   }
230   return $self->SUPER::_quote($label);
231 }
232
233 sub limit_dialect {
234     my $self = shift;
235     $self->{limit_dialect} = shift if @_;
236     return $self->{limit_dialect};
237 }
238
239 sub quote_char {
240     my $self = shift;
241     $self->{quote_char} = shift if @_;
242     return $self->{quote_char};
243 }
244
245 sub name_sep {
246     my $self = shift;
247     $self->{name_sep} = shift if @_;
248     return $self->{name_sep};
249 }
250
251 } # End of BEGIN block
252
253 use base qw/DBIx::Class/;
254
255 __PACKAGE__->load_components(qw/AccessorGroup/);
256
257 __PACKAGE__->mk_group_accessors('simple' =>
258   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
259      debug debugobj cursor on_connect_do transaction_depth/);
260
261 =head1 NAME
262
263 DBIx::Class::Storage::DBI - DBI storage handler
264
265 =head1 SYNOPSIS
266
267 =head1 DESCRIPTION
268
269 This class represents the connection to the database
270
271 =head1 METHODS
272
273 =head2 new
274
275 =cut
276
277 sub new {
278   my $new = {};
279   bless $new, (ref $_[0] || $_[0]);
280
281   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
282   $new->transaction_depth(0);
283
284   $new->debugobj(new DBIx::Class::Storage::Statistics());
285
286   my $fh;
287
288   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
289                   || $ENV{DBIC_TRACE};
290
291   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
292     $fh = IO::File->new($1, 'w')
293       or $new->throw_exception("Cannot open trace file $1");
294   } else {
295     $fh = IO::File->new('>&STDERR');
296   }
297   $new->debugfh($fh);
298   $new->debug(1) if $debug_env;
299   $new->_sql_maker_opts({});
300   return $new;
301 }
302
303 =head2 throw_exception
304
305 Throws an exception - croaks.
306
307 =cut
308
309 sub throw_exception {
310   my ($self, $msg) = @_;
311   croak($msg);
312 }
313
314 =head2 connect_info
315
316 The arguments of C<connect_info> are always a single array reference.
317
318 This is normally accessed via L<DBIx::Class::Schema/connection>, which
319 encapsulates its argument list in an arrayref before calling
320 C<connect_info> here.
321
322 The arrayref can either contain the same set of arguments one would
323 normally pass to L<DBI/connect>, or a lone code reference which returns
324 a connected database handle.
325
326 In either case, if the final argument in your connect_info happens
327 to be a hashref, C<connect_info> will look there for several
328 connection-specific options:
329
330 =over 4
331
332 =item on_connect_do
333
334 This can be set to an arrayref of literal sql statements, which will
335 be executed immediately after making the connection to the database
336 every time we [re-]connect.
337
338 =item limit_dialect 
339
340 Sets the limit dialect. This is useful for JDBC-bridge among others
341 where the remote SQL-dialect cannot be determined by the name of the
342 driver alone.
343
344 =item quote_char
345
346 Specifies what characters to use to quote table and column names. If 
347 you use this you will want to specify L<name_sep> as well.
348
349 quote_char expects either a single character, in which case is it is placed
350 on either side of the table/column, or an arrayref of length 2 in which case the
351 table/column name is placed between the elements.
352
353 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
354 use C<quote_char =E<gt> [qw/[ ]/]>.
355
356 =item name_sep
357
358 This only needs to be used in conjunction with L<quote_char>, and is used to 
359 specify the charecter that seperates elements (schemas, tables, columns) from 
360 each other. In most cases this is simply a C<.>.
361
362 =back
363
364 These options can be mixed in with your other L<DBI> connection attributes,
365 or placed in a seperate hashref after all other normal L<DBI> connection
366 arguments.
367
368 Every time C<connect_info> is invoked, any previous settings for
369 these options will be cleared before setting the new ones, regardless of
370 whether any options are specified in the new C<connect_info>.
371
372 Examples:
373
374   # Simple SQLite connection
375   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
376
377   # Connect via subref
378   ->connect_info([ sub { DBI->connect(...) } ]);
379
380   # A bit more complicated
381   ->connect_info(
382     [
383       'dbi:Pg:dbname=foo',
384       'postgres',
385       'my_pg_password',
386       { AutoCommit => 0 },
387       { quote_char => q{"}, name_sep => q{.} },
388     ]
389   );
390
391   # Equivalent to the previous example
392   ->connect_info(
393     [
394       'dbi:Pg:dbname=foo',
395       'postgres',
396       'my_pg_password',
397       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
398     ]
399   );
400
401   # Subref + DBIC-specific connection options
402   ->connect_info(
403     [
404       sub { DBI->connect(...) },
405       {
406           quote_char => q{`},
407           name_sep => q{@},
408           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
409       },
410     ]
411   );
412
413 =head2 on_connect_do
414
415 This method is deprecated in favor of setting via L</connect_info>.
416
417 =head2 debug
418
419 Causes SQL trace information to be emitted on the C<debugobj> object.
420 (or C<STDERR> if C<debugobj> has not specifically been set).
421
422 This is the equivalent to setting L</DBIC_TRACE> in your
423 shell environment.
424
425 =head2 debugfh
426
427 Set or retrieve the filehandle used for trace/debug output.  This should be
428 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
429 set to be STDERR - although see information on the
430 L<DBIC_TRACE> environment variable.
431
432 =cut
433
434 sub debugfh {
435     my $self = shift;
436
437     if ($self->debugobj->can('debugfh')) {
438         return $self->debugobj->debugfh(@_);
439     }
440 }
441
442 =head2 debugobj
443
444 Sets or retrieves the object used for metric collection. Defaults to an instance
445 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
446 method of using a coderef as a callback.  See the aforementioned Statistics
447 class for more information.
448
449 =head2 debugcb
450
451 Sets a callback to be executed each time a statement is run; takes a sub
452 reference.  Callback is executed as $sub->($op, $info) where $op is
453 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
454
455 See L<debugobj> for a better way.
456
457 =cut
458
459 sub debugcb {
460     my $self = shift;
461
462     if ($self->debugobj->can('callback')) {
463         return $self->debugobj->callback(@_);
464     }
465 }
466
467 =head2 disconnect
468
469 Disconnect the L<DBI> handle, performing a rollback first if the
470 database is not in C<AutoCommit> mode.
471
472 =cut
473
474 sub disconnect {
475   my ($self) = @_;
476
477   if( $self->connected ) {
478     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
479     $self->_dbh->disconnect;
480     $self->_dbh(undef);
481   }
482 }
483
484 =head2 connected
485
486 Check if the L<DBI> handle is connected.  Returns true if the handle
487 is connected.
488
489 =cut
490
491 sub connected { my ($self) = @_;
492
493   if(my $dbh = $self->_dbh) {
494       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
495           return $self->_dbh(undef);
496       }
497       elsif($self->_conn_pid != $$) {
498           $self->_dbh->{InactiveDestroy} = 1;
499           return $self->_dbh(undef);
500       }
501       return ($dbh->FETCH('Active') && $dbh->ping);
502   }
503
504   return 0;
505 }
506
507 =head2 ensure_connected
508
509 Check whether the database handle is connected - if not then make a
510 connection.
511
512 =cut
513
514 sub ensure_connected {
515   my ($self) = @_;
516
517   unless ($self->connected) {
518     $self->_populate_dbh;
519   }
520 }
521
522 =head2 dbh
523
524 Returns the dbh - a data base handle of class L<DBI>.
525
526 =cut
527
528 sub dbh {
529   my ($self) = @_;
530
531   $self->ensure_connected;
532   return $self->_dbh;
533 }
534
535 sub _sql_maker_args {
536     my ($self) = @_;
537     
538     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
539 }
540
541 =head2 sql_maker
542
543 Returns a C<sql_maker> object - normally an object of class
544 C<DBIC::SQL::Abstract>.
545
546 =cut
547
548 sub sql_maker {
549   my ($self) = @_;
550   unless ($self->_sql_maker) {
551     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
552   }
553   return $self->_sql_maker;
554 }
555
556 sub connect_info {
557   my ($self, $info_arg) = @_;
558
559   if($info_arg) {
560     # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
561     #  the new set of options
562     $self->_sql_maker(undef);
563     $self->_sql_maker_opts({});
564
565     my $info = [ @$info_arg ]; # copy because we can alter it
566     my $last_info = $info->[-1];
567     if(ref $last_info eq 'HASH') {
568       if(my $on_connect_do = delete $last_info->{on_connect_do}) {
569         $self->on_connect_do($on_connect_do);
570       }
571       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
572         if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
573           $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
574         }
575       }
576
577       # Get rid of any trailing empty hashref
578       pop(@$info) if !keys %$last_info;
579     }
580
581     $self->_connect_info($info);
582   }
583
584   $self->_connect_info;
585 }
586
587 sub _populate_dbh {
588   my ($self) = @_;
589   my @info = @{$self->_connect_info || []};
590   $self->_dbh($self->_connect(@info));
591
592   if(ref $self eq 'DBIx::Class::Storage::DBI') {
593     my $driver = $self->_dbh->{Driver}->{Name};
594     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
595       bless $self, "DBIx::Class::Storage::DBI::${driver}";
596       $self->_rebless() if $self->can('_rebless');
597     }
598   }
599
600   # if on-connect sql statements are given execute them
601   foreach my $sql_statement (@{$self->on_connect_do || []}) {
602     $self->debugobj->query_start($sql_statement) if $self->debug();
603     $self->_dbh->do($sql_statement);
604     $self->debugobj->query_end($sql_statement) if $self->debug();
605   }
606
607   $self->_conn_pid($$);
608   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
609 }
610
611 sub _connect {
612   my ($self, @info) = @_;
613
614   $self->throw_exception("You failed to provide any connection info")
615       if !@info;
616
617   my ($old_connect_via, $dbh);
618
619   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
620       $old_connect_via = $DBI::connect_via;
621       $DBI::connect_via = 'connect';
622   }
623
624   eval {
625     $dbh = ref $info[0] eq 'CODE'
626          ? &{$info[0]}
627          : DBI->connect(@info);
628   };
629
630   $DBI::connect_via = $old_connect_via if $old_connect_via;
631
632   if (!$dbh || $@) {
633     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
634   }
635
636   $dbh;
637 }
638
639 =head2 txn_begin
640
641 Calls begin_work on the current dbh.
642
643 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
644 an entire code block to be executed transactionally.
645
646 =cut
647
648 sub txn_begin {
649   my $self = shift;
650   if ($self->{transaction_depth}++ == 0) {
651     my $dbh = $self->dbh;
652     if ($dbh->{AutoCommit}) {
653       $self->debugobj->txn_begin()
654         if ($self->debug);
655       $dbh->begin_work;
656     }
657   }
658 }
659
660 =head2 txn_commit
661
662 Issues a commit against the current dbh.
663
664 =cut
665
666 sub txn_commit {
667   my $self = shift;
668   my $dbh = $self->dbh;
669   if ($self->{transaction_depth} == 0) {
670     unless ($dbh->{AutoCommit}) {
671       $self->debugobj->txn_commit()
672         if ($self->debug);
673       $dbh->commit;
674     }
675   }
676   else {
677     if (--$self->{transaction_depth} == 0) {
678       $self->debugobj->txn_commit()
679         if ($self->debug);
680       $dbh->commit;
681     }
682   }
683 }
684
685 =head2 txn_rollback
686
687 Issues a rollback against the current dbh. A nested rollback will
688 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
689 which allows the rollback to propagate to the outermost transaction.
690
691 =cut
692
693 sub txn_rollback {
694   my $self = shift;
695
696   eval {
697     my $dbh = $self->dbh;
698     if ($self->{transaction_depth} == 0) {
699       unless ($dbh->{AutoCommit}) {
700         $self->debugobj->txn_rollback()
701           if ($self->debug);
702         $dbh->rollback;
703       }
704     }
705     else {
706       if (--$self->{transaction_depth} == 0) {
707         $self->debugobj->txn_rollback()
708           if ($self->debug);
709         $dbh->rollback;
710       }
711       else {
712         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
713       }
714     }
715   };
716
717   if ($@) {
718     my $error = $@;
719     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
720     $error =~ /$exception_class/ and $self->throw_exception($error);
721     $self->{transaction_depth} = 0;          # ensure that a failed rollback
722     $self->throw_exception($error);          # resets the transaction depth
723   }
724 }
725
726 sub _execute {
727   my ($self, $op, $extra_bind, $ident, @args) = @_;
728   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
729   unshift(@bind, @$extra_bind) if $extra_bind;
730   if ($self->debug) {
731       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
732       $self->debugobj->query_start($sql, @debug_bind);
733   }
734   my $sth = eval { $self->sth($sql,$op) };
735
736   if (!$sth || $@) {
737     $self->throw_exception(
738       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
739     );
740   }
741   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
742   my $rv;
743   if ($sth) {
744     my $time = time();
745     $rv = eval { $sth->execute(@bind) };
746
747     if ($@ || !$rv) {
748       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
749     }
750   } else {
751     $self->throw_exception("'$sql' did not generate a statement.");
752   }
753   if ($self->debug) {
754       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
755       $self->debugobj->query_end($sql, @debug_bind);
756   }
757   return (wantarray ? ($rv, $sth, @bind) : $rv);
758 }
759
760 sub insert {
761   my ($self, $ident, $to_insert) = @_;
762   $self->throw_exception(
763     "Couldn't insert ".join(', ',
764       map "$_ => $to_insert->{$_}", keys %$to_insert
765     )." into ${ident}"
766   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
767   return $to_insert;
768 }
769
770 sub update {
771   return shift->_execute('update' => [], @_);
772 }
773
774 sub delete {
775   return shift->_execute('delete' => [], @_);
776 }
777
778 sub _select {
779   my ($self, $ident, $select, $condition, $attrs) = @_;
780   my $order = $attrs->{order_by};
781   if (ref $condition eq 'SCALAR') {
782     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
783   }
784   if (exists $attrs->{group_by} || $attrs->{having}) {
785     $order = {
786       group_by => $attrs->{group_by},
787       having => $attrs->{having},
788       ($order ? (order_by => $order) : ())
789     };
790   }
791   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
792   if ($attrs->{software_limit} ||
793       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
794         $attrs->{software_limit} = 1;
795   } else {
796     $self->throw_exception("rows attribute must be positive if present")
797       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
798     push @args, $attrs->{rows}, $attrs->{offset};
799   }
800   return $self->_execute(@args);
801 }
802
803 =head2 select
804
805 Handle a SQL select statement.
806
807 =cut
808
809 sub select {
810   my $self = shift;
811   my ($ident, $select, $condition, $attrs) = @_;
812   return $self->cursor->new($self, \@_, $attrs);
813 }
814
815 =head2 select_single
816
817 Performs a select, fetch and return of data - handles a single row
818 only.
819
820 =cut
821
822 # Need to call finish() to work round broken DBDs
823
824 sub select_single {
825   my $self = shift;
826   my ($rv, $sth, @bind) = $self->_select(@_);
827   my @row = $sth->fetchrow_array;
828   $sth->finish();
829   return @row;
830 }
831
832 =head2 sth
833
834 Returns a L<DBI> sth (statement handle) for the supplied SQL.
835
836 =cut
837
838 sub sth {
839   my ($self, $sql) = @_;
840   # 3 is the if_active parameter which avoids active sth re-use
841   return $self->dbh->prepare_cached($sql, {}, 3);
842 }
843
844 =head2 columns_info_for
845
846 Returns database type info for a given table columns.
847
848 =cut
849
850 sub columns_info_for {
851   my ($self, $table) = @_;
852
853   my $dbh = $self->dbh;
854
855   if ($dbh->can('column_info')) {
856     my %result;
857     local $dbh->{RaiseError} = 1;
858     local $dbh->{PrintError} = 0;
859     eval {
860       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
861       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
862       $sth->execute();
863
864       # Some error occured or there is no information:
865       if($sth->rows <1) {
866           die "column_info returned no rows for $schema, $tab";
867       }
868
869       while ( my $info = $sth->fetchrow_hashref() ){
870         my %column_info;
871         $column_info{data_type}   = $info->{TYPE_NAME};
872         $column_info{size}      = $info->{COLUMN_SIZE};
873         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
874         $column_info{default_value} = $info->{COLUMN_DEF};
875         my $col_name = $info->{COLUMN_NAME};
876         $col_name =~ s/^\"(.*)\"$/$1/;
877
878         $result{$col_name} = \%column_info;
879       }
880     };
881     return \%result if !$@;
882   }
883
884   my %result;
885   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
886   $sth->execute;
887   my @columns = @{$sth->{NAME_lc}};
888   for my $i ( 0 .. $#columns ){
889     my %column_info;
890     my $type_num = $sth->{TYPE}->[$i];
891     my $type_name;
892     if(defined $type_num && $dbh->can('type_info')) {
893       my $type_info = $dbh->type_info($type_num);
894       $type_name = $type_info->{TYPE_NAME} if $type_info;
895     }
896     $column_info{data_type} = $type_name ? $type_name : $type_num;
897     $column_info{size} = $sth->{PRECISION}->[$i];
898     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
899
900     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
901       $column_info{data_type} = $1;
902       $column_info{size}    = $2;
903     }
904
905     $result{$columns[$i]} = \%column_info;
906   }
907
908   return \%result;
909 }
910
911 =head2 last_insert_id
912
913 Return the row id of the last insert.
914
915 =cut
916
917 sub last_insert_id {
918   my ($self, $row) = @_;
919     
920   return $self->dbh->func('last_insert_rowid');
921
922 }
923
924 =head2 sqlt_type
925
926 Returns the database driver name.
927
928 =cut
929
930 sub sqlt_type { shift->dbh->{Driver}->{Name} }
931
932 =head2 create_ddl_dir (EXPERIMENTAL)
933
934 =over 4
935
936 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
937
938 =back
939
940 Creates an SQL file based on the Schema, for each of the specified
941 database types, in the given directory.
942
943 Note that this feature is currently EXPERIMENTAL and may not work correctly
944 across all databases, or fully handle complex relationships.
945
946 =cut
947
948 sub create_ddl_dir
949 {
950   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
951
952   if(!$dir || !-d $dir)
953   {
954     warn "No directory given, using ./\n";
955     $dir = "./";
956   }
957   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
958   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
959   $version ||= $schema->VERSION || '1.x';
960   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
961
962   eval "use SQL::Translator";
963   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
964
965   my $sqlt = SQL::Translator->new($sqltargs);
966   foreach my $db (@$databases)
967   {
968     $sqlt->reset();
969     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
970 #    $sqlt->parser_args({'DBIx::Class' => $schema);
971     $sqlt->data($schema);
972     $sqlt->producer($db);
973
974     my $file;
975     my $filename = $schema->ddl_filename($db, $dir, $version);
976     if(-e $filename)
977     {
978       $self->throw_exception("$filename already exists, skipping $db");
979       next;
980     }
981     open($file, ">$filename") 
982       or $self->throw_exception("Can't open $filename for writing ($!)");
983     my $output = $sqlt->translate;
984 #use Data::Dumper;
985 #    print join(":", keys %{$schema->source_registrations});
986 #    print Dumper($sqlt->schema);
987     if(!$output)
988     {
989       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
990       next;
991     }
992     print $file $output;
993     close($file);
994   }
995
996 }
997
998 =head2 deployment_statements
999
1000 Create the statements for L</deploy> and
1001 L<DBIx::Class::Schema/deploy>.
1002
1003 =cut
1004
1005 sub deployment_statements {
1006   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1007   # Need to be connected to get the correct sqlt_type
1008   $self->ensure_connected() unless $type;
1009   $type ||= $self->sqlt_type;
1010   $version ||= $schema->VERSION || '1.x';
1011   $dir ||= './';
1012   eval "use SQL::Translator";
1013   if(!$@)
1014   {
1015     eval "use SQL::Translator::Parser::DBIx::Class;";
1016     $self->throw_exception($@) if $@;
1017     eval "use SQL::Translator::Producer::${type};";
1018     $self->throw_exception($@) if $@;
1019     my $tr = SQL::Translator->new(%$sqltargs);
1020     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1021     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1022   }
1023
1024   my $filename = $schema->ddl_filename($type, $dir, $version);
1025   if(!-f $filename)
1026   {
1027 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1028       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1029       return;
1030   }
1031   my $file;
1032   open($file, "<$filename") 
1033       or $self->throw_exception("Can't open $filename ($!)");
1034   my @rows = <$file>;
1035   close($file);
1036
1037   return join('', @rows);
1038   
1039 }
1040
1041 =head2 deploy
1042
1043 Sends the appropriate statements to create or modify tables to the
1044 db. This would normally be called through
1045 L<DBIx::Class::Schema/deploy>.
1046
1047 =cut
1048
1049 sub deploy {
1050   my ($self, $schema, $type, $sqltargs) = @_;
1051   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1052     for ( split(";\n", $statement)) {
1053       next if($_ =~ /^--/);
1054       next if(!$_);
1055 #      next if($_ =~ /^DROP/m);
1056       next if($_ =~ /^BEGIN TRANSACTION/m);
1057       next if($_ =~ /^COMMIT/m);
1058       next if $_ =~ /^\s+$/; # skip whitespace only
1059       $self->debugobj->query_start($_) if $self->debug;
1060       $self->dbh->do($_) or warn "SQL was:\n $_";
1061       $self->debugobj->query_end($_) if $self->debug;
1062     }
1063   }
1064 }
1065
1066 =head2 datetime_parser
1067
1068 Returns the datetime parser class
1069
1070 =cut
1071
1072 sub datetime_parser {
1073   my $self = shift;
1074   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1075 }
1076
1077 =head2 datetime_parser_type
1078
1079 Defines (returns) the datetime parser class - currently hardwired to
1080 L<DateTime::Format::MySQL>
1081
1082 =cut
1083
1084 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1085
1086 =head2 build_datetime_parser
1087
1088 See L</datetime_parser>
1089
1090 =cut
1091
1092 sub build_datetime_parser {
1093   my $self = shift;
1094   my $type = $self->datetime_parser_type(@_);
1095   eval "use ${type}";
1096   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1097   return $type;
1098 }
1099
1100 sub DESTROY { shift->disconnect }
1101
1102 1;
1103
1104 =head1 SQL METHODS
1105
1106 The module defines a set of methods within the DBIC::SQL::Abstract
1107 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1108 SQL query functions.
1109
1110 The following methods are extended:-
1111
1112 =over 4
1113
1114 =item delete
1115
1116 =item insert
1117
1118 =item select
1119
1120 =item update
1121
1122 =item limit_dialect
1123
1124 See L</connect_info> for details.
1125 For setting, this method is deprecated in favor of L</connect_info>.
1126
1127 =item quote_char
1128
1129 See L</connect_info> for details.
1130 For setting, this method is deprecated in favor of L</connect_info>.
1131
1132 =item name_sep
1133
1134 See L</connect_info> for details.
1135 For setting, this method is deprecated in favor of L</connect_info>.
1136
1137 =back
1138
1139 =head1 ENVIRONMENT VARIABLES
1140
1141 =head2 DBIC_TRACE
1142
1143 If C<DBIC_TRACE> is set then SQL trace information
1144 is produced (as when the L<debug> method is set).
1145
1146 If the value is of the form C<1=/path/name> then the trace output is
1147 written to the file C</path/name>.
1148
1149 This environment variable is checked when the storage object is first
1150 created (when you call connect on your schema).  So, run-time changes 
1151 to this environment variable will not take effect unless you also 
1152 re-connect on your schema.
1153
1154 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1155
1156 Old name for DBIC_TRACE
1157
1158 =head1 AUTHORS
1159
1160 Matt S. Trout <mst@shadowcatsystems.co.uk>
1161
1162 Andy Grundman <andy@hybridized.org>
1163
1164 =head1 LICENSE
1165
1166 You may distribute this code under the same terms as Perl itself.
1167
1168 =cut
1169