further refinements to storage_exceptions
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 # While we're at it, this should make LIMIT queries more efficient,
33 #  without digging into things too deeply
34 sub _find_syntax {
35   my ($self, $syntax) = @_;
36   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
37 }
38
39 sub select {
40   my ($self, $table, $fields, $where, $order, @rest) = @_;
41   $table = $self->_quote($table) unless ref($table);
42   local $self->{rownum_hack_count} = 1
43     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
44   @rest = (-1) unless defined $rest[0];
45   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
46     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
47   local $self->{having_bind} = [];
48   my ($sql, @ret) = $self->SUPER::select(
49     $table, $self->_recurse_fields($fields), $where, $order, @rest
50   );
51   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
52 }
53
54 sub insert {
55   my $self = shift;
56   my $table = shift;
57   $table = $self->_quote($table) unless ref($table);
58   $self->SUPER::insert($table, @_);
59 }
60
61 sub update {
62   my $self = shift;
63   my $table = shift;
64   $table = $self->_quote($table) unless ref($table);
65   $self->SUPER::update($table, @_);
66 }
67
68 sub delete {
69   my $self = shift;
70   my $table = shift;
71   $table = $self->_quote($table) unless ref($table);
72   $self->SUPER::delete($table, @_);
73 }
74
75 sub _emulate_limit {
76   my $self = shift;
77   if ($_[3] == -1) {
78     return $_[1].$self->_order_by($_[2]);
79   } else {
80     return $self->SUPER::_emulate_limit(@_);
81   }
82 }
83
84 sub _recurse_fields {
85   my ($self, $fields) = @_;
86   my $ref = ref $fields;
87   return $self->_quote($fields) unless $ref;
88   return $$fields if $ref eq 'SCALAR';
89
90   if ($ref eq 'ARRAY') {
91     return join(', ', map {
92       $self->_recurse_fields($_)
93       .(exists $self->{rownum_hack_count}
94          ? ' AS col'.$self->{rownum_hack_count}++
95          : '')
96      } @$fields);
97   } elsif ($ref eq 'HASH') {
98     foreach my $func (keys %$fields) {
99       return $self->_sqlcase($func)
100         .'( '.$self->_recurse_fields($fields->{$func}).' )';
101     }
102   }
103 }
104
105 sub _order_by {
106   my $self = shift;
107   my $ret = '';
108   my @extra;
109   if (ref $_[0] eq 'HASH') {
110     if (defined $_[0]->{group_by}) {
111       $ret = $self->_sqlcase(' group by ')
112                .$self->_recurse_fields($_[0]->{group_by});
113     }
114     if (defined $_[0]->{having}) {
115       my $frag;
116       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
117       push(@{$self->{having_bind}}, @extra);
118       $ret .= $self->_sqlcase(' having ').$frag;
119     }
120     if (defined $_[0]->{order_by}) {
121       $ret .= $self->_order_by($_[0]->{order_by});
122     }
123   } elsif (ref $_[0] eq 'SCALAR') {
124     $ret = $self->_sqlcase(' order by ').${ $_[0] };
125   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
126     my @order = @{+shift};
127     $ret = $self->_sqlcase(' order by ')
128           .join(', ', map {
129                         my $r = $self->_order_by($_, @_);
130                         $r =~ s/^ ?ORDER BY //i;
131                         $r;
132                       } @order);
133   } else {
134     $ret = $self->SUPER::_order_by(@_);
135   }
136   return $ret;
137 }
138
139 sub _order_directions {
140   my ($self, $order) = @_;
141   $order = $order->{order_by} if ref $order eq 'HASH';
142   return $self->SUPER::_order_directions($order);
143 }
144
145 sub _table {
146   my ($self, $from) = @_;
147   if (ref $from eq 'ARRAY') {
148     return $self->_recurse_from(@$from);
149   } elsif (ref $from eq 'HASH') {
150     return $self->_make_as($from);
151   } else {
152     return $from; # would love to quote here but _table ends up getting called
153                   # twice during an ->select without a limit clause due to
154                   # the way S::A::Limit->select works. should maybe consider
155                   # bypassing this and doing S::A::select($self, ...) in
156                   # our select method above. meantime, quoting shims have
157                   # been added to select/insert/update/delete here
158   }
159 }
160
161 sub _recurse_from {
162   my ($self, $from, @join) = @_;
163   my @sqlf;
164   push(@sqlf, $self->_make_as($from));
165   foreach my $j (@join) {
166     my ($to, $on) = @$j;
167
168     # check whether a join type exists
169     my $join_clause = '';
170     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
171     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
172       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
173     } else {
174       $join_clause = ' JOIN ';
175     }
176     push(@sqlf, $join_clause);
177
178     if (ref $to eq 'ARRAY') {
179       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
180     } else {
181       push(@sqlf, $self->_make_as($to));
182     }
183     push(@sqlf, ' ON ', $self->_join_condition($on));
184   }
185   return join('', @sqlf);
186 }
187
188 sub _make_as {
189   my ($self, $from) = @_;
190   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
191                      reverse each %{$self->_skip_options($from)});
192 }
193
194 sub _skip_options {
195   my ($self, $hash) = @_;
196   my $clean_hash = {};
197   $clean_hash->{$_} = $hash->{$_}
198     for grep {!/^-/} keys %$hash;
199   return $clean_hash;
200 }
201
202 sub _join_condition {
203   my ($self, $cond) = @_;
204   if (ref $cond eq 'HASH') {
205     my %j;
206     for (keys %$cond) {
207       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
208     };
209     return $self->_recurse_where(\%j);
210   } elsif (ref $cond eq 'ARRAY') {
211     return join(' OR ', map { $self->_join_condition($_) } @$cond);
212   } else {
213     die "Can't handle this yet!";
214   }
215 }
216
217 sub _quote {
218   my ($self, $label) = @_;
219   return '' unless defined $label;
220   return "*" if $label eq '*';
221   return $label unless $self->{quote_char};
222   if(ref $self->{quote_char} eq "ARRAY"){
223     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
224       if !defined $self->{name_sep};
225     my $sep = $self->{name_sep};
226     return join($self->{name_sep},
227         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
228        split(/\Q$sep\E/,$label));
229   }
230   return $self->SUPER::_quote($label);
231 }
232
233 sub limit_dialect {
234     my $self = shift;
235     $self->{limit_dialect} = shift if @_;
236     return $self->{limit_dialect};
237 }
238
239 sub quote_char {
240     my $self = shift;
241     $self->{quote_char} = shift if @_;
242     return $self->{quote_char};
243 }
244
245 sub name_sep {
246     my $self = shift;
247     $self->{name_sep} = shift if @_;
248     return $self->{name_sep};
249 }
250
251 } # End of BEGIN block
252
253 use base qw/DBIx::Class/;
254
255 __PACKAGE__->load_components(qw/AccessorGroup/);
256
257 __PACKAGE__->mk_group_accessors('simple' =>
258   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
259      debug debugobj cursor on_connect_do transaction_depth/);
260
261 =head1 NAME
262
263 DBIx::Class::Storage::DBI - DBI storage handler
264
265 =head1 SYNOPSIS
266
267 =head1 DESCRIPTION
268
269 This class represents the connection to the database
270
271 =head1 METHODS
272
273 =head2 new
274
275 =cut
276
277 sub new {
278   my $new = bless({}, ref $_[0] || $_[0]);
279   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
280   $new->transaction_depth(0);
281
282   $new->debugobj(new DBIx::Class::Storage::Statistics());
283
284   my $fh;
285
286   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
287                   || $ENV{DBIC_TRACE};
288
289   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
290     $fh = IO::File->new($1, 'w')
291       or $new->throw_exception("Cannot open trace file $1");
292   } else {
293     $fh = IO::File->new('>&STDERR');
294   }
295   $new->debugfh($fh);
296   $new->debug(1) if $debug_env;
297   $new->_sql_maker_opts({});
298   return $new;
299 }
300
301 =head2 throw_exception
302
303 Throws an exception - croaks.
304
305 =cut
306
307 sub throw_exception {
308   my ($self, $msg) = @_;
309   croak($msg);
310 }
311
312 =head2 connect_info
313
314 The arguments of C<connect_info> are always a single array reference.
315
316 This is normally accessed via L<DBIx::Class::Schema/connection>, which
317 encapsulates its argument list in an arrayref before calling
318 C<connect_info> here.
319
320 The arrayref can either contain the same set of arguments one would
321 normally pass to L<DBI/connect>, or a lone code reference which returns
322 a connected database handle.
323
324 In either case, if the final argument in your connect_info happens
325 to be a hashref, C<connect_info> will look there for several
326 connection-specific options:
327
328 =over 4
329
330 =item on_connect_do
331
332 This can be set to an arrayref of literal sql statements, which will
333 be executed immediately after making the connection to the database
334 every time we [re-]connect.
335
336 =item limit_dialect 
337
338 Sets the limit dialect. This is useful for JDBC-bridge among others
339 where the remote SQL-dialect cannot be determined by the name of the
340 driver alone.
341
342 =item quote_char
343
344 Specifies what characters to use to quote table and column names. If 
345 you use this you will want to specify L<name_sep> as well.
346
347 quote_char expects either a single character, in which case is it is placed
348 on either side of the table/column, or an arrayref of length 2 in which case the
349 table/column name is placed between the elements.
350
351 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
352 use C<quote_char =E<gt> [qw/[ ]/]>.
353
354 =item name_sep
355
356 This only needs to be used in conjunction with L<quote_char>, and is used to 
357 specify the charecter that seperates elements (schemas, tables, columns) from 
358 each other. In most cases this is simply a C<.>.
359
360 =back
361
362 These options can be mixed in with your other L<DBI> connection attributes,
363 or placed in a seperate hashref after all other normal L<DBI> connection
364 arguments.
365
366 Every time C<connect_info> is invoked, any previous settings for
367 these options will be cleared before setting the new ones, regardless of
368 whether any options are specified in the new C<connect_info>.
369
370 Examples:
371
372   # Simple SQLite connection
373   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
374
375   # Connect via subref
376   ->connect_info([ sub { DBI->connect(...) } ]);
377
378   # A bit more complicated
379   ->connect_info(
380     [
381       'dbi:Pg:dbname=foo',
382       'postgres',
383       'my_pg_password',
384       { AutoCommit => 0 },
385       { quote_char => q{"}, name_sep => q{.} },
386     ]
387   );
388
389   # Equivalent to the previous example
390   ->connect_info(
391     [
392       'dbi:Pg:dbname=foo',
393       'postgres',
394       'my_pg_password',
395       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
396     ]
397   );
398
399   # Subref + DBIC-specific connection options
400   ->connect_info(
401     [
402       sub { DBI->connect(...) },
403       {
404           quote_char => q{`},
405           name_sep => q{@},
406           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
407       },
408     ]
409   );
410
411 =head2 on_connect_do
412
413 This method is deprecated in favor of setting via L</connect_info>.
414
415 =head2 debug
416
417 Causes SQL trace information to be emitted on the C<debugobj> object.
418 (or C<STDERR> if C<debugobj> has not specifically been set).
419
420 This is the equivalent to setting L</DBIC_TRACE> in your
421 shell environment.
422
423 =head2 debugfh
424
425 Set or retrieve the filehandle used for trace/debug output.  This should be
426 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
427 set to be STDERR - although see information on the
428 L<DBIC_TRACE> environment variable.
429
430 =cut
431
432 sub debugfh {
433     my $self = shift;
434
435     if ($self->debugobj->can('debugfh')) {
436         return $self->debugobj->debugfh(@_);
437     }
438 }
439
440 =head2 debugobj
441
442 Sets or retrieves the object used for metric collection. Defaults to an instance
443 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
444 method of using a coderef as a callback.  See the aforementioned Statistics
445 class for more information.
446
447 =head2 debugcb
448
449 Sets a callback to be executed each time a statement is run; takes a sub
450 reference.  Callback is executed as $sub->($op, $info) where $op is
451 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
452
453 See L<debugobj> for a better way.
454
455 =cut
456
457 sub debugcb {
458     my $self = shift;
459
460     if ($self->debugobj->can('callback')) {
461         return $self->debugobj->callback(@_);
462     }
463 }
464
465 =head2 dbh_do
466
467 Execute the given subref with the underlying database handle as its
468 first argument, using the new exception-based connection management.
469 Example:
470
471   my @stuff = $schema->storage->dbh_do(
472     sub {
473       shift->selectrow_array("SELECT * FROM foo")
474     }
475   );
476
477 =cut
478
479 sub dbh_do {
480   my ($self, $todo) = @_;
481
482   my @result;
483   my $want_array = wantarray;
484
485   eval {
486     $self->_verify_pid if $self->_dbh;
487     $self->_populate_dbh if !$self->_dbh;
488     my $dbh = $self->_dbh;
489     local $dbh->{RaiseError} = 1;
490     local $dbh->{PrintError} = 0;
491     if($want_array) {
492         @result = $todo->($dbh);
493     }
494     elsif(defined $want_array) {
495         $result[0] = $todo->($dbh);
496     }
497     else {
498         $todo->($dbh);
499     }
500   };
501
502   if($@) {
503     my $exception = $@;
504     $self->connected
505       ? $self->throw_exception($exception)
506       : $self->_populate_dbh;
507
508     my $dbh = $self->_dbh;
509     local $dbh->{RaiseError} = 1;
510     local $dbh->{PrintError} = 0;
511     return $todo->($dbh);
512   }
513
514   return $want_array ? @result : $result[0];
515 }
516
517 =head2 disconnect
518
519 Disconnect the L<DBI> handle, performing a rollback first if the
520 database is not in C<AutoCommit> mode.
521
522 =cut
523
524 sub disconnect {
525   my ($self) = @_;
526
527   if( $self->connected ) {
528     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
529     $self->_dbh->disconnect;
530     $self->_dbh(undef);
531   }
532 }
533
534 =head2 connected
535
536 Check if the L<DBI> handle is connected.  Returns true if the handle
537 is connected.
538
539 =cut
540
541 sub connected {
542   my ($self) = @_;
543
544   if(my $dbh = $self->_dbh) {
545       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
546           return $self->_dbh(undef);
547       }
548       else {
549           $self->_verify_pid;
550       }
551       return ($dbh->FETCH('Active') && $dbh->ping);
552   }
553
554   return 0;
555 }
556
557 # handle pid changes correctly
558 #  NOTE: assumes $self->_dbh is a valid $dbh
559 sub _verify_pid {
560   my ($self) = @_;
561
562   return if $self->_conn_pid == $$;
563
564   $self->_dbh->{InactiveDestroy} = 1;
565   $self->_dbh(undef);
566
567   return;
568 }
569
570 =head2 ensure_connected
571
572 Check whether the database handle is connected - if not then make a
573 connection.
574
575 =cut
576
577 sub ensure_connected {
578   my ($self) = @_;
579
580   unless ($self->connected) {
581     $self->_populate_dbh;
582   }
583 }
584
585 =head2 dbh
586
587 Returns the dbh - a data base handle of class L<DBI>.
588
589 =cut
590
591 sub dbh {
592   my ($self) = @_;
593
594   $self->ensure_connected;
595   return $self->_dbh;
596 }
597
598 sub _sql_maker_args {
599     my ($self) = @_;
600     
601     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
602 }
603
604 =head2 sql_maker
605
606 Returns a C<sql_maker> object - normally an object of class
607 C<DBIC::SQL::Abstract>.
608
609 =cut
610
611 sub sql_maker {
612   my ($self) = @_;
613   unless ($self->_sql_maker) {
614     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
615   }
616   return $self->_sql_maker;
617 }
618
619 sub connect_info {
620   my ($self, $info_arg) = @_;
621
622   return $self->_connect_info if !$info_arg;
623
624   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
625   #  the new set of options
626   $self->_sql_maker(undef);
627   $self->_sql_maker_opts({});
628
629   my $info = [ @$info_arg ]; # copy because we can alter it
630   my $last_info = $info->[-1];
631   if(ref $last_info eq 'HASH') {
632     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
633       $self->on_connect_do($on_connect_do);
634     }
635     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
636       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
637         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
638       }
639     }
640
641     # Get rid of any trailing empty hashref
642     pop(@$info) if !keys %$last_info;
643   }
644
645   $self->_connect_info($info);
646 }
647
648 sub _populate_dbh {
649   my ($self) = @_;
650   my @info = @{$self->_connect_info || []};
651   $self->_dbh($self->_connect(@info));
652
653   if(ref $self eq 'DBIx::Class::Storage::DBI') {
654     my $driver = $self->_dbh->{Driver}->{Name};
655     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
656       bless $self, "DBIx::Class::Storage::DBI::${driver}";
657       $self->_rebless() if $self->can('_rebless');
658     }
659   }
660
661   # if on-connect sql statements are given execute them
662   foreach my $sql_statement (@{$self->on_connect_do || []}) {
663     $self->debugobj->query_start($sql_statement) if $self->debug();
664     $self->_dbh->do($sql_statement);
665     $self->debugobj->query_end($sql_statement) if $self->debug();
666   }
667
668   $self->_conn_pid($$);
669   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
670 }
671
672 sub _connect {
673   my ($self, @info) = @_;
674
675   $self->throw_exception("You failed to provide any connection info")
676       if !@info;
677
678   my ($old_connect_via, $dbh);
679
680   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
681       $old_connect_via = $DBI::connect_via;
682       $DBI::connect_via = 'connect';
683   }
684
685   eval {
686     $dbh = ref $info[0] eq 'CODE'
687          ? &{$info[0]}
688          : DBI->connect(@info);
689   };
690
691   $DBI::connect_via = $old_connect_via if $old_connect_via;
692
693   if (!$dbh || $@) {
694     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
695   }
696
697   $dbh;
698 }
699
700 =head2 txn_begin
701
702 Calls begin_work on the current dbh.
703
704 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
705 an entire code block to be executed transactionally.
706
707 =cut
708
709 sub txn_begin {
710   my $self = shift;
711   if ($self->{transaction_depth}++ == 0) {
712     $self->dbh_do(sub {
713       my $dbh = shift;
714       if ($dbh->{AutoCommit}) {
715         $self->debugobj->txn_begin()
716           if ($self->debug);
717         $dbh->begin_work;
718       }
719     });
720   }
721 }
722
723 =head2 txn_commit
724
725 Issues a commit against the current dbh.
726
727 =cut
728
729 sub txn_commit {
730   my $self = shift;
731   $self->dbh_do(sub {
732     my $dbh = shift;
733     if ($self->{transaction_depth} == 0) {
734       unless ($dbh->{AutoCommit}) {
735         $self->debugobj->txn_commit()
736           if ($self->debug);
737         $dbh->commit;
738       }
739     }
740     else {
741       if (--$self->{transaction_depth} == 0) {
742         $self->debugobj->txn_commit()
743           if ($self->debug);
744         $dbh->commit;
745       }
746     }
747   });
748 }
749
750 =head2 txn_rollback
751
752 Issues a rollback against the current dbh. A nested rollback will
753 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
754 which allows the rollback to propagate to the outermost transaction.
755
756 =cut
757
758 sub txn_rollback {
759   my $self = shift;
760
761   eval {
762     $self->dbh_do(sub {
763       my $dbh = shift;
764       if ($self->{transaction_depth} == 0) {
765         unless ($dbh->{AutoCommit}) {
766           $self->debugobj->txn_rollback()
767             if ($self->debug);
768           $dbh->rollback;
769         }
770       }
771       else {
772         if (--$self->{transaction_depth} == 0) {
773           $self->debugobj->txn_rollback()
774             if ($self->debug);
775           $dbh->rollback;
776         }
777         else {
778           die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
779         }
780       }
781     });
782   };
783
784   if ($@) {
785     my $error = $@;
786     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
787     $error =~ /$exception_class/ and $self->throw_exception($error);
788     $self->{transaction_depth} = 0;          # ensure that a failed rollback
789     $self->throw_exception($error);          # resets the transaction depth
790   }
791 }
792
793 sub _execute {
794   my ($self, $op, $extra_bind, $ident, @args) = @_;
795   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
796   unshift(@bind, @$extra_bind) if $extra_bind;
797   if ($self->debug) {
798       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
799       $self->debugobj->query_start($sql, @debug_bind);
800   }
801   my $sth = eval { $self->sth($sql,$op) };
802
803   if (!$sth || $@) {
804     $self->throw_exception(
805       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
806     );
807   }
808   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
809   my $rv;
810   if ($sth) {
811     my $time = time();
812     $rv = eval { $sth->execute(@bind) };
813
814     if ($@ || !$rv) {
815       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
816     }
817   } else {
818     $self->throw_exception("'$sql' did not generate a statement.");
819   }
820   if ($self->debug) {
821       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
822       $self->debugobj->query_end($sql, @debug_bind);
823   }
824   return (wantarray ? ($rv, $sth, @bind) : $rv);
825 }
826
827 sub insert {
828   my ($self, $ident, $to_insert) = @_;
829   $self->throw_exception(
830     "Couldn't insert ".join(', ',
831       map "$_ => $to_insert->{$_}", keys %$to_insert
832     )." into ${ident}"
833   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
834   return $to_insert;
835 }
836
837 sub update {
838   return shift->_execute('update' => [], @_);
839 }
840
841 sub delete {
842   return shift->_execute('delete' => [], @_);
843 }
844
845 sub _select {
846   my ($self, $ident, $select, $condition, $attrs) = @_;
847   my $order = $attrs->{order_by};
848   if (ref $condition eq 'SCALAR') {
849     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
850   }
851   if (exists $attrs->{group_by} || $attrs->{having}) {
852     $order = {
853       group_by => $attrs->{group_by},
854       having => $attrs->{having},
855       ($order ? (order_by => $order) : ())
856     };
857   }
858   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
859   if ($attrs->{software_limit} ||
860       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
861         $attrs->{software_limit} = 1;
862   } else {
863     $self->throw_exception("rows attribute must be positive if present")
864       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
865     push @args, $attrs->{rows}, $attrs->{offset};
866   }
867   return $self->_execute(@args);
868 }
869
870 =head2 select
871
872 Handle a SQL select statement.
873
874 =cut
875
876 sub select {
877   my $self = shift;
878   my ($ident, $select, $condition, $attrs) = @_;
879   return $self->cursor->new($self, \@_, $attrs);
880 }
881
882 =head2 select_single
883
884 Performs a select, fetch and return of data - handles a single row
885 only.
886
887 =cut
888
889 # Need to call finish() to work round broken DBDs
890
891 sub select_single {
892   my $self = shift;
893   my ($rv, $sth, @bind) = $self->_select(@_);
894   my @row = $sth->fetchrow_array;
895   $sth->finish();
896   return @row;
897 }
898
899 =head2 sth
900
901 Returns a L<DBI> sth (statement handle) for the supplied SQL.
902
903 =cut
904
905 sub sth {
906   my ($self, $sql) = @_;
907   # 3 is the if_active parameter which avoids active sth re-use
908   return $self->dbh_do(sub { shift->prepare_cached($sql, {}, 3) });
909 }
910
911 =head2 columns_info_for
912
913 Returns database type info for a given table columns.
914
915 =cut
916
917 sub columns_info_for {
918   my ($self, $table) = @_;
919
920   $self->dbh_do(sub {
921     my $dbh = shift;
922
923     if ($dbh->can('column_info')) {
924       my %result;
925       eval {
926         my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
927         my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
928         $sth->execute();
929         while ( my $info = $sth->fetchrow_hashref() ){
930           my %column_info;
931           $column_info{data_type}   = $info->{TYPE_NAME};
932           $column_info{size}      = $info->{COLUMN_SIZE};
933           $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
934           $column_info{default_value} = $info->{COLUMN_DEF};
935           my $col_name = $info->{COLUMN_NAME};
936           $col_name =~ s/^\"(.*)\"$/$1/;
937
938           $result{$col_name} = \%column_info;
939         }
940       };
941       return \%result if !$@;
942     }
943
944     my %result;
945     my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
946     $sth->execute;
947     my @columns = @{$sth->{NAME_lc}};
948     for my $i ( 0 .. $#columns ){
949       my %column_info;
950       my $type_num = $sth->{TYPE}->[$i];
951       my $type_name;
952       if(defined $type_num && $dbh->can('type_info')) {
953         my $type_info = $dbh->type_info($type_num);
954         $type_name = $type_info->{TYPE_NAME} if $type_info;
955       }
956       $column_info{data_type} = $type_name ? $type_name : $type_num;
957       $column_info{size} = $sth->{PRECISION}->[$i];
958       $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
959
960       if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
961         $column_info{data_type} = $1;
962         $column_info{size}    = $2;
963       }
964
965       $result{$columns[$i]} = \%column_info;
966     }
967
968     return \%result;
969   });
970 }
971
972 =head2 last_insert_id
973
974 Return the row id of the last insert.
975
976 =cut
977
978 sub last_insert_id {
979   my ($self, $row) = @_;
980     
981   $self->dbh_do(sub { shift->func('last_insert_rowid') });
982 }
983
984 =head2 sqlt_type
985
986 Returns the database driver name.
987
988 =cut
989
990 sub sqlt_type { shift->dbh_do(sub { shift->{Driver}->{Name} }) }
991
992 =head2 create_ddl_dir (EXPERIMENTAL)
993
994 =over 4
995
996 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
997
998 =back
999
1000 Creates an SQL file based on the Schema, for each of the specified
1001 database types, in the given directory.
1002
1003 Note that this feature is currently EXPERIMENTAL and may not work correctly
1004 across all databases, or fully handle complex relationships.
1005
1006 =cut
1007
1008 sub create_ddl_dir
1009 {
1010   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1011
1012   if(!$dir || !-d $dir)
1013   {
1014     warn "No directory given, using ./\n";
1015     $dir = "./";
1016   }
1017   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1018   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1019   $version ||= $schema->VERSION || '1.x';
1020   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1021
1022   eval "use SQL::Translator";
1023   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1024
1025   my $sqlt = SQL::Translator->new($sqltargs);
1026   foreach my $db (@$databases)
1027   {
1028     $sqlt->reset();
1029     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1030 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1031     $sqlt->data($schema);
1032     $sqlt->producer($db);
1033
1034     my $file;
1035     my $filename = $schema->ddl_filename($db, $dir, $version);
1036     if(-e $filename)
1037     {
1038       $self->throw_exception("$filename already exists, skipping $db");
1039       next;
1040     }
1041     open($file, ">$filename") 
1042       or $self->throw_exception("Can't open $filename for writing ($!)");
1043     my $output = $sqlt->translate;
1044 #use Data::Dumper;
1045 #    print join(":", keys %{$schema->source_registrations});
1046 #    print Dumper($sqlt->schema);
1047     if(!$output)
1048     {
1049       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1050       next;
1051     }
1052     print $file $output;
1053     close($file);
1054   }
1055
1056 }
1057
1058 =head2 deployment_statements
1059
1060 Create the statements for L</deploy> and
1061 L<DBIx::Class::Schema/deploy>.
1062
1063 =cut
1064
1065 sub deployment_statements {
1066   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1067   # Need to be connected to get the correct sqlt_type
1068   $self->ensure_connected() unless $type;
1069   $type ||= $self->sqlt_type;
1070   $version ||= $schema->VERSION || '1.x';
1071   $dir ||= './';
1072   eval "use SQL::Translator";
1073   if(!$@)
1074   {
1075     eval "use SQL::Translator::Parser::DBIx::Class;";
1076     $self->throw_exception($@) if $@;
1077     eval "use SQL::Translator::Producer::${type};";
1078     $self->throw_exception($@) if $@;
1079     my $tr = SQL::Translator->new(%$sqltargs);
1080     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1081     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1082   }
1083
1084   my $filename = $schema->ddl_filename($type, $dir, $version);
1085   if(!-f $filename)
1086   {
1087 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1088       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1089       return;
1090   }
1091   my $file;
1092   open($file, "<$filename") 
1093       or $self->throw_exception("Can't open $filename ($!)");
1094   my @rows = <$file>;
1095   close($file);
1096
1097   return join('', @rows);
1098   
1099 }
1100
1101 =head2 deploy
1102
1103 Sends the appropriate statements to create or modify tables to the
1104 db. This would normally be called through
1105 L<DBIx::Class::Schema/deploy>.
1106
1107 =cut
1108
1109 sub deploy {
1110   my ($self, $schema, $type, $sqltargs) = @_;
1111   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1112     for ( split(";\n", $statement)) {
1113       next if($_ =~ /^--/);
1114       next if(!$_);
1115 #      next if($_ =~ /^DROP/m);
1116       next if($_ =~ /^BEGIN TRANSACTION/m);
1117       next if($_ =~ /^COMMIT/m);
1118       next if $_ =~ /^\s+$/; # skip whitespace only
1119       $self->debugobj->query_start($_) if $self->debug;
1120       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1121       $self->debugobj->query_end($_) if $self->debug;
1122     }
1123   }
1124 }
1125
1126 =head2 datetime_parser
1127
1128 Returns the datetime parser class
1129
1130 =cut
1131
1132 sub datetime_parser {
1133   my $self = shift;
1134   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1135 }
1136
1137 =head2 datetime_parser_type
1138
1139 Defines (returns) the datetime parser class - currently hardwired to
1140 L<DateTime::Format::MySQL>
1141
1142 =cut
1143
1144 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1145
1146 =head2 build_datetime_parser
1147
1148 See L</datetime_parser>
1149
1150 =cut
1151
1152 sub build_datetime_parser {
1153   my $self = shift;
1154   my $type = $self->datetime_parser_type(@_);
1155   eval "use ${type}";
1156   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1157   return $type;
1158 }
1159
1160 sub DESTROY { shift->_dbh(undef) }
1161
1162 1;
1163
1164 =head1 SQL METHODS
1165
1166 The module defines a set of methods within the DBIC::SQL::Abstract
1167 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1168 SQL query functions.
1169
1170 The following methods are extended:-
1171
1172 =over 4
1173
1174 =item delete
1175
1176 =item insert
1177
1178 =item select
1179
1180 =item update
1181
1182 =item limit_dialect
1183
1184 See L</connect_info> for details.
1185 For setting, this method is deprecated in favor of L</connect_info>.
1186
1187 =item quote_char
1188
1189 See L</connect_info> for details.
1190 For setting, this method is deprecated in favor of L</connect_info>.
1191
1192 =item name_sep
1193
1194 See L</connect_info> for details.
1195 For setting, this method is deprecated in favor of L</connect_info>.
1196
1197 =back
1198
1199 =head1 ENVIRONMENT VARIABLES
1200
1201 =head2 DBIC_TRACE
1202
1203 If C<DBIC_TRACE> is set then SQL trace information
1204 is produced (as when the L<debug> method is set).
1205
1206 If the value is of the form C<1=/path/name> then the trace output is
1207 written to the file C</path/name>.
1208
1209 This environment variable is checked when the storage object is first
1210 created (when you call connect on your schema).  So, run-time changes 
1211 to this environment variable will not take effect unless you also 
1212 re-connect on your schema.
1213
1214 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1215
1216 Old name for DBIC_TRACE
1217
1218 =head1 AUTHORS
1219
1220 Matt S. Trout <mst@shadowcatsystems.co.uk>
1221
1222 Andy Grundman <andy@hybridized.org>
1223
1224 =head1 LICENSE
1225
1226 You may distribute this code under the same terms as Perl itself.
1227
1228 =cut
1229