Assume scalar refs need fetching in PK::Auto (to allow \'DEFAULT'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 sub _RowNumberOver {
33   my ($self, $sql, $order, $rows, $offset ) = @_;
34
35   $offset += 1;
36   my $last = $rows + $offset;
37   my ( $order_by ) = $self->_order_by( $order );
38
39   $sql = <<"";
40 SELECT * FROM
41 (
42    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
43       $sql
44       $order_by
45    ) Q1
46 ) Q2
47 WHERE ROW_NUM BETWEEN $offset AND $last
48
49   return $sql;
50 }
51
52
53 # While we're at it, this should make LIMIT queries more efficient,
54 #  without digging into things too deeply
55 sub _find_syntax {
56   my ($self, $syntax) = @_;
57   my $dbhname = eval { $syntax->{Driver}->{Name}} || '';
58   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
59     return 'RowNumberOver';
60   }
61
62   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
63 }
64
65 sub select {
66   my ($self, $table, $fields, $where, $order, @rest) = @_;
67   $table = $self->_quote($table) unless ref($table);
68   local $self->{rownum_hack_count} = 1
69     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
70   @rest = (-1) unless defined $rest[0];
71   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
72     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
73   local $self->{having_bind} = [];
74   my ($sql, @ret) = $self->SUPER::select(
75     $table, $self->_recurse_fields($fields), $where, $order, @rest
76   );
77   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
78 }
79
80 sub insert {
81   my $self = shift;
82   my $table = shift;
83   $table = $self->_quote($table) unless ref($table);
84   $self->SUPER::insert($table, @_);
85 }
86
87 sub update {
88   my $self = shift;
89   my $table = shift;
90   $table = $self->_quote($table) unless ref($table);
91   $self->SUPER::update($table, @_);
92 }
93
94 sub delete {
95   my $self = shift;
96   my $table = shift;
97   $table = $self->_quote($table) unless ref($table);
98   $self->SUPER::delete($table, @_);
99 }
100
101 sub _emulate_limit {
102   my $self = shift;
103   if ($_[3] == -1) {
104     return $_[1].$self->_order_by($_[2]);
105   } else {
106     return $self->SUPER::_emulate_limit(@_);
107   }
108 }
109
110 sub _recurse_fields {
111   my ($self, $fields) = @_;
112   my $ref = ref $fields;
113   return $self->_quote($fields) unless $ref;
114   return $$fields if $ref eq 'SCALAR';
115
116   if ($ref eq 'ARRAY') {
117     return join(', ', map {
118       $self->_recurse_fields($_)
119       .(exists $self->{rownum_hack_count}
120          ? ' AS col'.$self->{rownum_hack_count}++
121          : '')
122      } @$fields);
123   } elsif ($ref eq 'HASH') {
124     foreach my $func (keys %$fields) {
125       return $self->_sqlcase($func)
126         .'( '.$self->_recurse_fields($fields->{$func}).' )';
127     }
128   }
129 }
130
131 sub _order_by {
132   my $self = shift;
133   my $ret = '';
134   my @extra;
135   if (ref $_[0] eq 'HASH') {
136     if (defined $_[0]->{group_by}) {
137       $ret = $self->_sqlcase(' group by ')
138                .$self->_recurse_fields($_[0]->{group_by});
139     }
140     if (defined $_[0]->{having}) {
141       my $frag;
142       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
143       push(@{$self->{having_bind}}, @extra);
144       $ret .= $self->_sqlcase(' having ').$frag;
145     }
146     if (defined $_[0]->{order_by}) {
147       $ret .= $self->_order_by($_[0]->{order_by});
148     }
149   } elsif (ref $_[0] eq 'SCALAR') {
150     $ret = $self->_sqlcase(' order by ').${ $_[0] };
151   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
152     my @order = @{+shift};
153     $ret = $self->_sqlcase(' order by ')
154           .join(', ', map {
155                         my $r = $self->_order_by($_, @_);
156                         $r =~ s/^ ?ORDER BY //i;
157                         $r;
158                       } @order);
159   } else {
160     $ret = $self->SUPER::_order_by(@_);
161   }
162   return $ret;
163 }
164
165 sub _order_directions {
166   my ($self, $order) = @_;
167   $order = $order->{order_by} if ref $order eq 'HASH';
168   return $self->SUPER::_order_directions($order);
169 }
170
171 sub _table {
172   my ($self, $from) = @_;
173   if (ref $from eq 'ARRAY') {
174     return $self->_recurse_from(@$from);
175   } elsif (ref $from eq 'HASH') {
176     return $self->_make_as($from);
177   } else {
178     return $from; # would love to quote here but _table ends up getting called
179                   # twice during an ->select without a limit clause due to
180                   # the way S::A::Limit->select works. should maybe consider
181                   # bypassing this and doing S::A::select($self, ...) in
182                   # our select method above. meantime, quoting shims have
183                   # been added to select/insert/update/delete here
184   }
185 }
186
187 sub _recurse_from {
188   my ($self, $from, @join) = @_;
189   my @sqlf;
190   push(@sqlf, $self->_make_as($from));
191   foreach my $j (@join) {
192     my ($to, $on) = @$j;
193
194     # check whether a join type exists
195     my $join_clause = '';
196     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
197     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
198       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
199     } else {
200       $join_clause = ' JOIN ';
201     }
202     push(@sqlf, $join_clause);
203
204     if (ref $to eq 'ARRAY') {
205       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
206     } else {
207       push(@sqlf, $self->_make_as($to));
208     }
209     push(@sqlf, ' ON ', $self->_join_condition($on));
210   }
211   return join('', @sqlf);
212 }
213
214 sub _make_as {
215   my ($self, $from) = @_;
216   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
217                      reverse each %{$self->_skip_options($from)});
218 }
219
220 sub _skip_options {
221   my ($self, $hash) = @_;
222   my $clean_hash = {};
223   $clean_hash->{$_} = $hash->{$_}
224     for grep {!/^-/} keys %$hash;
225   return $clean_hash;
226 }
227
228 sub _join_condition {
229   my ($self, $cond) = @_;
230   if (ref $cond eq 'HASH') {
231     my %j;
232     for (keys %$cond) {
233       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
234     };
235     return $self->_recurse_where(\%j);
236   } elsif (ref $cond eq 'ARRAY') {
237     return join(' OR ', map { $self->_join_condition($_) } @$cond);
238   } else {
239     die "Can't handle this yet!";
240   }
241 }
242
243 sub _quote {
244   my ($self, $label) = @_;
245   return '' unless defined $label;
246   return "*" if $label eq '*';
247   return $label unless $self->{quote_char};
248   if(ref $self->{quote_char} eq "ARRAY"){
249     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
250       if !defined $self->{name_sep};
251     my $sep = $self->{name_sep};
252     return join($self->{name_sep},
253         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
254        split(/\Q$sep\E/,$label));
255   }
256   return $self->SUPER::_quote($label);
257 }
258
259 sub limit_dialect {
260     my $self = shift;
261     $self->{limit_dialect} = shift if @_;
262     return $self->{limit_dialect};
263 }
264
265 sub quote_char {
266     my $self = shift;
267     $self->{quote_char} = shift if @_;
268     return $self->{quote_char};
269 }
270
271 sub name_sep {
272     my $self = shift;
273     $self->{name_sep} = shift if @_;
274     return $self->{name_sep};
275 }
276
277 } # End of BEGIN block
278
279 use base qw/DBIx::Class/;
280
281 __PACKAGE__->load_components(qw/AccessorGroup/);
282
283 __PACKAGE__->mk_group_accessors('simple' =>
284   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
285      debug debugobj cursor on_connect_do transaction_depth/);
286
287 =head1 NAME
288
289 DBIx::Class::Storage::DBI - DBI storage handler
290
291 =head1 SYNOPSIS
292
293 =head1 DESCRIPTION
294
295 This class represents the connection to the database
296
297 =head1 METHODS
298
299 =head2 new
300
301 =cut
302
303 sub new {
304   my $new = {};
305   bless $new, (ref $_[0] || $_[0]);
306
307   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
308   $new->transaction_depth(0);
309
310   $new->debugobj(new DBIx::Class::Storage::Statistics());
311
312   my $fh;
313
314   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
315                   || $ENV{DBIC_TRACE};
316
317   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
318     $fh = IO::File->new($1, 'w')
319       or $new->throw_exception("Cannot open trace file $1");
320   } else {
321     $fh = IO::File->new('>&STDERR');
322   }
323   $new->debugfh($fh);
324   $new->debug(1) if $debug_env;
325   $new->_sql_maker_opts({});
326   return $new;
327 }
328
329 =head2 throw_exception
330
331 Throws an exception - croaks.
332
333 =cut
334
335 sub throw_exception {
336   my ($self, $msg) = @_;
337   croak($msg);
338 }
339
340 =head2 connect_info
341
342 The arguments of C<connect_info> are always a single array reference.
343
344 This is normally accessed via L<DBIx::Class::Schema/connection>, which
345 encapsulates its argument list in an arrayref before calling
346 C<connect_info> here.
347
348 The arrayref can either contain the same set of arguments one would
349 normally pass to L<DBI/connect>, or a lone code reference which returns
350 a connected database handle.
351
352 In either case, if the final argument in your connect_info happens
353 to be a hashref, C<connect_info> will look there for several
354 connection-specific options:
355
356 =over 4
357
358 =item on_connect_do
359
360 This can be set to an arrayref of literal sql statements, which will
361 be executed immediately after making the connection to the database
362 every time we [re-]connect.
363
364 =item limit_dialect 
365
366 Sets the limit dialect. This is useful for JDBC-bridge among others
367 where the remote SQL-dialect cannot be determined by the name of the
368 driver alone.
369
370 =item quote_char
371
372 Specifies what characters to use to quote table and column names. If 
373 you use this you will want to specify L<name_sep> as well.
374
375 quote_char expects either a single character, in which case is it is placed
376 on either side of the table/column, or an arrayref of length 2 in which case the
377 table/column name is placed between the elements.
378
379 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
380 use C<quote_char =E<gt> [qw/[ ]/]>.
381
382 =item name_sep
383
384 This only needs to be used in conjunction with L<quote_char>, and is used to 
385 specify the charecter that seperates elements (schemas, tables, columns) from 
386 each other. In most cases this is simply a C<.>.
387
388 =back
389
390 These options can be mixed in with your other L<DBI> connection attributes,
391 or placed in a seperate hashref after all other normal L<DBI> connection
392 arguments.
393
394 Every time C<connect_info> is invoked, any previous settings for
395 these options will be cleared before setting the new ones, regardless of
396 whether any options are specified in the new C<connect_info>.
397
398 Examples:
399
400   # Simple SQLite connection
401   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
402
403   # Connect via subref
404   ->connect_info([ sub { DBI->connect(...) } ]);
405
406   # A bit more complicated
407   ->connect_info(
408     [
409       'dbi:Pg:dbname=foo',
410       'postgres',
411       'my_pg_password',
412       { AutoCommit => 0 },
413       { quote_char => q{"}, name_sep => q{.} },
414     ]
415   );
416
417   # Equivalent to the previous example
418   ->connect_info(
419     [
420       'dbi:Pg:dbname=foo',
421       'postgres',
422       'my_pg_password',
423       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
424     ]
425   );
426
427   # Subref + DBIC-specific connection options
428   ->connect_info(
429     [
430       sub { DBI->connect(...) },
431       {
432           quote_char => q{`},
433           name_sep => q{@},
434           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
435       },
436     ]
437   );
438
439 =head2 on_connect_do
440
441 This method is deprecated in favor of setting via L</connect_info>.
442
443 =head2 debug
444
445 Causes SQL trace information to be emitted on the C<debugobj> object.
446 (or C<STDERR> if C<debugobj> has not specifically been set).
447
448 This is the equivalent to setting L</DBIC_TRACE> in your
449 shell environment.
450
451 =head2 debugfh
452
453 Set or retrieve the filehandle used for trace/debug output.  This should be
454 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
455 set to be STDERR - although see information on the
456 L<DBIC_TRACE> environment variable.
457
458 =cut
459
460 sub debugfh {
461     my $self = shift;
462
463     if ($self->debugobj->can('debugfh')) {
464         return $self->debugobj->debugfh(@_);
465     }
466 }
467
468 =head2 debugobj
469
470 Sets or retrieves the object used for metric collection. Defaults to an instance
471 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
472 method of using a coderef as a callback.  See the aforementioned Statistics
473 class for more information.
474
475 =head2 debugcb
476
477 Sets a callback to be executed each time a statement is run; takes a sub
478 reference.  Callback is executed as $sub->($op, $info) where $op is
479 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
480
481 See L<debugobj> for a better way.
482
483 =cut
484
485 sub debugcb {
486     my $self = shift;
487
488     if ($self->debugobj->can('callback')) {
489         return $self->debugobj->callback(@_);
490     }
491 }
492
493 =head2 disconnect
494
495 Disconnect the L<DBI> handle, performing a rollback first if the
496 database is not in C<AutoCommit> mode.
497
498 =cut
499
500 sub disconnect {
501   my ($self) = @_;
502
503   if( $self->connected ) {
504     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
505     $self->_dbh->disconnect;
506     $self->_dbh(undef);
507   }
508 }
509
510 =head2 connected
511
512 Check if the L<DBI> handle is connected.  Returns true if the handle
513 is connected.
514
515 =cut
516
517 sub connected { my ($self) = @_;
518
519   if(my $dbh = $self->_dbh) {
520       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
521           return $self->_dbh(undef);
522       }
523       elsif($self->_conn_pid != $$) {
524           $self->_dbh->{InactiveDestroy} = 1;
525           return $self->_dbh(undef);
526       }
527       return ($dbh->FETCH('Active') && $dbh->ping);
528   }
529
530   return 0;
531 }
532
533 =head2 ensure_connected
534
535 Check whether the database handle is connected - if not then make a
536 connection.
537
538 =cut
539
540 sub ensure_connected {
541   my ($self) = @_;
542
543   unless ($self->connected) {
544     $self->_populate_dbh;
545   }
546 }
547
548 =head2 dbh
549
550 Returns the dbh - a data base handle of class L<DBI>.
551
552 =cut
553
554 sub dbh {
555   my ($self) = @_;
556
557   $self->ensure_connected;
558   return $self->_dbh;
559 }
560
561 sub _sql_maker_args {
562     my ($self) = @_;
563     
564     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
565 }
566
567 =head2 sql_maker
568
569 Returns a C<sql_maker> object - normally an object of class
570 C<DBIC::SQL::Abstract>.
571
572 =cut
573
574 sub sql_maker {
575   my ($self) = @_;
576   unless ($self->_sql_maker) {
577     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
578   }
579   return $self->_sql_maker;
580 }
581
582 sub connect_info {
583   my ($self, $info_arg) = @_;
584
585   if($info_arg) {
586     # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
587     #  the new set of options
588     $self->_sql_maker(undef);
589     $self->_sql_maker_opts({});
590
591     my $info = [ @$info_arg ]; # copy because we can alter it
592     my $last_info = $info->[-1];
593     if(ref $last_info eq 'HASH') {
594       if(my $on_connect_do = delete $last_info->{on_connect_do}) {
595         $self->on_connect_do($on_connect_do);
596       }
597       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
598         if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
599           $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
600         }
601       }
602
603       # Get rid of any trailing empty hashref
604       pop(@$info) if !keys %$last_info;
605     }
606
607     $self->_connect_info($info);
608   }
609
610   $self->_connect_info;
611 }
612
613 sub _populate_dbh {
614   my ($self) = @_;
615   my @info = @{$self->_connect_info || []};
616   $self->_dbh($self->_connect(@info));
617
618   if(ref $self eq 'DBIx::Class::Storage::DBI') {
619     my $driver = $self->_dbh->{Driver}->{Name};
620     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
621       bless $self, "DBIx::Class::Storage::DBI::${driver}";
622       $self->_rebless() if $self->can('_rebless');
623     }
624   }
625
626   # if on-connect sql statements are given execute them
627   foreach my $sql_statement (@{$self->on_connect_do || []}) {
628     $self->debugobj->query_start($sql_statement) if $self->debug();
629     $self->_dbh->do($sql_statement);
630     $self->debugobj->query_end($sql_statement) if $self->debug();
631   }
632
633   $self->_conn_pid($$);
634   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
635 }
636
637 sub _connect {
638   my ($self, @info) = @_;
639
640   $self->throw_exception("You failed to provide any connection info")
641       if !@info;
642
643   my ($old_connect_via, $dbh);
644
645   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
646       $old_connect_via = $DBI::connect_via;
647       $DBI::connect_via = 'connect';
648   }
649
650   eval {
651     $dbh = ref $info[0] eq 'CODE'
652          ? &{$info[0]}
653          : DBI->connect(@info);
654   };
655
656   $DBI::connect_via = $old_connect_via if $old_connect_via;
657
658   if (!$dbh || $@) {
659     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
660   }
661
662   $dbh;
663 }
664
665 =head2 txn_begin
666
667 Calls begin_work on the current dbh.
668
669 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
670 an entire code block to be executed transactionally.
671
672 =cut
673
674 sub txn_begin {
675   my $self = shift;
676   if ($self->{transaction_depth}++ == 0) {
677     my $dbh = $self->dbh;
678     if ($dbh->{AutoCommit}) {
679       $self->debugobj->txn_begin()
680         if ($self->debug);
681       $dbh->begin_work;
682     }
683   }
684 }
685
686 =head2 txn_commit
687
688 Issues a commit against the current dbh.
689
690 =cut
691
692 sub txn_commit {
693   my $self = shift;
694   my $dbh = $self->dbh;
695   if ($self->{transaction_depth} == 0) {
696     unless ($dbh->{AutoCommit}) {
697       $self->debugobj->txn_commit()
698         if ($self->debug);
699       $dbh->commit;
700     }
701   }
702   else {
703     if (--$self->{transaction_depth} == 0) {
704       $self->debugobj->txn_commit()
705         if ($self->debug);
706       $dbh->commit;
707     }
708   }
709 }
710
711 =head2 txn_rollback
712
713 Issues a rollback against the current dbh. A nested rollback will
714 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
715 which allows the rollback to propagate to the outermost transaction.
716
717 =cut
718
719 sub txn_rollback {
720   my $self = shift;
721
722   eval {
723     my $dbh = $self->dbh;
724     if ($self->{transaction_depth} == 0) {
725       unless ($dbh->{AutoCommit}) {
726         $self->debugobj->txn_rollback()
727           if ($self->debug);
728         $dbh->rollback;
729       }
730     }
731     else {
732       if (--$self->{transaction_depth} == 0) {
733         $self->debugobj->txn_rollback()
734           if ($self->debug);
735         $dbh->rollback;
736       }
737       else {
738         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
739       }
740     }
741   };
742
743   if ($@) {
744     my $error = $@;
745     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
746     $error =~ /$exception_class/ and $self->throw_exception($error);
747     $self->{transaction_depth} = 0;          # ensure that a failed rollback
748     $self->throw_exception($error);          # resets the transaction depth
749   }
750 }
751
752 sub _execute {
753   my ($self, $op, $extra_bind, $ident, @args) = @_;
754   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
755   unshift(@bind, @$extra_bind) if $extra_bind;
756   if ($self->debug) {
757       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
758       $self->debugobj->query_start($sql, @debug_bind);
759   }
760   my $sth = eval { $self->sth($sql,$op) };
761
762   if (!$sth || $@) {
763     $self->throw_exception(
764       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
765     );
766   }
767   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
768   my $rv;
769   if ($sth) {
770     my $time = time();
771     $rv = eval { $sth->execute(@bind) };
772
773     if ($@ || !$rv) {
774       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
775     }
776   } else {
777     $self->throw_exception("'$sql' did not generate a statement.");
778   }
779   if ($self->debug) {
780       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
781       $self->debugobj->query_end($sql, @debug_bind);
782   }
783   return (wantarray ? ($rv, $sth, @bind) : $rv);
784 }
785
786 sub insert {
787   my ($self, $ident, $to_insert) = @_;
788   $self->throw_exception(
789     "Couldn't insert ".join(', ',
790       map "$_ => $to_insert->{$_}", keys %$to_insert
791     )." into ${ident}"
792   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
793   return $to_insert;
794 }
795
796 sub update {
797   return shift->_execute('update' => [], @_);
798 }
799
800 sub delete {
801   return shift->_execute('delete' => [], @_);
802 }
803
804 sub _select {
805   my ($self, $ident, $select, $condition, $attrs) = @_;
806   my $order = $attrs->{order_by};
807   if (ref $condition eq 'SCALAR') {
808     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
809   }
810   if (exists $attrs->{group_by} || $attrs->{having}) {
811     $order = {
812       group_by => $attrs->{group_by},
813       having => $attrs->{having},
814       ($order ? (order_by => $order) : ())
815     };
816   }
817   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
818   if ($attrs->{software_limit} ||
819       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
820         $attrs->{software_limit} = 1;
821   } else {
822     $self->throw_exception("rows attribute must be positive if present")
823       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
824     push @args, $attrs->{rows}, $attrs->{offset};
825   }
826   return $self->_execute(@args);
827 }
828
829 =head2 select
830
831 Handle a SQL select statement.
832
833 =cut
834
835 sub select {
836   my $self = shift;
837   my ($ident, $select, $condition, $attrs) = @_;
838   return $self->cursor->new($self, \@_, $attrs);
839 }
840
841 =head2 select_single
842
843 Performs a select, fetch and return of data - handles a single row
844 only.
845
846 =cut
847
848 # Need to call finish() to work round broken DBDs
849
850 sub select_single {
851   my $self = shift;
852   my ($rv, $sth, @bind) = $self->_select(@_);
853   my @row = $sth->fetchrow_array;
854   $sth->finish();
855   return @row;
856 }
857
858 =head2 sth
859
860 Returns a L<DBI> sth (statement handle) for the supplied SQL.
861
862 =cut
863
864 sub sth {
865   my ($self, $sql) = @_;
866   # 3 is the if_active parameter which avoids active sth re-use
867   return $self->dbh->prepare_cached($sql, {}, 3);
868 }
869
870 =head2 columns_info_for
871
872 Returns database type info for a given table columns.
873
874 =cut
875
876 sub columns_info_for {
877   my ($self, $table) = @_;
878
879   my $dbh = $self->dbh;
880
881   if ($dbh->can('column_info')) {
882     my %result;
883     local $dbh->{RaiseError} = 1;
884     local $dbh->{PrintError} = 0;
885     eval {
886       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
887       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
888       $sth->execute();
889
890       while ( my $info = $sth->fetchrow_hashref() ){
891         my %column_info;
892         $column_info{data_type}   = $info->{TYPE_NAME};
893         $column_info{size}      = $info->{COLUMN_SIZE};
894         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
895         $column_info{default_value} = $info->{COLUMN_DEF};
896         my $col_name = $info->{COLUMN_NAME};
897         $col_name =~ s/^\"(.*)\"$/$1/;
898
899         $result{$col_name} = \%column_info;
900       }
901     };
902     return \%result if !$@ && scalar keys %result;
903   }
904
905   my %result;
906   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
907   $sth->execute;
908   my @columns = @{$sth->{NAME_lc}};
909   for my $i ( 0 .. $#columns ){
910     my %column_info;
911     my $type_num = $sth->{TYPE}->[$i];
912     my $type_name;
913     if(defined $type_num && $dbh->can('type_info')) {
914       my $type_info = $dbh->type_info($type_num);
915       $type_name = $type_info->{TYPE_NAME} if $type_info;
916     }
917     $column_info{data_type} = $type_name ? $type_name : $type_num;
918     $column_info{size} = $sth->{PRECISION}->[$i];
919     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
920
921     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
922       $column_info{data_type} = $1;
923       $column_info{size}    = $2;
924     }
925
926     $result{$columns[$i]} = \%column_info;
927   }
928
929   return \%result;
930 }
931
932 =head2 last_insert_id
933
934 Return the row id of the last insert.
935
936 =cut
937
938 sub last_insert_id {
939   my ($self, $row) = @_;
940     
941   return $self->dbh->func('last_insert_rowid');
942
943 }
944
945 =head2 sqlt_type
946
947 Returns the database driver name.
948
949 =cut
950
951 sub sqlt_type { shift->dbh->{Driver}->{Name} }
952
953 =head2 create_ddl_dir (EXPERIMENTAL)
954
955 =over 4
956
957 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
958
959 =back
960
961 Creates an SQL file based on the Schema, for each of the specified
962 database types, in the given directory.
963
964 Note that this feature is currently EXPERIMENTAL and may not work correctly
965 across all databases, or fully handle complex relationships.
966
967 =cut
968
969 sub create_ddl_dir
970 {
971   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
972
973   if(!$dir || !-d $dir)
974   {
975     warn "No directory given, using ./\n";
976     $dir = "./";
977   }
978   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
979   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
980   $version ||= $schema->VERSION || '1.x';
981   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
982
983   eval "use SQL::Translator";
984   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
985
986   my $sqlt = SQL::Translator->new($sqltargs);
987   foreach my $db (@$databases)
988   {
989     $sqlt->reset();
990     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
991 #    $sqlt->parser_args({'DBIx::Class' => $schema);
992     $sqlt->data($schema);
993     $sqlt->producer($db);
994
995     my $file;
996     my $filename = $schema->ddl_filename($db, $dir, $version);
997     if(-e $filename)
998     {
999       $self->throw_exception("$filename already exists, skipping $db");
1000       next;
1001     }
1002     open($file, ">$filename") 
1003       or $self->throw_exception("Can't open $filename for writing ($!)");
1004     my $output = $sqlt->translate;
1005 #use Data::Dumper;
1006 #    print join(":", keys %{$schema->source_registrations});
1007 #    print Dumper($sqlt->schema);
1008     if(!$output)
1009     {
1010       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1011       next;
1012     }
1013     print $file $output;
1014     close($file);
1015   }
1016
1017 }
1018
1019 =head2 deployment_statements
1020
1021 Create the statements for L</deploy> and
1022 L<DBIx::Class::Schema/deploy>.
1023
1024 =cut
1025
1026 sub deployment_statements {
1027   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1028   # Need to be connected to get the correct sqlt_type
1029   $self->ensure_connected() unless $type;
1030   $type ||= $self->sqlt_type;
1031   $version ||= $schema->VERSION || '1.x';
1032   $dir ||= './';
1033   eval "use SQL::Translator";
1034   if(!$@)
1035   {
1036     eval "use SQL::Translator::Parser::DBIx::Class;";
1037     $self->throw_exception($@) if $@;
1038     eval "use SQL::Translator::Producer::${type};";
1039     $self->throw_exception($@) if $@;
1040     my $tr = SQL::Translator->new(%$sqltargs);
1041     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1042     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1043   }
1044
1045   my $filename = $schema->ddl_filename($type, $dir, $version);
1046   if(!-f $filename)
1047   {
1048 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1049       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1050       return;
1051   }
1052   my $file;
1053   open($file, "<$filename") 
1054       or $self->throw_exception("Can't open $filename ($!)");
1055   my @rows = <$file>;
1056   close($file);
1057
1058   return join('', @rows);
1059   
1060 }
1061
1062 =head2 deploy
1063
1064 Sends the appropriate statements to create or modify tables to the
1065 db. This would normally be called through
1066 L<DBIx::Class::Schema/deploy>.
1067
1068 =cut
1069
1070 sub deploy {
1071   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1072   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1073     for ( split(";\n", $statement)) {
1074       next if($_ =~ /^--/);
1075       next if(!$_);
1076 #      next if($_ =~ /^DROP/m);
1077       next if($_ =~ /^BEGIN TRANSACTION/m);
1078       next if($_ =~ /^COMMIT/m);
1079       next if $_ =~ /^\s+$/; # skip whitespace only
1080       $self->debugobj->query_start($_) if $self->debug;
1081       $self->dbh->do($_) or warn "SQL was:\n $_";
1082       $self->debugobj->query_end($_) if $self->debug;
1083     }
1084   }
1085 }
1086
1087 =head2 datetime_parser
1088
1089 Returns the datetime parser class
1090
1091 =cut
1092
1093 sub datetime_parser {
1094   my $self = shift;
1095   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1096 }
1097
1098 =head2 datetime_parser_type
1099
1100 Defines (returns) the datetime parser class - currently hardwired to
1101 L<DateTime::Format::MySQL>
1102
1103 =cut
1104
1105 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1106
1107 =head2 build_datetime_parser
1108
1109 See L</datetime_parser>
1110
1111 =cut
1112
1113 sub build_datetime_parser {
1114   my $self = shift;
1115   my $type = $self->datetime_parser_type(@_);
1116   eval "use ${type}";
1117   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1118   return $type;
1119 }
1120
1121 sub DESTROY {
1122   # NOTE: if there's a merge conflict here when -current is pushed
1123   #  back to trunk, take -current's version and ignore this trunk one :)
1124   my $self = shift;
1125
1126   if($self->_dbh && $self->_conn_pid != $$) {
1127     $self->_dbh->{InactiveDestroy} = 1;
1128   }
1129
1130   $self->_dbh(undef);
1131 }
1132
1133 1;
1134
1135 =head1 SQL METHODS
1136
1137 The module defines a set of methods within the DBIC::SQL::Abstract
1138 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1139 SQL query functions.
1140
1141 The following methods are extended:-
1142
1143 =over 4
1144
1145 =item delete
1146
1147 =item insert
1148
1149 =item select
1150
1151 =item update
1152
1153 =item limit_dialect
1154
1155 See L</connect_info> for details.
1156 For setting, this method is deprecated in favor of L</connect_info>.
1157
1158 =item quote_char
1159
1160 See L</connect_info> for details.
1161 For setting, this method is deprecated in favor of L</connect_info>.
1162
1163 =item name_sep
1164
1165 See L</connect_info> for details.
1166 For setting, this method is deprecated in favor of L</connect_info>.
1167
1168 =back
1169
1170 =head1 ENVIRONMENT VARIABLES
1171
1172 =head2 DBIC_TRACE
1173
1174 If C<DBIC_TRACE> is set then SQL trace information
1175 is produced (as when the L<debug> method is set).
1176
1177 If the value is of the form C<1=/path/name> then the trace output is
1178 written to the file C</path/name>.
1179
1180 This environment variable is checked when the storage object is first
1181 created (when you call connect on your schema).  So, run-time changes 
1182 to this environment variable will not take effect unless you also 
1183 re-connect on your schema.
1184
1185 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1186
1187 Old name for DBIC_TRACE
1188
1189 =head1 AUTHORS
1190
1191 Matt S. Trout <mst@shadowcatsystems.co.uk>
1192
1193 Andy Grundman <andy@hybridized.org>
1194
1195 =head1 LICENSE
1196
1197 You may distribute this code under the same terms as Perl itself.
1198
1199 =cut
1200