force/assume RaiseError/PrintError
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 # While we're at it, this should make LIMIT queries more efficient,
33 #  without digging into things too deeply
34 sub _find_syntax {
35   my ($self, $syntax) = @_;
36   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
37 }
38
39 sub select {
40   my ($self, $table, $fields, $where, $order, @rest) = @_;
41   $table = $self->_quote($table) unless ref($table);
42   local $self->{rownum_hack_count} = 1
43     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
44   @rest = (-1) unless defined $rest[0];
45   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
46     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
47   local $self->{having_bind} = [];
48   my ($sql, @ret) = $self->SUPER::select(
49     $table, $self->_recurse_fields($fields), $where, $order, @rest
50   );
51   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
52 }
53
54 sub insert {
55   my $self = shift;
56   my $table = shift;
57   $table = $self->_quote($table) unless ref($table);
58   $self->SUPER::insert($table, @_);
59 }
60
61 sub update {
62   my $self = shift;
63   my $table = shift;
64   $table = $self->_quote($table) unless ref($table);
65   $self->SUPER::update($table, @_);
66 }
67
68 sub delete {
69   my $self = shift;
70   my $table = shift;
71   $table = $self->_quote($table) unless ref($table);
72   $self->SUPER::delete($table, @_);
73 }
74
75 sub _emulate_limit {
76   my $self = shift;
77   if ($_[3] == -1) {
78     return $_[1].$self->_order_by($_[2]);
79   } else {
80     return $self->SUPER::_emulate_limit(@_);
81   }
82 }
83
84 sub _recurse_fields {
85   my ($self, $fields) = @_;
86   my $ref = ref $fields;
87   return $self->_quote($fields) unless $ref;
88   return $$fields if $ref eq 'SCALAR';
89
90   if ($ref eq 'ARRAY') {
91     return join(', ', map {
92       $self->_recurse_fields($_)
93       .(exists $self->{rownum_hack_count}
94          ? ' AS col'.$self->{rownum_hack_count}++
95          : '')
96      } @$fields);
97   } elsif ($ref eq 'HASH') {
98     foreach my $func (keys %$fields) {
99       return $self->_sqlcase($func)
100         .'( '.$self->_recurse_fields($fields->{$func}).' )';
101     }
102   }
103 }
104
105 sub _order_by {
106   my $self = shift;
107   my $ret = '';
108   my @extra;
109   if (ref $_[0] eq 'HASH') {
110     if (defined $_[0]->{group_by}) {
111       $ret = $self->_sqlcase(' group by ')
112                .$self->_recurse_fields($_[0]->{group_by});
113     }
114     if (defined $_[0]->{having}) {
115       my $frag;
116       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
117       push(@{$self->{having_bind}}, @extra);
118       $ret .= $self->_sqlcase(' having ').$frag;
119     }
120     if (defined $_[0]->{order_by}) {
121       $ret .= $self->_order_by($_[0]->{order_by});
122     }
123   } elsif (ref $_[0] eq 'SCALAR') {
124     $ret = $self->_sqlcase(' order by ').${ $_[0] };
125   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
126     my @order = @{+shift};
127     $ret = $self->_sqlcase(' order by ')
128           .join(', ', map {
129                         my $r = $self->_order_by($_, @_);
130                         $r =~ s/^ ?ORDER BY //i;
131                         $r;
132                       } @order);
133   } else {
134     $ret = $self->SUPER::_order_by(@_);
135   }
136   return $ret;
137 }
138
139 sub _order_directions {
140   my ($self, $order) = @_;
141   $order = $order->{order_by} if ref $order eq 'HASH';
142   return $self->SUPER::_order_directions($order);
143 }
144
145 sub _table {
146   my ($self, $from) = @_;
147   if (ref $from eq 'ARRAY') {
148     return $self->_recurse_from(@$from);
149   } elsif (ref $from eq 'HASH') {
150     return $self->_make_as($from);
151   } else {
152     return $from; # would love to quote here but _table ends up getting called
153                   # twice during an ->select without a limit clause due to
154                   # the way S::A::Limit->select works. should maybe consider
155                   # bypassing this and doing S::A::select($self, ...) in
156                   # our select method above. meantime, quoting shims have
157                   # been added to select/insert/update/delete here
158   }
159 }
160
161 sub _recurse_from {
162   my ($self, $from, @join) = @_;
163   my @sqlf;
164   push(@sqlf, $self->_make_as($from));
165   foreach my $j (@join) {
166     my ($to, $on) = @$j;
167
168     # check whether a join type exists
169     my $join_clause = '';
170     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
171     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
172       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
173     } else {
174       $join_clause = ' JOIN ';
175     }
176     push(@sqlf, $join_clause);
177
178     if (ref $to eq 'ARRAY') {
179       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
180     } else {
181       push(@sqlf, $self->_make_as($to));
182     }
183     push(@sqlf, ' ON ', $self->_join_condition($on));
184   }
185   return join('', @sqlf);
186 }
187
188 sub _make_as {
189   my ($self, $from) = @_;
190   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
191                      reverse each %{$self->_skip_options($from)});
192 }
193
194 sub _skip_options {
195   my ($self, $hash) = @_;
196   my $clean_hash = {};
197   $clean_hash->{$_} = $hash->{$_}
198     for grep {!/^-/} keys %$hash;
199   return $clean_hash;
200 }
201
202 sub _join_condition {
203   my ($self, $cond) = @_;
204   if (ref $cond eq 'HASH') {
205     my %j;
206     for (keys %$cond) {
207       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
208     };
209     return $self->_recurse_where(\%j);
210   } elsif (ref $cond eq 'ARRAY') {
211     return join(' OR ', map { $self->_join_condition($_) } @$cond);
212   } else {
213     die "Can't handle this yet!";
214   }
215 }
216
217 sub _quote {
218   my ($self, $label) = @_;
219   return '' unless defined $label;
220   return "*" if $label eq '*';
221   return $label unless $self->{quote_char};
222   if(ref $self->{quote_char} eq "ARRAY"){
223     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
224       if !defined $self->{name_sep};
225     my $sep = $self->{name_sep};
226     return join($self->{name_sep},
227         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
228        split(/\Q$sep\E/,$label));
229   }
230   return $self->SUPER::_quote($label);
231 }
232
233 sub limit_dialect {
234     my $self = shift;
235     $self->{limit_dialect} = shift if @_;
236     return $self->{limit_dialect};
237 }
238
239 sub quote_char {
240     my $self = shift;
241     $self->{quote_char} = shift if @_;
242     return $self->{quote_char};
243 }
244
245 sub name_sep {
246     my $self = shift;
247     $self->{name_sep} = shift if @_;
248     return $self->{name_sep};
249 }
250
251 } # End of BEGIN block
252
253 use base qw/DBIx::Class/;
254
255 __PACKAGE__->load_components(qw/AccessorGroup/);
256
257 __PACKAGE__->mk_group_accessors('simple' =>
258   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
259      debug debugobj cursor on_connect_do transaction_depth/);
260
261 =head1 NAME
262
263 DBIx::Class::Storage::DBI - DBI storage handler
264
265 =head1 SYNOPSIS
266
267 =head1 DESCRIPTION
268
269 This class represents the connection to the database
270
271 =head1 METHODS
272
273 =head2 new
274
275 =cut
276
277 sub new {
278   my $new = bless({}, ref $_[0] || $_[0]);
279   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
280   $new->transaction_depth(0);
281
282   $new->debugobj(new DBIx::Class::Storage::Statistics());
283
284   my $fh;
285
286   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
287                   || $ENV{DBIC_TRACE};
288
289   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
290     $fh = IO::File->new($1, 'w')
291       or $new->throw_exception("Cannot open trace file $1");
292   } else {
293     $fh = IO::File->new('>&STDERR');
294   }
295   $new->debugfh($fh);
296   $new->debug(1) if $debug_env;
297   $new->_sql_maker_opts({});
298   return $new;
299 }
300
301 =head2 throw_exception
302
303 Throws an exception - croaks.
304
305 =cut
306
307 sub throw_exception {
308   my ($self, $msg) = @_;
309   croak($msg);
310 }
311
312 =head2 connect_info
313
314 The arguments of C<connect_info> are always a single array reference.
315
316 This is normally accessed via L<DBIx::Class::Schema/connection>, which
317 encapsulates its argument list in an arrayref before calling
318 C<connect_info> here.
319
320 The arrayref can either contain the same set of arguments one would
321 normally pass to L<DBI/connect>, or a lone code reference which returns
322 a connected database handle.
323
324 In either case, if the final argument in your connect_info happens
325 to be a hashref, C<connect_info> will look there for several
326 connection-specific options:
327
328 =over 4
329
330 =item on_connect_do
331
332 This can be set to an arrayref of literal sql statements, which will
333 be executed immediately after making the connection to the database
334 every time we [re-]connect.
335
336 =item limit_dialect 
337
338 Sets the limit dialect. This is useful for JDBC-bridge among others
339 where the remote SQL-dialect cannot be determined by the name of the
340 driver alone.
341
342 =item quote_char
343
344 Specifies what characters to use to quote table and column names. If 
345 you use this you will want to specify L<name_sep> as well.
346
347 quote_char expects either a single character, in which case is it is placed
348 on either side of the table/column, or an arrayref of length 2 in which case the
349 table/column name is placed between the elements.
350
351 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
352 use C<quote_char =E<gt> [qw/[ ]/]>.
353
354 =item name_sep
355
356 This only needs to be used in conjunction with L<quote_char>, and is used to 
357 specify the charecter that seperates elements (schemas, tables, columns) from 
358 each other. In most cases this is simply a C<.>.
359
360 =back
361
362 These options can be mixed in with your other L<DBI> connection attributes,
363 or placed in a seperate hashref after all other normal L<DBI> connection
364 arguments.
365
366 Every time C<connect_info> is invoked, any previous settings for
367 these options will be cleared before setting the new ones, regardless of
368 whether any options are specified in the new C<connect_info>.
369
370 Important note:  DBIC expects the returned database handle provided by 
371 a subref argument to have RaiseError set on it.  If it doesn't, things
372 might not work very well, YMMV.  If you don't use a subref, DBIC will
373 force this setting for you anyways.  Setting HandleError to anything
374 other than simple exception object wrapper might cause problems too.
375
376 Examples:
377
378   # Simple SQLite connection
379   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
380
381   # Connect via subref
382   ->connect_info([ sub { DBI->connect(...) } ]);
383
384   # A bit more complicated
385   ->connect_info(
386     [
387       'dbi:Pg:dbname=foo',
388       'postgres',
389       'my_pg_password',
390       { AutoCommit => 0 },
391       { quote_char => q{"}, name_sep => q{.} },
392     ]
393   );
394
395   # Equivalent to the previous example
396   ->connect_info(
397     [
398       'dbi:Pg:dbname=foo',
399       'postgres',
400       'my_pg_password',
401       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
402     ]
403   );
404
405   # Subref + DBIC-specific connection options
406   ->connect_info(
407     [
408       sub { DBI->connect(...) },
409       {
410           quote_char => q{`},
411           name_sep => q{@},
412           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
413       },
414     ]
415   );
416
417 =head2 on_connect_do
418
419 This method is deprecated in favor of setting via L</connect_info>.
420
421 =head2 debug
422
423 Causes SQL trace information to be emitted on the C<debugobj> object.
424 (or C<STDERR> if C<debugobj> has not specifically been set).
425
426 This is the equivalent to setting L</DBIC_TRACE> in your
427 shell environment.
428
429 =head2 debugfh
430
431 Set or retrieve the filehandle used for trace/debug output.  This should be
432 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
433 set to be STDERR - although see information on the
434 L<DBIC_TRACE> environment variable.
435
436 =cut
437
438 sub debugfh {
439     my $self = shift;
440
441     if ($self->debugobj->can('debugfh')) {
442         return $self->debugobj->debugfh(@_);
443     }
444 }
445
446 =head2 debugobj
447
448 Sets or retrieves the object used for metric collection. Defaults to an instance
449 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
450 method of using a coderef as a callback.  See the aforementioned Statistics
451 class for more information.
452
453 =head2 debugcb
454
455 Sets a callback to be executed each time a statement is run; takes a sub
456 reference.  Callback is executed as $sub->($op, $info) where $op is
457 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
458
459 See L<debugobj> for a better way.
460
461 =cut
462
463 sub debugcb {
464     my $self = shift;
465
466     if ($self->debugobj->can('callback')) {
467         return $self->debugobj->callback(@_);
468     }
469 }
470
471 =head2 dbh_do
472
473 Execute the given subref with the underlying database handle as its
474 first argument, using the new exception-based connection management.
475 Example:
476
477   my @stuff = $schema->storage->dbh_do(
478     sub {
479       shift->selectrow_array("SELECT * FROM foo")
480     }
481   );
482
483 =cut
484
485 sub dbh_do {
486   my ($self, $todo) = @_;
487
488   my @result;
489   my $want_array = wantarray;
490
491   eval {
492     $self->_verify_pid if $self->_dbh;
493     $self->_populate_dbh if !$self->_dbh;
494     my $dbh = $self->_dbh;
495     if($want_array) {
496         @result = $todo->($dbh);
497     }
498     elsif(defined $want_array) {
499         $result[0] = $todo->($dbh);
500     }
501     else {
502         $todo->($dbh);
503     }
504   };
505
506   if($@) {
507     my $exception = $@;
508     $self->connected
509       ? $self->throw_exception($exception)
510       : $self->_populate_dbh;
511
512     my $dbh = $self->_dbh;
513     return $todo->($dbh);
514   }
515
516   return $want_array ? @result : $result[0];
517 }
518
519 =head2 disconnect
520
521 Disconnect the L<DBI> handle, performing a rollback first if the
522 database is not in C<AutoCommit> mode.
523
524 =cut
525
526 sub disconnect {
527   my ($self) = @_;
528
529   if( $self->connected ) {
530     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
531     $self->_dbh->disconnect;
532     $self->_dbh(undef);
533   }
534 }
535
536 =head2 connected
537
538 Check if the L<DBI> handle is connected.  Returns true if the handle
539 is connected.
540
541 =cut
542
543 sub connected {
544   my ($self) = @_;
545
546   if(my $dbh = $self->_dbh) {
547       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
548           return $self->_dbh(undef);
549       }
550       else {
551           $self->_verify_pid;
552       }
553       return ($dbh->FETCH('Active') && $dbh->ping);
554   }
555
556   return 0;
557 }
558
559 # handle pid changes correctly
560 #  NOTE: assumes $self->_dbh is a valid $dbh
561 sub _verify_pid {
562   my ($self) = @_;
563
564   return if $self->_conn_pid == $$;
565
566   $self->_dbh->{InactiveDestroy} = 1;
567   $self->_dbh(undef);
568
569   return;
570 }
571
572 =head2 ensure_connected
573
574 Check whether the database handle is connected - if not then make a
575 connection.
576
577 =cut
578
579 sub ensure_connected {
580   my ($self) = @_;
581
582   unless ($self->connected) {
583     $self->_populate_dbh;
584   }
585 }
586
587 =head2 dbh
588
589 Returns the dbh - a data base handle of class L<DBI>.
590
591 =cut
592
593 sub dbh {
594   my ($self) = @_;
595
596   $self->ensure_connected;
597   return $self->_dbh;
598 }
599
600 sub _sql_maker_args {
601     my ($self) = @_;
602     
603     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
604 }
605
606 =head2 sql_maker
607
608 Returns a C<sql_maker> object - normally an object of class
609 C<DBIC::SQL::Abstract>.
610
611 =cut
612
613 sub sql_maker {
614   my ($self) = @_;
615   unless ($self->_sql_maker) {
616     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
617   }
618   return $self->_sql_maker;
619 }
620
621 sub connect_info {
622   my ($self, $info_arg) = @_;
623
624   return $self->_connect_info if !$info_arg;
625
626   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
627   #  the new set of options
628   $self->_sql_maker(undef);
629   $self->_sql_maker_opts({});
630
631   my $info = [ @$info_arg ]; # copy because we can alter it
632   my $last_info = $info->[-1];
633   if(ref $last_info eq 'HASH') {
634     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
635       $self->on_connect_do($on_connect_do);
636     }
637     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
638       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
639         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
640       }
641     }
642
643     # Get rid of any trailing empty hashref
644     pop(@$info) if !keys %$last_info;
645   }
646
647   $self->_connect_info($info);
648 }
649
650 sub _populate_dbh {
651   my ($self) = @_;
652   my @info = @{$self->_connect_info || []};
653   $self->_dbh($self->_connect(@info));
654
655   if(ref $self eq 'DBIx::Class::Storage::DBI') {
656     my $driver = $self->_dbh->{Driver}->{Name};
657     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
658       bless $self, "DBIx::Class::Storage::DBI::${driver}";
659       $self->_rebless() if $self->can('_rebless');
660     }
661   }
662
663   # if on-connect sql statements are given execute them
664   foreach my $sql_statement (@{$self->on_connect_do || []}) {
665     $self->debugobj->query_start($sql_statement) if $self->debug();
666     $self->_dbh->do($sql_statement);
667     $self->debugobj->query_end($sql_statement) if $self->debug();
668   }
669
670   $self->_conn_pid($$);
671   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
672 }
673
674 sub _connect {
675   my ($self, @info) = @_;
676
677   $self->throw_exception("You failed to provide any connection info")
678       if !@info;
679
680   my ($old_connect_via, $dbh);
681
682   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
683       $old_connect_via = $DBI::connect_via;
684       $DBI::connect_via = 'connect';
685   }
686
687   eval {
688     if(ref $info[0] eq 'CODE') {
689        $dbh = &{$info[0]}
690     }
691     else {
692        $dbh = DBI->connect(@info);
693        $dbh->{RaiseError} = 1;
694        $dbh->{PrintError} = 0;
695     }
696   };
697
698   $DBI::connect_via = $old_connect_via if $old_connect_via;
699
700   if (!$dbh || $@) {
701     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
702   }
703
704   $dbh;
705 }
706
707 =head2 txn_begin
708
709 Calls begin_work on the current dbh.
710
711 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
712 an entire code block to be executed transactionally.
713
714 =cut
715
716 sub txn_begin {
717   my $self = shift;
718   if ($self->{transaction_depth}++ == 0) {
719     $self->dbh_do(sub {
720       my $dbh = shift;
721       if ($dbh->{AutoCommit}) {
722         $self->debugobj->txn_begin()
723           if ($self->debug);
724         $dbh->begin_work;
725       }
726     });
727   }
728 }
729
730 =head2 txn_commit
731
732 Issues a commit against the current dbh.
733
734 =cut
735
736 sub txn_commit {
737   my $self = shift;
738   $self->dbh_do(sub {
739     my $dbh = shift;
740     if ($self->{transaction_depth} == 0) {
741       unless ($dbh->{AutoCommit}) {
742         $self->debugobj->txn_commit()
743           if ($self->debug);
744         $dbh->commit;
745       }
746     }
747     else {
748       if (--$self->{transaction_depth} == 0) {
749         $self->debugobj->txn_commit()
750           if ($self->debug);
751         $dbh->commit;
752       }
753     }
754   });
755 }
756
757 =head2 txn_rollback
758
759 Issues a rollback against the current dbh. A nested rollback will
760 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
761 which allows the rollback to propagate to the outermost transaction.
762
763 =cut
764
765 sub txn_rollback {
766   my $self = shift;
767
768   eval {
769     $self->dbh_do(sub {
770       my $dbh = shift;
771       if ($self->{transaction_depth} == 0) {
772         unless ($dbh->{AutoCommit}) {
773           $self->debugobj->txn_rollback()
774             if ($self->debug);
775           $dbh->rollback;
776         }
777       }
778       else {
779         if (--$self->{transaction_depth} == 0) {
780           $self->debugobj->txn_rollback()
781             if ($self->debug);
782           $dbh->rollback;
783         }
784         else {
785           die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
786         }
787       }
788     });
789   };
790
791   if ($@) {
792     my $error = $@;
793     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
794     $error =~ /$exception_class/ and $self->throw_exception($error);
795     $self->{transaction_depth} = 0;          # ensure that a failed rollback
796     $self->throw_exception($error);          # resets the transaction depth
797   }
798 }
799
800 sub _execute {
801   my ($self, $op, $extra_bind, $ident, @args) = @_;
802   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
803   unshift(@bind, @$extra_bind) if $extra_bind;
804   if ($self->debug) {
805       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
806       $self->debugobj->query_start($sql, @debug_bind);
807   }
808   my $sth = eval { $self->sth($sql,$op) };
809
810   if (!$sth || $@) {
811     $self->throw_exception(
812       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
813     );
814   }
815   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
816   my $rv;
817   if ($sth) {
818     my $time = time();
819     $rv = eval { $sth->execute(@bind) };
820
821     if ($@ || !$rv) {
822       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
823     }
824   } else {
825     $self->throw_exception("'$sql' did not generate a statement.");
826   }
827   if ($self->debug) {
828       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
829       $self->debugobj->query_end($sql, @debug_bind);
830   }
831   return (wantarray ? ($rv, $sth, @bind) : $rv);
832 }
833
834 sub insert {
835   my ($self, $ident, $to_insert) = @_;
836   $self->throw_exception(
837     "Couldn't insert ".join(', ',
838       map "$_ => $to_insert->{$_}", keys %$to_insert
839     )." into ${ident}"
840   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
841   return $to_insert;
842 }
843
844 sub update {
845   return shift->_execute('update' => [], @_);
846 }
847
848 sub delete {
849   return shift->_execute('delete' => [], @_);
850 }
851
852 sub _select {
853   my ($self, $ident, $select, $condition, $attrs) = @_;
854   my $order = $attrs->{order_by};
855   if (ref $condition eq 'SCALAR') {
856     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
857   }
858   if (exists $attrs->{group_by} || $attrs->{having}) {
859     $order = {
860       group_by => $attrs->{group_by},
861       having => $attrs->{having},
862       ($order ? (order_by => $order) : ())
863     };
864   }
865   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
866   if ($attrs->{software_limit} ||
867       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
868         $attrs->{software_limit} = 1;
869   } else {
870     $self->throw_exception("rows attribute must be positive if present")
871       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
872     push @args, $attrs->{rows}, $attrs->{offset};
873   }
874   return $self->_execute(@args);
875 }
876
877 =head2 select
878
879 Handle a SQL select statement.
880
881 =cut
882
883 sub select {
884   my $self = shift;
885   my ($ident, $select, $condition, $attrs) = @_;
886   return $self->cursor->new($self, \@_, $attrs);
887 }
888
889 =head2 select_single
890
891 Performs a select, fetch and return of data - handles a single row
892 only.
893
894 =cut
895
896 # Need to call finish() to work round broken DBDs
897
898 sub select_single {
899   my $self = shift;
900   my ($rv, $sth, @bind) = $self->_select(@_);
901   my @row = $sth->fetchrow_array;
902   $sth->finish();
903   return @row;
904 }
905
906 =head2 sth
907
908 Returns a L<DBI> sth (statement handle) for the supplied SQL.
909
910 =cut
911
912 sub sth {
913   my ($self, $sql) = @_;
914   # 3 is the if_active parameter which avoids active sth re-use
915   return $self->dbh_do(sub { shift->prepare_cached($sql, {}, 3) });
916 }
917
918 =head2 columns_info_for
919
920 Returns database type info for a given table columns.
921
922 =cut
923
924 sub columns_info_for {
925   my ($self, $table) = @_;
926
927   $self->dbh_do(sub {
928     my $dbh = shift;
929
930     if ($dbh->can('column_info')) {
931       my %result;
932       eval {
933         my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
934         my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
935         $sth->execute();
936         while ( my $info = $sth->fetchrow_hashref() ){
937           my %column_info;
938           $column_info{data_type}   = $info->{TYPE_NAME};
939           $column_info{size}      = $info->{COLUMN_SIZE};
940           $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
941           $column_info{default_value} = $info->{COLUMN_DEF};
942           my $col_name = $info->{COLUMN_NAME};
943           $col_name =~ s/^\"(.*)\"$/$1/;
944
945           $result{$col_name} = \%column_info;
946         }
947       };
948       return \%result if !$@;
949     }
950
951     my %result;
952     my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
953     $sth->execute;
954     my @columns = @{$sth->{NAME_lc}};
955     for my $i ( 0 .. $#columns ){
956       my %column_info;
957       my $type_num = $sth->{TYPE}->[$i];
958       my $type_name;
959       if(defined $type_num && $dbh->can('type_info')) {
960         my $type_info = $dbh->type_info($type_num);
961         $type_name = $type_info->{TYPE_NAME} if $type_info;
962       }
963       $column_info{data_type} = $type_name ? $type_name : $type_num;
964       $column_info{size} = $sth->{PRECISION}->[$i];
965       $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
966
967       if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
968         $column_info{data_type} = $1;
969         $column_info{size}    = $2;
970       }
971
972       $result{$columns[$i]} = \%column_info;
973     }
974
975     return \%result;
976   });
977 }
978
979 =head2 last_insert_id
980
981 Return the row id of the last insert.
982
983 =cut
984
985 sub last_insert_id {
986   my ($self, $row) = @_;
987     
988   $self->dbh_do(sub { shift->func('last_insert_rowid') });
989 }
990
991 =head2 sqlt_type
992
993 Returns the database driver name.
994
995 =cut
996
997 sub sqlt_type { shift->dbh_do(sub { shift->{Driver}->{Name} }) }
998
999 =head2 create_ddl_dir (EXPERIMENTAL)
1000
1001 =over 4
1002
1003 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1004
1005 =back
1006
1007 Creates an SQL file based on the Schema, for each of the specified
1008 database types, in the given directory.
1009
1010 Note that this feature is currently EXPERIMENTAL and may not work correctly
1011 across all databases, or fully handle complex relationships.
1012
1013 =cut
1014
1015 sub create_ddl_dir
1016 {
1017   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1018
1019   if(!$dir || !-d $dir)
1020   {
1021     warn "No directory given, using ./\n";
1022     $dir = "./";
1023   }
1024   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1025   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1026   $version ||= $schema->VERSION || '1.x';
1027   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1028
1029   eval "use SQL::Translator";
1030   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1031
1032   my $sqlt = SQL::Translator->new($sqltargs);
1033   foreach my $db (@$databases)
1034   {
1035     $sqlt->reset();
1036     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1037 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1038     $sqlt->data($schema);
1039     $sqlt->producer($db);
1040
1041     my $file;
1042     my $filename = $schema->ddl_filename($db, $dir, $version);
1043     if(-e $filename)
1044     {
1045       $self->throw_exception("$filename already exists, skipping $db");
1046       next;
1047     }
1048     open($file, ">$filename") 
1049       or $self->throw_exception("Can't open $filename for writing ($!)");
1050     my $output = $sqlt->translate;
1051 #use Data::Dumper;
1052 #    print join(":", keys %{$schema->source_registrations});
1053 #    print Dumper($sqlt->schema);
1054     if(!$output)
1055     {
1056       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1057       next;
1058     }
1059     print $file $output;
1060     close($file);
1061   }
1062
1063 }
1064
1065 =head2 deployment_statements
1066
1067 Create the statements for L</deploy> and
1068 L<DBIx::Class::Schema/deploy>.
1069
1070 =cut
1071
1072 sub deployment_statements {
1073   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1074   # Need to be connected to get the correct sqlt_type
1075   $self->ensure_connected() unless $type;
1076   $type ||= $self->sqlt_type;
1077   $version ||= $schema->VERSION || '1.x';
1078   $dir ||= './';
1079   eval "use SQL::Translator";
1080   if(!$@)
1081   {
1082     eval "use SQL::Translator::Parser::DBIx::Class;";
1083     $self->throw_exception($@) if $@;
1084     eval "use SQL::Translator::Producer::${type};";
1085     $self->throw_exception($@) if $@;
1086     my $tr = SQL::Translator->new(%$sqltargs);
1087     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1088     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1089   }
1090
1091   my $filename = $schema->ddl_filename($type, $dir, $version);
1092   if(!-f $filename)
1093   {
1094 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1095       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1096       return;
1097   }
1098   my $file;
1099   open($file, "<$filename") 
1100       or $self->throw_exception("Can't open $filename ($!)");
1101   my @rows = <$file>;
1102   close($file);
1103
1104   return join('', @rows);
1105   
1106 }
1107
1108 =head2 deploy
1109
1110 Sends the appropriate statements to create or modify tables to the
1111 db. This would normally be called through
1112 L<DBIx::Class::Schema/deploy>.
1113
1114 =cut
1115
1116 sub deploy {
1117   my ($self, $schema, $type, $sqltargs) = @_;
1118   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1119     for ( split(";\n", $statement)) {
1120       next if($_ =~ /^--/);
1121       next if(!$_);
1122 #      next if($_ =~ /^DROP/m);
1123       next if($_ =~ /^BEGIN TRANSACTION/m);
1124       next if($_ =~ /^COMMIT/m);
1125       next if $_ =~ /^\s+$/; # skip whitespace only
1126       $self->debugobj->query_start($_) if $self->debug;
1127       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1128       $self->debugobj->query_end($_) if $self->debug;
1129     }
1130   }
1131 }
1132
1133 =head2 datetime_parser
1134
1135 Returns the datetime parser class
1136
1137 =cut
1138
1139 sub datetime_parser {
1140   my $self = shift;
1141   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1142 }
1143
1144 =head2 datetime_parser_type
1145
1146 Defines (returns) the datetime parser class - currently hardwired to
1147 L<DateTime::Format::MySQL>
1148
1149 =cut
1150
1151 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1152
1153 =head2 build_datetime_parser
1154
1155 See L</datetime_parser>
1156
1157 =cut
1158
1159 sub build_datetime_parser {
1160   my $self = shift;
1161   my $type = $self->datetime_parser_type(@_);
1162   eval "use ${type}";
1163   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1164   return $type;
1165 }
1166
1167 sub DESTROY {
1168   my $self = shift;
1169   return if !$self->_dbh;
1170
1171   $self->_verify_pid;
1172   $self->_dbh(undef);
1173 }
1174
1175 1;
1176
1177 =head1 SQL METHODS
1178
1179 The module defines a set of methods within the DBIC::SQL::Abstract
1180 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1181 SQL query functions.
1182
1183 The following methods are extended:-
1184
1185 =over 4
1186
1187 =item delete
1188
1189 =item insert
1190
1191 =item select
1192
1193 =item update
1194
1195 =item limit_dialect
1196
1197 See L</connect_info> for details.
1198 For setting, this method is deprecated in favor of L</connect_info>.
1199
1200 =item quote_char
1201
1202 See L</connect_info> for details.
1203 For setting, this method is deprecated in favor of L</connect_info>.
1204
1205 =item name_sep
1206
1207 See L</connect_info> for details.
1208 For setting, this method is deprecated in favor of L</connect_info>.
1209
1210 =back
1211
1212 =head1 ENVIRONMENT VARIABLES
1213
1214 =head2 DBIC_TRACE
1215
1216 If C<DBIC_TRACE> is set then SQL trace information
1217 is produced (as when the L<debug> method is set).
1218
1219 If the value is of the form C<1=/path/name> then the trace output is
1220 written to the file C</path/name>.
1221
1222 This environment variable is checked when the storage object is first
1223 created (when you call connect on your schema).  So, run-time changes 
1224 to this environment variable will not take effect unless you also 
1225 re-connect on your schema.
1226
1227 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1228
1229 Old name for DBIC_TRACE
1230
1231 =head1 AUTHORS
1232
1233 Matt S. Trout <mst@shadowcatsystems.co.uk>
1234
1235 Andy Grundman <andy@hybridized.org>
1236
1237 =head1 LICENSE
1238
1239 You may distribute this code under the same terms as Perl itself.
1240
1241 =cut
1242