Documentation fixes.
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 sub _RowNumberOver {
33   my ($self, $sql, $order, $rows, $offset ) = @_;
34
35   $offset += 1;
36   my $last = $rows + $offset;
37   my ( $order_by ) = $self->_order_by( $order );
38
39   $sql = <<"";
40 SELECT * FROM
41 (
42    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
43       $sql
44       $order_by
45    ) Q1
46 ) Q2
47 WHERE ROW_NUM BETWEEN $offset AND $last
48
49   return $sql;
50 }
51
52
53 # While we're at it, this should make LIMIT queries more efficient,
54 #  without digging into things too deeply
55 sub _find_syntax {
56   my ($self, $syntax) = @_;
57   my $dbhname = ref $syntax eq 'HASH' ? $syntax->{Driver}{Name} : '';
58   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
59     return 'RowNumberOver';
60   }
61
62   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
63 }
64
65 sub select {
66   my ($self, $table, $fields, $where, $order, @rest) = @_;
67   $table = $self->_quote($table) unless ref($table);
68   local $self->{rownum_hack_count} = 1
69     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
70   @rest = (-1) unless defined $rest[0];
71   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
72     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
73   local $self->{having_bind} = [];
74   my ($sql, @ret) = $self->SUPER::select(
75     $table, $self->_recurse_fields($fields), $where, $order, @rest
76   );
77   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
78 }
79
80 sub insert {
81   my $self = shift;
82   my $table = shift;
83   $table = $self->_quote($table) unless ref($table);
84   $self->SUPER::insert($table, @_);
85 }
86
87 sub update {
88   my $self = shift;
89   my $table = shift;
90   $table = $self->_quote($table) unless ref($table);
91   $self->SUPER::update($table, @_);
92 }
93
94 sub delete {
95   my $self = shift;
96   my $table = shift;
97   $table = $self->_quote($table) unless ref($table);
98   $self->SUPER::delete($table, @_);
99 }
100
101 sub _emulate_limit {
102   my $self = shift;
103   if ($_[3] == -1) {
104     return $_[1].$self->_order_by($_[2]);
105   } else {
106     return $self->SUPER::_emulate_limit(@_);
107   }
108 }
109
110 sub _recurse_fields {
111   my ($self, $fields) = @_;
112   my $ref = ref $fields;
113   return $self->_quote($fields) unless $ref;
114   return $$fields if $ref eq 'SCALAR';
115
116   if ($ref eq 'ARRAY') {
117     return join(', ', map {
118       $self->_recurse_fields($_)
119       .(exists $self->{rownum_hack_count}
120          ? ' AS col'.$self->{rownum_hack_count}++
121          : '')
122      } @$fields);
123   } elsif ($ref eq 'HASH') {
124     foreach my $func (keys %$fields) {
125       return $self->_sqlcase($func)
126         .'( '.$self->_recurse_fields($fields->{$func}).' )';
127     }
128   }
129 }
130
131 sub _order_by {
132   my $self = shift;
133   my $ret = '';
134   my @extra;
135   if (ref $_[0] eq 'HASH') {
136     if (defined $_[0]->{group_by}) {
137       $ret = $self->_sqlcase(' group by ')
138                .$self->_recurse_fields($_[0]->{group_by});
139     }
140     if (defined $_[0]->{having}) {
141       my $frag;
142       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
143       push(@{$self->{having_bind}}, @extra);
144       $ret .= $self->_sqlcase(' having ').$frag;
145     }
146     if (defined $_[0]->{order_by}) {
147       $ret .= $self->_order_by($_[0]->{order_by});
148     }
149   } elsif (ref $_[0] eq 'SCALAR') {
150     $ret = $self->_sqlcase(' order by ').${ $_[0] };
151   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
152     my @order = @{+shift};
153     $ret = $self->_sqlcase(' order by ')
154           .join(', ', map {
155                         my $r = $self->_order_by($_, @_);
156                         $r =~ s/^ ?ORDER BY //i;
157                         $r;
158                       } @order);
159   } else {
160     $ret = $self->SUPER::_order_by(@_);
161   }
162   return $ret;
163 }
164
165 sub _order_directions {
166   my ($self, $order) = @_;
167   $order = $order->{order_by} if ref $order eq 'HASH';
168   return $self->SUPER::_order_directions($order);
169 }
170
171 sub _table {
172   my ($self, $from) = @_;
173   if (ref $from eq 'ARRAY') {
174     return $self->_recurse_from(@$from);
175   } elsif (ref $from eq 'HASH') {
176     return $self->_make_as($from);
177   } else {
178     return $from; # would love to quote here but _table ends up getting called
179                   # twice during an ->select without a limit clause due to
180                   # the way S::A::Limit->select works. should maybe consider
181                   # bypassing this and doing S::A::select($self, ...) in
182                   # our select method above. meantime, quoting shims have
183                   # been added to select/insert/update/delete here
184   }
185 }
186
187 sub _recurse_from {
188   my ($self, $from, @join) = @_;
189   my @sqlf;
190   push(@sqlf, $self->_make_as($from));
191   foreach my $j (@join) {
192     my ($to, $on) = @$j;
193
194     # check whether a join type exists
195     my $join_clause = '';
196     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
197     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
198       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
199     } else {
200       $join_clause = ' JOIN ';
201     }
202     push(@sqlf, $join_clause);
203
204     if (ref $to eq 'ARRAY') {
205       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
206     } else {
207       push(@sqlf, $self->_make_as($to));
208     }
209     push(@sqlf, ' ON ', $self->_join_condition($on));
210   }
211   return join('', @sqlf);
212 }
213
214 sub _make_as {
215   my ($self, $from) = @_;
216   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
217                      reverse each %{$self->_skip_options($from)});
218 }
219
220 sub _skip_options {
221   my ($self, $hash) = @_;
222   my $clean_hash = {};
223   $clean_hash->{$_} = $hash->{$_}
224     for grep {!/^-/} keys %$hash;
225   return $clean_hash;
226 }
227
228 sub _join_condition {
229   my ($self, $cond) = @_;
230   if (ref $cond eq 'HASH') {
231     my %j;
232     for (keys %$cond) {
233       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
234     };
235     return $self->_recurse_where(\%j);
236   } elsif (ref $cond eq 'ARRAY') {
237     return join(' OR ', map { $self->_join_condition($_) } @$cond);
238   } else {
239     die "Can't handle this yet!";
240   }
241 }
242
243 sub _quote {
244   my ($self, $label) = @_;
245   return '' unless defined $label;
246   return "*" if $label eq '*';
247   return $label unless $self->{quote_char};
248   if(ref $self->{quote_char} eq "ARRAY"){
249     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
250       if !defined $self->{name_sep};
251     my $sep = $self->{name_sep};
252     return join($self->{name_sep},
253         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
254        split(/\Q$sep\E/,$label));
255   }
256   return $self->SUPER::_quote($label);
257 }
258
259 sub limit_dialect {
260     my $self = shift;
261     $self->{limit_dialect} = shift if @_;
262     return $self->{limit_dialect};
263 }
264
265 sub quote_char {
266     my $self = shift;
267     $self->{quote_char} = shift if @_;
268     return $self->{quote_char};
269 }
270
271 sub name_sep {
272     my $self = shift;
273     $self->{name_sep} = shift if @_;
274     return $self->{name_sep};
275 }
276
277 } # End of BEGIN block
278
279 use base qw/DBIx::Class/;
280
281 __PACKAGE__->load_components(qw/AccessorGroup/);
282
283 __PACKAGE__->mk_group_accessors('simple' =>
284   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
285      debug debugobj cursor on_connect_do transaction_depth/);
286
287 =head1 NAME
288
289 DBIx::Class::Storage::DBI - DBI storage handler
290
291 =head1 SYNOPSIS
292
293 =head1 DESCRIPTION
294
295 This class represents the connection to the database
296
297 =head1 METHODS
298
299 =head2 new
300
301 =cut
302
303 sub new {
304   my $new = {};
305   bless $new, (ref $_[0] || $_[0]);
306
307   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
308   $new->transaction_depth(0);
309
310   $new->debugobj(new DBIx::Class::Storage::Statistics());
311
312   my $fh;
313
314   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
315                   || $ENV{DBIC_TRACE};
316
317   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
318     $fh = IO::File->new($1, 'w')
319       or $new->throw_exception("Cannot open trace file $1");
320   } else {
321     $fh = IO::File->new('>&STDERR');
322   }
323   $new->debugfh($fh);
324   $new->debug(1) if $debug_env;
325   $new->_sql_maker_opts({});
326   return $new;
327 }
328
329 =head2 throw_exception
330
331 Throws an exception - croaks.
332
333 =cut
334
335 sub throw_exception {
336   my ($self, $msg) = @_;
337   croak($msg);
338 }
339
340 =head2 connect_info
341
342 The arguments of C<connect_info> are always a single array reference.
343
344 This is normally accessed via L<DBIx::Class::Schema/connection>, which
345 encapsulates its argument list in an arrayref before calling
346 C<connect_info> here.
347
348 The arrayref can either contain the same set of arguments one would
349 normally pass to L<DBI/connect>, or a lone code reference which returns
350 a connected database handle.
351
352 In either case, if the final argument in your connect_info happens
353 to be a hashref, C<connect_info> will look there for several
354 connection-specific options:
355
356 =over 4
357
358 =item on_connect_do
359
360 This can be set to an arrayref of literal sql statements, which will
361 be executed immediately after making the connection to the database
362 every time we [re-]connect.
363
364 =item limit_dialect 
365
366 Sets the limit dialect. This is useful for JDBC-bridge among others
367 where the remote SQL-dialect cannot be determined by the name of the
368 driver alone.
369
370 =item quote_char
371
372 Specifies what characters to use to quote table and column names. If 
373 you use this you will want to specify L<name_sep> as well.
374
375 quote_char expects either a single character, in which case is it is placed
376 on either side of the table/column, or an arrayref of length 2 in which case the
377 table/column name is placed between the elements.
378
379 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
380 use C<quote_char =E<gt> [qw/[ ]/]>.
381
382 =item name_sep
383
384 This only needs to be used in conjunction with L<quote_char>, and is used to 
385 specify the charecter that seperates elements (schemas, tables, columns) from 
386 each other. In most cases this is simply a C<.>.
387
388 =back
389
390 These options can be mixed in with your other L<DBI> connection attributes,
391 or placed in a seperate hashref after all other normal L<DBI> connection
392 arguments.
393
394 Every time C<connect_info> is invoked, any previous settings for
395 these options will be cleared before setting the new ones, regardless of
396 whether any options are specified in the new C<connect_info>.
397
398 Examples:
399
400   # Simple SQLite connection
401   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
402
403   # Connect via subref
404   ->connect_info([ sub { DBI->connect(...) } ]);
405
406   # A bit more complicated
407   ->connect_info(
408     [
409       'dbi:Pg:dbname=foo',
410       'postgres',
411       'my_pg_password',
412       { AutoCommit => 0 },
413       { quote_char => q{"}, name_sep => q{.} },
414     ]
415   );
416
417   # Equivalent to the previous example
418   ->connect_info(
419     [
420       'dbi:Pg:dbname=foo',
421       'postgres',
422       'my_pg_password',
423       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
424     ]
425   );
426
427   # Subref + DBIC-specific connection options
428   ->connect_info(
429     [
430       sub { DBI->connect(...) },
431       {
432           quote_char => q{`},
433           name_sep => q{@},
434           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
435       },
436     ]
437   );
438
439 =head2 on_connect_do
440
441 This method is deprecated in favor of setting via L</connect_info>.
442
443 =head2 debug
444
445 Causes SQL trace information to be emitted on the C<debugobj> object.
446 (or C<STDERR> if C<debugobj> has not specifically been set).
447
448 This is the equivalent to setting L</DBIC_TRACE> in your
449 shell environment.
450
451 =head2 debugfh
452
453 Set or retrieve the filehandle used for trace/debug output.  This should be
454 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
455 set to be STDERR - although see information on the
456 L<DBIC_TRACE> environment variable.
457
458 =cut
459
460 sub debugfh {
461     my $self = shift;
462
463     if ($self->debugobj->can('debugfh')) {
464         return $self->debugobj->debugfh(@_);
465     }
466 }
467
468 =head2 debugobj
469
470 Sets or retrieves the object used for metric collection. Defaults to an instance
471 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
472 method of using a coderef as a callback.  See the aforementioned Statistics
473 class for more information.
474
475 =head2 debugcb
476
477 Sets a callback to be executed each time a statement is run; takes a sub
478 reference.  Callback is executed as $sub->($op, $info) where $op is
479 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
480
481 See L<debugobj> for a better way.
482
483 =cut
484
485 sub debugcb {
486     my $self = shift;
487
488     if ($self->debugobj->can('callback')) {
489         return $self->debugobj->callback(@_);
490     }
491 }
492
493 =head2 disconnect
494
495 Disconnect the L<DBI> handle, performing a rollback first if the
496 database is not in C<AutoCommit> mode.
497
498 =cut
499
500 sub disconnect {
501   my ($self) = @_;
502
503   if( $self->connected ) {
504     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
505     $self->_dbh->disconnect;
506     $self->_dbh(undef);
507   }
508 }
509
510 =head2 connected
511
512 Check if the L<DBI> handle is connected.  Returns true if the handle
513 is connected.
514
515 =cut
516
517 sub connected { my ($self) = @_;
518
519   if(my $dbh = $self->_dbh) {
520       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
521           return $self->_dbh(undef);
522       }
523       elsif($self->_conn_pid != $$) {
524           $self->_dbh->{InactiveDestroy} = 1;
525           return $self->_dbh(undef);
526       }
527       return ($dbh->FETCH('Active') && $dbh->ping);
528   }
529
530   return 0;
531 }
532
533 =head2 ensure_connected
534
535 Check whether the database handle is connected - if not then make a
536 connection.
537
538 =cut
539
540 sub ensure_connected {
541   my ($self) = @_;
542
543   unless ($self->connected) {
544     $self->_populate_dbh;
545   }
546 }
547
548 =head2 dbh
549
550 Returns the dbh - a data base handle of class L<DBI>.
551
552 =cut
553
554 sub dbh {
555   my ($self) = @_;
556
557   $self->ensure_connected;
558   return $self->_dbh;
559 }
560
561 sub _sql_maker_args {
562     my ($self) = @_;
563     
564     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
565 }
566
567 =head2 sql_maker
568
569 Returns a C<sql_maker> object - normally an object of class
570 C<DBIC::SQL::Abstract>.
571
572 =cut
573
574 sub sql_maker {
575   my ($self) = @_;
576   unless ($self->_sql_maker) {
577     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
578   }
579   return $self->_sql_maker;
580 }
581
582 sub connect_info {
583   my ($self, $info_arg) = @_;
584
585   if($info_arg) {
586     # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
587     #  the new set of options
588     $self->_sql_maker(undef);
589     $self->_sql_maker_opts({});
590
591     my $info = [ @$info_arg ]; # copy because we can alter it
592     my $last_info = $info->[-1];
593     if(ref $last_info eq 'HASH') {
594       if(my $on_connect_do = delete $last_info->{on_connect_do}) {
595         $self->on_connect_do($on_connect_do);
596       }
597       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
598         if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
599           $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
600         }
601       }
602
603       # Get rid of any trailing empty hashref
604       pop(@$info) if !keys %$last_info;
605     }
606
607     $self->_connect_info($info);
608   }
609
610   $self->_connect_info;
611 }
612
613 sub _populate_dbh {
614   my ($self) = @_;
615   my @info = @{$self->_connect_info || []};
616   $self->_dbh($self->_connect(@info));
617
618   if(ref $self eq 'DBIx::Class::Storage::DBI') {
619     my $driver = $self->_dbh->{Driver}->{Name};
620     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
621       bless $self, "DBIx::Class::Storage::DBI::${driver}";
622       $self->_rebless() if $self->can('_rebless');
623     }
624   }
625
626   # if on-connect sql statements are given execute them
627   foreach my $sql_statement (@{$self->on_connect_do || []}) {
628     $self->debugobj->query_start($sql_statement) if $self->debug();
629     $self->_dbh->do($sql_statement);
630     $self->debugobj->query_end($sql_statement) if $self->debug();
631   }
632
633   $self->_conn_pid($$);
634   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
635 }
636
637 sub _connect {
638   my ($self, @info) = @_;
639
640   $self->throw_exception("You failed to provide any connection info")
641       if !@info;
642
643   my ($old_connect_via, $dbh);
644
645   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
646       $old_connect_via = $DBI::connect_via;
647       $DBI::connect_via = 'connect';
648   }
649
650   eval {
651     $dbh = ref $info[0] eq 'CODE'
652          ? &{$info[0]}
653          : DBI->connect(@info);
654   };
655
656   $DBI::connect_via = $old_connect_via if $old_connect_via;
657
658   if (!$dbh || $@) {
659     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
660   }
661
662   $dbh;
663 }
664
665 =head2 txn_begin
666
667 Calls begin_work on the current dbh.
668
669 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
670 an entire code block to be executed transactionally.
671
672 =cut
673
674 sub txn_begin {
675   my $self = shift;
676   if ($self->{transaction_depth}++ == 0) {
677     my $dbh = $self->dbh;
678     if ($dbh->{AutoCommit}) {
679       $self->debugobj->txn_begin()
680         if ($self->debug);
681       $dbh->begin_work;
682     }
683   }
684 }
685
686 =head2 txn_commit
687
688 Issues a commit against the current dbh.
689
690 =cut
691
692 sub txn_commit {
693   my $self = shift;
694   my $dbh = $self->dbh;
695   if ($self->{transaction_depth} == 0) {
696     unless ($dbh->{AutoCommit}) {
697       $self->debugobj->txn_commit()
698         if ($self->debug);
699       $dbh->commit;
700     }
701   }
702   else {
703     if (--$self->{transaction_depth} == 0) {
704       $self->debugobj->txn_commit()
705         if ($self->debug);
706       $dbh->commit;
707     }
708   }
709 }
710
711 =head2 txn_rollback
712
713 Issues a rollback against the current dbh. A nested rollback will
714 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
715 which allows the rollback to propagate to the outermost transaction.
716
717 =cut
718
719 sub txn_rollback {
720   my $self = shift;
721
722   eval {
723     my $dbh = $self->dbh;
724     if ($self->{transaction_depth} == 0) {
725       unless ($dbh->{AutoCommit}) {
726         $self->debugobj->txn_rollback()
727           if ($self->debug);
728         $dbh->rollback;
729       }
730     }
731     else {
732       if (--$self->{transaction_depth} == 0) {
733         $self->debugobj->txn_rollback()
734           if ($self->debug);
735         $dbh->rollback;
736       }
737       else {
738         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
739       }
740     }
741   };
742
743   if ($@) {
744     my $error = $@;
745     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
746     $error =~ /$exception_class/ and $self->throw_exception($error);
747     $self->{transaction_depth} = 0;          # ensure that a failed rollback
748     $self->throw_exception($error);          # resets the transaction depth
749   }
750 }
751
752 sub _execute {
753   my ($self, $op, $extra_bind, $ident, @args) = @_;
754   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
755   unshift(@bind, @$extra_bind) if $extra_bind;
756   if ($self->debug) {
757       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
758       $self->debugobj->query_start($sql, @debug_bind);
759   }
760   my $sth = eval { $self->sth($sql,$op) };
761
762   if (!$sth || $@) {
763     $self->throw_exception(
764       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
765     );
766   }
767   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
768   my $rv;
769   if ($sth) {
770     my $time = time();
771     $rv = eval { $sth->execute(@bind) };
772
773     if ($@ || !$rv) {
774       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
775     }
776   } else {
777     $self->throw_exception("'$sql' did not generate a statement.");
778   }
779   if ($self->debug) {
780       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
781       $self->debugobj->query_end($sql, @debug_bind);
782   }
783   return (wantarray ? ($rv, $sth, @bind) : $rv);
784 }
785
786 sub insert {
787   my ($self, $ident, $to_insert) = @_;
788   $self->throw_exception(
789     "Couldn't insert ".join(', ',
790       map "$_ => $to_insert->{$_}", keys %$to_insert
791     )." into ${ident}"
792   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
793   return $to_insert;
794 }
795
796 sub update {
797   return shift->_execute('update' => [], @_);
798 }
799
800 sub delete {
801   return shift->_execute('delete' => [], @_);
802 }
803
804 sub _select {
805   my ($self, $ident, $select, $condition, $attrs) = @_;
806   my $order = $attrs->{order_by};
807   if (ref $condition eq 'SCALAR') {
808     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
809   }
810   if (exists $attrs->{group_by} || $attrs->{having}) {
811     $order = {
812       group_by => $attrs->{group_by},
813       having => $attrs->{having},
814       ($order ? (order_by => $order) : ())
815     };
816   }
817   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
818   if ($attrs->{software_limit} ||
819       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
820         $attrs->{software_limit} = 1;
821   } else {
822     $self->throw_exception("rows attribute must be positive if present")
823       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
824     push @args, $attrs->{rows}, $attrs->{offset};
825   }
826   return $self->_execute(@args);
827 }
828
829 =head2 select
830
831 =over 4
832
833 =item Arguments: $ident, $select, $condition, $attrs
834
835 =back
836
837 Handle a SQL select statement.
838
839 =cut
840
841 sub select {
842   my $self = shift;
843   my ($ident, $select, $condition, $attrs) = @_;
844   return $self->cursor->new($self, \@_, $attrs);
845 }
846
847 =head2 select_single
848
849 Performs a select, fetch and return of data - handles a single row
850 only.
851
852 =cut
853
854 # Need to call finish() to work round broken DBDs
855
856 sub select_single {
857   my $self = shift;
858   my ($rv, $sth, @bind) = $self->_select(@_);
859   my @row = $sth->fetchrow_array;
860   $sth->finish();
861   return @row;
862 }
863
864 =head2 sth
865
866 =over 4
867
868 =item Arguments: $sql
869
870 =back
871
872 Returns a L<DBI> sth (statement handle) for the supplied SQL.
873
874 =cut
875
876 sub sth {
877   my ($self, $sql) = @_;
878   # 3 is the if_active parameter which avoids active sth re-use
879   return $self->dbh->prepare_cached($sql, {}, 3);
880 }
881
882 =head2 columns_info_for
883
884 Returns database type info for a given table column.
885
886 =cut
887
888 sub columns_info_for {
889   my ($self, $table) = @_;
890
891   my $dbh = $self->dbh;
892
893   if ($dbh->can('column_info')) {
894     my %result;
895     local $dbh->{RaiseError} = 1;
896     local $dbh->{PrintError} = 0;
897     eval {
898       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
899       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
900       $sth->execute();
901
902       while ( my $info = $sth->fetchrow_hashref() ){
903         my %column_info;
904         $column_info{data_type}   = $info->{TYPE_NAME};
905         $column_info{size}      = $info->{COLUMN_SIZE};
906         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
907         $column_info{default_value} = $info->{COLUMN_DEF};
908         my $col_name = $info->{COLUMN_NAME};
909         $col_name =~ s/^\"(.*)\"$/$1/;
910
911         $result{$col_name} = \%column_info;
912       }
913     };
914     return \%result if !$@ && scalar keys %result;
915   }
916
917   my %result;
918   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
919   $sth->execute;
920   my @columns = @{$sth->{NAME_lc}};
921   for my $i ( 0 .. $#columns ){
922     my %column_info;
923     my $type_num = $sth->{TYPE}->[$i];
924     my $type_name;
925     if(defined $type_num && $dbh->can('type_info')) {
926       my $type_info = $dbh->type_info($type_num);
927       $type_name = $type_info->{TYPE_NAME} if $type_info;
928     }
929     $column_info{data_type} = $type_name ? $type_name : $type_num;
930     $column_info{size} = $sth->{PRECISION}->[$i];
931     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
932
933     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
934       $column_info{data_type} = $1;
935       $column_info{size}    = $2;
936     }
937
938     $result{$columns[$i]} = \%column_info;
939   }
940
941   return \%result;
942 }
943
944 =head2 last_insert_id
945
946 Return the row id of the last insert.
947
948 =cut
949
950 sub last_insert_id {
951   my ($self, $row) = @_;
952     
953   return $self->dbh->func('last_insert_rowid');
954
955 }
956
957 =head2 sqlt_type
958
959 Returns the database driver name.
960
961 =cut
962
963 sub sqlt_type { shift->dbh->{Driver}->{Name} }
964
965 =head2 create_ddl_dir (EXPERIMENTAL)
966
967 =over 4
968
969 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
970
971 =back
972
973 Creates a SQL file based on the Schema, for each of the specified
974 database types, in the given directory.
975
976 Note that this feature is currently EXPERIMENTAL and may not work correctly
977 across all databases, or fully handle complex relationships.
978
979 =cut
980
981 sub create_ddl_dir
982 {
983   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
984
985   if(!$dir || !-d $dir)
986   {
987     warn "No directory given, using ./\n";
988     $dir = "./";
989   }
990   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
991   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
992   $version ||= $schema->VERSION || '1.x';
993   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
994
995   eval "use SQL::Translator";
996   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
997
998   my $sqlt = SQL::Translator->new($sqltargs);
999   foreach my $db (@$databases)
1000   {
1001     $sqlt->reset();
1002     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1003 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1004     $sqlt->data($schema);
1005     $sqlt->producer($db);
1006
1007     my $file;
1008     my $filename = $schema->ddl_filename($db, $dir, $version);
1009     if(-e $filename)
1010     {
1011       $self->throw_exception("$filename already exists, skipping $db");
1012       next;
1013     }
1014     open($file, ">$filename") 
1015       or $self->throw_exception("Can't open $filename for writing ($!)");
1016     my $output = $sqlt->translate;
1017 #use Data::Dumper;
1018 #    print join(":", keys %{$schema->source_registrations});
1019 #    print Dumper($sqlt->schema);
1020     if(!$output)
1021     {
1022       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1023       next;
1024     }
1025     print $file $output;
1026     close($file);
1027   }
1028
1029 }
1030
1031 =head2 deployment_statements
1032
1033 =over 4
1034
1035 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1036
1037 =back
1038
1039 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1040 The database driver name is given by C<$type>, though the value from
1041 L</sqlt_type> is used if it is not specified.
1042
1043 C<$directory> is used to return statements from files in a previously created
1044 L</create_ddl_dir> directory and is optional. The filenames are constructed
1045 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1046
1047 If no C<$directory> is specified then the statements are constructed on the
1048 fly using L<SQL::Translator> and C<$version> is ignored.
1049
1050 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1051
1052 =cut
1053
1054 sub deployment_statements {
1055   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1056   # Need to be connected to get the correct sqlt_type
1057   $self->ensure_connected() unless $type;
1058   $type ||= $self->sqlt_type;
1059   $version ||= $schema->VERSION || '1.x';
1060   $dir ||= './';
1061   eval "use SQL::Translator";
1062   if(!$@)
1063   {
1064     eval "use SQL::Translator::Parser::DBIx::Class;";
1065     $self->throw_exception($@) if $@;
1066     eval "use SQL::Translator::Producer::${type};";
1067     $self->throw_exception($@) if $@;
1068     my $tr = SQL::Translator->new(%$sqltargs);
1069     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1070     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1071   }
1072
1073   my $filename = $schema->ddl_filename($type, $dir, $version);
1074   if(!-f $filename)
1075   {
1076 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1077       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1078       return;
1079   }
1080   my $file;
1081   open($file, "<$filename") 
1082       or $self->throw_exception("Can't open $filename ($!)");
1083   my @rows = <$file>;
1084   close($file);
1085
1086   return join('', @rows);
1087   
1088 }
1089
1090 =head2 deploy
1091
1092 Sends the appropriate statements to create or modify tables to the
1093 db. This would normally be called through
1094 L<DBIx::Class::Schema/deploy>.
1095
1096 =cut
1097
1098 sub deploy {
1099   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1100   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1101     for ( split(";\n", $statement)) {
1102       next if($_ =~ /^--/);
1103       next if(!$_);
1104 #      next if($_ =~ /^DROP/m);
1105       next if($_ =~ /^BEGIN TRANSACTION/m);
1106       next if($_ =~ /^COMMIT/m);
1107       next if $_ =~ /^\s+$/; # skip whitespace only
1108       $self->debugobj->query_start($_) if $self->debug;
1109       $self->dbh->do($_) or warn "SQL was:\n $_";
1110       $self->debugobj->query_end($_) if $self->debug;
1111     }
1112   }
1113 }
1114
1115 =head2 datetime_parser
1116
1117 Returns the datetime parser class
1118
1119 =cut
1120
1121 sub datetime_parser {
1122   my $self = shift;
1123   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1124 }
1125
1126 =head2 datetime_parser_type
1127
1128 Defines (returns) the datetime parser class - currently hardwired to
1129 L<DateTime::Format::MySQL>
1130
1131 =cut
1132
1133 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1134
1135 =head2 build_datetime_parser
1136
1137 See L</datetime_parser>
1138
1139 =cut
1140
1141 sub build_datetime_parser {
1142   my $self = shift;
1143   my $type = $self->datetime_parser_type(@_);
1144   eval "use ${type}";
1145   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1146   return $type;
1147 }
1148
1149 sub DESTROY {
1150   # NOTE: if there's a merge conflict here when -current is pushed
1151   #  back to trunk, take -current's version and ignore this trunk one :)
1152   my $self = shift;
1153
1154   if($self->_dbh && $self->_conn_pid != $$) {
1155     $self->_dbh->{InactiveDestroy} = 1;
1156   }
1157
1158   $self->_dbh(undef);
1159 }
1160
1161 1;
1162
1163 =head1 SQL METHODS
1164
1165 The module defines a set of methods within the DBIC::SQL::Abstract
1166 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1167 SQL query functions.
1168
1169 The following methods are extended:-
1170
1171 =over 4
1172
1173 =item delete
1174
1175 =item insert
1176
1177 =item select
1178
1179 =item update
1180
1181 =item limit_dialect
1182
1183 See L</connect_info> for details.
1184 For setting, this method is deprecated in favor of L</connect_info>.
1185
1186 =item quote_char
1187
1188 See L</connect_info> for details.
1189 For setting, this method is deprecated in favor of L</connect_info>.
1190
1191 =item name_sep
1192
1193 See L</connect_info> for details.
1194 For setting, this method is deprecated in favor of L</connect_info>.
1195
1196 =back
1197
1198 =head1 ENVIRONMENT VARIABLES
1199
1200 =head2 DBIC_TRACE
1201
1202 If C<DBIC_TRACE> is set then SQL trace information
1203 is produced (as when the L<debug> method is set).
1204
1205 If the value is of the form C<1=/path/name> then the trace output is
1206 written to the file C</path/name>.
1207
1208 This environment variable is checked when the storage object is first
1209 created (when you call connect on your schema).  So, run-time changes 
1210 to this environment variable will not take effect unless you also 
1211 re-connect on your schema.
1212
1213 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1214
1215 Old name for DBIC_TRACE
1216
1217 =head1 AUTHORS
1218
1219 Matt S. Trout <mst@shadowcatsystems.co.uk>
1220
1221 Andy Grundman <andy@hybridized.org>
1222
1223 =head1 LICENSE
1224
1225 You may distribute this code under the same terms as Perl itself.
1226
1227 =cut