1) Add an explicit error to columns_info_for if the given schema/table combination...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 # While we're at it, this should make LIMIT queries more efficient,
33 #  without digging into things too deeply
34 sub _find_syntax {
35   my ($self, $syntax) = @_;
36   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
37 }
38
39 sub select {
40   my ($self, $table, $fields, $where, $order, @rest) = @_;
41   $table = $self->_quote($table) unless ref($table);
42   local $self->{rownum_hack_count} = 1
43     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
44   @rest = (-1) unless defined $rest[0];
45   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
46     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
47   local $self->{having_bind} = [];
48   my ($sql, @ret) = $self->SUPER::select(
49     $table, $self->_recurse_fields($fields), $where, $order, @rest
50   );
51   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
52 }
53
54 sub insert {
55   my $self = shift;
56   my $table = shift;
57   $table = $self->_quote($table) unless ref($table);
58   $self->SUPER::insert($table, @_);
59 }
60
61 sub update {
62   my $self = shift;
63   my $table = shift;
64   $table = $self->_quote($table) unless ref($table);
65   $self->SUPER::update($table, @_);
66 }
67
68 sub delete {
69   my $self = shift;
70   my $table = shift;
71   $table = $self->_quote($table) unless ref($table);
72   $self->SUPER::delete($table, @_);
73 }
74
75 sub _emulate_limit {
76   my $self = shift;
77   if ($_[3] == -1) {
78     return $_[1].$self->_order_by($_[2]);
79   } else {
80     return $self->SUPER::_emulate_limit(@_);
81   }
82 }
83
84 sub _recurse_fields {
85   my ($self, $fields) = @_;
86   my $ref = ref $fields;
87   return $self->_quote($fields) unless $ref;
88   return $$fields if $ref eq 'SCALAR';
89
90   if ($ref eq 'ARRAY') {
91     return join(', ', map {
92       $self->_recurse_fields($_)
93       .(exists $self->{rownum_hack_count}
94          ? ' AS col'.$self->{rownum_hack_count}++
95          : '')
96      } @$fields);
97   } elsif ($ref eq 'HASH') {
98     foreach my $func (keys %$fields) {
99       return $self->_sqlcase($func)
100         .'( '.$self->_recurse_fields($fields->{$func}).' )';
101     }
102   }
103 }
104
105 sub _order_by {
106   my $self = shift;
107   my $ret = '';
108   my @extra;
109   if (ref $_[0] eq 'HASH') {
110     if (defined $_[0]->{group_by}) {
111       $ret = $self->_sqlcase(' group by ')
112                .$self->_recurse_fields($_[0]->{group_by});
113     }
114     if (defined $_[0]->{having}) {
115       my $frag;
116       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
117       push(@{$self->{having_bind}}, @extra);
118       $ret .= $self->_sqlcase(' having ').$frag;
119     }
120     if (defined $_[0]->{order_by}) {
121       $ret .= $self->_order_by($_[0]->{order_by});
122     }
123   } elsif (ref $_[0] eq 'SCALAR') {
124     $ret = $self->_sqlcase(' order by ').${ $_[0] };
125   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
126     my @order = @{+shift};
127     $ret = $self->_sqlcase(' order by ')
128           .join(', ', map {
129                         my $r = $self->_order_by($_, @_);
130                         $r =~ s/^ ?ORDER BY //i;
131                         $r;
132                       } @order);
133   } else {
134     $ret = $self->SUPER::_order_by(@_);
135   }
136   return $ret;
137 }
138
139 sub _order_directions {
140   my ($self, $order) = @_;
141   $order = $order->{order_by} if ref $order eq 'HASH';
142   return $self->SUPER::_order_directions($order);
143 }
144
145 sub _table {
146   my ($self, $from) = @_;
147   if (ref $from eq 'ARRAY') {
148     return $self->_recurse_from(@$from);
149   } elsif (ref $from eq 'HASH') {
150     return $self->_make_as($from);
151   } else {
152     return $from; # would love to quote here but _table ends up getting called
153                   # twice during an ->select without a limit clause due to
154                   # the way S::A::Limit->select works. should maybe consider
155                   # bypassing this and doing S::A::select($self, ...) in
156                   # our select method above. meantime, quoting shims have
157                   # been added to select/insert/update/delete here
158   }
159 }
160
161 sub _recurse_from {
162   my ($self, $from, @join) = @_;
163   my @sqlf;
164   push(@sqlf, $self->_make_as($from));
165   foreach my $j (@join) {
166     my ($to, $on) = @$j;
167
168     # check whether a join type exists
169     my $join_clause = '';
170     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
171     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
172       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
173     } else {
174       $join_clause = ' JOIN ';
175     }
176     push(@sqlf, $join_clause);
177
178     if (ref $to eq 'ARRAY') {
179       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
180     } else {
181       push(@sqlf, $self->_make_as($to));
182     }
183     push(@sqlf, ' ON ', $self->_join_condition($on));
184   }
185   return join('', @sqlf);
186 }
187
188 sub _make_as {
189   my ($self, $from) = @_;
190   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
191                      reverse each %{$self->_skip_options($from)});
192 }
193
194 sub _skip_options {
195   my ($self, $hash) = @_;
196   my $clean_hash = {};
197   $clean_hash->{$_} = $hash->{$_}
198     for grep {!/^-/} keys %$hash;
199   return $clean_hash;
200 }
201
202 sub _join_condition {
203   my ($self, $cond) = @_;
204   if (ref $cond eq 'HASH') {
205     my %j;
206     for (keys %$cond) {
207       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
208     };
209     return $self->_recurse_where(\%j);
210   } elsif (ref $cond eq 'ARRAY') {
211     return join(' OR ', map { $self->_join_condition($_) } @$cond);
212   } else {
213     die "Can't handle this yet!";
214   }
215 }
216
217 sub _quote {
218   my ($self, $label) = @_;
219   return '' unless defined $label;
220   return "*" if $label eq '*';
221   return $label unless $self->{quote_char};
222   if(ref $self->{quote_char} eq "ARRAY"){
223     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
224       if !defined $self->{name_sep};
225     my $sep = $self->{name_sep};
226     return join($self->{name_sep},
227         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
228        split(/\Q$sep\E/,$label));
229   }
230   return $self->SUPER::_quote($label);
231 }
232
233 sub limit_dialect {
234     my $self = shift;
235     $self->{limit_dialect} = shift if @_;
236     return $self->{limit_dialect};
237 }
238
239 sub quote_char {
240     my $self = shift;
241     $self->{quote_char} = shift if @_;
242     return $self->{quote_char};
243 }
244
245 sub name_sep {
246     my $self = shift;
247     $self->{name_sep} = shift if @_;
248     return $self->{name_sep};
249 }
250
251 } # End of BEGIN block
252
253 use base qw/DBIx::Class/;
254
255 __PACKAGE__->load_components(qw/AccessorGroup/);
256
257 __PACKAGE__->mk_group_accessors('simple' =>
258   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
259      debug debugobj cursor on_connect_do transaction_depth/);
260
261 =head1 NAME
262
263 DBIx::Class::Storage::DBI - DBI storage handler
264
265 =head1 SYNOPSIS
266
267 =head1 DESCRIPTION
268
269 This class represents the connection to the database
270
271 =head1 METHODS
272
273 =head2 new
274
275 =cut
276
277 sub new {
278   my $new = bless({}, ref $_[0] || $_[0]);
279   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
280   $new->transaction_depth(0);
281
282   $new->debugobj(new DBIx::Class::Storage::Statistics());
283
284   my $fh;
285
286   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
287                   || $ENV{DBIC_TRACE};
288
289   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
290     $fh = IO::File->new($1, 'w')
291       or $new->throw_exception("Cannot open trace file $1");
292   } else {
293     $fh = IO::File->new('>&STDERR');
294   }
295   $new->debugfh($fh);
296   $new->debug(1) if $debug_env;
297   $new->_sql_maker_opts({});
298   return $new;
299 }
300
301 =head2 throw_exception
302
303 Throws an exception - croaks.
304
305 =cut
306
307 sub throw_exception {
308   my ($self, $msg) = @_;
309   croak($msg);
310 }
311
312 =head2 connect_info
313
314 The arguments of C<connect_info> are always a single array reference.
315
316 This is normally accessed via L<DBIx::Class::Schema/connection>, which
317 encapsulates its argument list in an arrayref before calling
318 C<connect_info> here.
319
320 The arrayref can either contain the same set of arguments one would
321 normally pass to L<DBI/connect>, or a lone code reference which returns
322 a connected database handle.
323
324 In either case, if the final argument in your connect_info happens
325 to be a hashref, C<connect_info> will look there for several
326 connection-specific options:
327
328 =over 4
329
330 =item on_connect_do
331
332 This can be set to an arrayref of literal sql statements, which will
333 be executed immediately after making the connection to the database
334 every time we [re-]connect.
335
336 =item limit_dialect 
337
338 Sets the limit dialect. This is useful for JDBC-bridge among others
339 where the remote SQL-dialect cannot be determined by the name of the
340 driver alone.
341
342 =item quote_char
343
344 Specifies what characters to use to quote table and column names. If 
345 you use this you will want to specify L<name_sep> as well.
346
347 quote_char expects either a single character, in which case is it is placed
348 on either side of the table/column, or an arrayref of length 2 in which case the
349 table/column name is placed between the elements.
350
351 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
352 use C<quote_char =E<gt> [qw/[ ]/]>.
353
354 =item name_sep
355
356 This only needs to be used in conjunction with L<quote_char>, and is used to 
357 specify the charecter that seperates elements (schemas, tables, columns) from 
358 each other. In most cases this is simply a C<.>.
359
360 =back
361
362 These options can be mixed in with your other L<DBI> connection attributes,
363 or placed in a seperate hashref after all other normal L<DBI> connection
364 arguments.
365
366 Every time C<connect_info> is invoked, any previous settings for
367 these options will be cleared before setting the new ones, regardless of
368 whether any options are specified in the new C<connect_info>.
369
370 Examples:
371
372   # Simple SQLite connection
373   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
374
375   # Connect via subref
376   ->connect_info([ sub { DBI->connect(...) } ]);
377
378   # A bit more complicated
379   ->connect_info(
380     [
381       'dbi:Pg:dbname=foo',
382       'postgres',
383       'my_pg_password',
384       { AutoCommit => 0 },
385       { quote_char => q{"}, name_sep => q{.} },
386     ]
387   );
388
389   # Equivalent to the previous example
390   ->connect_info(
391     [
392       'dbi:Pg:dbname=foo',
393       'postgres',
394       'my_pg_password',
395       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
396     ]
397   );
398
399   # Subref + DBIC-specific connection options
400   ->connect_info(
401     [
402       sub { DBI->connect(...) },
403       {
404           quote_char => q{`},
405           name_sep => q{@},
406           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
407       },
408     ]
409   );
410
411 =head2 on_connect_do
412
413 This method is deprecated in favor of setting via L</connect_info>.
414
415 =head2 debug
416
417 Causes SQL trace information to be emitted on the C<debugobj> object.
418 (or C<STDERR> if C<debugobj> has not specifically been set).
419
420 This is the equivalent to setting L</DBIC_TRACE> in your
421 shell environment.
422
423 =head2 debugfh
424
425 Set or retrieve the filehandle used for trace/debug output.  This should be
426 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
427 set to be STDERR - although see information on the
428 L<DBIC_TRACE> environment variable.
429
430 =cut
431
432 sub debugfh {
433     my $self = shift;
434
435     if ($self->debugobj->can('debugfh')) {
436         return $self->debugobj->debugfh(@_);
437     }
438 }
439
440 =head2 debugobj
441
442 Sets or retrieves the object used for metric collection. Defaults to an instance
443 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
444 method of using a coderef as a callback.  See the aforementioned Statistics
445 class for more information.
446
447 =head2 debugcb
448
449 Sets a callback to be executed each time a statement is run; takes a sub
450 reference.  Callback is executed as $sub->($op, $info) where $op is
451 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
452
453 See L<debugobj> for a better way.
454
455 =cut
456
457 sub debugcb {
458     my $self = shift;
459
460     if ($self->debugobj->can('callback')) {
461         return $self->debugobj->callback(@_);
462     }
463 }
464
465 =head2 disconnect
466
467 Disconnect the L<DBI> handle, performing a rollback first if the
468 database is not in C<AutoCommit> mode.
469
470 =cut
471
472 sub disconnect {
473   my ($self) = @_;
474
475   if( $self->connected ) {
476     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
477     $self->_dbh->disconnect;
478     $self->_dbh(undef);
479   }
480 }
481
482 =head2 connected
483
484 Check if the L<DBI> handle is connected.  Returns true if the handle
485 is connected.
486
487 =cut
488
489 sub connected { my ($self) = @_;
490
491   if(my $dbh = $self->_dbh) {
492       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
493           return $self->_dbh(undef);
494       }
495       elsif($self->_conn_pid != $$) {
496           $self->_dbh->{InactiveDestroy} = 1;
497           return $self->_dbh(undef);
498       }
499       return ($dbh->FETCH('Active') && $dbh->ping);
500   }
501
502   return 0;
503 }
504
505 =head2 ensure_connected
506
507 Check whether the database handle is connected - if not then make a
508 connection.
509
510 =cut
511
512 sub ensure_connected {
513   my ($self) = @_;
514
515   unless ($self->connected) {
516     $self->_populate_dbh;
517   }
518 }
519
520 =head2 dbh
521
522 Returns the dbh - a data base handle of class L<DBI>.
523
524 =cut
525
526 sub dbh {
527   my ($self) = @_;
528
529   $self->ensure_connected;
530   return $self->_dbh;
531 }
532
533 sub _sql_maker_args {
534     my ($self) = @_;
535     
536     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
537 }
538
539 =head2 sql_maker
540
541 Returns a C<sql_maker> object - normally an object of class
542 C<DBIC::SQL::Abstract>.
543
544 =cut
545
546 sub sql_maker {
547   my ($self) = @_;
548   unless ($self->_sql_maker) {
549     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
550   }
551   return $self->_sql_maker;
552 }
553
554 sub connect_info {
555   my ($self, $info_arg) = @_;
556
557   if($info_arg) {
558     # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
559     #  the new set of options
560     $self->_sql_maker(undef);
561     $self->_sql_maker_opts({});
562
563     my $info = [ @$info_arg ]; # copy because we can alter it
564     my $last_info = $info->[-1];
565     if(ref $last_info eq 'HASH') {
566       if(my $on_connect_do = delete $last_info->{on_connect_do}) {
567         $self->on_connect_do($on_connect_do);
568       }
569       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
570         if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
571           $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
572         }
573       }
574
575       # Get rid of any trailing empty hashref
576       pop(@$info) if !keys %$last_info;
577     }
578
579     $self->_connect_info($info);
580   }
581
582   $self->_connect_info;
583 }
584
585 sub _populate_dbh {
586   my ($self) = @_;
587   my @info = @{$self->_connect_info || []};
588   $self->_dbh($self->_connect(@info));
589
590   if(ref $self eq 'DBIx::Class::Storage::DBI') {
591     my $driver = $self->_dbh->{Driver}->{Name};
592     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
593       bless $self, "DBIx::Class::Storage::DBI::${driver}";
594       $self->_rebless() if $self->can('_rebless');
595     }
596   }
597
598   # if on-connect sql statements are given execute them
599   foreach my $sql_statement (@{$self->on_connect_do || []}) {
600     $self->debugobj->query_start($sql_statement) if $self->debug();
601     $self->_dbh->do($sql_statement);
602     $self->debugobj->query_end($sql_statement) if $self->debug();
603   }
604
605   $self->_conn_pid($$);
606   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
607 }
608
609 sub _connect {
610   my ($self, @info) = @_;
611
612   $self->throw_exception("You failed to provide any connection info")
613       if !@info;
614
615   my ($old_connect_via, $dbh);
616
617   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
618       $old_connect_via = $DBI::connect_via;
619       $DBI::connect_via = 'connect';
620   }
621
622   eval {
623     $dbh = ref $info[0] eq 'CODE'
624          ? &{$info[0]}
625          : DBI->connect(@info);
626   };
627
628   $DBI::connect_via = $old_connect_via if $old_connect_via;
629
630   if (!$dbh || $@) {
631     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
632   }
633
634   $dbh;
635 }
636
637 =head2 txn_begin
638
639 Calls begin_work on the current dbh.
640
641 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
642 an entire code block to be executed transactionally.
643
644 =cut
645
646 sub txn_begin {
647   my $self = shift;
648   if ($self->{transaction_depth}++ == 0) {
649     my $dbh = $self->dbh;
650     if ($dbh->{AutoCommit}) {
651       $self->debugobj->txn_begin()
652         if ($self->debug);
653       $dbh->begin_work;
654     }
655   }
656 }
657
658 =head2 txn_commit
659
660 Issues a commit against the current dbh.
661
662 =cut
663
664 sub txn_commit {
665   my $self = shift;
666   my $dbh = $self->dbh;
667   if ($self->{transaction_depth} == 0) {
668     unless ($dbh->{AutoCommit}) {
669       $self->debugobj->txn_commit()
670         if ($self->debug);
671       $dbh->commit;
672     }
673   }
674   else {
675     if (--$self->{transaction_depth} == 0) {
676       $self->debugobj->txn_commit()
677         if ($self->debug);
678       $dbh->commit;
679     }
680   }
681 }
682
683 =head2 txn_rollback
684
685 Issues a rollback against the current dbh. A nested rollback will
686 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
687 which allows the rollback to propagate to the outermost transaction.
688
689 =cut
690
691 sub txn_rollback {
692   my $self = shift;
693
694   eval {
695     my $dbh = $self->dbh;
696     if ($self->{transaction_depth} == 0) {
697       unless ($dbh->{AutoCommit}) {
698         $self->debugobj->txn_rollback()
699           if ($self->debug);
700         $dbh->rollback;
701       }
702     }
703     else {
704       if (--$self->{transaction_depth} == 0) {
705         $self->debugobj->txn_rollback()
706           if ($self->debug);
707         $dbh->rollback;
708       }
709       else {
710         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
711       }
712     }
713   };
714
715   if ($@) {
716     my $error = $@;
717     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
718     $error =~ /$exception_class/ and $self->throw_exception($error);
719     $self->{transaction_depth} = 0;          # ensure that a failed rollback
720     $self->throw_exception($error);          # resets the transaction depth
721   }
722 }
723
724 sub _execute {
725   my ($self, $op, $extra_bind, $ident, @args) = @_;
726   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
727   unshift(@bind, @$extra_bind) if $extra_bind;
728   if ($self->debug) {
729       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
730       $self->debugobj->query_start($sql, @debug_bind);
731   }
732   my $sth = eval { $self->sth($sql,$op) };
733
734   if (!$sth || $@) {
735     $self->throw_exception(
736       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
737     );
738   }
739   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
740   my $rv;
741   if ($sth) {
742     my $time = time();
743     $rv = eval { $sth->execute(@bind) };
744
745     if ($@ || !$rv) {
746       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
747     }
748   } else {
749     $self->throw_exception("'$sql' did not generate a statement.");
750   }
751   if ($self->debug) {
752       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
753       $self->debugobj->query_end($sql, @debug_bind);
754   }
755   return (wantarray ? ($rv, $sth, @bind) : $rv);
756 }
757
758 sub insert {
759   my ($self, $ident, $to_insert) = @_;
760   $self->throw_exception(
761     "Couldn't insert ".join(', ',
762       map "$_ => $to_insert->{$_}", keys %$to_insert
763     )." into ${ident}"
764   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
765   return $to_insert;
766 }
767
768 sub update {
769   return shift->_execute('update' => [], @_);
770 }
771
772 sub delete {
773   return shift->_execute('delete' => [], @_);
774 }
775
776 sub _select {
777   my ($self, $ident, $select, $condition, $attrs) = @_;
778   my $order = $attrs->{order_by};
779   if (ref $condition eq 'SCALAR') {
780     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
781   }
782   if (exists $attrs->{group_by} || $attrs->{having}) {
783     $order = {
784       group_by => $attrs->{group_by},
785       having => $attrs->{having},
786       ($order ? (order_by => $order) : ())
787     };
788   }
789   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
790   if ($attrs->{software_limit} ||
791       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
792         $attrs->{software_limit} = 1;
793   } else {
794     $self->throw_exception("rows attribute must be positive if present")
795       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
796     push @args, $attrs->{rows}, $attrs->{offset};
797   }
798   return $self->_execute(@args);
799 }
800
801 =head2 select
802
803 Handle a SQL select statement.
804
805 =cut
806
807 sub select {
808   my $self = shift;
809   my ($ident, $select, $condition, $attrs) = @_;
810   return $self->cursor->new($self, \@_, $attrs);
811 }
812
813 =head2 select_single
814
815 Performs a select, fetch and return of data - handles a single row
816 only.
817
818 =cut
819
820 # Need to call finish() to work round broken DBDs
821
822 sub select_single {
823   my $self = shift;
824   my ($rv, $sth, @bind) = $self->_select(@_);
825   my @row = $sth->fetchrow_array;
826   $sth->finish();
827   return @row;
828 }
829
830 =head2 sth
831
832 Returns a L<DBI> sth (statement handle) for the supplied SQL.
833
834 =cut
835
836 sub sth {
837   my ($self, $sql) = @_;
838   # 3 is the if_active parameter which avoids active sth re-use
839   return $self->dbh->prepare_cached($sql, {}, 3);
840 }
841
842 =head2 columns_info_for
843
844 Returns database type info for a given table columns.
845
846 =cut
847
848 sub columns_info_for {
849   my ($self, $table) = @_;
850
851   my $dbh = $self->dbh;
852
853   if ($dbh->can('column_info')) {
854     my %result;
855     my $old_raise_err = $dbh->{RaiseError};
856     my $old_print_err = $dbh->{PrintError};
857     $dbh->{RaiseError} = 1;
858     $dbh->{PrintError} = 0;
859     eval {
860       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
861       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
862       $sth->execute();
863
864       # Some error occured or there is no information:
865       if($sth->rows <1) {
866           die "column_info returned no rows for $schema, $tab";
867       }
868
869       while ( my $info = $sth->fetchrow_hashref() ){
870         my %column_info;
871         $column_info{data_type}   = $info->{TYPE_NAME};
872         $column_info{size}      = $info->{COLUMN_SIZE};
873         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
874         $column_info{default_value} = $info->{COLUMN_DEF};
875         my $col_name = $info->{COLUMN_NAME};
876         $col_name =~ s/^\"(.*)\"$/$1/;
877
878         $result{$col_name} = \%column_info;
879       }
880     };
881     $dbh->{RaiseError} = $old_raise_err;
882     $dbh->{PrintError} = $old_print_err;
883     return \%result if !$@;
884   }
885
886   my %result;
887   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
888   $sth->execute;
889   my @columns = @{$sth->{NAME_lc}};
890   for my $i ( 0 .. $#columns ){
891     my %column_info;
892     my $type_num = $sth->{TYPE}->[$i];
893     my $type_name;
894     if(defined $type_num && $dbh->can('type_info')) {
895       my $type_info = $dbh->type_info($type_num);
896       $type_name = $type_info->{TYPE_NAME} if $type_info;
897     }
898     $column_info{data_type} = $type_name ? $type_name : $type_num;
899     $column_info{size} = $sth->{PRECISION}->[$i];
900     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
901
902     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
903       $column_info{data_type} = $1;
904       $column_info{size}    = $2;
905     }
906
907     $result{$columns[$i]} = \%column_info;
908   }
909
910   return \%result;
911 }
912
913 =head2 last_insert_id
914
915 Return the row id of the last insert.
916
917 =cut
918
919 sub last_insert_id {
920   my ($self, $row) = @_;
921     
922   return $self->dbh->func('last_insert_rowid');
923
924 }
925
926 =head2 sqlt_type
927
928 Returns the database driver name.
929
930 =cut
931
932 sub sqlt_type { shift->dbh->{Driver}->{Name} }
933
934 =head2 create_ddl_dir (EXPERIMENTAL)
935
936 =over 4
937
938 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
939
940 =back
941
942 Creates an SQL file based on the Schema, for each of the specified
943 database types, in the given directory.
944
945 Note that this feature is currently EXPERIMENTAL and may not work correctly
946 across all databases, or fully handle complex relationships.
947
948 =cut
949
950 sub create_ddl_dir
951 {
952   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
953
954   if(!$dir || !-d $dir)
955   {
956     warn "No directory given, using ./\n";
957     $dir = "./";
958   }
959   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
960   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
961   $version ||= $schema->VERSION || '1.x';
962   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
963
964   eval "use SQL::Translator";
965   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
966
967   my $sqlt = SQL::Translator->new($sqltargs);
968   foreach my $db (@$databases)
969   {
970     $sqlt->reset();
971     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
972 #    $sqlt->parser_args({'DBIx::Class' => $schema);
973     $sqlt->data($schema);
974     $sqlt->producer($db);
975
976     my $file;
977     my $filename = $schema->ddl_filename($db, $dir, $version);
978     if(-e $filename)
979     {
980       $self->throw_exception("$filename already exists, skipping $db");
981       next;
982     }
983     open($file, ">$filename") 
984       or $self->throw_exception("Can't open $filename for writing ($!)");
985     my $output = $sqlt->translate;
986 #use Data::Dumper;
987 #    print join(":", keys %{$schema->source_registrations});
988 #    print Dumper($sqlt->schema);
989     if(!$output)
990     {
991       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
992       next;
993     }
994     print $file $output;
995     close($file);
996   }
997
998 }
999
1000 =head2 deployment_statements
1001
1002 Create the statements for L</deploy> and
1003 L<DBIx::Class::Schema/deploy>.
1004
1005 =cut
1006
1007 sub deployment_statements {
1008   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1009   # Need to be connected to get the correct sqlt_type
1010   $self->ensure_connected() unless $type;
1011   $type ||= $self->sqlt_type;
1012   $version ||= $schema->VERSION || '1.x';
1013   $dir ||= './';
1014   eval "use SQL::Translator";
1015   if(!$@)
1016   {
1017     eval "use SQL::Translator::Parser::DBIx::Class;";
1018     $self->throw_exception($@) if $@;
1019     eval "use SQL::Translator::Producer::${type};";
1020     $self->throw_exception($@) if $@;
1021     my $tr = SQL::Translator->new(%$sqltargs);
1022     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1023     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1024   }
1025
1026   my $filename = $schema->ddl_filename($type, $dir, $version);
1027   if(!-f $filename)
1028   {
1029 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1030       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1031       return;
1032   }
1033   my $file;
1034   open($file, "<$filename") 
1035       or $self->throw_exception("Can't open $filename ($!)");
1036   my @rows = <$file>;
1037   close($file);
1038
1039   return join('', @rows);
1040   
1041 }
1042
1043 =head2 deploy
1044
1045 Sends the appropriate statements to create or modify tables to the
1046 db. This would normally be called through
1047 L<DBIx::Class::Schema/deploy>.
1048
1049 =cut
1050
1051 sub deploy {
1052   my ($self, $schema, $type, $sqltargs) = @_;
1053   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1054     for ( split(";\n", $statement)) {
1055       next if($_ =~ /^--/);
1056       next if(!$_);
1057 #      next if($_ =~ /^DROP/m);
1058       next if($_ =~ /^BEGIN TRANSACTION/m);
1059       next if($_ =~ /^COMMIT/m);
1060       next if $_ =~ /^\s+$/; # skip whitespace only
1061       $self->debugobj->query_start($_) if $self->debug;
1062       $self->dbh->do($_) or warn "SQL was:\n $_";
1063       $self->debugobj->query_end($_) if $self->debug;
1064     }
1065   }
1066 }
1067
1068 =head2 datetime_parser
1069
1070 Returns the datetime parser class
1071
1072 =cut
1073
1074 sub datetime_parser {
1075   my $self = shift;
1076   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1077 }
1078
1079 =head2 datetime_parser_type
1080
1081 Defines (returns) the datetime parser class - currently hardwired to
1082 L<DateTime::Format::MySQL>
1083
1084 =cut
1085
1086 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1087
1088 =head2 build_datetime_parser
1089
1090 See L</datetime_parser>
1091
1092 =cut
1093
1094 sub build_datetime_parser {
1095   my $self = shift;
1096   my $type = $self->datetime_parser_type(@_);
1097   eval "use ${type}";
1098   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1099   return $type;
1100 }
1101
1102 sub DESTROY { shift->disconnect }
1103
1104 1;
1105
1106 =head1 SQL METHODS
1107
1108 The module defines a set of methods within the DBIC::SQL::Abstract
1109 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1110 SQL query functions.
1111
1112 The following methods are extended:-
1113
1114 =over 4
1115
1116 =item delete
1117
1118 =item insert
1119
1120 =item select
1121
1122 =item update
1123
1124 =item limit_dialect
1125
1126 See L</connect_info> for details.
1127 For setting, this method is deprecated in favor of L</connect_info>.
1128
1129 =item quote_char
1130
1131 See L</connect_info> for details.
1132 For setting, this method is deprecated in favor of L</connect_info>.
1133
1134 =item name_sep
1135
1136 See L</connect_info> for details.
1137 For setting, this method is deprecated in favor of L</connect_info>.
1138
1139 =back
1140
1141 =head1 ENVIRONMENT VARIABLES
1142
1143 =head2 DBIC_TRACE
1144
1145 If C<DBIC_TRACE> is set then SQL trace information
1146 is produced (as when the L<debug> method is set).
1147
1148 If the value is of the form C<1=/path/name> then the trace output is
1149 written to the file C</path/name>.
1150
1151 This environment variable is checked when the storage object is first
1152 created (when you call connect on your schema).  So, run-time changes 
1153 to this environment variable will not take effect unless you also 
1154 re-connect on your schema.
1155
1156 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1157
1158 Old name for DBIC_TRACE
1159
1160 =head1 AUTHORS
1161
1162 Matt S. Trout <mst@shadowcatsystems.co.uk>
1163
1164 Andy Grundman <andy@hybridized.org>
1165
1166 =head1 LICENSE
1167
1168 You may distribute this code under the same terms as Perl itself.
1169
1170 =cut
1171