Added how to '.. fetch a single (or topmost) row?' to the FAQ
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 sub _RowNumberOver {
33   my ($self, $sql, $order, $rows, $offset ) = @_;
34
35   $offset += 1;
36   my $last = $rows + $offset;
37   my ( $order_by ) = $self->_order_by( $order );
38
39   $sql = <<"";
40 SELECT * FROM
41 (
42    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
43       $sql
44       $order_by
45    ) Q1
46 ) Q2
47 WHERE ROW_NUM BETWEEN $offset AND $last
48
49   return $sql;
50 }
51
52
53 # While we're at it, this should make LIMIT queries more efficient,
54 #  without digging into things too deeply
55 use Scalar::Util 'blessed';
56 sub _find_syntax {
57   my ($self, $syntax) = @_;
58   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
59 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
60   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
61     return 'RowNumberOver';
62   }
63
64   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
65 }
66
67 sub select {
68   my ($self, $table, $fields, $where, $order, @rest) = @_;
69   $table = $self->_quote($table) unless ref($table);
70   local $self->{rownum_hack_count} = 1
71     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
72   @rest = (-1) unless defined $rest[0];
73   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
74     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
75   local $self->{having_bind} = [];
76   my ($sql, @ret) = $self->SUPER::select(
77     $table, $self->_recurse_fields($fields), $where, $order, @rest
78   );
79   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
80 }
81
82 sub insert {
83   my $self = shift;
84   my $table = shift;
85   $table = $self->_quote($table) unless ref($table);
86   $self->SUPER::insert($table, @_);
87 }
88
89 sub update {
90   my $self = shift;
91   my $table = shift;
92   $table = $self->_quote($table) unless ref($table);
93   $self->SUPER::update($table, @_);
94 }
95
96 sub delete {
97   my $self = shift;
98   my $table = shift;
99   $table = $self->_quote($table) unless ref($table);
100   $self->SUPER::delete($table, @_);
101 }
102
103 sub _emulate_limit {
104   my $self = shift;
105   if ($_[3] == -1) {
106     return $_[1].$self->_order_by($_[2]);
107   } else {
108     return $self->SUPER::_emulate_limit(@_);
109   }
110 }
111
112 sub _recurse_fields {
113   my ($self, $fields) = @_;
114   my $ref = ref $fields;
115   return $self->_quote($fields) unless $ref;
116   return $$fields if $ref eq 'SCALAR';
117
118   if ($ref eq 'ARRAY') {
119     return join(', ', map {
120       $self->_recurse_fields($_)
121       .(exists $self->{rownum_hack_count}
122          ? ' AS col'.$self->{rownum_hack_count}++
123          : '')
124      } @$fields);
125   } elsif ($ref eq 'HASH') {
126     foreach my $func (keys %$fields) {
127       return $self->_sqlcase($func)
128         .'( '.$self->_recurse_fields($fields->{$func}).' )';
129     }
130   }
131 }
132
133 sub _order_by {
134   my $self = shift;
135   my $ret = '';
136   my @extra;
137   if (ref $_[0] eq 'HASH') {
138     if (defined $_[0]->{group_by}) {
139       $ret = $self->_sqlcase(' group by ')
140                .$self->_recurse_fields($_[0]->{group_by});
141     }
142     if (defined $_[0]->{having}) {
143       my $frag;
144       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
145       push(@{$self->{having_bind}}, @extra);
146       $ret .= $self->_sqlcase(' having ').$frag;
147     }
148     if (defined $_[0]->{order_by}) {
149       $ret .= $self->_order_by($_[0]->{order_by});
150     }
151   } elsif (ref $_[0] eq 'SCALAR') {
152     $ret = $self->_sqlcase(' order by ').${ $_[0] };
153   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
154     my @order = @{+shift};
155     $ret = $self->_sqlcase(' order by ')
156           .join(', ', map {
157                         my $r = $self->_order_by($_, @_);
158                         $r =~ s/^ ?ORDER BY //i;
159                         $r;
160                       } @order);
161   } else {
162     $ret = $self->SUPER::_order_by(@_);
163   }
164   return $ret;
165 }
166
167 sub _order_directions {
168   my ($self, $order) = @_;
169   $order = $order->{order_by} if ref $order eq 'HASH';
170   return $self->SUPER::_order_directions($order);
171 }
172
173 sub _table {
174   my ($self, $from) = @_;
175   if (ref $from eq 'ARRAY') {
176     return $self->_recurse_from(@$from);
177   } elsif (ref $from eq 'HASH') {
178     return $self->_make_as($from);
179   } else {
180     return $from; # would love to quote here but _table ends up getting called
181                   # twice during an ->select without a limit clause due to
182                   # the way S::A::Limit->select works. should maybe consider
183                   # bypassing this and doing S::A::select($self, ...) in
184                   # our select method above. meantime, quoting shims have
185                   # been added to select/insert/update/delete here
186   }
187 }
188
189 sub _recurse_from {
190   my ($self, $from, @join) = @_;
191   my @sqlf;
192   push(@sqlf, $self->_make_as($from));
193   foreach my $j (@join) {
194     my ($to, $on) = @$j;
195
196     # check whether a join type exists
197     my $join_clause = '';
198     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
199     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
200       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
201     } else {
202       $join_clause = ' JOIN ';
203     }
204     push(@sqlf, $join_clause);
205
206     if (ref $to eq 'ARRAY') {
207       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
208     } else {
209       push(@sqlf, $self->_make_as($to));
210     }
211     push(@sqlf, ' ON ', $self->_join_condition($on));
212   }
213   return join('', @sqlf);
214 }
215
216 sub _make_as {
217   my ($self, $from) = @_;
218   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
219                      reverse each %{$self->_skip_options($from)});
220 }
221
222 sub _skip_options {
223   my ($self, $hash) = @_;
224   my $clean_hash = {};
225   $clean_hash->{$_} = $hash->{$_}
226     for grep {!/^-/} keys %$hash;
227   return $clean_hash;
228 }
229
230 sub _join_condition {
231   my ($self, $cond) = @_;
232   if (ref $cond eq 'HASH') {
233     my %j;
234     for (keys %$cond) {
235       my $v = $cond->{$_};
236       if (ref $v) {
237         # XXX no throw_exception() in this package and croak() fails with strange results
238         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
239             if ref($v) ne 'SCALAR';
240         $j{$_} = $v;
241       }
242       else {
243         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
244       }
245     };
246     return scalar($self->_recurse_where(\%j));
247   } elsif (ref $cond eq 'ARRAY') {
248     return join(' OR ', map { $self->_join_condition($_) } @$cond);
249   } else {
250     die "Can't handle this yet!";
251   }
252 }
253
254 sub _quote {
255   my ($self, $label) = @_;
256   return '' unless defined $label;
257   return "*" if $label eq '*';
258   return $label unless $self->{quote_char};
259   if(ref $self->{quote_char} eq "ARRAY"){
260     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
261       if !defined $self->{name_sep};
262     my $sep = $self->{name_sep};
263     return join($self->{name_sep},
264         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
265        split(/\Q$sep\E/,$label));
266   }
267   return $self->SUPER::_quote($label);
268 }
269
270 sub limit_dialect {
271     my $self = shift;
272     $self->{limit_dialect} = shift if @_;
273     return $self->{limit_dialect};
274 }
275
276 sub quote_char {
277     my $self = shift;
278     $self->{quote_char} = shift if @_;
279     return $self->{quote_char};
280 }
281
282 sub name_sep {
283     my $self = shift;
284     $self->{name_sep} = shift if @_;
285     return $self->{name_sep};
286 }
287
288 } # End of BEGIN block
289
290 use base qw/DBIx::Class/;
291
292 __PACKAGE__->load_components(qw/AccessorGroup/);
293
294 __PACKAGE__->mk_group_accessors('simple' =>
295   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
296      debug debugobj cursor on_connect_do transaction_depth/);
297
298 =head1 NAME
299
300 DBIx::Class::Storage::DBI - DBI storage handler
301
302 =head1 SYNOPSIS
303
304 =head1 DESCRIPTION
305
306 This class represents the connection to the database
307
308 =head1 METHODS
309
310 =head2 new
311
312 =cut
313
314 sub new {
315   my $new = {};
316   bless $new, (ref $_[0] || $_[0]);
317
318   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
319   $new->transaction_depth(0);
320
321   $new->debugobj(new DBIx::Class::Storage::Statistics());
322
323   my $fh;
324
325   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
326                   || $ENV{DBIC_TRACE};
327
328   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
329     $fh = IO::File->new($1, 'w')
330       or $new->throw_exception("Cannot open trace file $1");
331   } else {
332     $fh = IO::File->new('>&STDERR');
333   }
334   $new->debugfh($fh);
335   $new->debug(1) if $debug_env;
336   $new->_sql_maker_opts({});
337   return $new;
338 }
339
340 =head2 throw_exception
341
342 Throws an exception - croaks.
343
344 =cut
345
346 sub throw_exception {
347   my ($self, $msg) = @_;
348   croak($msg);
349 }
350
351 =head2 connect_info
352
353 The arguments of C<connect_info> are always a single array reference.
354
355 This is normally accessed via L<DBIx::Class::Schema/connection>, which
356 encapsulates its argument list in an arrayref before calling
357 C<connect_info> here.
358
359 The arrayref can either contain the same set of arguments one would
360 normally pass to L<DBI/connect>, or a lone code reference which returns
361 a connected database handle.
362
363 In either case, if the final argument in your connect_info happens
364 to be a hashref, C<connect_info> will look there for several
365 connection-specific options:
366
367 =over 4
368
369 =item on_connect_do
370
371 This can be set to an arrayref of literal sql statements, which will
372 be executed immediately after making the connection to the database
373 every time we [re-]connect.
374
375 =item limit_dialect 
376
377 Sets the limit dialect. This is useful for JDBC-bridge among others
378 where the remote SQL-dialect cannot be determined by the name of the
379 driver alone.
380
381 =item quote_char
382
383 Specifies what characters to use to quote table and column names. If 
384 you use this you will want to specify L<name_sep> as well.
385
386 quote_char expects either a single character, in which case is it is placed
387 on either side of the table/column, or an arrayref of length 2 in which case the
388 table/column name is placed between the elements.
389
390 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
391 use C<quote_char =E<gt> [qw/[ ]/]>.
392
393 =item name_sep
394
395 This only needs to be used in conjunction with L<quote_char>, and is used to 
396 specify the charecter that seperates elements (schemas, tables, columns) from 
397 each other. In most cases this is simply a C<.>.
398
399 =back
400
401 These options can be mixed in with your other L<DBI> connection attributes,
402 or placed in a seperate hashref after all other normal L<DBI> connection
403 arguments.
404
405 Every time C<connect_info> is invoked, any previous settings for
406 these options will be cleared before setting the new ones, regardless of
407 whether any options are specified in the new C<connect_info>.
408
409 Examples:
410
411   # Simple SQLite connection
412   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
413
414   # Connect via subref
415   ->connect_info([ sub { DBI->connect(...) } ]);
416
417   # A bit more complicated
418   ->connect_info(
419     [
420       'dbi:Pg:dbname=foo',
421       'postgres',
422       'my_pg_password',
423       { AutoCommit => 0 },
424       { quote_char => q{"}, name_sep => q{.} },
425     ]
426   );
427
428   # Equivalent to the previous example
429   ->connect_info(
430     [
431       'dbi:Pg:dbname=foo',
432       'postgres',
433       'my_pg_password',
434       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
435     ]
436   );
437
438   # Subref + DBIC-specific connection options
439   ->connect_info(
440     [
441       sub { DBI->connect(...) },
442       {
443           quote_char => q{`},
444           name_sep => q{@},
445           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
446       },
447     ]
448   );
449
450 =head2 on_connect_do
451
452 This method is deprecated in favor of setting via L</connect_info>.
453
454 =head2 debug
455
456 Causes SQL trace information to be emitted on the C<debugobj> object.
457 (or C<STDERR> if C<debugobj> has not specifically been set).
458
459 This is the equivalent to setting L</DBIC_TRACE> in your
460 shell environment.
461
462 =head2 debugfh
463
464 Set or retrieve the filehandle used for trace/debug output.  This should be
465 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
466 set to be STDERR - although see information on the
467 L<DBIC_TRACE> environment variable.
468
469 =cut
470
471 sub debugfh {
472     my $self = shift;
473
474     if ($self->debugobj->can('debugfh')) {
475         return $self->debugobj->debugfh(@_);
476     }
477 }
478
479 =head2 debugobj
480
481 Sets or retrieves the object used for metric collection. Defaults to an instance
482 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
483 method of using a coderef as a callback.  See the aforementioned Statistics
484 class for more information.
485
486 =head2 debugcb
487
488 Sets a callback to be executed each time a statement is run; takes a sub
489 reference.  Callback is executed as $sub->($op, $info) where $op is
490 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
491
492 See L<debugobj> for a better way.
493
494 =cut
495
496 sub debugcb {
497     my $self = shift;
498
499     if ($self->debugobj->can('callback')) {
500         return $self->debugobj->callback(@_);
501     }
502 }
503
504 =head2 disconnect
505
506 Disconnect the L<DBI> handle, performing a rollback first if the
507 database is not in C<AutoCommit> mode.
508
509 =cut
510
511 sub disconnect {
512   my ($self) = @_;
513
514   if( $self->connected ) {
515     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
516     $self->_dbh->disconnect;
517     $self->_dbh(undef);
518   }
519 }
520
521 =head2 connected
522
523 Check if the L<DBI> handle is connected.  Returns true if the handle
524 is connected.
525
526 =cut
527
528 sub connected { my ($self) = @_;
529
530   if(my $dbh = $self->_dbh) {
531       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
532           return $self->_dbh(undef);
533       }
534       elsif($self->_conn_pid != $$) {
535           $self->_dbh->{InactiveDestroy} = 1;
536           return $self->_dbh(undef);
537       }
538       return ($dbh->FETCH('Active') && $dbh->ping);
539   }
540
541   return 0;
542 }
543
544 =head2 ensure_connected
545
546 Check whether the database handle is connected - if not then make a
547 connection.
548
549 =cut
550
551 sub ensure_connected {
552   my ($self) = @_;
553
554   unless ($self->connected) {
555     $self->_populate_dbh;
556   }
557 }
558
559 =head2 dbh
560
561 Returns the dbh - a data base handle of class L<DBI>.
562
563 =cut
564
565 sub dbh {
566   my ($self) = @_;
567
568   $self->ensure_connected;
569   return $self->_dbh;
570 }
571
572 sub _sql_maker_args {
573     my ($self) = @_;
574     
575     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
576 }
577
578 =head2 sql_maker
579
580 Returns a C<sql_maker> object - normally an object of class
581 C<DBIC::SQL::Abstract>.
582
583 =cut
584
585 sub sql_maker {
586   my ($self) = @_;
587   unless ($self->_sql_maker) {
588     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
589   }
590   return $self->_sql_maker;
591 }
592
593 sub connect_info {
594   my ($self, $info_arg) = @_;
595
596   if($info_arg) {
597     # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
598     #  the new set of options
599     $self->_sql_maker(undef);
600     $self->_sql_maker_opts({});
601
602     my $info = [ @$info_arg ]; # copy because we can alter it
603     my $last_info = $info->[-1];
604     if(ref $last_info eq 'HASH') {
605       if(my $on_connect_do = delete $last_info->{on_connect_do}) {
606         $self->on_connect_do($on_connect_do);
607       }
608       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
609         if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
610           $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
611         }
612       }
613
614       # Get rid of any trailing empty hashref
615       pop(@$info) if !keys %$last_info;
616     }
617
618     $self->_connect_info($info);
619   }
620
621   $self->_connect_info;
622 }
623
624 sub _populate_dbh {
625   my ($self) = @_;
626   my @info = @{$self->_connect_info || []};
627   $self->_dbh($self->_connect(@info));
628
629   if(ref $self eq 'DBIx::Class::Storage::DBI') {
630     my $driver = $self->_dbh->{Driver}->{Name};
631     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
632       bless $self, "DBIx::Class::Storage::DBI::${driver}";
633       $self->_rebless() if $self->can('_rebless');
634     }
635   }
636
637   # if on-connect sql statements are given execute them
638   foreach my $sql_statement (@{$self->on_connect_do || []}) {
639     $self->debugobj->query_start($sql_statement) if $self->debug();
640     $self->_dbh->do($sql_statement);
641     $self->debugobj->query_end($sql_statement) if $self->debug();
642   }
643
644   $self->_conn_pid($$);
645   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
646 }
647
648 sub _connect {
649   my ($self, @info) = @_;
650
651   $self->throw_exception("You failed to provide any connection info")
652       if !@info;
653
654   my ($old_connect_via, $dbh);
655
656   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
657       $old_connect_via = $DBI::connect_via;
658       $DBI::connect_via = 'connect';
659   }
660
661   eval {
662     $dbh = ref $info[0] eq 'CODE'
663          ? &{$info[0]}
664          : DBI->connect(@info);
665   };
666
667   $DBI::connect_via = $old_connect_via if $old_connect_via;
668
669   if (!$dbh || $@) {
670     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
671   }
672
673   $dbh;
674 }
675
676 =head2 txn_begin
677
678 Calls begin_work on the current dbh.
679
680 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
681 an entire code block to be executed transactionally.
682
683 =cut
684
685 sub txn_begin {
686   my $self = shift;
687   if ($self->{transaction_depth}++ == 0) {
688     my $dbh = $self->dbh;
689     if ($dbh->{AutoCommit}) {
690       $self->debugobj->txn_begin()
691         if ($self->debug);
692       $dbh->begin_work;
693     }
694   }
695 }
696
697 =head2 txn_commit
698
699 Issues a commit against the current dbh.
700
701 =cut
702
703 sub txn_commit {
704   my $self = shift;
705   my $dbh = $self->dbh;
706   if ($self->{transaction_depth} == 0) {
707     unless ($dbh->{AutoCommit}) {
708       $self->debugobj->txn_commit()
709         if ($self->debug);
710       $dbh->commit;
711     }
712   }
713   else {
714     if (--$self->{transaction_depth} == 0) {
715       $self->debugobj->txn_commit()
716         if ($self->debug);
717       $dbh->commit;
718     }
719   }
720 }
721
722 =head2 txn_rollback
723
724 Issues a rollback against the current dbh. A nested rollback will
725 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
726 which allows the rollback to propagate to the outermost transaction.
727
728 =cut
729
730 sub txn_rollback {
731   my $self = shift;
732
733   eval {
734     my $dbh = $self->dbh;
735     if ($self->{transaction_depth} == 0) {
736       unless ($dbh->{AutoCommit}) {
737         $self->debugobj->txn_rollback()
738           if ($self->debug);
739         $dbh->rollback;
740       }
741     }
742     else {
743       if (--$self->{transaction_depth} == 0) {
744         $self->debugobj->txn_rollback()
745           if ($self->debug);
746         $dbh->rollback;
747       }
748       else {
749         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
750       }
751     }
752   };
753
754   if ($@) {
755     my $error = $@;
756     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
757     $error =~ /$exception_class/ and $self->throw_exception($error);
758     $self->{transaction_depth} = 0;          # ensure that a failed rollback
759     $self->throw_exception($error);          # resets the transaction depth
760   }
761 }
762
763 sub _execute {
764   my ($self, $op, $extra_bind, $ident, @args) = @_;
765   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
766   unshift(@bind, @$extra_bind) if $extra_bind;
767   if ($self->debug) {
768       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
769       $self->debugobj->query_start($sql, @debug_bind);
770   }
771   my $sth = eval { $self->sth($sql,$op) };
772
773   if (!$sth || $@) {
774     $self->throw_exception(
775       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
776     );
777   }
778   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
779   my $rv;
780   if ($sth) {
781     my $time = time();
782     $rv = eval { $sth->execute(@bind) };
783
784     if ($@ || !$rv) {
785       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
786     }
787   } else {
788     $self->throw_exception("'$sql' did not generate a statement.");
789   }
790   if ($self->debug) {
791       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind; 
792       $self->debugobj->query_end($sql, @debug_bind);
793   }
794   return (wantarray ? ($rv, $sth, @bind) : $rv);
795 }
796
797 sub insert {
798   my ($self, $ident, $to_insert) = @_;
799   $self->throw_exception(
800     "Couldn't insert ".join(', ',
801       map "$_ => $to_insert->{$_}", keys %$to_insert
802     )." into ${ident}"
803   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
804   return $to_insert;
805 }
806
807 sub update {
808   return shift->_execute('update' => [], @_);
809 }
810
811 sub delete {
812   return shift->_execute('delete' => [], @_);
813 }
814
815 sub _select {
816   my ($self, $ident, $select, $condition, $attrs) = @_;
817   my $order = $attrs->{order_by};
818   if (ref $condition eq 'SCALAR') {
819     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
820   }
821   if (exists $attrs->{group_by} || $attrs->{having}) {
822     $order = {
823       group_by => $attrs->{group_by},
824       having => $attrs->{having},
825       ($order ? (order_by => $order) : ())
826     };
827   }
828   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
829   if ($attrs->{software_limit} ||
830       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
831         $attrs->{software_limit} = 1;
832   } else {
833     $self->throw_exception("rows attribute must be positive if present")
834       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
835     push @args, $attrs->{rows}, $attrs->{offset};
836   }
837   return $self->_execute(@args);
838 }
839
840 =head2 select
841
842 =over 4
843
844 =item Arguments: $ident, $select, $condition, $attrs
845
846 =back
847
848 Handle a SQL select statement.
849
850 =cut
851
852 sub select {
853   my $self = shift;
854   my ($ident, $select, $condition, $attrs) = @_;
855   return $self->cursor->new($self, \@_, $attrs);
856 }
857
858 =head2 select_single
859
860 Performs a select, fetch and return of data - handles a single row
861 only.
862
863 =cut
864
865 # Need to call finish() to work round broken DBDs
866
867 sub select_single {
868   my $self = shift;
869   my ($rv, $sth, @bind) = $self->_select(@_);
870   my @row = $sth->fetchrow_array;
871   $sth->finish();
872   return @row;
873 }
874
875 =head2 sth
876
877 =over 4
878
879 =item Arguments: $sql
880
881 =back
882
883 Returns a L<DBI> sth (statement handle) for the supplied SQL.
884
885 =cut
886
887 sub sth {
888   my ($self, $sql) = @_;
889   # 3 is the if_active parameter which avoids active sth re-use
890   return $self->dbh->prepare_cached($sql, {}, 3);
891 }
892
893 =head2 columns_info_for
894
895 Returns database type info for a given table column.
896
897 =cut
898
899 sub columns_info_for {
900   my ($self, $table) = @_;
901
902   my $dbh = $self->dbh;
903
904   if ($dbh->can('column_info')) {
905     my %result;
906     local $dbh->{RaiseError} = 1;
907     local $dbh->{PrintError} = 0;
908     eval {
909       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
910       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
911       $sth->execute();
912
913       while ( my $info = $sth->fetchrow_hashref() ){
914         my %column_info;
915         $column_info{data_type}   = $info->{TYPE_NAME};
916         $column_info{size}      = $info->{COLUMN_SIZE};
917         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
918         $column_info{default_value} = $info->{COLUMN_DEF};
919         my $col_name = $info->{COLUMN_NAME};
920         $col_name =~ s/^\"(.*)\"$/$1/;
921
922         $result{$col_name} = \%column_info;
923       }
924     };
925     return \%result if !$@ && scalar keys %result;
926   }
927
928   my %result;
929   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
930   $sth->execute;
931   my @columns = @{$sth->{NAME_lc}};
932   for my $i ( 0 .. $#columns ){
933     my %column_info;
934     my $type_num = $sth->{TYPE}->[$i];
935     my $type_name;
936     if(defined $type_num && $dbh->can('type_info')) {
937       my $type_info = $dbh->type_info($type_num);
938       $type_name = $type_info->{TYPE_NAME} if $type_info;
939     }
940     $column_info{data_type} = $type_name ? $type_name : $type_num;
941     $column_info{size} = $sth->{PRECISION}->[$i];
942     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
943
944     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
945       $column_info{data_type} = $1;
946       $column_info{size}    = $2;
947     }
948
949     $result{$columns[$i]} = \%column_info;
950   }
951
952   return \%result;
953 }
954
955 =head2 last_insert_id
956
957 Return the row id of the last insert.
958
959 =cut
960
961 sub last_insert_id {
962   my ($self, $row) = @_;
963     
964   return $self->dbh->func('last_insert_rowid');
965
966 }
967
968 =head2 sqlt_type
969
970 Returns the database driver name.
971
972 =cut
973
974 sub sqlt_type { shift->dbh->{Driver}->{Name} }
975
976 =head2 create_ddl_dir (EXPERIMENTAL)
977
978 =over 4
979
980 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
981
982 =back
983
984 Creates a SQL file based on the Schema, for each of the specified
985 database types, in the given directory.
986
987 Note that this feature is currently EXPERIMENTAL and may not work correctly
988 across all databases, or fully handle complex relationships.
989
990 =cut
991
992 sub create_ddl_dir
993 {
994   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
995
996   if(!$dir || !-d $dir)
997   {
998     warn "No directory given, using ./\n";
999     $dir = "./";
1000   }
1001   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1002   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1003   $version ||= $schema->VERSION || '1.x';
1004   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1005
1006   eval "use SQL::Translator";
1007   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1008
1009   my $sqlt = SQL::Translator->new($sqltargs);
1010   foreach my $db (@$databases)
1011   {
1012     $sqlt->reset();
1013     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1014 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1015     $sqlt->data($schema);
1016     $sqlt->producer($db);
1017
1018     my $file;
1019     my $filename = $schema->ddl_filename($db, $dir, $version);
1020     if(-e $filename)
1021     {
1022       $self->throw_exception("$filename already exists, skipping $db");
1023       next;
1024     }
1025     open($file, ">$filename") 
1026       or $self->throw_exception("Can't open $filename for writing ($!)");
1027     my $output = $sqlt->translate;
1028 #use Data::Dumper;
1029 #    print join(":", keys %{$schema->source_registrations});
1030 #    print Dumper($sqlt->schema);
1031     if(!$output)
1032     {
1033       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1034       next;
1035     }
1036     print $file $output;
1037     close($file);
1038   }
1039
1040 }
1041
1042 =head2 deployment_statements
1043
1044 =over 4
1045
1046 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1047
1048 =back
1049
1050 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1051 The database driver name is given by C<$type>, though the value from
1052 L</sqlt_type> is used if it is not specified.
1053
1054 C<$directory> is used to return statements from files in a previously created
1055 L</create_ddl_dir> directory and is optional. The filenames are constructed
1056 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1057
1058 If no C<$directory> is specified then the statements are constructed on the
1059 fly using L<SQL::Translator> and C<$version> is ignored.
1060
1061 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1062
1063 =cut
1064
1065 sub deployment_statements {
1066   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1067   # Need to be connected to get the correct sqlt_type
1068   $self->ensure_connected() unless $type;
1069   $type ||= $self->sqlt_type;
1070   $version ||= $schema->VERSION || '1.x';
1071   $dir ||= './';
1072   eval "use SQL::Translator";
1073   if(!$@)
1074   {
1075     eval "use SQL::Translator::Parser::DBIx::Class;";
1076     $self->throw_exception($@) if $@;
1077     eval "use SQL::Translator::Producer::${type};";
1078     $self->throw_exception($@) if $@;
1079     my $tr = SQL::Translator->new(%$sqltargs);
1080     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1081     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1082   }
1083
1084   my $filename = $schema->ddl_filename($type, $dir, $version);
1085   if(!-f $filename)
1086   {
1087 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1088       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1089       return;
1090   }
1091   my $file;
1092   open($file, "<$filename") 
1093       or $self->throw_exception("Can't open $filename ($!)");
1094   my @rows = <$file>;
1095   close($file);
1096
1097   return join('', @rows);
1098   
1099 }
1100
1101 =head2 deploy
1102
1103 Sends the appropriate statements to create or modify tables to the
1104 db. This would normally be called through
1105 L<DBIx::Class::Schema/deploy>.
1106
1107 =cut
1108
1109 sub deploy {
1110   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1111   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1112     for ( split(";\n", $statement)) {
1113       next if($_ =~ /^--/);
1114       next if(!$_);
1115 #      next if($_ =~ /^DROP/m);
1116       next if($_ =~ /^BEGIN TRANSACTION/m);
1117       next if($_ =~ /^COMMIT/m);
1118       next if $_ =~ /^\s+$/; # skip whitespace only
1119       $self->debugobj->query_start($_) if $self->debug;
1120       $self->dbh->do($_) or warn "SQL was:\n $_";
1121       $self->debugobj->query_end($_) if $self->debug;
1122     }
1123   }
1124 }
1125
1126 =head2 datetime_parser
1127
1128 Returns the datetime parser class
1129
1130 =cut
1131
1132 sub datetime_parser {
1133   my $self = shift;
1134   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1135 }
1136
1137 =head2 datetime_parser_type
1138
1139 Defines (returns) the datetime parser class - currently hardwired to
1140 L<DateTime::Format::MySQL>
1141
1142 =cut
1143
1144 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1145
1146 =head2 build_datetime_parser
1147
1148 See L</datetime_parser>
1149
1150 =cut
1151
1152 sub build_datetime_parser {
1153   my $self = shift;
1154   my $type = $self->datetime_parser_type(@_);
1155   eval "use ${type}";
1156   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1157   return $type;
1158 }
1159
1160 sub DESTROY {
1161   # NOTE: if there's a merge conflict here when -current is pushed
1162   #  back to trunk, take -current's version and ignore this trunk one :)
1163   my $self = shift;
1164
1165   if($self->_dbh && $self->_conn_pid != $$) {
1166     $self->_dbh->{InactiveDestroy} = 1;
1167   }
1168
1169   $self->_dbh(undef);
1170 }
1171
1172 1;
1173
1174 =head1 SQL METHODS
1175
1176 The module defines a set of methods within the DBIC::SQL::Abstract
1177 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1178 SQL query functions.
1179
1180 The following methods are extended:-
1181
1182 =over 4
1183
1184 =item delete
1185
1186 =item insert
1187
1188 =item select
1189
1190 =item update
1191
1192 =item limit_dialect
1193
1194 See L</connect_info> for details.
1195 For setting, this method is deprecated in favor of L</connect_info>.
1196
1197 =item quote_char
1198
1199 See L</connect_info> for details.
1200 For setting, this method is deprecated in favor of L</connect_info>.
1201
1202 =item name_sep
1203
1204 See L</connect_info> for details.
1205 For setting, this method is deprecated in favor of L</connect_info>.
1206
1207 =back
1208
1209 =head1 ENVIRONMENT VARIABLES
1210
1211 =head2 DBIC_TRACE
1212
1213 If C<DBIC_TRACE> is set then SQL trace information
1214 is produced (as when the L<debug> method is set).
1215
1216 If the value is of the form C<1=/path/name> then the trace output is
1217 written to the file C</path/name>.
1218
1219 This environment variable is checked when the storage object is first
1220 created (when you call connect on your schema).  So, run-time changes 
1221 to this environment variable will not take effect unless you also 
1222 re-connect on your schema.
1223
1224 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1225
1226 Old name for DBIC_TRACE
1227
1228 =head1 AUTHORS
1229
1230 Matt S. Trout <mst@shadowcatsystems.co.uk>
1231
1232 Andy Grundman <andy@hybridized.org>
1233
1234 =head1 LICENSE
1235
1236 You may distribute this code under the same terms as Perl itself.
1237
1238 =cut