Prevent S::A::L caching of $dbh
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 # While we're at it, this should make LIMIT queries more efficient,
33 #  without digging into things too deeply
34 sub _find_syntax {
35   my ($self, $syntax) = @_;
36   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
37 }
38
39 sub select {
40   my ($self, $table, $fields, $where, $order, @rest) = @_;
41   $table = $self->_quote($table) unless ref($table);
42   @rest = (-1) unless defined $rest[0];
43   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
44     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
45   local $self->{having_bind} = [];
46   my ($sql, @ret) = $self->SUPER::select(
47     $table, $self->_recurse_fields($fields), $where, $order, @rest
48   );
49   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
50 }
51
52 sub insert {
53   my $self = shift;
54   my $table = shift;
55   $table = $self->_quote($table) unless ref($table);
56   $self->SUPER::insert($table, @_);
57 }
58
59 sub update {
60   my $self = shift;
61   my $table = shift;
62   $table = $self->_quote($table) unless ref($table);
63   $self->SUPER::update($table, @_);
64 }
65
66 sub delete {
67   my $self = shift;
68   my $table = shift;
69   $table = $self->_quote($table) unless ref($table);
70   $self->SUPER::delete($table, @_);
71 }
72
73 sub _emulate_limit {
74   my $self = shift;
75   if ($_[3] == -1) {
76     return $_[1].$self->_order_by($_[2]);
77   } else {
78     return $self->SUPER::_emulate_limit(@_);
79   }
80 }
81
82 sub _recurse_fields {
83   my ($self, $fields) = @_;
84   my $ref = ref $fields;
85   return $self->_quote($fields) unless $ref;
86   return $$fields if $ref eq 'SCALAR';
87
88   if ($ref eq 'ARRAY') {
89     return join(', ', map { $self->_recurse_fields($_) } @$fields);
90   } elsif ($ref eq 'HASH') {
91     foreach my $func (keys %$fields) {
92       return $self->_sqlcase($func)
93         .'( '.$self->_recurse_fields($fields->{$func}).' )';
94     }
95   }
96 }
97
98 sub _order_by {
99   my $self = shift;
100   my $ret = '';
101   my @extra;
102   if (ref $_[0] eq 'HASH') {
103     if (defined $_[0]->{group_by}) {
104       $ret = $self->_sqlcase(' group by ')
105                .$self->_recurse_fields($_[0]->{group_by});
106     }
107     if (defined $_[0]->{having}) {
108       my $frag;
109       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
110       push(@{$self->{having_bind}}, @extra);
111       $ret .= $self->_sqlcase(' having ').$frag;
112     }
113     if (defined $_[0]->{order_by}) {
114       $ret .= $self->SUPER::_order_by($_[0]->{order_by});
115     }
116   } elsif(ref $_[0] eq 'SCALAR') {
117     $ret = $self->_sqlcase(' order by ').${ $_[0] };
118   } else {
119     $ret = $self->SUPER::_order_by(@_);
120   }
121   return $ret;
122 }
123
124 sub _order_directions {
125   my ($self, $order) = @_;
126   $order = $order->{order_by} if ref $order eq 'HASH';
127   return $self->SUPER::_order_directions($order);
128 }
129
130 sub _table {
131   my ($self, $from) = @_;
132   if (ref $from eq 'ARRAY') {
133     return $self->_recurse_from(@$from);
134   } elsif (ref $from eq 'HASH') {
135     return $self->_make_as($from);
136   } else {
137     return $from; # would love to quote here but _table ends up getting called
138                   # twice during an ->select without a limit clause due to
139                   # the way S::A::Limit->select works. should maybe consider
140                   # bypassing this and doing S::A::select($self, ...) in
141                   # our select method above. meantime, quoting shims have
142                   # been added to select/insert/update/delete here
143   }
144 }
145
146 sub _recurse_from {
147   my ($self, $from, @join) = @_;
148   my @sqlf;
149   push(@sqlf, $self->_make_as($from));
150   foreach my $j (@join) {
151     my ($to, $on) = @$j;
152
153     # check whether a join type exists
154     my $join_clause = '';
155     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
156     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
157       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
158     } else {
159       $join_clause = ' JOIN ';
160     }
161     push(@sqlf, $join_clause);
162
163     if (ref $to eq 'ARRAY') {
164       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
165     } else {
166       push(@sqlf, $self->_make_as($to));
167     }
168     push(@sqlf, ' ON ', $self->_join_condition($on));
169   }
170   return join('', @sqlf);
171 }
172
173 sub _make_as {
174   my ($self, $from) = @_;
175   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
176                      reverse each %{$self->_skip_options($from)});
177 }
178
179 sub _skip_options {
180   my ($self, $hash) = @_;
181   my $clean_hash = {};
182   $clean_hash->{$_} = $hash->{$_}
183     for grep {!/^-/} keys %$hash;
184   return $clean_hash;
185 }
186
187 sub _join_condition {
188   my ($self, $cond) = @_;
189   if (ref $cond eq 'HASH') {
190     my %j;
191     for (keys %$cond) {
192       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
193     };
194     return $self->_recurse_where(\%j);
195   } elsif (ref $cond eq 'ARRAY') {
196     return join(' OR ', map { $self->_join_condition($_) } @$cond);
197   } else {
198     die "Can't handle this yet!";
199   }
200 }
201
202 sub _quote {
203   my ($self, $label) = @_;
204   return '' unless defined $label;
205   return "*" if $label eq '*';
206   return $label unless $self->{quote_char};
207   if(ref $self->{quote_char} eq "ARRAY"){
208     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
209       if !defined $self->{name_sep};
210     my $sep = $self->{name_sep};
211     return join($self->{name_sep},
212         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
213        split(/\Q$sep\E/,$label));
214   }
215   return $self->SUPER::_quote($label);
216 }
217
218 sub _RowNum {
219    my $self = shift;
220    my $c;
221    $_[0] =~ s/SELECT (.*?) FROM/
222      'SELECT '.join(', ', map { $_.' AS col'.++$c } split(', ', $1)).' FROM'/e;
223    $self->SUPER::_RowNum(@_);
224 }
225
226 sub limit_dialect {
227     my $self = shift;
228     $self->{limit_dialect} = shift if @_;
229     return $self->{limit_dialect};
230 }
231
232 sub quote_char {
233     my $self = shift;
234     $self->{quote_char} = shift if @_;
235     return $self->{quote_char};
236 }
237
238 sub name_sep {
239     my $self = shift;
240     $self->{name_sep} = shift if @_;
241     return $self->{name_sep};
242 }
243
244 } # End of BEGIN block
245
246 use base qw/DBIx::Class/;
247
248 __PACKAGE__->load_components(qw/AccessorGroup/);
249
250 __PACKAGE__->mk_group_accessors('simple' =>
251   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
252      debug debugobj cursor on_connect_do transaction_depth/);
253
254 =head1 NAME
255
256 DBIx::Class::Storage::DBI - DBI storage handler
257
258 =head1 SYNOPSIS
259
260 =head1 DESCRIPTION
261
262 This class represents the connection to the database
263
264 =head1 METHODS
265
266 =head2 new
267
268 =cut
269
270 sub new {
271   my $new = bless({}, ref $_[0] || $_[0]);
272   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
273   $new->transaction_depth(0);
274
275   $new->debugobj(new DBIx::Class::Storage::Statistics());
276
277   my $fh;
278
279   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
280                   || $ENV{DBIC_TRACE};
281
282   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
283     $fh = IO::File->new($1, 'w')
284       or $new->throw_exception("Cannot open trace file $1");
285   } else {
286     $fh = IO::File->new('>&STDERR');
287   }
288   $new->debugfh($fh);
289   $new->debug(1) if $debug_env;
290   $new->_sql_maker_opts({});
291   return $new;
292 }
293
294 =head2 throw_exception
295
296 Throws an exception - croaks.
297
298 =cut
299
300 sub throw_exception {
301   my ($self, $msg) = @_;
302   croak($msg);
303 }
304
305 =head2 connect_info
306
307 The arguments of C<connect_info> are always a single array reference.
308
309 This is normally accessed via L<DBIx::Class::Schema/connection>, which
310 encapsulates its argument list in an arrayref before calling
311 C<connect_info> here.
312
313 The arrayref can either contain the same set of arguments one would
314 normally pass to L<DBI/connect>, or a lone code reference which returns
315 a connected database handle.
316
317 In either case, if the final argument in your connect_info happens
318 to be a hashref, C<connect_info> will look there for several
319 connection-specific options:
320
321 =over 4
322
323 =item on_connect_do
324
325 This can be set to an arrayref of literal sql statements, which will
326 be executed immediately after making the connection to the database
327 every time we [re-]connect.
328
329 =item limit_dialect 
330
331 Sets the limit dialect. This is useful for JDBC-bridge among others
332 where the remote SQL-dialect cannot be determined by the name of the
333 driver alone.
334
335 =item quote_char
336
337 Specifies what characters to use to quote table and column names. If 
338 you use this you will want to specify L<name_sep> as well.
339
340 quote_char expects either a single character, in which case is it is placed
341 on either side of the table/column, or an arrayref of length 2 in which case the
342 table/column name is placed between the elements.
343
344 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
345 use C<quote_char =E<gt> [qw/[ ]/]>.
346
347 =item name_sep
348
349 This only needs to be used in conjunction with L<quote_char>, and is used to 
350 specify the charecter that seperates elements (schemas, tables, columns) from 
351 each other. In most cases this is simply a C<.>.
352
353 =back
354
355 These options can be mixed in with your other L<DBI> connection attributes,
356 or placed in a seperate hashref after all other normal L<DBI> connection
357 arguments.
358
359 Every time C<connect_info> is invoked, any previous settings for
360 these options will be cleared before setting the new ones, regardless of
361 whether any options are specified in the new C<connect_info>.
362
363 Examples:
364
365   # Simple SQLite connection
366   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
367
368   # Connect via subref
369   ->connect_info([ sub { DBI->connect(...) } ]);
370
371   # A bit more complicated
372   ->connect_info(
373     [
374       'dbi:Pg:dbname=foo',
375       'postgres',
376       'my_pg_password',
377       { AutoCommit => 0 },
378       { quote_char => q{"}, name_sep => q{.} },
379     ]
380   );
381
382   # Equivalent to the previous example
383   ->connect_info(
384     [
385       'dbi:Pg:dbname=foo',
386       'postgres',
387       'my_pg_password',
388       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
389     ]
390   );
391
392   # Subref + DBIC-specific connection options
393   ->connect_info(
394     [
395       sub { DBI->connect(...) },
396       {
397           quote_char => q{`},
398           name_sep => q{@},
399           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
400       },
401     ]
402   );
403
404 =head2 on_connect_do
405
406 This method is deprecated in favor of setting via L</connect_info>.
407
408 =head2 debug
409
410 Causes SQL trace information to be emitted on the C<debugobj> object.
411 (or C<STDERR> if C<debugobj> has not specifically been set).
412
413 =head2 debugfh
414
415 Set or retrieve the filehandle used for trace/debug output.  This should be
416 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
417 set to be STDERR - although see information on the
418 L<DBIC_TRACE> environment variable.
419
420 =cut
421
422 sub debugfh {
423     my $self = shift;
424
425     if ($self->debugobj->can('debugfh')) {
426         return $self->debugobj->debugfh(@_);
427     }
428 }
429
430 =head2 debugobj
431
432 Sets or retrieves the object used for metric collection. Defaults to an instance
433 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
434 method of using a coderef as a callback.  See the aforementioned Statistics
435 class for more information.
436
437 =head2 debugcb
438
439 Sets a callback to be executed each time a statement is run; takes a sub
440 reference.  Callback is executed as $sub->($op, $info) where $op is
441 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
442
443 See L<debugobj> for a better way.
444
445 =cut
446
447 sub debugcb {
448     my $self = shift;
449
450     if ($self->debugobj->can('callback')) {
451         return $self->debugobj->callback(@_);
452     }
453 }
454
455 =head2 disconnect
456
457 Disconnect the L<DBI> handle, performing a rollback first if the
458 database is not in C<AutoCommit> mode.
459
460 =cut
461
462 sub disconnect {
463   my ($self) = @_;
464
465   if( $self->connected ) {
466     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
467     $self->_dbh->disconnect;
468     $self->_dbh(undef);
469   }
470 }
471
472 =head2 connected
473
474 Check if the L<DBI> handle is connected.  Returns true if the handle
475 is connected.
476
477 =cut
478
479 sub connected { my ($self) = @_;
480
481   if(my $dbh = $self->_dbh) {
482       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
483           return $self->_dbh(undef);
484       }
485       elsif($self->_conn_pid != $$) {
486           $self->_dbh->{InactiveDestroy} = 1;
487           return $self->_dbh(undef);
488       }
489       return ($dbh->FETCH('Active') && $dbh->ping);
490   }
491
492   return 0;
493 }
494
495 =head2 ensure_connected
496
497 Check whether the database handle is connected - if not then make a
498 connection.
499
500 =cut
501
502 sub ensure_connected {
503   my ($self) = @_;
504
505   unless ($self->connected) {
506     $self->_populate_dbh;
507   }
508 }
509
510 =head2 dbh
511
512 Returns the dbh - a data base handle of class L<DBI>.
513
514 =cut
515
516 sub dbh {
517   my ($self) = @_;
518
519   $self->ensure_connected;
520   return $self->_dbh;
521 }
522
523 sub _sql_maker_args {
524     my ($self) = @_;
525     
526     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
527 }
528
529 =head2 sql_maker
530
531 Returns a C<sql_maker> object - normally an object of class
532 C<DBIC::SQL::Abstract>.
533
534 =cut
535
536 sub sql_maker {
537   my ($self) = @_;
538   unless ($self->_sql_maker) {
539     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
540   }
541   return $self->_sql_maker;
542 }
543
544 sub connect_info {
545   my ($self, $info_arg) = @_;
546
547   if($info_arg) {
548     # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
549     #  the new set of options
550     $self->_sql_maker(undef);
551     $self->_sql_maker_opts({});
552
553     my $info = [ @$info_arg ]; # copy because we can alter it
554     my $last_info = $info->[-1];
555     if(ref $last_info eq 'HASH') {
556       if(my $on_connect_do = delete $last_info->{on_connect_do}) {
557         $self->on_connect_do($on_connect_do);
558       }
559       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
560         if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
561           $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
562         }
563       }
564
565       # Get rid of any trailing empty hashref
566       pop(@$info) if !keys %$last_info;
567     }
568
569     $self->_connect_info($info);
570   }
571
572   $self->_connect_info;
573 }
574
575 sub _populate_dbh {
576   my ($self) = @_;
577   my @info = @{$self->_connect_info || []};
578   $self->_dbh($self->_connect(@info));
579
580   if(ref $self eq 'DBIx::Class::Storage::DBI') {
581     my $driver = $self->_dbh->{Driver}->{Name};
582     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
583       bless $self, "DBIx::Class::Storage::DBI::${driver}";
584       $self->_rebless() if $self->can('_rebless');
585     }
586   }
587
588   # if on-connect sql statements are given execute them
589   foreach my $sql_statement (@{$self->on_connect_do || []}) {
590     $self->debugobj->query_start($sql_statement) if $self->debug();
591     $self->_dbh->do($sql_statement);
592     $self->debugobj->query_end($sql_statement) if $self->debug();
593   }
594
595   $self->_conn_pid($$);
596   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
597 }
598
599 sub _connect {
600   my ($self, @info) = @_;
601
602   $self->throw_exception("You failed to provide any connection info")
603       if !@info;
604
605   my ($old_connect_via, $dbh);
606
607   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
608       $old_connect_via = $DBI::connect_via;
609       $DBI::connect_via = 'connect';
610   }
611
612   eval {
613     $dbh = ref $info[0] eq 'CODE'
614          ? &{$info[0]}
615          : DBI->connect(@info);
616   };
617
618   $DBI::connect_via = $old_connect_via if $old_connect_via;
619
620   if (!$dbh || $@) {
621     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
622   }
623
624   $dbh;
625 }
626
627 =head2 txn_begin
628
629 Calls begin_work on the current dbh.
630
631 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
632 an entire code block to be executed transactionally.
633
634 =cut
635
636 sub txn_begin {
637   my $self = shift;
638   if ($self->{transaction_depth}++ == 0) {
639     my $dbh = $self->dbh;
640     if ($dbh->{AutoCommit}) {
641       $self->debugobj->txn_begin()
642         if ($self->debug);
643       $dbh->begin_work;
644     }
645   }
646 }
647
648 =head2 txn_commit
649
650 Issues a commit against the current dbh.
651
652 =cut
653
654 sub txn_commit {
655   my $self = shift;
656   my $dbh = $self->dbh;
657   if ($self->{transaction_depth} == 0) {
658     unless ($dbh->{AutoCommit}) {
659       $self->debugobj->txn_commit()
660         if ($self->debug);
661       $dbh->commit;
662     }
663   }
664   else {
665     if (--$self->{transaction_depth} == 0) {
666       $self->debugobj->txn_commit()
667         if ($self->debug);
668       $dbh->commit;
669     }
670   }
671 }
672
673 =head2 txn_rollback
674
675 Issues a rollback against the current dbh. A nested rollback will
676 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
677 which allows the rollback to propagate to the outermost transaction.
678
679 =cut
680
681 sub txn_rollback {
682   my $self = shift;
683
684   eval {
685     my $dbh = $self->dbh;
686     if ($self->{transaction_depth} == 0) {
687       unless ($dbh->{AutoCommit}) {
688         $self->debugobj->txn_rollback()
689           if ($self->debug);
690         $dbh->rollback;
691       }
692     }
693     else {
694       if (--$self->{transaction_depth} == 0) {
695         $self->debugobj->txn_rollback()
696           if ($self->debug);
697         $dbh->rollback;
698       }
699       else {
700         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
701       }
702     }
703   };
704
705   if ($@) {
706     my $error = $@;
707     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
708     $error =~ /$exception_class/ and $self->throw_exception($error);
709     $self->{transaction_depth} = 0;          # ensure that a failed rollback
710     $self->throw_exception($error);          # resets the transaction depth
711   }
712 }
713
714 sub _execute {
715   my ($self, $op, $extra_bind, $ident, @args) = @_;
716   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
717   unshift(@bind, @$extra_bind) if $extra_bind;
718   if ($self->debug) {
719       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
720       $self->debugobj->query_start($sql, @debug_bind);
721   }
722   my $sth = eval { $self->sth($sql,$op) };
723
724   if (!$sth || $@) {
725     $self->throw_exception(
726       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
727     );
728   }
729   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
730   my $rv;
731   if ($sth) {
732     my $time = time();
733     $rv = eval { $sth->execute(@bind) };
734
735     if ($@ || !$rv) {
736       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
737     }
738   } else {
739     $self->throw_exception("'$sql' did not generate a statement.");
740   }
741   if ($self->debug) {
742       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
743       $self->debugobj->query_end($sql, @debug_bind);
744   }
745   return (wantarray ? ($rv, $sth, @bind) : $rv);
746 }
747
748 sub insert {
749   my ($self, $ident, $to_insert) = @_;
750   $self->throw_exception(
751     "Couldn't insert ".join(', ',
752       map "$_ => $to_insert->{$_}", keys %$to_insert
753     )." into ${ident}"
754   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
755   return $to_insert;
756 }
757
758 sub update {
759   return shift->_execute('update' => [], @_);
760 }
761
762 sub delete {
763   return shift->_execute('delete' => [], @_);
764 }
765
766 sub _select {
767   my ($self, $ident, $select, $condition, $attrs) = @_;
768   my $order = $attrs->{order_by};
769   if (ref $condition eq 'SCALAR') {
770     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
771   }
772   if (exists $attrs->{group_by} || $attrs->{having}) {
773     $order = {
774       group_by => $attrs->{group_by},
775       having => $attrs->{having},
776       ($order ? (order_by => $order) : ())
777     };
778   }
779   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
780   if ($attrs->{software_limit} ||
781       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
782         $attrs->{software_limit} = 1;
783   } else {
784     $self->throw_exception("rows attribute must be positive if present")
785       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
786     push @args, $attrs->{rows}, $attrs->{offset};
787   }
788   return $self->_execute(@args);
789 }
790
791 =head2 select
792
793 Handle a SQL select statement.
794
795 =cut
796
797 sub select {
798   my $self = shift;
799   my ($ident, $select, $condition, $attrs) = @_;
800   return $self->cursor->new($self, \@_, $attrs);
801 }
802
803 =head2 select_single
804
805 Performs a select, fetch and return of data - handles a single row
806 only.
807
808 =cut
809
810 # Need to call finish() to work round broken DBDs
811
812 sub select_single {
813   my $self = shift;
814   my ($rv, $sth, @bind) = $self->_select(@_);
815   my @row = $sth->fetchrow_array;
816   $sth->finish();
817   return @row;
818 }
819
820 =head2 sth
821
822 Returns a L<DBI> sth (statement handle) for the supplied SQL.
823
824 =cut
825
826 sub sth {
827   my ($self, $sql) = @_;
828   # 3 is the if_active parameter which avoids active sth re-use
829   return $self->dbh->prepare_cached($sql, {}, 3);
830 }
831
832 =head2 columns_info_for
833
834 Returns database type info for a given table columns.
835
836 =cut
837
838 sub columns_info_for {
839   my ($self, $table) = @_;
840
841   my $dbh = $self->dbh;
842
843   if ($dbh->can('column_info')) {
844     my %result;
845     my $old_raise_err = $dbh->{RaiseError};
846     my $old_print_err = $dbh->{PrintError};
847     $dbh->{RaiseError} = 1;
848     $dbh->{PrintError} = 0;
849     eval {
850       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
851       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
852       $sth->execute();
853       while ( my $info = $sth->fetchrow_hashref() ){
854         my %column_info;
855         $column_info{data_type}   = $info->{TYPE_NAME};
856         $column_info{size}      = $info->{COLUMN_SIZE};
857         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
858         $column_info{default_value} = $info->{COLUMN_DEF};
859         my $col_name = $info->{COLUMN_NAME};
860         $col_name =~ s/^\"(.*)\"$/$1/;
861
862         $result{$col_name} = \%column_info;
863       }
864     };
865     $dbh->{RaiseError} = $old_raise_err;
866     $dbh->{PrintError} = $old_print_err;
867     return \%result if !$@;
868   }
869
870   my %result;
871   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
872   $sth->execute;
873   my @columns = @{$sth->{NAME_lc}};
874   for my $i ( 0 .. $#columns ){
875     my %column_info;
876     my $type_num = $sth->{TYPE}->[$i];
877     my $type_name;
878     if(defined $type_num && $dbh->can('type_info')) {
879       my $type_info = $dbh->type_info($type_num);
880       $type_name = $type_info->{TYPE_NAME} if $type_info;
881     }
882     $column_info{data_type} = $type_name ? $type_name : $type_num;
883     $column_info{size} = $sth->{PRECISION}->[$i];
884     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
885
886     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
887       $column_info{data_type} = $1;
888       $column_info{size}    = $2;
889     }
890
891     $result{$columns[$i]} = \%column_info;
892   }
893
894   return \%result;
895 }
896
897 =head2 last_insert_id
898
899 Return the row id of the last insert.
900
901 =cut
902
903 sub last_insert_id {
904   my ($self, $row) = @_;
905     
906   return $self->dbh->func('last_insert_rowid');
907
908 }
909
910 =head2 sqlt_type
911
912 Returns the database driver name.
913
914 =cut
915
916 sub sqlt_type { shift->dbh->{Driver}->{Name} }
917
918 =head2 create_ddl_dir (EXPERIMENTAL)
919
920 =over 4
921
922 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
923
924 =back
925
926 Creates an SQL file based on the Schema, for each of the specified
927 database types, in the given directory.
928
929 Note that this feature is currently EXPERIMENTAL and may not work correctly
930 across all databases, or fully handle complex relationships.
931
932 =cut
933
934 sub create_ddl_dir
935 {
936   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
937
938   if(!$dir || !-d $dir)
939   {
940     warn "No directory given, using ./\n";
941     $dir = "./";
942   }
943   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
944   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
945   $version ||= $schema->VERSION || '1.x';
946
947   eval "use SQL::Translator";
948   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
949
950   my $sqlt = SQL::Translator->new({
951 #      debug => 1,
952       add_drop_table => 1,
953   });
954   foreach my $db (@$databases)
955   {
956     $sqlt->reset();
957     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
958 #    $sqlt->parser_args({'DBIx::Class' => $schema);
959     $sqlt->data($schema);
960     $sqlt->producer($db);
961
962     my $file;
963     my $filename = $schema->ddl_filename($db, $dir, $version);
964     if(-e $filename)
965     {
966       $self->throw_exception("$filename already exists, skipping $db");
967       next;
968     }
969     open($file, ">$filename") 
970       or $self->throw_exception("Can't open $filename for writing ($!)");
971     my $output = $sqlt->translate;
972 #use Data::Dumper;
973 #    print join(":", keys %{$schema->source_registrations});
974 #    print Dumper($sqlt->schema);
975     if(!$output)
976     {
977       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
978       next;
979     }
980     print $file $output;
981     close($file);
982   }
983
984 }
985
986 =head2 deployment_statements
987
988 Create the statements for L</deploy> and
989 L<DBIx::Class::Schema/deploy>.
990
991 =cut
992
993 sub deployment_statements {
994   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
995   # Need to be connected to get the correct sqlt_type
996   $self->ensure_connected() unless $type;
997   $type ||= $self->sqlt_type;
998   $version ||= $schema->VERSION || '1.x';
999   $dir ||= './';
1000   eval "use SQL::Translator";
1001   if(!$@)
1002   {
1003     eval "use SQL::Translator::Parser::DBIx::Class;";
1004     $self->throw_exception($@) if $@;
1005     eval "use SQL::Translator::Producer::${type};";
1006     $self->throw_exception($@) if $@;
1007     my $tr = SQL::Translator->new(%$sqltargs);
1008     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1009     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1010   }
1011
1012   my $filename = $schema->ddl_filename($type, $dir, $version);
1013   if(!-f $filename)
1014   {
1015 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1016       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1017       return;
1018   }
1019   my $file;
1020   open($file, "<$filename") 
1021       or $self->throw_exception("Can't open $filename ($!)");
1022   my @rows = <$file>;
1023   close($file);
1024
1025   return join('', @rows);
1026   
1027 }
1028
1029 =head2 deploy
1030
1031 Sends the appropriate statements to create or modify tables to the
1032 db. This would normally be called through
1033 L<DBIx::Class::Schema/deploy>.
1034
1035 =cut
1036
1037 sub deploy {
1038   my ($self, $schema, $type, $sqltargs) = @_;
1039   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, $sqltargs) ) {
1040     for ( split(";\n", $statement)) {
1041       next if($_ =~ /^--/);
1042       next if(!$_);
1043 #      next if($_ =~ /^DROP/m);
1044       next if($_ =~ /^BEGIN TRANSACTION/m);
1045       next if($_ =~ /^COMMIT/m);
1046       $self->debugobj->query_start($_) if $self->debug;
1047       $self->dbh->do($_) or warn "SQL was:\n $_";
1048       $self->debugobj->query_end($_) if $self->debug;
1049     }
1050   }
1051 }
1052
1053 =head2 datetime_parser
1054
1055 Returns the datetime parser class
1056
1057 =cut
1058
1059 sub datetime_parser {
1060   my $self = shift;
1061   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1062 }
1063
1064 =head2 datetime_parser_type
1065
1066 Defines (returns) the datetime parser class - currently hardwired to
1067 L<DateTime::Format::MySQL>
1068
1069 =cut
1070
1071 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1072
1073 =head2 build_datetime_parser
1074
1075 See L</datetime_parser>
1076
1077 =cut
1078
1079 sub build_datetime_parser {
1080   my $self = shift;
1081   my $type = $self->datetime_parser_type(@_);
1082   eval "use ${type}";
1083   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1084   return $type;
1085 }
1086
1087 sub DESTROY { shift->disconnect }
1088
1089 1;
1090
1091 =head1 SQL METHODS
1092
1093 The module defines a set of methods within the DBIC::SQL::Abstract
1094 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1095 SQL query functions.
1096
1097 The following methods are extended:-
1098
1099 =over 4
1100
1101 =item delete
1102
1103 =item insert
1104
1105 =item select
1106
1107 =item update
1108
1109 =item limit_dialect
1110
1111 See L</connect_info> for details.
1112 For setting, this method is deprecated in favor of L</connect_info>.
1113
1114 =item quote_char
1115
1116 See L</connect_info> for details.
1117 For setting, this method is deprecated in favor of L</connect_info>.
1118
1119 =item name_sep
1120
1121 See L</connect_info> for details.
1122 For setting, this method is deprecated in favor of L</connect_info>.
1123
1124 =back
1125
1126 =head1 ENVIRONMENT VARIABLES
1127
1128 =head2 DBIC_TRACE
1129
1130 If C<DBIC_TRACE> is set then SQL trace information
1131 is produced (as when the L<debug> method is set).
1132
1133 If the value is of the form C<1=/path/name> then the trace output is
1134 written to the file C</path/name>.
1135
1136 This environment variable is checked when the storage object is first
1137 created (when you call connect on your schema).  So, run-time changes 
1138 to this environment variable will not take effect unless you also 
1139 re-connect on your schema.
1140
1141 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1142
1143 Old name for DBIC_TRACE
1144
1145 =head1 AUTHORS
1146
1147 Matt S. Trout <mst@shadowcatsystems.co.uk>
1148
1149 Andy Grundman <andy@hybridized.org>
1150
1151 =head1 LICENSE
1152
1153 You may distribute this code under the same terms as Perl itself.
1154
1155 =cut
1156