Merge 'storage_exceptions' into 'DBIx-Class-current'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 # While we're at it, this should make LIMIT queries more efficient,
33 #  without digging into things too deeply
34 sub _find_syntax {
35   my ($self, $syntax) = @_;
36   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
37 }
38
39 sub select {
40   my ($self, $table, $fields, $where, $order, @rest) = @_;
41   $table = $self->_quote($table) unless ref($table);
42   local $self->{rownum_hack_count} = 1
43     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
44   @rest = (-1) unless defined $rest[0];
45   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
46     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
47   local $self->{having_bind} = [];
48   my ($sql, @ret) = $self->SUPER::select(
49     $table, $self->_recurse_fields($fields), $where, $order, @rest
50   );
51   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
52 }
53
54 sub insert {
55   my $self = shift;
56   my $table = shift;
57   $table = $self->_quote($table) unless ref($table);
58   $self->SUPER::insert($table, @_);
59 }
60
61 sub update {
62   my $self = shift;
63   my $table = shift;
64   $table = $self->_quote($table) unless ref($table);
65   $self->SUPER::update($table, @_);
66 }
67
68 sub delete {
69   my $self = shift;
70   my $table = shift;
71   $table = $self->_quote($table) unless ref($table);
72   $self->SUPER::delete($table, @_);
73 }
74
75 sub _emulate_limit {
76   my $self = shift;
77   if ($_[3] == -1) {
78     return $_[1].$self->_order_by($_[2]);
79   } else {
80     return $self->SUPER::_emulate_limit(@_);
81   }
82 }
83
84 sub _recurse_fields {
85   my ($self, $fields) = @_;
86   my $ref = ref $fields;
87   return $self->_quote($fields) unless $ref;
88   return $$fields if $ref eq 'SCALAR';
89
90   if ($ref eq 'ARRAY') {
91     return join(', ', map {
92       $self->_recurse_fields($_)
93       .(exists $self->{rownum_hack_count}
94          ? ' AS col'.$self->{rownum_hack_count}++
95          : '')
96      } @$fields);
97   } elsif ($ref eq 'HASH') {
98     foreach my $func (keys %$fields) {
99       return $self->_sqlcase($func)
100         .'( '.$self->_recurse_fields($fields->{$func}).' )';
101     }
102   }
103 }
104
105 sub _order_by {
106   my $self = shift;
107   my $ret = '';
108   my @extra;
109   if (ref $_[0] eq 'HASH') {
110     if (defined $_[0]->{group_by}) {
111       $ret = $self->_sqlcase(' group by ')
112                .$self->_recurse_fields($_[0]->{group_by});
113     }
114     if (defined $_[0]->{having}) {
115       my $frag;
116       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
117       push(@{$self->{having_bind}}, @extra);
118       $ret .= $self->_sqlcase(' having ').$frag;
119     }
120     if (defined $_[0]->{order_by}) {
121       $ret .= $self->_order_by($_[0]->{order_by});
122     }
123   } elsif (ref $_[0] eq 'SCALAR') {
124     $ret = $self->_sqlcase(' order by ').${ $_[0] };
125   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
126     my @order = @{+shift};
127     $ret = $self->_sqlcase(' order by ')
128           .join(', ', map {
129                         my $r = $self->_order_by($_, @_);
130                         $r =~ s/^ ?ORDER BY //i;
131                         $r;
132                       } @order);
133   } else {
134     $ret = $self->SUPER::_order_by(@_);
135   }
136   return $ret;
137 }
138
139 sub _order_directions {
140   my ($self, $order) = @_;
141   $order = $order->{order_by} if ref $order eq 'HASH';
142   return $self->SUPER::_order_directions($order);
143 }
144
145 sub _table {
146   my ($self, $from) = @_;
147   if (ref $from eq 'ARRAY') {
148     return $self->_recurse_from(@$from);
149   } elsif (ref $from eq 'HASH') {
150     return $self->_make_as($from);
151   } else {
152     return $from; # would love to quote here but _table ends up getting called
153                   # twice during an ->select without a limit clause due to
154                   # the way S::A::Limit->select works. should maybe consider
155                   # bypassing this and doing S::A::select($self, ...) in
156                   # our select method above. meantime, quoting shims have
157                   # been added to select/insert/update/delete here
158   }
159 }
160
161 sub _recurse_from {
162   my ($self, $from, @join) = @_;
163   my @sqlf;
164   push(@sqlf, $self->_make_as($from));
165   foreach my $j (@join) {
166     my ($to, $on) = @$j;
167
168     # check whether a join type exists
169     my $join_clause = '';
170     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
171     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
172       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
173     } else {
174       $join_clause = ' JOIN ';
175     }
176     push(@sqlf, $join_clause);
177
178     if (ref $to eq 'ARRAY') {
179       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
180     } else {
181       push(@sqlf, $self->_make_as($to));
182     }
183     push(@sqlf, ' ON ', $self->_join_condition($on));
184   }
185   return join('', @sqlf);
186 }
187
188 sub _make_as {
189   my ($self, $from) = @_;
190   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
191                      reverse each %{$self->_skip_options($from)});
192 }
193
194 sub _skip_options {
195   my ($self, $hash) = @_;
196   my $clean_hash = {};
197   $clean_hash->{$_} = $hash->{$_}
198     for grep {!/^-/} keys %$hash;
199   return $clean_hash;
200 }
201
202 sub _join_condition {
203   my ($self, $cond) = @_;
204   if (ref $cond eq 'HASH') {
205     my %j;
206     for (keys %$cond) {
207       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
208     };
209     return $self->_recurse_where(\%j);
210   } elsif (ref $cond eq 'ARRAY') {
211     return join(' OR ', map { $self->_join_condition($_) } @$cond);
212   } else {
213     die "Can't handle this yet!";
214   }
215 }
216
217 sub _quote {
218   my ($self, $label) = @_;
219   return '' unless defined $label;
220   return "*" if $label eq '*';
221   return $label unless $self->{quote_char};
222   if(ref $self->{quote_char} eq "ARRAY"){
223     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
224       if !defined $self->{name_sep};
225     my $sep = $self->{name_sep};
226     return join($self->{name_sep},
227         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
228        split(/\Q$sep\E/,$label));
229   }
230   return $self->SUPER::_quote($label);
231 }
232
233 sub limit_dialect {
234     my $self = shift;
235     $self->{limit_dialect} = shift if @_;
236     return $self->{limit_dialect};
237 }
238
239 sub quote_char {
240     my $self = shift;
241     $self->{quote_char} = shift if @_;
242     return $self->{quote_char};
243 }
244
245 sub name_sep {
246     my $self = shift;
247     $self->{name_sep} = shift if @_;
248     return $self->{name_sep};
249 }
250
251 } # End of BEGIN block
252
253 use base qw/DBIx::Class/;
254
255 __PACKAGE__->load_components(qw/AccessorGroup/);
256
257 __PACKAGE__->mk_group_accessors('simple' =>
258   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
259      debug debugobj cursor on_connect_do transaction_depth/);
260
261 =head1 NAME
262
263 DBIx::Class::Storage::DBI - DBI storage handler
264
265 =head1 SYNOPSIS
266
267 =head1 DESCRIPTION
268
269 This class represents the connection to the database
270
271 =head1 METHODS
272
273 =head2 new
274
275 =cut
276
277 sub new {
278   my $new = bless({}, ref $_[0] || $_[0]);
279   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
280   $new->transaction_depth(0);
281
282   $new->debugobj(new DBIx::Class::Storage::Statistics());
283
284   my $fh;
285
286   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
287                   || $ENV{DBIC_TRACE};
288
289   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
290     $fh = IO::File->new($1, 'w')
291       or $new->throw_exception("Cannot open trace file $1");
292   } else {
293     $fh = IO::File->new('>&STDERR');
294   }
295   $new->debugfh($fh);
296   $new->debug(1) if $debug_env;
297   $new->_sql_maker_opts({});
298   return $new;
299 }
300
301 =head2 throw_exception
302
303 Throws an exception - croaks.
304
305 =cut
306
307 sub throw_exception {
308   my ($self, $msg) = @_;
309   croak($msg);
310 }
311
312 =head2 connect_info
313
314 The arguments of C<connect_info> are always a single array reference.
315
316 This is normally accessed via L<DBIx::Class::Schema/connection>, which
317 encapsulates its argument list in an arrayref before calling
318 C<connect_info> here.
319
320 The arrayref can either contain the same set of arguments one would
321 normally pass to L<DBI/connect>, or a lone code reference which returns
322 a connected database handle.
323
324 In either case, if the final argument in your connect_info happens
325 to be a hashref, C<connect_info> will look there for several
326 connection-specific options:
327
328 =over 4
329
330 =item on_connect_do
331
332 This can be set to an arrayref of literal sql statements, which will
333 be executed immediately after making the connection to the database
334 every time we [re-]connect.
335
336 =item limit_dialect 
337
338 Sets the limit dialect. This is useful for JDBC-bridge among others
339 where the remote SQL-dialect cannot be determined by the name of the
340 driver alone.
341
342 =item quote_char
343
344 Specifies what characters to use to quote table and column names. If 
345 you use this you will want to specify L<name_sep> as well.
346
347 quote_char expects either a single character, in which case is it is placed
348 on either side of the table/column, or an arrayref of length 2 in which case the
349 table/column name is placed between the elements.
350
351 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
352 use C<quote_char =E<gt> [qw/[ ]/]>.
353
354 =item name_sep
355
356 This only needs to be used in conjunction with L<quote_char>, and is used to 
357 specify the charecter that seperates elements (schemas, tables, columns) from 
358 each other. In most cases this is simply a C<.>.
359
360 =back
361
362 These options can be mixed in with your other L<DBI> connection attributes,
363 or placed in a seperate hashref after all other normal L<DBI> connection
364 arguments.
365
366 Every time C<connect_info> is invoked, any previous settings for
367 these options will be cleared before setting the new ones, regardless of
368 whether any options are specified in the new C<connect_info>.
369
370 Examples:
371
372   # Simple SQLite connection
373   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
374
375   # Connect via subref
376   ->connect_info([ sub { DBI->connect(...) } ]);
377
378   # A bit more complicated
379   ->connect_info(
380     [
381       'dbi:Pg:dbname=foo',
382       'postgres',
383       'my_pg_password',
384       { AutoCommit => 0 },
385       { quote_char => q{"}, name_sep => q{.} },
386     ]
387   );
388
389   # Equivalent to the previous example
390   ->connect_info(
391     [
392       'dbi:Pg:dbname=foo',
393       'postgres',
394       'my_pg_password',
395       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
396     ]
397   );
398
399   # Subref + DBIC-specific connection options
400   ->connect_info(
401     [
402       sub { DBI->connect(...) },
403       {
404           quote_char => q{`},
405           name_sep => q{@},
406           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
407       },
408     ]
409   );
410
411 =head2 on_connect_do
412
413 This method is deprecated in favor of setting via L</connect_info>.
414
415 =head2 debug
416
417 Causes SQL trace information to be emitted on the C<debugobj> object.
418 (or C<STDERR> if C<debugobj> has not specifically been set).
419
420 This is the equivalent to setting L</DBIC_TRACE> in your
421 shell environment.
422
423 =head2 debugfh
424
425 Set or retrieve the filehandle used for trace/debug output.  This should be
426 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
427 set to be STDERR - although see information on the
428 L<DBIC_TRACE> environment variable.
429
430 =cut
431
432 sub debugfh {
433     my $self = shift;
434
435     if ($self->debugobj->can('debugfh')) {
436         return $self->debugobj->debugfh(@_);
437     }
438 }
439
440 =head2 debugobj
441
442 Sets or retrieves the object used for metric collection. Defaults to an instance
443 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
444 method of using a coderef as a callback.  See the aforementioned Statistics
445 class for more information.
446
447 =head2 debugcb
448
449 Sets a callback to be executed each time a statement is run; takes a sub
450 reference.  Callback is executed as $sub->($op, $info) where $op is
451 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
452
453 See L<debugobj> for a better way.
454
455 =cut
456
457 sub debugcb {
458     my $self = shift;
459
460     if ($self->debugobj->can('callback')) {
461         return $self->debugobj->callback(@_);
462     }
463 }
464
465 =head2 dbh_do
466
467 Execute the given subref with the underlying
468 database handle as its first argument, using our
469 normal exception-based connection management.  Example:
470
471   $schema->storage->dbh_do(sub { shift->do("blah blah") });
472
473 =cut
474
475 sub dbh_do {
476   my ($self, $todo) = @_;
477
478   my @result;
479   my $want_array = wantarray;
480
481   eval {
482     $self->_verify_pid;
483     $self->_populate_dbh if !$self->_dbh;
484     my $dbh = $self->_dbh;
485     local $dbh->{RaiseError} = 1;
486     local $dbh->{PrintError} = 0;
487     if($want_array) {
488         @result = $todo->($dbh);
489     }
490     else {
491         $result[0] = $todo->($dbh);
492     }
493   };
494   if($@) {
495     my $exception = $@;
496     $self->connected
497       ? $self->throw_exception($exception)
498       : $self->_populate_dbh;
499
500     my $dbh = $self->_dbh;
501     local $dbh->{RaiseError} = 1;
502     local $dbh->{PrintError} = 0;
503     return $todo->($self->_dbh);
504   }
505   return $want_array ? @result : $result[0];
506 }
507
508 =head2 disconnect
509
510 Disconnect the L<DBI> handle, performing a rollback first if the
511 database is not in C<AutoCommit> mode.
512
513 =cut
514
515 sub disconnect {
516   my ($self) = @_;
517
518   if( $self->connected ) {
519     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
520     $self->_dbh->disconnect;
521     $self->_dbh(undef);
522   }
523 }
524
525 =head2 connected
526
527 Check if the L<DBI> handle is connected.  Returns true if the handle
528 is connected.
529
530 =cut
531
532 sub connected {
533   my ($self) = @_;
534
535   if(my $dbh = $self->_dbh) {
536       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
537           return $self->_dbh(undef);
538       }
539       $self->_verify_pid;
540       return ($dbh->FETCH('Active') && $dbh->ping);
541   }
542
543   return 0;
544 }
545
546 # handle pid changes correctly
547 sub _verify_pid {
548   my ($self) = @_;
549
550   return if !$self->_dbh || $self->_conn_pid == $$;
551
552   $self->_dbh->{InactiveDestroy} = 1;
553   $self->_dbh(undef);
554
555   return;
556 }
557
558 =head2 ensure_connected
559
560 Check whether the database handle is connected - if not then make a
561 connection.
562
563 =cut
564
565 sub ensure_connected {
566   my ($self) = @_;
567
568   unless ($self->connected) {
569     $self->_populate_dbh;
570   }
571 }
572
573 =head2 dbh
574
575 Returns the dbh - a data base handle of class L<DBI>.
576
577 =cut
578
579 sub dbh {
580   my ($self) = @_;
581
582   $self->ensure_connected;
583   return $self->_dbh;
584 }
585
586 sub _sql_maker_args {
587     my ($self) = @_;
588     
589     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
590 }
591
592 =head2 sql_maker
593
594 Returns a C<sql_maker> object - normally an object of class
595 C<DBIC::SQL::Abstract>.
596
597 =cut
598
599 sub sql_maker {
600   my ($self) = @_;
601   unless ($self->_sql_maker) {
602     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
603   }
604   return $self->_sql_maker;
605 }
606
607 sub connect_info {
608   my ($self, $info_arg) = @_;
609
610   return $self->_connect_info if !$info_arg;
611
612   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
613   #  the new set of options
614   $self->_sql_maker(undef);
615   $self->_sql_maker_opts({});
616
617   my $info = [ @$info_arg ]; # copy because we can alter it
618   my $last_info = $info->[-1];
619   if(ref $last_info eq 'HASH') {
620     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
621       $self->on_connect_do($on_connect_do);
622     }
623     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
624       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
625         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
626       }
627     }
628
629     # Get rid of any trailing empty hashref
630     pop(@$info) if !keys %$last_info;
631   }
632
633   $self->_connect_info($info);
634 }
635
636 sub _populate_dbh {
637   my ($self) = @_;
638   my @info = @{$self->_connect_info || []};
639   $self->_dbh($self->_connect(@info));
640
641   if(ref $self eq 'DBIx::Class::Storage::DBI') {
642     my $driver = $self->_dbh->{Driver}->{Name};
643     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
644       bless $self, "DBIx::Class::Storage::DBI::${driver}";
645       $self->_rebless() if $self->can('_rebless');
646     }
647   }
648
649   # if on-connect sql statements are given execute them
650   foreach my $sql_statement (@{$self->on_connect_do || []}) {
651     $self->debugobj->query_start($sql_statement) if $self->debug();
652     $self->_dbh->do($sql_statement);
653     $self->debugobj->query_end($sql_statement) if $self->debug();
654   }
655
656   $self->_conn_pid($$);
657   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
658 }
659
660 sub _connect {
661   my ($self, @info) = @_;
662
663   $self->throw_exception("You failed to provide any connection info")
664       if !@info;
665
666   my ($old_connect_via, $dbh);
667
668   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
669       $old_connect_via = $DBI::connect_via;
670       $DBI::connect_via = 'connect';
671   }
672
673   eval {
674     $dbh = ref $info[0] eq 'CODE'
675          ? &{$info[0]}
676          : DBI->connect(@info);
677   };
678
679   $DBI::connect_via = $old_connect_via if $old_connect_via;
680
681   if (!$dbh || $@) {
682     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
683   }
684
685   $dbh;
686 }
687
688 =head2 txn_begin
689
690 Calls begin_work on the current dbh.
691
692 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
693 an entire code block to be executed transactionally.
694
695 =cut
696
697 sub txn_begin {
698   my $self = shift;
699   if ($self->{transaction_depth}++ == 0) {
700     $self->dbh_do(sub {
701       my $dbh = shift;
702       if ($dbh->{AutoCommit}) {
703         $self->debugobj->txn_begin()
704           if ($self->debug);
705         $dbh->begin_work;
706       }
707     });
708   }
709 }
710
711 =head2 txn_commit
712
713 Issues a commit against the current dbh.
714
715 =cut
716
717 sub txn_commit {
718   my $self = shift;
719   $self->dbh_do(sub {
720     my $dbh = shift;
721     if ($self->{transaction_depth} == 0) {
722       unless ($dbh->{AutoCommit}) {
723         $self->debugobj->txn_commit()
724           if ($self->debug);
725         $dbh->commit;
726       }
727     }
728     else {
729       if (--$self->{transaction_depth} == 0) {
730         $self->debugobj->txn_commit()
731           if ($self->debug);
732         $dbh->commit;
733       }
734     }
735   });
736 }
737
738 =head2 txn_rollback
739
740 Issues a rollback against the current dbh. A nested rollback will
741 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
742 which allows the rollback to propagate to the outermost transaction.
743
744 =cut
745
746 sub txn_rollback {
747   my $self = shift;
748
749   eval {
750     $self->dbh_do(sub {
751       my $dbh = shift;
752       if ($self->{transaction_depth} == 0) {
753         unless ($dbh->{AutoCommit}) {
754           $self->debugobj->txn_rollback()
755             if ($self->debug);
756           $dbh->rollback;
757         }
758       }
759       else {
760         if (--$self->{transaction_depth} == 0) {
761           $self->debugobj->txn_rollback()
762             if ($self->debug);
763           $dbh->rollback;
764         }
765         else {
766           die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
767         }
768       }
769     });
770   };
771
772   if ($@) {
773     my $error = $@;
774     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
775     $error =~ /$exception_class/ and $self->throw_exception($error);
776     $self->{transaction_depth} = 0;          # ensure that a failed rollback
777     $self->throw_exception($error);          # resets the transaction depth
778   }
779 }
780
781 sub _execute {
782   my ($self, $op, $extra_bind, $ident, @args) = @_;
783   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
784   unshift(@bind, @$extra_bind) if $extra_bind;
785   if ($self->debug) {
786       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
787       $self->debugobj->query_start($sql, @debug_bind);
788   }
789   my $sth = eval { $self->sth($sql,$op) };
790
791   if (!$sth || $@) {
792     $self->throw_exception(
793       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
794     );
795   }
796   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
797   my $rv;
798   if ($sth) {
799     my $time = time();
800     $rv = eval { $sth->execute(@bind) };
801
802     if ($@ || !$rv) {
803       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
804     }
805   } else {
806     $self->throw_exception("'$sql' did not generate a statement.");
807   }
808   if ($self->debug) {
809       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
810       $self->debugobj->query_end($sql, @debug_bind);
811   }
812   return (wantarray ? ($rv, $sth, @bind) : $rv);
813 }
814
815 sub insert {
816   my ($self, $ident, $to_insert) = @_;
817   $self->throw_exception(
818     "Couldn't insert ".join(', ',
819       map "$_ => $to_insert->{$_}", keys %$to_insert
820     )." into ${ident}"
821   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
822   return $to_insert;
823 }
824
825 sub update {
826   return shift->_execute('update' => [], @_);
827 }
828
829 sub delete {
830   return shift->_execute('delete' => [], @_);
831 }
832
833 sub _select {
834   my ($self, $ident, $select, $condition, $attrs) = @_;
835   my $order = $attrs->{order_by};
836   if (ref $condition eq 'SCALAR') {
837     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
838   }
839   if (exists $attrs->{group_by} || $attrs->{having}) {
840     $order = {
841       group_by => $attrs->{group_by},
842       having => $attrs->{having},
843       ($order ? (order_by => $order) : ())
844     };
845   }
846   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
847   if ($attrs->{software_limit} ||
848       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
849         $attrs->{software_limit} = 1;
850   } else {
851     $self->throw_exception("rows attribute must be positive if present")
852       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
853     push @args, $attrs->{rows}, $attrs->{offset};
854   }
855   return $self->_execute(@args);
856 }
857
858 =head2 select
859
860 Handle a SQL select statement.
861
862 =cut
863
864 sub select {
865   my $self = shift;
866   my ($ident, $select, $condition, $attrs) = @_;
867   return $self->cursor->new($self, \@_, $attrs);
868 }
869
870 =head2 select_single
871
872 Performs a select, fetch and return of data - handles a single row
873 only.
874
875 =cut
876
877 # Need to call finish() to work round broken DBDs
878
879 sub select_single {
880   my $self = shift;
881   my ($rv, $sth, @bind) = $self->_select(@_);
882   my @row = $sth->fetchrow_array;
883   $sth->finish();
884   return @row;
885 }
886
887 =head2 sth
888
889 Returns a L<DBI> sth (statement handle) for the supplied SQL.
890
891 =cut
892
893 sub sth {
894   my ($self, $sql) = @_;
895   # 3 is the if_active parameter which avoids active sth re-use
896   return $self->dbh_do(sub { shift->prepare_cached($sql, {}, 3) });
897 }
898
899 =head2 columns_info_for
900
901 Returns database type info for a given table columns.
902
903 =cut
904
905 sub columns_info_for {
906   my ($self, $table) = @_;
907
908   my $dbh = $self->dbh;
909
910   if ($dbh->can('column_info')) {
911     my %result;
912     local $dbh->{RaiseError} = 1;
913     local $dbh->{PrintError} = 0;
914     eval {
915       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
916       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
917       $sth->execute();
918       while ( my $info = $sth->fetchrow_hashref() ){
919         my %column_info;
920         $column_info{data_type}   = $info->{TYPE_NAME};
921         $column_info{size}      = $info->{COLUMN_SIZE};
922         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
923         $column_info{default_value} = $info->{COLUMN_DEF};
924         my $col_name = $info->{COLUMN_NAME};
925         $col_name =~ s/^\"(.*)\"$/$1/;
926
927         $result{$col_name} = \%column_info;
928       }
929     };
930     return \%result if !$@;
931   }
932
933   my %result;
934   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
935   $sth->execute;
936   my @columns = @{$sth->{NAME_lc}};
937   for my $i ( 0 .. $#columns ){
938     my %column_info;
939     my $type_num = $sth->{TYPE}->[$i];
940     my $type_name;
941     if(defined $type_num && $dbh->can('type_info')) {
942       my $type_info = $dbh->type_info($type_num);
943       $type_name = $type_info->{TYPE_NAME} if $type_info;
944     }
945     $column_info{data_type} = $type_name ? $type_name : $type_num;
946     $column_info{size} = $sth->{PRECISION}->[$i];
947     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
948
949     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
950       $column_info{data_type} = $1;
951       $column_info{size}    = $2;
952     }
953
954     $result{$columns[$i]} = \%column_info;
955   }
956
957   return \%result;
958 }
959
960 =head2 last_insert_id
961
962 Return the row id of the last insert.
963
964 =cut
965
966 sub last_insert_id {
967   my ($self, $row) = @_;
968     
969   $self->dbh_do(sub { shift->func('last_insert_rowid') });
970 }
971
972 =head2 sqlt_type
973
974 Returns the database driver name.
975
976 =cut
977
978 sub sqlt_type { shift->dbh_do(sub { shift->{Driver}->{Name} }) }
979
980 =head2 create_ddl_dir (EXPERIMENTAL)
981
982 =over 4
983
984 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
985
986 =back
987
988 Creates an SQL file based on the Schema, for each of the specified
989 database types, in the given directory.
990
991 Note that this feature is currently EXPERIMENTAL and may not work correctly
992 across all databases, or fully handle complex relationships.
993
994 =cut
995
996 sub create_ddl_dir
997 {
998   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
999
1000   if(!$dir || !-d $dir)
1001   {
1002     warn "No directory given, using ./\n";
1003     $dir = "./";
1004   }
1005   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1006   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1007   $version ||= $schema->VERSION || '1.x';
1008   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1009
1010   eval "use SQL::Translator";
1011   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1012
1013   my $sqlt = SQL::Translator->new($sqltargs);
1014   foreach my $db (@$databases)
1015   {
1016     $sqlt->reset();
1017     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1018 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1019     $sqlt->data($schema);
1020     $sqlt->producer($db);
1021
1022     my $file;
1023     my $filename = $schema->ddl_filename($db, $dir, $version);
1024     if(-e $filename)
1025     {
1026       $self->throw_exception("$filename already exists, skipping $db");
1027       next;
1028     }
1029     open($file, ">$filename") 
1030       or $self->throw_exception("Can't open $filename for writing ($!)");
1031     my $output = $sqlt->translate;
1032 #use Data::Dumper;
1033 #    print join(":", keys %{$schema->source_registrations});
1034 #    print Dumper($sqlt->schema);
1035     if(!$output)
1036     {
1037       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1038       next;
1039     }
1040     print $file $output;
1041     close($file);
1042   }
1043
1044 }
1045
1046 =head2 deployment_statements
1047
1048 Create the statements for L</deploy> and
1049 L<DBIx::Class::Schema/deploy>.
1050
1051 =cut
1052
1053 sub deployment_statements {
1054   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1055   # Need to be connected to get the correct sqlt_type
1056   $self->ensure_connected() unless $type;
1057   $type ||= $self->sqlt_type;
1058   $version ||= $schema->VERSION || '1.x';
1059   $dir ||= './';
1060   eval "use SQL::Translator";
1061   if(!$@)
1062   {
1063     eval "use SQL::Translator::Parser::DBIx::Class;";
1064     $self->throw_exception($@) if $@;
1065     eval "use SQL::Translator::Producer::${type};";
1066     $self->throw_exception($@) if $@;
1067     my $tr = SQL::Translator->new(%$sqltargs);
1068     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1069     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1070   }
1071
1072   my $filename = $schema->ddl_filename($type, $dir, $version);
1073   if(!-f $filename)
1074   {
1075 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1076       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1077       return;
1078   }
1079   my $file;
1080   open($file, "<$filename") 
1081       or $self->throw_exception("Can't open $filename ($!)");
1082   my @rows = <$file>;
1083   close($file);
1084
1085   return join('', @rows);
1086   
1087 }
1088
1089 =head2 deploy
1090
1091 Sends the appropriate statements to create or modify tables to the
1092 db. This would normally be called through
1093 L<DBIx::Class::Schema/deploy>.
1094
1095 =cut
1096
1097 sub deploy {
1098   my ($self, $schema, $type, $sqltargs) = @_;
1099   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1100     for ( split(";\n", $statement)) {
1101       next if($_ =~ /^--/);
1102       next if(!$_);
1103 #      next if($_ =~ /^DROP/m);
1104       next if($_ =~ /^BEGIN TRANSACTION/m);
1105       next if($_ =~ /^COMMIT/m);
1106       next if $_ =~ /^\s+$/; # skip whitespace only
1107       $self->debugobj->query_start($_) if $self->debug;
1108       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1109       $self->debugobj->query_end($_) if $self->debug;
1110     }
1111   }
1112 }
1113
1114 =head2 datetime_parser
1115
1116 Returns the datetime parser class
1117
1118 =cut
1119
1120 sub datetime_parser {
1121   my $self = shift;
1122   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1123 }
1124
1125 =head2 datetime_parser_type
1126
1127 Defines (returns) the datetime parser class - currently hardwired to
1128 L<DateTime::Format::MySQL>
1129
1130 =cut
1131
1132 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1133
1134 =head2 build_datetime_parser
1135
1136 See L</datetime_parser>
1137
1138 =cut
1139
1140 sub build_datetime_parser {
1141   my $self = shift;
1142   my $type = $self->datetime_parser_type(@_);
1143   eval "use ${type}";
1144   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1145   return $type;
1146 }
1147
1148 sub DESTROY { shift->_dbh(undef) }
1149
1150 1;
1151
1152 =head1 SQL METHODS
1153
1154 The module defines a set of methods within the DBIC::SQL::Abstract
1155 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1156 SQL query functions.
1157
1158 The following methods are extended:-
1159
1160 =over 4
1161
1162 =item delete
1163
1164 =item insert
1165
1166 =item select
1167
1168 =item update
1169
1170 =item limit_dialect
1171
1172 See L</connect_info> for details.
1173 For setting, this method is deprecated in favor of L</connect_info>.
1174
1175 =item quote_char
1176
1177 See L</connect_info> for details.
1178 For setting, this method is deprecated in favor of L</connect_info>.
1179
1180 =item name_sep
1181
1182 See L</connect_info> for details.
1183 For setting, this method is deprecated in favor of L</connect_info>.
1184
1185 =back
1186
1187 =head1 ENVIRONMENT VARIABLES
1188
1189 =head2 DBIC_TRACE
1190
1191 If C<DBIC_TRACE> is set then SQL trace information
1192 is produced (as when the L<debug> method is set).
1193
1194 If the value is of the form C<1=/path/name> then the trace output is
1195 written to the file C</path/name>.
1196
1197 This environment variable is checked when the storage object is first
1198 created (when you call connect on your schema).  So, run-time changes 
1199 to this environment variable will not take effect unless you also 
1200 re-connect on your schema.
1201
1202 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1203
1204 Old name for DBIC_TRACE
1205
1206 =head1 AUTHORS
1207
1208 Matt S. Trout <mst@shadowcatsystems.co.uk>
1209
1210 Andy Grundman <andy@hybridized.org>
1211
1212 =head1 LICENSE
1213
1214 You may distribute this code under the same terms as Perl itself.
1215
1216 =cut
1217