using ->rows here was not a reliable thing to do
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 # While we're at it, this should make LIMIT queries more efficient,
33 #  without digging into things too deeply
34 sub _find_syntax {
35   my ($self, $syntax) = @_;
36   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
37 }
38
39 sub select {
40   my ($self, $table, $fields, $where, $order, @rest) = @_;
41   $table = $self->_quote($table) unless ref($table);
42   local $self->{rownum_hack_count} = 1
43     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
44   @rest = (-1) unless defined $rest[0];
45   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
46     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
47   local $self->{having_bind} = [];
48   my ($sql, @ret) = $self->SUPER::select(
49     $table, $self->_recurse_fields($fields), $where, $order, @rest
50   );
51   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
52 }
53
54 sub insert {
55   my $self = shift;
56   my $table = shift;
57   $table = $self->_quote($table) unless ref($table);
58   $self->SUPER::insert($table, @_);
59 }
60
61 sub update {
62   my $self = shift;
63   my $table = shift;
64   $table = $self->_quote($table) unless ref($table);
65   $self->SUPER::update($table, @_);
66 }
67
68 sub delete {
69   my $self = shift;
70   my $table = shift;
71   $table = $self->_quote($table) unless ref($table);
72   $self->SUPER::delete($table, @_);
73 }
74
75 sub _emulate_limit {
76   my $self = shift;
77   if ($_[3] == -1) {
78     return $_[1].$self->_order_by($_[2]);
79   } else {
80     return $self->SUPER::_emulate_limit(@_);
81   }
82 }
83
84 sub _recurse_fields {
85   my ($self, $fields) = @_;
86   my $ref = ref $fields;
87   return $self->_quote($fields) unless $ref;
88   return $$fields if $ref eq 'SCALAR';
89
90   if ($ref eq 'ARRAY') {
91     return join(', ', map {
92       $self->_recurse_fields($_)
93       .(exists $self->{rownum_hack_count}
94          ? ' AS col'.$self->{rownum_hack_count}++
95          : '')
96      } @$fields);
97   } elsif ($ref eq 'HASH') {
98     foreach my $func (keys %$fields) {
99       return $self->_sqlcase($func)
100         .'( '.$self->_recurse_fields($fields->{$func}).' )';
101     }
102   }
103 }
104
105 sub _order_by {
106   my $self = shift;
107   my $ret = '';
108   my @extra;
109   if (ref $_[0] eq 'HASH') {
110     if (defined $_[0]->{group_by}) {
111       $ret = $self->_sqlcase(' group by ')
112                .$self->_recurse_fields($_[0]->{group_by});
113     }
114     if (defined $_[0]->{having}) {
115       my $frag;
116       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
117       push(@{$self->{having_bind}}, @extra);
118       $ret .= $self->_sqlcase(' having ').$frag;
119     }
120     if (defined $_[0]->{order_by}) {
121       $ret .= $self->_order_by($_[0]->{order_by});
122     }
123   } elsif (ref $_[0] eq 'SCALAR') {
124     $ret = $self->_sqlcase(' order by ').${ $_[0] };
125   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
126     my @order = @{+shift};
127     $ret = $self->_sqlcase(' order by ')
128           .join(', ', map {
129                         my $r = $self->_order_by($_, @_);
130                         $r =~ s/^ ?ORDER BY //i;
131                         $r;
132                       } @order);
133   } else {
134     $ret = $self->SUPER::_order_by(@_);
135   }
136   return $ret;
137 }
138
139 sub _order_directions {
140   my ($self, $order) = @_;
141   $order = $order->{order_by} if ref $order eq 'HASH';
142   return $self->SUPER::_order_directions($order);
143 }
144
145 sub _table {
146   my ($self, $from) = @_;
147   if (ref $from eq 'ARRAY') {
148     return $self->_recurse_from(@$from);
149   } elsif (ref $from eq 'HASH') {
150     return $self->_make_as($from);
151   } else {
152     return $from; # would love to quote here but _table ends up getting called
153                   # twice during an ->select without a limit clause due to
154                   # the way S::A::Limit->select works. should maybe consider
155                   # bypassing this and doing S::A::select($self, ...) in
156                   # our select method above. meantime, quoting shims have
157                   # been added to select/insert/update/delete here
158   }
159 }
160
161 sub _recurse_from {
162   my ($self, $from, @join) = @_;
163   my @sqlf;
164   push(@sqlf, $self->_make_as($from));
165   foreach my $j (@join) {
166     my ($to, $on) = @$j;
167
168     # check whether a join type exists
169     my $join_clause = '';
170     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
171     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
172       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
173     } else {
174       $join_clause = ' JOIN ';
175     }
176     push(@sqlf, $join_clause);
177
178     if (ref $to eq 'ARRAY') {
179       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
180     } else {
181       push(@sqlf, $self->_make_as($to));
182     }
183     push(@sqlf, ' ON ', $self->_join_condition($on));
184   }
185   return join('', @sqlf);
186 }
187
188 sub _make_as {
189   my ($self, $from) = @_;
190   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
191                      reverse each %{$self->_skip_options($from)});
192 }
193
194 sub _skip_options {
195   my ($self, $hash) = @_;
196   my $clean_hash = {};
197   $clean_hash->{$_} = $hash->{$_}
198     for grep {!/^-/} keys %$hash;
199   return $clean_hash;
200 }
201
202 sub _join_condition {
203   my ($self, $cond) = @_;
204   if (ref $cond eq 'HASH') {
205     my %j;
206     for (keys %$cond) {
207       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
208     };
209     return $self->_recurse_where(\%j);
210   } elsif (ref $cond eq 'ARRAY') {
211     return join(' OR ', map { $self->_join_condition($_) } @$cond);
212   } else {
213     die "Can't handle this yet!";
214   }
215 }
216
217 sub _quote {
218   my ($self, $label) = @_;
219   return '' unless defined $label;
220   return "*" if $label eq '*';
221   return $label unless $self->{quote_char};
222   if(ref $self->{quote_char} eq "ARRAY"){
223     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
224       if !defined $self->{name_sep};
225     my $sep = $self->{name_sep};
226     return join($self->{name_sep},
227         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
228        split(/\Q$sep\E/,$label));
229   }
230   return $self->SUPER::_quote($label);
231 }
232
233 sub limit_dialect {
234     my $self = shift;
235     $self->{limit_dialect} = shift if @_;
236     return $self->{limit_dialect};
237 }
238
239 sub quote_char {
240     my $self = shift;
241     $self->{quote_char} = shift if @_;
242     return $self->{quote_char};
243 }
244
245 sub name_sep {
246     my $self = shift;
247     $self->{name_sep} = shift if @_;
248     return $self->{name_sep};
249 }
250
251 } # End of BEGIN block
252
253 use base qw/DBIx::Class/;
254
255 __PACKAGE__->load_components(qw/AccessorGroup/);
256
257 __PACKAGE__->mk_group_accessors('simple' =>
258   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
259      debug debugobj cursor on_connect_do transaction_depth/);
260
261 =head1 NAME
262
263 DBIx::Class::Storage::DBI - DBI storage handler
264
265 =head1 SYNOPSIS
266
267 =head1 DESCRIPTION
268
269 This class represents the connection to the database
270
271 =head1 METHODS
272
273 =head2 new
274
275 =cut
276
277 sub new {
278   my $new = {};
279   bless $new, (ref $_[0] || $_[0]);
280
281   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
282   $new->transaction_depth(0);
283
284   $new->debugobj(new DBIx::Class::Storage::Statistics());
285
286   my $fh;
287
288   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
289                   || $ENV{DBIC_TRACE};
290
291   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
292     $fh = IO::File->new($1, 'w')
293       or $new->throw_exception("Cannot open trace file $1");
294   } else {
295     $fh = IO::File->new('>&STDERR');
296   }
297   $new->debugfh($fh);
298   $new->debug(1) if $debug_env;
299   $new->_sql_maker_opts({});
300   return $new;
301 }
302
303 =head2 throw_exception
304
305 Throws an exception - croaks.
306
307 =cut
308
309 sub throw_exception {
310   my ($self, $msg) = @_;
311   croak($msg);
312 }
313
314 =head2 connect_info
315
316 The arguments of C<connect_info> are always a single array reference.
317
318 This is normally accessed via L<DBIx::Class::Schema/connection>, which
319 encapsulates its argument list in an arrayref before calling
320 C<connect_info> here.
321
322 The arrayref can either contain the same set of arguments one would
323 normally pass to L<DBI/connect>, or a lone code reference which returns
324 a connected database handle.
325
326 In either case, if the final argument in your connect_info happens
327 to be a hashref, C<connect_info> will look there for several
328 connection-specific options:
329
330 =over 4
331
332 =item on_connect_do
333
334 This can be set to an arrayref of literal sql statements, which will
335 be executed immediately after making the connection to the database
336 every time we [re-]connect.
337
338 =item limit_dialect 
339
340 Sets the limit dialect. This is useful for JDBC-bridge among others
341 where the remote SQL-dialect cannot be determined by the name of the
342 driver alone.
343
344 =item quote_char
345
346 Specifies what characters to use to quote table and column names. If 
347 you use this you will want to specify L<name_sep> as well.
348
349 quote_char expects either a single character, in which case is it is placed
350 on either side of the table/column, or an arrayref of length 2 in which case the
351 table/column name is placed between the elements.
352
353 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
354 use C<quote_char =E<gt> [qw/[ ]/]>.
355
356 =item name_sep
357
358 This only needs to be used in conjunction with L<quote_char>, and is used to 
359 specify the charecter that seperates elements (schemas, tables, columns) from 
360 each other. In most cases this is simply a C<.>.
361
362 =back
363
364 These options can be mixed in with your other L<DBI> connection attributes,
365 or placed in a seperate hashref after all other normal L<DBI> connection
366 arguments.
367
368 Every time C<connect_info> is invoked, any previous settings for
369 these options will be cleared before setting the new ones, regardless of
370 whether any options are specified in the new C<connect_info>.
371
372 Examples:
373
374   # Simple SQLite connection
375   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
376
377   # Connect via subref
378   ->connect_info([ sub { DBI->connect(...) } ]);
379
380   # A bit more complicated
381   ->connect_info(
382     [
383       'dbi:Pg:dbname=foo',
384       'postgres',
385       'my_pg_password',
386       { AutoCommit => 0 },
387       { quote_char => q{"}, name_sep => q{.} },
388     ]
389   );
390
391   # Equivalent to the previous example
392   ->connect_info(
393     [
394       'dbi:Pg:dbname=foo',
395       'postgres',
396       'my_pg_password',
397       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
398     ]
399   );
400
401   # Subref + DBIC-specific connection options
402   ->connect_info(
403     [
404       sub { DBI->connect(...) },
405       {
406           quote_char => q{`},
407           name_sep => q{@},
408           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
409       },
410     ]
411   );
412
413 =head2 on_connect_do
414
415 This method is deprecated in favor of setting via L</connect_info>.
416
417 =head2 debug
418
419 Causes SQL trace information to be emitted on the C<debugobj> object.
420 (or C<STDERR> if C<debugobj> has not specifically been set).
421
422 This is the equivalent to setting L</DBIC_TRACE> in your
423 shell environment.
424
425 =head2 debugfh
426
427 Set or retrieve the filehandle used for trace/debug output.  This should be
428 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
429 set to be STDERR - although see information on the
430 L<DBIC_TRACE> environment variable.
431
432 =cut
433
434 sub debugfh {
435     my $self = shift;
436
437     if ($self->debugobj->can('debugfh')) {
438         return $self->debugobj->debugfh(@_);
439     }
440 }
441
442 =head2 debugobj
443
444 Sets or retrieves the object used for metric collection. Defaults to an instance
445 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
446 method of using a coderef as a callback.  See the aforementioned Statistics
447 class for more information.
448
449 =head2 debugcb
450
451 Sets a callback to be executed each time a statement is run; takes a sub
452 reference.  Callback is executed as $sub->($op, $info) where $op is
453 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
454
455 See L<debugobj> for a better way.
456
457 =cut
458
459 sub debugcb {
460     my $self = shift;
461
462     if ($self->debugobj->can('callback')) {
463         return $self->debugobj->callback(@_);
464     }
465 }
466
467 =head2 disconnect
468
469 Disconnect the L<DBI> handle, performing a rollback first if the
470 database is not in C<AutoCommit> mode.
471
472 =cut
473
474 sub disconnect {
475   my ($self) = @_;
476
477   if( $self->connected ) {
478     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
479     $self->_dbh->disconnect;
480     $self->_dbh(undef);
481   }
482 }
483
484 =head2 connected
485
486 Check if the L<DBI> handle is connected.  Returns true if the handle
487 is connected.
488
489 =cut
490
491 sub connected { my ($self) = @_;
492
493   if(my $dbh = $self->_dbh) {
494       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
495           return $self->_dbh(undef);
496       }
497       elsif($self->_conn_pid != $$) {
498           $self->_dbh->{InactiveDestroy} = 1;
499           return $self->_dbh(undef);
500       }
501       return ($dbh->FETCH('Active') && $dbh->ping);
502   }
503
504   return 0;
505 }
506
507 =head2 ensure_connected
508
509 Check whether the database handle is connected - if not then make a
510 connection.
511
512 =cut
513
514 sub ensure_connected {
515   my ($self) = @_;
516
517   unless ($self->connected) {
518     $self->_populate_dbh;
519   }
520 }
521
522 =head2 dbh
523
524 Returns the dbh - a data base handle of class L<DBI>.
525
526 =cut
527
528 sub dbh {
529   my ($self) = @_;
530
531   $self->ensure_connected;
532   return $self->_dbh;
533 }
534
535 sub _sql_maker_args {
536     my ($self) = @_;
537     
538     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
539 }
540
541 =head2 sql_maker
542
543 Returns a C<sql_maker> object - normally an object of class
544 C<DBIC::SQL::Abstract>.
545
546 =cut
547
548 sub sql_maker {
549   my ($self) = @_;
550   unless ($self->_sql_maker) {
551     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
552   }
553   return $self->_sql_maker;
554 }
555
556 sub connect_info {
557   my ($self, $info_arg) = @_;
558
559   if($info_arg) {
560     # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
561     #  the new set of options
562     $self->_sql_maker(undef);
563     $self->_sql_maker_opts({});
564
565     my $info = [ @$info_arg ]; # copy because we can alter it
566     my $last_info = $info->[-1];
567     if(ref $last_info eq 'HASH') {
568       if(my $on_connect_do = delete $last_info->{on_connect_do}) {
569         $self->on_connect_do($on_connect_do);
570       }
571       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
572         if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
573           $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
574         }
575       }
576
577       # Get rid of any trailing empty hashref
578       pop(@$info) if !keys %$last_info;
579     }
580
581     $self->_connect_info($info);
582   }
583
584   $self->_connect_info;
585 }
586
587 sub _populate_dbh {
588   my ($self) = @_;
589   my @info = @{$self->_connect_info || []};
590   $self->_dbh($self->_connect(@info));
591
592   if(ref $self eq 'DBIx::Class::Storage::DBI') {
593     my $driver = $self->_dbh->{Driver}->{Name};
594     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
595       bless $self, "DBIx::Class::Storage::DBI::${driver}";
596       $self->_rebless() if $self->can('_rebless');
597     }
598   }
599
600   # if on-connect sql statements are given execute them
601   foreach my $sql_statement (@{$self->on_connect_do || []}) {
602     $self->debugobj->query_start($sql_statement) if $self->debug();
603     $self->_dbh->do($sql_statement);
604     $self->debugobj->query_end($sql_statement) if $self->debug();
605   }
606
607   $self->_conn_pid($$);
608   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
609 }
610
611 sub _connect {
612   my ($self, @info) = @_;
613
614   $self->throw_exception("You failed to provide any connection info")
615       if !@info;
616
617   my ($old_connect_via, $dbh);
618
619   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
620       $old_connect_via = $DBI::connect_via;
621       $DBI::connect_via = 'connect';
622   }
623
624   eval {
625     $dbh = ref $info[0] eq 'CODE'
626          ? &{$info[0]}
627          : DBI->connect(@info);
628   };
629
630   $DBI::connect_via = $old_connect_via if $old_connect_via;
631
632   if (!$dbh || $@) {
633     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
634   }
635
636   $dbh;
637 }
638
639 =head2 txn_begin
640
641 Calls begin_work on the current dbh.
642
643 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
644 an entire code block to be executed transactionally.
645
646 =cut
647
648 sub txn_begin {
649   my $self = shift;
650   if ($self->{transaction_depth}++ == 0) {
651     my $dbh = $self->dbh;
652     if ($dbh->{AutoCommit}) {
653       $self->debugobj->txn_begin()
654         if ($self->debug);
655       $dbh->begin_work;
656     }
657   }
658 }
659
660 =head2 txn_commit
661
662 Issues a commit against the current dbh.
663
664 =cut
665
666 sub txn_commit {
667   my $self = shift;
668   my $dbh = $self->dbh;
669   if ($self->{transaction_depth} == 0) {
670     unless ($dbh->{AutoCommit}) {
671       $self->debugobj->txn_commit()
672         if ($self->debug);
673       $dbh->commit;
674     }
675   }
676   else {
677     if (--$self->{transaction_depth} == 0) {
678       $self->debugobj->txn_commit()
679         if ($self->debug);
680       $dbh->commit;
681     }
682   }
683 }
684
685 =head2 txn_rollback
686
687 Issues a rollback against the current dbh. A nested rollback will
688 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
689 which allows the rollback to propagate to the outermost transaction.
690
691 =cut
692
693 sub txn_rollback {
694   my $self = shift;
695
696   eval {
697     my $dbh = $self->dbh;
698     if ($self->{transaction_depth} == 0) {
699       unless ($dbh->{AutoCommit}) {
700         $self->debugobj->txn_rollback()
701           if ($self->debug);
702         $dbh->rollback;
703       }
704     }
705     else {
706       if (--$self->{transaction_depth} == 0) {
707         $self->debugobj->txn_rollback()
708           if ($self->debug);
709         $dbh->rollback;
710       }
711       else {
712         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
713       }
714     }
715   };
716
717   if ($@) {
718     my $error = $@;
719     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
720     $error =~ /$exception_class/ and $self->throw_exception($error);
721     $self->{transaction_depth} = 0;          # ensure that a failed rollback
722     $self->throw_exception($error);          # resets the transaction depth
723   }
724 }
725
726 sub _execute {
727   my ($self, $op, $extra_bind, $ident, @args) = @_;
728   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
729   unshift(@bind, @$extra_bind) if $extra_bind;
730   if ($self->debug) {
731       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
732       $self->debugobj->query_start($sql, @debug_bind);
733   }
734   my $sth = eval { $self->sth($sql,$op) };
735
736   if (!$sth || $@) {
737     $self->throw_exception(
738       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
739     );
740   }
741   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
742   my $rv;
743   if ($sth) {
744     my $time = time();
745     $rv = eval { $sth->execute(@bind) };
746
747     if ($@ || !$rv) {
748       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
749     }
750   } else {
751     $self->throw_exception("'$sql' did not generate a statement.");
752   }
753   if ($self->debug) {
754       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
755       $self->debugobj->query_end($sql, @debug_bind);
756   }
757   return (wantarray ? ($rv, $sth, @bind) : $rv);
758 }
759
760 sub insert {
761   my ($self, $ident, $to_insert) = @_;
762   $self->throw_exception(
763     "Couldn't insert ".join(', ',
764       map "$_ => $to_insert->{$_}", keys %$to_insert
765     )." into ${ident}"
766   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
767   return $to_insert;
768 }
769
770 sub update {
771   return shift->_execute('update' => [], @_);
772 }
773
774 sub delete {
775   return shift->_execute('delete' => [], @_);
776 }
777
778 sub _select {
779   my ($self, $ident, $select, $condition, $attrs) = @_;
780   my $order = $attrs->{order_by};
781   if (ref $condition eq 'SCALAR') {
782     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
783   }
784   if (exists $attrs->{group_by} || $attrs->{having}) {
785     $order = {
786       group_by => $attrs->{group_by},
787       having => $attrs->{having},
788       ($order ? (order_by => $order) : ())
789     };
790   }
791   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
792   if ($attrs->{software_limit} ||
793       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
794         $attrs->{software_limit} = 1;
795   } else {
796     $self->throw_exception("rows attribute must be positive if present")
797       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
798     push @args, $attrs->{rows}, $attrs->{offset};
799   }
800   return $self->_execute(@args);
801 }
802
803 =head2 select
804
805 Handle a SQL select statement.
806
807 =cut
808
809 sub select {
810   my $self = shift;
811   my ($ident, $select, $condition, $attrs) = @_;
812   return $self->cursor->new($self, \@_, $attrs);
813 }
814
815 =head2 select_single
816
817 Performs a select, fetch and return of data - handles a single row
818 only.
819
820 =cut
821
822 # Need to call finish() to work round broken DBDs
823
824 sub select_single {
825   my $self = shift;
826   my ($rv, $sth, @bind) = $self->_select(@_);
827   my @row = $sth->fetchrow_array;
828   $sth->finish();
829   return @row;
830 }
831
832 =head2 sth
833
834 Returns a L<DBI> sth (statement handle) for the supplied SQL.
835
836 =cut
837
838 sub sth {
839   my ($self, $sql) = @_;
840   # 3 is the if_active parameter which avoids active sth re-use
841   return $self->dbh->prepare_cached($sql, {}, 3);
842 }
843
844 =head2 columns_info_for
845
846 Returns database type info for a given table columns.
847
848 =cut
849
850 sub columns_info_for {
851   my ($self, $table) = @_;
852
853   my $dbh = $self->dbh;
854
855   if ($dbh->can('column_info')) {
856     my %result;
857     local $dbh->{RaiseError} = 1;
858     local $dbh->{PrintError} = 0;
859     eval {
860       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
861       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
862       $sth->execute();
863
864       while ( my $info = $sth->fetchrow_hashref() ){
865         my %column_info;
866         $column_info{data_type}   = $info->{TYPE_NAME};
867         $column_info{size}      = $info->{COLUMN_SIZE};
868         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
869         $column_info{default_value} = $info->{COLUMN_DEF};
870         my $col_name = $info->{COLUMN_NAME};
871         $col_name =~ s/^\"(.*)\"$/$1/;
872
873         $result{$col_name} = \%column_info;
874       }
875     };
876     return \%result if !$@ && scalar keys %result;
877   }
878
879   my %result;
880   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
881   $sth->execute;
882   my @columns = @{$sth->{NAME_lc}};
883   for my $i ( 0 .. $#columns ){
884     my %column_info;
885     my $type_num = $sth->{TYPE}->[$i];
886     my $type_name;
887     if(defined $type_num && $dbh->can('type_info')) {
888       my $type_info = $dbh->type_info($type_num);
889       $type_name = $type_info->{TYPE_NAME} if $type_info;
890     }
891     $column_info{data_type} = $type_name ? $type_name : $type_num;
892     $column_info{size} = $sth->{PRECISION}->[$i];
893     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
894
895     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
896       $column_info{data_type} = $1;
897       $column_info{size}    = $2;
898     }
899
900     $result{$columns[$i]} = \%column_info;
901   }
902
903   return \%result;
904 }
905
906 =head2 last_insert_id
907
908 Return the row id of the last insert.
909
910 =cut
911
912 sub last_insert_id {
913   my ($self, $row) = @_;
914     
915   return $self->dbh->func('last_insert_rowid');
916
917 }
918
919 =head2 sqlt_type
920
921 Returns the database driver name.
922
923 =cut
924
925 sub sqlt_type { shift->dbh->{Driver}->{Name} }
926
927 =head2 create_ddl_dir (EXPERIMENTAL)
928
929 =over 4
930
931 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
932
933 =back
934
935 Creates an SQL file based on the Schema, for each of the specified
936 database types, in the given directory.
937
938 Note that this feature is currently EXPERIMENTAL and may not work correctly
939 across all databases, or fully handle complex relationships.
940
941 =cut
942
943 sub create_ddl_dir
944 {
945   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
946
947   if(!$dir || !-d $dir)
948   {
949     warn "No directory given, using ./\n";
950     $dir = "./";
951   }
952   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
953   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
954   $version ||= $schema->VERSION || '1.x';
955   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
956
957   eval "use SQL::Translator";
958   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
959
960   my $sqlt = SQL::Translator->new($sqltargs);
961   foreach my $db (@$databases)
962   {
963     $sqlt->reset();
964     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
965 #    $sqlt->parser_args({'DBIx::Class' => $schema);
966     $sqlt->data($schema);
967     $sqlt->producer($db);
968
969     my $file;
970     my $filename = $schema->ddl_filename($db, $dir, $version);
971     if(-e $filename)
972     {
973       $self->throw_exception("$filename already exists, skipping $db");
974       next;
975     }
976     open($file, ">$filename") 
977       or $self->throw_exception("Can't open $filename for writing ($!)");
978     my $output = $sqlt->translate;
979 #use Data::Dumper;
980 #    print join(":", keys %{$schema->source_registrations});
981 #    print Dumper($sqlt->schema);
982     if(!$output)
983     {
984       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
985       next;
986     }
987     print $file $output;
988     close($file);
989   }
990
991 }
992
993 =head2 deployment_statements
994
995 Create the statements for L</deploy> and
996 L<DBIx::Class::Schema/deploy>.
997
998 =cut
999
1000 sub deployment_statements {
1001   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1002   # Need to be connected to get the correct sqlt_type
1003   $self->ensure_connected() unless $type;
1004   $type ||= $self->sqlt_type;
1005   $version ||= $schema->VERSION || '1.x';
1006   $dir ||= './';
1007   eval "use SQL::Translator";
1008   if(!$@)
1009   {
1010     eval "use SQL::Translator::Parser::DBIx::Class;";
1011     $self->throw_exception($@) if $@;
1012     eval "use SQL::Translator::Producer::${type};";
1013     $self->throw_exception($@) if $@;
1014     my $tr = SQL::Translator->new(%$sqltargs);
1015     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1016     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1017   }
1018
1019   my $filename = $schema->ddl_filename($type, $dir, $version);
1020   if(!-f $filename)
1021   {
1022 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1023       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1024       return;
1025   }
1026   my $file;
1027   open($file, "<$filename") 
1028       or $self->throw_exception("Can't open $filename ($!)");
1029   my @rows = <$file>;
1030   close($file);
1031
1032   return join('', @rows);
1033   
1034 }
1035
1036 =head2 deploy
1037
1038 Sends the appropriate statements to create or modify tables to the
1039 db. This would normally be called through
1040 L<DBIx::Class::Schema/deploy>.
1041
1042 =cut
1043
1044 sub deploy {
1045   my ($self, $schema, $type, $sqltargs) = @_;
1046   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1047     for ( split(";\n", $statement)) {
1048       next if($_ =~ /^--/);
1049       next if(!$_);
1050 #      next if($_ =~ /^DROP/m);
1051       next if($_ =~ /^BEGIN TRANSACTION/m);
1052       next if($_ =~ /^COMMIT/m);
1053       next if $_ =~ /^\s+$/; # skip whitespace only
1054       $self->debugobj->query_start($_) if $self->debug;
1055       $self->dbh->do($_) or warn "SQL was:\n $_";
1056       $self->debugobj->query_end($_) if $self->debug;
1057     }
1058   }
1059 }
1060
1061 =head2 datetime_parser
1062
1063 Returns the datetime parser class
1064
1065 =cut
1066
1067 sub datetime_parser {
1068   my $self = shift;
1069   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1070 }
1071
1072 =head2 datetime_parser_type
1073
1074 Defines (returns) the datetime parser class - currently hardwired to
1075 L<DateTime::Format::MySQL>
1076
1077 =cut
1078
1079 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1080
1081 =head2 build_datetime_parser
1082
1083 See L</datetime_parser>
1084
1085 =cut
1086
1087 sub build_datetime_parser {
1088   my $self = shift;
1089   my $type = $self->datetime_parser_type(@_);
1090   eval "use ${type}";
1091   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1092   return $type;
1093 }
1094
1095 sub DESTROY { shift->disconnect }
1096
1097 1;
1098
1099 =head1 SQL METHODS
1100
1101 The module defines a set of methods within the DBIC::SQL::Abstract
1102 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1103 SQL query functions.
1104
1105 The following methods are extended:-
1106
1107 =over 4
1108
1109 =item delete
1110
1111 =item insert
1112
1113 =item select
1114
1115 =item update
1116
1117 =item limit_dialect
1118
1119 See L</connect_info> for details.
1120 For setting, this method is deprecated in favor of L</connect_info>.
1121
1122 =item quote_char
1123
1124 See L</connect_info> for details.
1125 For setting, this method is deprecated in favor of L</connect_info>.
1126
1127 =item name_sep
1128
1129 See L</connect_info> for details.
1130 For setting, this method is deprecated in favor of L</connect_info>.
1131
1132 =back
1133
1134 =head1 ENVIRONMENT VARIABLES
1135
1136 =head2 DBIC_TRACE
1137
1138 If C<DBIC_TRACE> is set then SQL trace information
1139 is produced (as when the L<debug> method is set).
1140
1141 If the value is of the form C<1=/path/name> then the trace output is
1142 written to the file C</path/name>.
1143
1144 This environment variable is checked when the storage object is first
1145 created (when you call connect on your schema).  So, run-time changes 
1146 to this environment variable will not take effect unless you also 
1147 re-connect on your schema.
1148
1149 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1150
1151 Old name for DBIC_TRACE
1152
1153 =head1 AUTHORS
1154
1155 Matt S. Trout <mst@shadowcatsystems.co.uk>
1156
1157 Andy Grundman <andy@hybridized.org>
1158
1159 =head1 LICENSE
1160
1161 You may distribute this code under the same terms as Perl itself.
1162
1163 =cut
1164