saner usage of dbh_do in Storage::DBI
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14
15 __PACKAGE__->mk_group_accessors(
16   'simple' =>
17     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
18        cursor on_connect_do transaction_depth/
19 );
20
21 BEGIN {
22
23 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
24
25 use base qw/SQL::Abstract::Limit/;
26
27 # This prevents the caching of $dbh in S::A::L, I believe
28 sub new {
29   my $self = shift->SUPER::new(@_);
30
31   # If limit_dialect is a ref (like a $dbh), go ahead and replace
32   #   it with what it resolves to:
33   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
34     if ref $self->{limit_dialect};
35
36   $self;
37 }
38
39 # While we're at it, this should make LIMIT queries more efficient,
40 #  without digging into things too deeply
41 sub _find_syntax {
42   my ($self, $syntax) = @_;
43   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
44 }
45
46 sub select {
47   my ($self, $table, $fields, $where, $order, @rest) = @_;
48   $table = $self->_quote($table) unless ref($table);
49   local $self->{rownum_hack_count} = 1
50     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
51   @rest = (-1) unless defined $rest[0];
52   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
53     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
54   local $self->{having_bind} = [];
55   my ($sql, @ret) = $self->SUPER::select(
56     $table, $self->_recurse_fields($fields), $where, $order, @rest
57   );
58   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
59 }
60
61 sub insert {
62   my $self = shift;
63   my $table = shift;
64   $table = $self->_quote($table) unless ref($table);
65   $self->SUPER::insert($table, @_);
66 }
67
68 sub update {
69   my $self = shift;
70   my $table = shift;
71   $table = $self->_quote($table) unless ref($table);
72   $self->SUPER::update($table, @_);
73 }
74
75 sub delete {
76   my $self = shift;
77   my $table = shift;
78   $table = $self->_quote($table) unless ref($table);
79   $self->SUPER::delete($table, @_);
80 }
81
82 sub _emulate_limit {
83   my $self = shift;
84   if ($_[3] == -1) {
85     return $_[1].$self->_order_by($_[2]);
86   } else {
87     return $self->SUPER::_emulate_limit(@_);
88   }
89 }
90
91 sub _recurse_fields {
92   my ($self, $fields) = @_;
93   my $ref = ref $fields;
94   return $self->_quote($fields) unless $ref;
95   return $$fields if $ref eq 'SCALAR';
96
97   if ($ref eq 'ARRAY') {
98     return join(', ', map {
99       $self->_recurse_fields($_)
100       .(exists $self->{rownum_hack_count}
101          ? ' AS col'.$self->{rownum_hack_count}++
102          : '')
103      } @$fields);
104   } elsif ($ref eq 'HASH') {
105     foreach my $func (keys %$fields) {
106       return $self->_sqlcase($func)
107         .'( '.$self->_recurse_fields($fields->{$func}).' )';
108     }
109   }
110 }
111
112 sub _order_by {
113   my $self = shift;
114   my $ret = '';
115   my @extra;
116   if (ref $_[0] eq 'HASH') {
117     if (defined $_[0]->{group_by}) {
118       $ret = $self->_sqlcase(' group by ')
119                .$self->_recurse_fields($_[0]->{group_by});
120     }
121     if (defined $_[0]->{having}) {
122       my $frag;
123       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
124       push(@{$self->{having_bind}}, @extra);
125       $ret .= $self->_sqlcase(' having ').$frag;
126     }
127     if (defined $_[0]->{order_by}) {
128       $ret .= $self->_order_by($_[0]->{order_by});
129     }
130   } elsif (ref $_[0] eq 'SCALAR') {
131     $ret = $self->_sqlcase(' order by ').${ $_[0] };
132   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
133     my @order = @{+shift};
134     $ret = $self->_sqlcase(' order by ')
135           .join(', ', map {
136                         my $r = $self->_order_by($_, @_);
137                         $r =~ s/^ ?ORDER BY //i;
138                         $r;
139                       } @order);
140   } else {
141     $ret = $self->SUPER::_order_by(@_);
142   }
143   return $ret;
144 }
145
146 sub _order_directions {
147   my ($self, $order) = @_;
148   $order = $order->{order_by} if ref $order eq 'HASH';
149   return $self->SUPER::_order_directions($order);
150 }
151
152 sub _table {
153   my ($self, $from) = @_;
154   if (ref $from eq 'ARRAY') {
155     return $self->_recurse_from(@$from);
156   } elsif (ref $from eq 'HASH') {
157     return $self->_make_as($from);
158   } else {
159     return $from; # would love to quote here but _table ends up getting called
160                   # twice during an ->select without a limit clause due to
161                   # the way S::A::Limit->select works. should maybe consider
162                   # bypassing this and doing S::A::select($self, ...) in
163                   # our select method above. meantime, quoting shims have
164                   # been added to select/insert/update/delete here
165   }
166 }
167
168 sub _recurse_from {
169   my ($self, $from, @join) = @_;
170   my @sqlf;
171   push(@sqlf, $self->_make_as($from));
172   foreach my $j (@join) {
173     my ($to, $on) = @$j;
174
175     # check whether a join type exists
176     my $join_clause = '';
177     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
178     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
179       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
180     } else {
181       $join_clause = ' JOIN ';
182     }
183     push(@sqlf, $join_clause);
184
185     if (ref $to eq 'ARRAY') {
186       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
187     } else {
188       push(@sqlf, $self->_make_as($to));
189     }
190     push(@sqlf, ' ON ', $self->_join_condition($on));
191   }
192   return join('', @sqlf);
193 }
194
195 sub _make_as {
196   my ($self, $from) = @_;
197   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
198                      reverse each %{$self->_skip_options($from)});
199 }
200
201 sub _skip_options {
202   my ($self, $hash) = @_;
203   my $clean_hash = {};
204   $clean_hash->{$_} = $hash->{$_}
205     for grep {!/^-/} keys %$hash;
206   return $clean_hash;
207 }
208
209 sub _join_condition {
210   my ($self, $cond) = @_;
211   if (ref $cond eq 'HASH') {
212     my %j;
213     for (keys %$cond) {
214       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
215     };
216     return $self->_recurse_where(\%j);
217   } elsif (ref $cond eq 'ARRAY') {
218     return join(' OR ', map { $self->_join_condition($_) } @$cond);
219   } else {
220     die "Can't handle this yet!";
221   }
222 }
223
224 sub _quote {
225   my ($self, $label) = @_;
226   return '' unless defined $label;
227   return "*" if $label eq '*';
228   return $label unless $self->{quote_char};
229   if(ref $self->{quote_char} eq "ARRAY"){
230     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
231       if !defined $self->{name_sep};
232     my $sep = $self->{name_sep};
233     return join($self->{name_sep},
234         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
235        split(/\Q$sep\E/,$label));
236   }
237   return $self->SUPER::_quote($label);
238 }
239
240 sub limit_dialect {
241     my $self = shift;
242     $self->{limit_dialect} = shift if @_;
243     return $self->{limit_dialect};
244 }
245
246 sub quote_char {
247     my $self = shift;
248     $self->{quote_char} = shift if @_;
249     return $self->{quote_char};
250 }
251
252 sub name_sep {
253     my $self = shift;
254     $self->{name_sep} = shift if @_;
255     return $self->{name_sep};
256 }
257
258 } # End of BEGIN block
259
260 =head1 NAME
261
262 DBIx::Class::Storage::DBI - DBI storage handler
263
264 =head1 SYNOPSIS
265
266 =head1 DESCRIPTION
267
268 This class represents the connection to an RDBMS via L<DBI>.  See
269 L<DBIx::Class::Storage> for general information.  This pod only
270 documents DBI-specific methods and behaviors.
271
272 =head1 METHODS
273
274 =cut
275
276 sub new {
277   my $new = shift->next::method(@_);
278
279   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
280   $new->transaction_depth(0);
281   $new->_sql_maker_opts({});
282
283   $new;
284 }
285
286 =head2 connect_info
287
288 The arguments of C<connect_info> are always a single array reference.
289
290 This is normally accessed via L<DBIx::Class::Schema/connection>, which
291 encapsulates its argument list in an arrayref before calling
292 C<connect_info> here.
293
294 The arrayref can either contain the same set of arguments one would
295 normally pass to L<DBI/connect>, or a lone code reference which returns
296 a connected database handle.
297
298 In either case, if the final argument in your connect_info happens
299 to be a hashref, C<connect_info> will look there for several
300 connection-specific options:
301
302 =over 4
303
304 =item on_connect_do
305
306 This can be set to an arrayref of literal sql statements, which will
307 be executed immediately after making the connection to the database
308 every time we [re-]connect.
309
310 =item limit_dialect 
311
312 Sets the limit dialect. This is useful for JDBC-bridge among others
313 where the remote SQL-dialect cannot be determined by the name of the
314 driver alone.
315
316 =item quote_char
317
318 Specifies what characters to use to quote table and column names. If 
319 you use this you will want to specify L<name_sep> as well.
320
321 quote_char expects either a single character, in which case is it is placed
322 on either side of the table/column, or an arrayref of length 2 in which case the
323 table/column name is placed between the elements.
324
325 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
326 use C<quote_char =E<gt> [qw/[ ]/]>.
327
328 =item name_sep
329
330 This only needs to be used in conjunction with L<quote_char>, and is used to 
331 specify the charecter that seperates elements (schemas, tables, columns) from 
332 each other. In most cases this is simply a C<.>.
333
334 =back
335
336 These options can be mixed in with your other L<DBI> connection attributes,
337 or placed in a seperate hashref after all other normal L<DBI> connection
338 arguments.
339
340 Every time C<connect_info> is invoked, any previous settings for
341 these options will be cleared before setting the new ones, regardless of
342 whether any options are specified in the new C<connect_info>.
343
344 Important note:  DBIC expects the returned database handle provided by 
345 a subref argument to have RaiseError set on it.  If it doesn't, things
346 might not work very well, YMMV.  If you don't use a subref, DBIC will
347 force this setting for you anyways.  Setting HandleError to anything
348 other than simple exception object wrapper might cause problems too.
349
350 Examples:
351
352   # Simple SQLite connection
353   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
354
355   # Connect via subref
356   ->connect_info([ sub { DBI->connect(...) } ]);
357
358   # A bit more complicated
359   ->connect_info(
360     [
361       'dbi:Pg:dbname=foo',
362       'postgres',
363       'my_pg_password',
364       { AutoCommit => 0 },
365       { quote_char => q{"}, name_sep => q{.} },
366     ]
367   );
368
369   # Equivalent to the previous example
370   ->connect_info(
371     [
372       'dbi:Pg:dbname=foo',
373       'postgres',
374       'my_pg_password',
375       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
376     ]
377   );
378
379   # Subref + DBIC-specific connection options
380   ->connect_info(
381     [
382       sub { DBI->connect(...) },
383       {
384           quote_char => q{`},
385           name_sep => q{@},
386           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
387       },
388     ]
389   );
390
391 =cut
392
393 sub connect_info {
394   my ($self, $info_arg) = @_;
395
396   return $self->_connect_info if !$info_arg;
397
398   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
399   #  the new set of options
400   $self->_sql_maker(undef);
401   $self->_sql_maker_opts({});
402
403   my $info = [ @$info_arg ]; # copy because we can alter it
404   my $last_info = $info->[-1];
405   if(ref $last_info eq 'HASH') {
406     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
407       $self->on_connect_do($on_connect_do);
408     }
409     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
410       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
411         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
412       }
413     }
414
415     # Get rid of any trailing empty hashref
416     pop(@$info) if !keys %$last_info;
417   }
418
419   $self->_connect_info($info);
420 }
421
422 =head2 on_connect_do
423
424 This method is deprecated in favor of setting via L</connect_info>.
425
426 =head2 dbh_do
427
428 Arguments: $subref, @extra_coderef_args?
429
430 Execute the given subref with the underlying database handle as its
431 first argument, using the new exception-based connection management.
432
433 Any additional arguments will be passed verbatim to the called subref
434 as arguments 2 and onwards.
435
436 Example:
437
438   my @stuff = $schema->storage->dbh_do(
439     sub {
440       my $dbh = shift;
441       my $cols = join(q{, }, @_);
442       shift->selectrow_array("SELECT $cols FROM foo")
443     },
444     @column_list
445   );
446
447 =cut
448
449 sub dbh_do {
450   my $self = shift;
451   my $todo = shift;
452
453   my @result;
454   my $want_array = wantarray;
455
456   eval {
457     $self->_verify_pid if $self->_dbh;
458     $self->_populate_dbh if !$self->_dbh;
459     my $dbh = $self->_dbh;
460     if($want_array) {
461         @result = $todo->($dbh, @_);
462     }
463     elsif(defined $want_array) {
464         $result[0] = $todo->($dbh, @_);
465     }
466     else {
467         $todo->($dbh, @_);
468     }
469   };
470
471   if($@) {
472     my $exception = $@;
473     $self->connected
474       ? $self->throw_exception($exception)
475       : $self->_populate_dbh;
476
477     my $dbh = $self->_dbh;
478     return $todo->($dbh, @_);
479   }
480
481   return $want_array ? @result : $result[0];
482 }
483
484 =head2 disconnect
485
486 Our C<disconnect> method also performs a rollback first if the
487 database is not in C<AutoCommit> mode.
488
489 =cut
490
491 sub disconnect {
492   my ($self) = @_;
493
494   if( $self->connected ) {
495     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
496     $self->_dbh->disconnect;
497     $self->_dbh(undef);
498   }
499 }
500
501 sub connected {
502   my ($self) = @_;
503
504   if(my $dbh = $self->_dbh) {
505       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
506           return $self->_dbh(undef);
507       }
508       else {
509           $self->_verify_pid;
510       }
511       return ($dbh->FETCH('Active') && $dbh->ping);
512   }
513
514   return 0;
515 }
516
517 # handle pid changes correctly
518 #  NOTE: assumes $self->_dbh is a valid $dbh
519 sub _verify_pid {
520   my ($self) = @_;
521
522   return if $self->_conn_pid == $$;
523
524   $self->_dbh->{InactiveDestroy} = 1;
525   $self->_dbh(undef);
526
527   return;
528 }
529
530 sub ensure_connected {
531   my ($self) = @_;
532
533   unless ($self->connected) {
534     $self->_populate_dbh;
535   }
536 }
537
538 =head2 dbh
539
540 Returns the dbh - a data base handle of class L<DBI>.
541
542 =cut
543
544 sub dbh {
545   my ($self) = @_;
546
547   $self->ensure_connected;
548   return $self->_dbh;
549 }
550
551 sub _sql_maker_args {
552     my ($self) = @_;
553     
554     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
555 }
556
557 sub sql_maker {
558   my ($self) = @_;
559   unless ($self->_sql_maker) {
560     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
561   }
562   return $self->_sql_maker;
563 }
564
565 sub _populate_dbh {
566   my ($self) = @_;
567   my @info = @{$self->_connect_info || []};
568   $self->_dbh($self->_connect(@info));
569
570   if(ref $self eq 'DBIx::Class::Storage::DBI') {
571     my $driver = $self->_dbh->{Driver}->{Name};
572     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
573       bless $self, "DBIx::Class::Storage::DBI::${driver}";
574       $self->_rebless() if $self->can('_rebless');
575     }
576   }
577
578   # if on-connect sql statements are given execute them
579   foreach my $sql_statement (@{$self->on_connect_do || []}) {
580     $self->debugobj->query_start($sql_statement) if $self->debug();
581     $self->_dbh->do($sql_statement);
582     $self->debugobj->query_end($sql_statement) if $self->debug();
583   }
584
585   $self->_conn_pid($$);
586   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
587 }
588
589 sub _connect {
590   my ($self, @info) = @_;
591
592   $self->throw_exception("You failed to provide any connection info")
593       if !@info;
594
595   my ($old_connect_via, $dbh);
596
597   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
598       $old_connect_via = $DBI::connect_via;
599       $DBI::connect_via = 'connect';
600   }
601
602   eval {
603     if(ref $info[0] eq 'CODE') {
604        $dbh = &{$info[0]}
605     }
606     else {
607        $dbh = DBI->connect(@info);
608        $dbh->{RaiseError} = 1;
609        $dbh->{PrintError} = 0;
610     }
611   };
612
613   $DBI::connect_via = $old_connect_via if $old_connect_via;
614
615   if (!$dbh || $@) {
616     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
617   }
618
619   $dbh;
620 }
621
622
623 sub __txn_begin {
624   my ($dbh, $self) = @_;
625   if ($dbh->{AutoCommit}) {
626     $self->debugobj->txn_begin()
627       if ($self->debug);
628     $dbh->begin_work;
629   }
630 }
631
632 sub txn_begin {
633   my $self = shift;
634   $self->dbh_do(\&__txn_begin, $self)
635     if $self->{transaction_depth}++ == 0;
636 }
637
638 sub __txn_commit {
639   my ($dbh, $self) = @_;
640   if ($self->{transaction_depth} == 0) {
641     unless ($dbh->{AutoCommit}) {
642       $self->debugobj->txn_commit()
643         if ($self->debug);
644       $dbh->commit;
645     }
646   }
647   else {
648     if (--$self->{transaction_depth} == 0) {
649       $self->debugobj->txn_commit()
650         if ($self->debug);
651       $dbh->commit;
652     }
653   }
654 }
655
656 sub txn_commit {
657   my $self = shift;
658   $self->dbh_do(\&__txn_commit, $self);
659 }
660
661 sub __txn_rollback {
662   my ($dbh, $self) = @_;
663   if ($self->{transaction_depth} == 0) {
664     unless ($dbh->{AutoCommit}) {
665       $self->debugobj->txn_rollback()
666         if ($self->debug);
667       $dbh->rollback;
668     }
669   }
670   else {
671     if (--$self->{transaction_depth} == 0) {
672       $self->debugobj->txn_rollback()
673         if ($self->debug);
674       $dbh->rollback;
675     }
676     else {
677       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
678     }
679   }
680 }
681
682 sub txn_rollback {
683   my $self = shift;
684   eval { $self->dbh_do(\&__txn_rollback, $self) };
685   if ($@) {
686     my $error = $@;
687     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
688     $error =~ /$exception_class/ and $self->throw_exception($error);
689     $self->{transaction_depth} = 0;          # ensure that a failed rollback
690     $self->throw_exception($error);          # resets the transaction depth
691   }
692 }
693
694 sub _execute {
695   my ($self, $op, $extra_bind, $ident, @args) = @_;
696   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
697   unshift(@bind, @$extra_bind) if $extra_bind;
698   if ($self->debug) {
699       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
700       $self->debugobj->query_start($sql, @debug_bind);
701   }
702   my $sth = eval { $self->sth($sql,$op) };
703
704   if (!$sth || $@) {
705     $self->throw_exception(
706       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
707     );
708   }
709   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
710   my $rv;
711   if ($sth) {
712     my $time = time();
713     $rv = eval { $sth->execute(@bind) };
714
715     if ($@ || !$rv) {
716       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
717     }
718   } else {
719     $self->throw_exception("'$sql' did not generate a statement.");
720   }
721   if ($self->debug) {
722       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
723       $self->debugobj->query_end($sql, @debug_bind);
724   }
725   return (wantarray ? ($rv, $sth, @bind) : $rv);
726 }
727
728 sub insert {
729   my ($self, $ident, $to_insert) = @_;
730   $self->throw_exception(
731     "Couldn't insert ".join(', ',
732       map "$_ => $to_insert->{$_}", keys %$to_insert
733     )." into ${ident}"
734   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
735   return $to_insert;
736 }
737
738 sub update {
739   return shift->_execute('update' => [], @_);
740 }
741
742 sub delete {
743   return shift->_execute('delete' => [], @_);
744 }
745
746 sub _select {
747   my ($self, $ident, $select, $condition, $attrs) = @_;
748   my $order = $attrs->{order_by};
749   if (ref $condition eq 'SCALAR') {
750     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
751   }
752   if (exists $attrs->{group_by} || $attrs->{having}) {
753     $order = {
754       group_by => $attrs->{group_by},
755       having => $attrs->{having},
756       ($order ? (order_by => $order) : ())
757     };
758   }
759   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
760   if ($attrs->{software_limit} ||
761       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
762         $attrs->{software_limit} = 1;
763   } else {
764     $self->throw_exception("rows attribute must be positive if present")
765       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
766     push @args, $attrs->{rows}, $attrs->{offset};
767   }
768   return $self->_execute(@args);
769 }
770
771 sub select {
772   my $self = shift;
773   my ($ident, $select, $condition, $attrs) = @_;
774   return $self->cursor->new($self, \@_, $attrs);
775 }
776
777 sub select_single {
778   my $self = shift;
779   my ($rv, $sth, @bind) = $self->_select(@_);
780   my @row = $sth->fetchrow_array;
781   # Need to call finish() to work round broken DBDs
782   $sth->finish();
783   return @row;
784 }
785
786 =head2 sth
787
788 Returns a L<DBI> sth (statement handle) for the supplied SQL.
789
790 =cut
791
792 sub __sth {
793   my ($dbh, $sql) = @_;
794   # 3 is the if_active parameter which avoids active sth re-use
795   $dbh->prepare_cached($sql, {}, 3);
796 }
797
798 sub sth {
799   my ($self, $sql) = @_;
800   $self->dbh_do(\&__sth, $sql);
801 }
802
803
804 sub __columns_info_for {
805   my ($dbh, $self, $table) = @_;
806
807   if ($dbh->can('column_info')) {
808     my %result;
809     eval {
810       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
811       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
812       $sth->execute();
813       while ( my $info = $sth->fetchrow_hashref() ){
814         my %column_info;
815         $column_info{data_type}   = $info->{TYPE_NAME};
816         $column_info{size}      = $info->{COLUMN_SIZE};
817         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
818         $column_info{default_value} = $info->{COLUMN_DEF};
819         my $col_name = $info->{COLUMN_NAME};
820         $col_name =~ s/^\"(.*)\"$/$1/;
821
822         $result{$col_name} = \%column_info;
823       }
824     };
825     return \%result if !$@;
826   }
827
828   my %result;
829   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
830   $sth->execute;
831   my @columns = @{$sth->{NAME_lc}};
832   for my $i ( 0 .. $#columns ){
833     my %column_info;
834     my $type_num = $sth->{TYPE}->[$i];
835     my $type_name;
836     if(defined $type_num && $dbh->can('type_info')) {
837       my $type_info = $dbh->type_info($type_num);
838       $type_name = $type_info->{TYPE_NAME} if $type_info;
839     }
840     $column_info{data_type} = $type_name ? $type_name : $type_num;
841     $column_info{size} = $sth->{PRECISION}->[$i];
842     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
843
844     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
845       $column_info{data_type} = $1;
846       $column_info{size}    = $2;
847     }
848
849     $result{$columns[$i]} = \%column_info;
850   }
851
852   return \%result;
853 }
854
855 sub columns_info_for {
856   my ($self, $table) = @_;
857   $self->dbh_do(\&__columns_info_for, $self, $table);
858 }
859
860 =head2 last_insert_id
861
862 Return the row id of the last insert.
863
864 =cut
865
866 sub last_insert_id {
867   my ($self, $row) = @_;
868     
869   $self->dbh_do(sub { shift->func('last_insert_rowid') });
870 }
871
872 =head2 sqlt_type
873
874 Returns the database driver name.
875
876 =cut
877
878 sub sqlt_type { shift->dbh_do(sub { shift->{Driver}->{Name} }) }
879
880 =head2 create_ddl_dir (EXPERIMENTAL)
881
882 =over 4
883
884 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
885
886 =back
887
888 Creates an SQL file based on the Schema, for each of the specified
889 database types, in the given directory.
890
891 Note that this feature is currently EXPERIMENTAL and may not work correctly
892 across all databases, or fully handle complex relationships.
893
894 =cut
895
896 sub create_ddl_dir
897 {
898   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
899
900   if(!$dir || !-d $dir)
901   {
902     warn "No directory given, using ./\n";
903     $dir = "./";
904   }
905   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
906   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
907   $version ||= $schema->VERSION || '1.x';
908   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
909
910   eval "use SQL::Translator";
911   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
912
913   my $sqlt = SQL::Translator->new($sqltargs);
914   foreach my $db (@$databases)
915   {
916     $sqlt->reset();
917     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
918 #    $sqlt->parser_args({'DBIx::Class' => $schema);
919     $sqlt->data($schema);
920     $sqlt->producer($db);
921
922     my $file;
923     my $filename = $schema->ddl_filename($db, $dir, $version);
924     if(-e $filename)
925     {
926       $self->throw_exception("$filename already exists, skipping $db");
927       next;
928     }
929     open($file, ">$filename") 
930       or $self->throw_exception("Can't open $filename for writing ($!)");
931     my $output = $sqlt->translate;
932 #use Data::Dumper;
933 #    print join(":", keys %{$schema->source_registrations});
934 #    print Dumper($sqlt->schema);
935     if(!$output)
936     {
937       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
938       next;
939     }
940     print $file $output;
941     close($file);
942   }
943
944 }
945
946 =head2 deployment_statements
947
948 Create the statements for L</deploy> and
949 L<DBIx::Class::Schema/deploy>.
950
951 =cut
952
953 sub deployment_statements {
954   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
955   # Need to be connected to get the correct sqlt_type
956   $self->ensure_connected() unless $type;
957   $type ||= $self->sqlt_type;
958   $version ||= $schema->VERSION || '1.x';
959   $dir ||= './';
960   eval "use SQL::Translator";
961   if(!$@)
962   {
963     eval "use SQL::Translator::Parser::DBIx::Class;";
964     $self->throw_exception($@) if $@;
965     eval "use SQL::Translator::Producer::${type};";
966     $self->throw_exception($@) if $@;
967     my $tr = SQL::Translator->new(%$sqltargs);
968     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
969     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
970   }
971
972   my $filename = $schema->ddl_filename($type, $dir, $version);
973   if(!-f $filename)
974   {
975 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
976       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
977       return;
978   }
979   my $file;
980   open($file, "<$filename") 
981       or $self->throw_exception("Can't open $filename ($!)");
982   my @rows = <$file>;
983   close($file);
984
985   return join('', @rows);
986   
987 }
988
989 sub deploy {
990   my ($self, $schema, $type, $sqltargs) = @_;
991   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
992     for ( split(";\n", $statement)) {
993       next if($_ =~ /^--/);
994       next if(!$_);
995 #      next if($_ =~ /^DROP/m);
996       next if($_ =~ /^BEGIN TRANSACTION/m);
997       next if($_ =~ /^COMMIT/m);
998       next if $_ =~ /^\s+$/; # skip whitespace only
999       $self->debugobj->query_start($_) if $self->debug;
1000       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1001       $self->debugobj->query_end($_) if $self->debug;
1002     }
1003   }
1004 }
1005
1006 =head2 datetime_parser
1007
1008 Returns the datetime parser class
1009
1010 =cut
1011
1012 sub datetime_parser {
1013   my $self = shift;
1014   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1015 }
1016
1017 =head2 datetime_parser_type
1018
1019 Defines (returns) the datetime parser class - currently hardwired to
1020 L<DateTime::Format::MySQL>
1021
1022 =cut
1023
1024 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1025
1026 =head2 build_datetime_parser
1027
1028 See L</datetime_parser>
1029
1030 =cut
1031
1032 sub build_datetime_parser {
1033   my $self = shift;
1034   my $type = $self->datetime_parser_type(@_);
1035   eval "use ${type}";
1036   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1037   return $type;
1038 }
1039
1040 sub DESTROY {
1041   my $self = shift;
1042   return if !$self->_dbh;
1043
1044   $self->_verify_pid;
1045   $self->_dbh(undef);
1046 }
1047
1048 1;
1049
1050 =head1 SQL METHODS
1051
1052 The module defines a set of methods within the DBIC::SQL::Abstract
1053 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1054 SQL query functions.
1055
1056 The following methods are extended:-
1057
1058 =over 4
1059
1060 =item delete
1061
1062 =item insert
1063
1064 =item select
1065
1066 =item update
1067
1068 =item limit_dialect
1069
1070 See L</connect_info> for details.
1071 For setting, this method is deprecated in favor of L</connect_info>.
1072
1073 =item quote_char
1074
1075 See L</connect_info> for details.
1076 For setting, this method is deprecated in favor of L</connect_info>.
1077
1078 =item name_sep
1079
1080 See L</connect_info> for details.
1081 For setting, this method is deprecated in favor of L</connect_info>.
1082
1083 =back
1084
1085 =head1 AUTHORS
1086
1087 Matt S. Trout <mst@shadowcatsystems.co.uk>
1088
1089 Andy Grundman <andy@hybridized.org>
1090
1091 =head1 LICENSE
1092
1093 You may distribute this code under the same terms as Perl itself.
1094
1095 =cut
1096