reshuffling the division of labor between Storage and Storage::DBI (not complete)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14
15 __PACKAGE__->mk_group_accessors(
16   'simple' =>
17     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
18        cursor on_connect_do transaction_depth/
19 );
20
21 BEGIN {
22
23 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
24
25 use base qw/SQL::Abstract::Limit/;
26
27 # This prevents the caching of $dbh in S::A::L, I believe
28 sub new {
29   my $self = shift->SUPER::new(@_);
30
31   # If limit_dialect is a ref (like a $dbh), go ahead and replace
32   #   it with what it resolves to:
33   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
34     if ref $self->{limit_dialect};
35
36   $self;
37 }
38
39 # While we're at it, this should make LIMIT queries more efficient,
40 #  without digging into things too deeply
41 sub _find_syntax {
42   my ($self, $syntax) = @_;
43   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
44 }
45
46 sub select {
47   my ($self, $table, $fields, $where, $order, @rest) = @_;
48   $table = $self->_quote($table) unless ref($table);
49   local $self->{rownum_hack_count} = 1
50     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
51   @rest = (-1) unless defined $rest[0];
52   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
53     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
54   local $self->{having_bind} = [];
55   my ($sql, @ret) = $self->SUPER::select(
56     $table, $self->_recurse_fields($fields), $where, $order, @rest
57   );
58   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
59 }
60
61 sub insert {
62   my $self = shift;
63   my $table = shift;
64   $table = $self->_quote($table) unless ref($table);
65   $self->SUPER::insert($table, @_);
66 }
67
68 sub update {
69   my $self = shift;
70   my $table = shift;
71   $table = $self->_quote($table) unless ref($table);
72   $self->SUPER::update($table, @_);
73 }
74
75 sub delete {
76   my $self = shift;
77   my $table = shift;
78   $table = $self->_quote($table) unless ref($table);
79   $self->SUPER::delete($table, @_);
80 }
81
82 sub _emulate_limit {
83   my $self = shift;
84   if ($_[3] == -1) {
85     return $_[1].$self->_order_by($_[2]);
86   } else {
87     return $self->SUPER::_emulate_limit(@_);
88   }
89 }
90
91 sub _recurse_fields {
92   my ($self, $fields) = @_;
93   my $ref = ref $fields;
94   return $self->_quote($fields) unless $ref;
95   return $$fields if $ref eq 'SCALAR';
96
97   if ($ref eq 'ARRAY') {
98     return join(', ', map {
99       $self->_recurse_fields($_)
100       .(exists $self->{rownum_hack_count}
101          ? ' AS col'.$self->{rownum_hack_count}++
102          : '')
103      } @$fields);
104   } elsif ($ref eq 'HASH') {
105     foreach my $func (keys %$fields) {
106       return $self->_sqlcase($func)
107         .'( '.$self->_recurse_fields($fields->{$func}).' )';
108     }
109   }
110 }
111
112 sub _order_by {
113   my $self = shift;
114   my $ret = '';
115   my @extra;
116   if (ref $_[0] eq 'HASH') {
117     if (defined $_[0]->{group_by}) {
118       $ret = $self->_sqlcase(' group by ')
119                .$self->_recurse_fields($_[0]->{group_by});
120     }
121     if (defined $_[0]->{having}) {
122       my $frag;
123       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
124       push(@{$self->{having_bind}}, @extra);
125       $ret .= $self->_sqlcase(' having ').$frag;
126     }
127     if (defined $_[0]->{order_by}) {
128       $ret .= $self->_order_by($_[0]->{order_by});
129     }
130   } elsif (ref $_[0] eq 'SCALAR') {
131     $ret = $self->_sqlcase(' order by ').${ $_[0] };
132   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
133     my @order = @{+shift};
134     $ret = $self->_sqlcase(' order by ')
135           .join(', ', map {
136                         my $r = $self->_order_by($_, @_);
137                         $r =~ s/^ ?ORDER BY //i;
138                         $r;
139                       } @order);
140   } else {
141     $ret = $self->SUPER::_order_by(@_);
142   }
143   return $ret;
144 }
145
146 sub _order_directions {
147   my ($self, $order) = @_;
148   $order = $order->{order_by} if ref $order eq 'HASH';
149   return $self->SUPER::_order_directions($order);
150 }
151
152 sub _table {
153   my ($self, $from) = @_;
154   if (ref $from eq 'ARRAY') {
155     return $self->_recurse_from(@$from);
156   } elsif (ref $from eq 'HASH') {
157     return $self->_make_as($from);
158   } else {
159     return $from; # would love to quote here but _table ends up getting called
160                   # twice during an ->select without a limit clause due to
161                   # the way S::A::Limit->select works. should maybe consider
162                   # bypassing this and doing S::A::select($self, ...) in
163                   # our select method above. meantime, quoting shims have
164                   # been added to select/insert/update/delete here
165   }
166 }
167
168 sub _recurse_from {
169   my ($self, $from, @join) = @_;
170   my @sqlf;
171   push(@sqlf, $self->_make_as($from));
172   foreach my $j (@join) {
173     my ($to, $on) = @$j;
174
175     # check whether a join type exists
176     my $join_clause = '';
177     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
178     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
179       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
180     } else {
181       $join_clause = ' JOIN ';
182     }
183     push(@sqlf, $join_clause);
184
185     if (ref $to eq 'ARRAY') {
186       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
187     } else {
188       push(@sqlf, $self->_make_as($to));
189     }
190     push(@sqlf, ' ON ', $self->_join_condition($on));
191   }
192   return join('', @sqlf);
193 }
194
195 sub _make_as {
196   my ($self, $from) = @_;
197   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
198                      reverse each %{$self->_skip_options($from)});
199 }
200
201 sub _skip_options {
202   my ($self, $hash) = @_;
203   my $clean_hash = {};
204   $clean_hash->{$_} = $hash->{$_}
205     for grep {!/^-/} keys %$hash;
206   return $clean_hash;
207 }
208
209 sub _join_condition {
210   my ($self, $cond) = @_;
211   if (ref $cond eq 'HASH') {
212     my %j;
213     for (keys %$cond) {
214       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
215     };
216     return $self->_recurse_where(\%j);
217   } elsif (ref $cond eq 'ARRAY') {
218     return join(' OR ', map { $self->_join_condition($_) } @$cond);
219   } else {
220     die "Can't handle this yet!";
221   }
222 }
223
224 sub _quote {
225   my ($self, $label) = @_;
226   return '' unless defined $label;
227   return "*" if $label eq '*';
228   return $label unless $self->{quote_char};
229   if(ref $self->{quote_char} eq "ARRAY"){
230     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
231       if !defined $self->{name_sep};
232     my $sep = $self->{name_sep};
233     return join($self->{name_sep},
234         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
235        split(/\Q$sep\E/,$label));
236   }
237   return $self->SUPER::_quote($label);
238 }
239
240 sub limit_dialect {
241     my $self = shift;
242     $self->{limit_dialect} = shift if @_;
243     return $self->{limit_dialect};
244 }
245
246 sub quote_char {
247     my $self = shift;
248     $self->{quote_char} = shift if @_;
249     return $self->{quote_char};
250 }
251
252 sub name_sep {
253     my $self = shift;
254     $self->{name_sep} = shift if @_;
255     return $self->{name_sep};
256 }
257
258 } # End of BEGIN block
259
260 =head1 NAME
261
262 DBIx::Class::Storage::DBI - DBI storage handler
263
264 =head1 SYNOPSIS
265
266 =head1 DESCRIPTION
267
268 This class represents the connection to an RDBMS via L<DBI>.  See
269 L<DBIx::Class::Storage> for general information.  This pod only
270 documents DBI-specific methods and behaviors.
271
272 =head1 METHODS
273
274 =cut
275
276 sub new {
277   my $new = shift->next::method(@_);
278
279   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
280   $new->transaction_depth(0);
281   $new->_sql_maker_opts({});
282
283   $new;
284 }
285
286 =head2 connect_info
287
288 The arguments of C<connect_info> are always a single array reference.
289
290 This is normally accessed via L<DBIx::Class::Schema/connection>, which
291 encapsulates its argument list in an arrayref before calling
292 C<connect_info> here.
293
294 The arrayref can either contain the same set of arguments one would
295 normally pass to L<DBI/connect>, or a lone code reference which returns
296 a connected database handle.
297
298 In either case, if the final argument in your connect_info happens
299 to be a hashref, C<connect_info> will look there for several
300 connection-specific options:
301
302 =over 4
303
304 =item on_connect_do
305
306 This can be set to an arrayref of literal sql statements, which will
307 be executed immediately after making the connection to the database
308 every time we [re-]connect.
309
310 =item limit_dialect 
311
312 Sets the limit dialect. This is useful for JDBC-bridge among others
313 where the remote SQL-dialect cannot be determined by the name of the
314 driver alone.
315
316 =item quote_char
317
318 Specifies what characters to use to quote table and column names. If 
319 you use this you will want to specify L<name_sep> as well.
320
321 quote_char expects either a single character, in which case is it is placed
322 on either side of the table/column, or an arrayref of length 2 in which case the
323 table/column name is placed between the elements.
324
325 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
326 use C<quote_char =E<gt> [qw/[ ]/]>.
327
328 =item name_sep
329
330 This only needs to be used in conjunction with L<quote_char>, and is used to 
331 specify the charecter that seperates elements (schemas, tables, columns) from 
332 each other. In most cases this is simply a C<.>.
333
334 =back
335
336 These options can be mixed in with your other L<DBI> connection attributes,
337 or placed in a seperate hashref after all other normal L<DBI> connection
338 arguments.
339
340 Every time C<connect_info> is invoked, any previous settings for
341 these options will be cleared before setting the new ones, regardless of
342 whether any options are specified in the new C<connect_info>.
343
344 Important note:  DBIC expects the returned database handle provided by 
345 a subref argument to have RaiseError set on it.  If it doesn't, things
346 might not work very well, YMMV.  If you don't use a subref, DBIC will
347 force this setting for you anyways.  Setting HandleError to anything
348 other than simple exception object wrapper might cause problems too.
349
350 Examples:
351
352   # Simple SQLite connection
353   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
354
355   # Connect via subref
356   ->connect_info([ sub { DBI->connect(...) } ]);
357
358   # A bit more complicated
359   ->connect_info(
360     [
361       'dbi:Pg:dbname=foo',
362       'postgres',
363       'my_pg_password',
364       { AutoCommit => 0 },
365       { quote_char => q{"}, name_sep => q{.} },
366     ]
367   );
368
369   # Equivalent to the previous example
370   ->connect_info(
371     [
372       'dbi:Pg:dbname=foo',
373       'postgres',
374       'my_pg_password',
375       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
376     ]
377   );
378
379   # Subref + DBIC-specific connection options
380   ->connect_info(
381     [
382       sub { DBI->connect(...) },
383       {
384           quote_char => q{`},
385           name_sep => q{@},
386           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
387       },
388     ]
389   );
390
391 =cut
392
393 sub connect_info {
394   my ($self, $info_arg) = @_;
395
396   return $self->_connect_info if !$info_arg;
397
398   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
399   #  the new set of options
400   $self->_sql_maker(undef);
401   $self->_sql_maker_opts({});
402
403   my $info = [ @$info_arg ]; # copy because we can alter it
404   my $last_info = $info->[-1];
405   if(ref $last_info eq 'HASH') {
406     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
407       $self->on_connect_do($on_connect_do);
408     }
409     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
410       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
411         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
412       }
413     }
414
415     # Get rid of any trailing empty hashref
416     pop(@$info) if !keys %$last_info;
417   }
418
419   $self->_connect_info($info);
420 }
421
422 =head2 on_connect_do
423
424 This method is deprecated in favor of setting via L</connect_info>.
425
426 =head2 dbh_do
427
428 Arguments: $subref, @extra_coderef_args?
429
430 Execute the given subref with the underlying database handle as its
431 first argument, using the new exception-based connection management.
432
433 Any additional arguments will be passed verbatim to the called subref
434 as arguments 2 and onwards.
435
436 Example:
437
438   my @stuff = $schema->storage->dbh_do(
439     sub {
440       my $dbh = shift;
441       my $cols = join(q{, }, @_);
442       shift->selectrow_array("SELECT $cols FROM foo")
443     },
444     @column_list
445   );
446
447 =cut
448
449 sub dbh_do {
450   my $self = shift;
451   my $todo = shift;
452
453   my @result;
454   my $want_array = wantarray;
455
456   eval {
457     $self->_verify_pid if $self->_dbh;
458     $self->_populate_dbh if !$self->_dbh;
459     my $dbh = $self->_dbh;
460     if($want_array) {
461         @result = $todo->($dbh, @_);
462     }
463     elsif(defined $want_array) {
464         $result[0] = $todo->($dbh, @_);
465     }
466     else {
467         $todo->($dbh, @_);
468     }
469   };
470
471   if($@) {
472     my $exception = $@;
473     $self->connected
474       ? $self->throw_exception($exception)
475       : $self->_populate_dbh;
476
477     my $dbh = $self->_dbh;
478     return $todo->($dbh, @_);
479   }
480
481   return $want_array ? @result : $result[0];
482 }
483
484 =head2 disconnect
485
486 Our C<disconnect> method also performs a rollback first if the
487 database is not in C<AutoCommit> mode.
488
489 =cut
490
491 sub disconnect {
492   my ($self) = @_;
493
494   if( $self->connected ) {
495     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
496     $self->_dbh->disconnect;
497     $self->_dbh(undef);
498   }
499 }
500
501 sub connected {
502   my ($self) = @_;
503
504   if(my $dbh = $self->_dbh) {
505       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
506           return $self->_dbh(undef);
507       }
508       else {
509           $self->_verify_pid;
510       }
511       return ($dbh->FETCH('Active') && $dbh->ping);
512   }
513
514   return 0;
515 }
516
517 # handle pid changes correctly
518 #  NOTE: assumes $self->_dbh is a valid $dbh
519 sub _verify_pid {
520   my ($self) = @_;
521
522   return if $self->_conn_pid == $$;
523
524   $self->_dbh->{InactiveDestroy} = 1;
525   $self->_dbh(undef);
526
527   return;
528 }
529
530 sub ensure_connected {
531   my ($self) = @_;
532
533   unless ($self->connected) {
534     $self->_populate_dbh;
535   }
536 }
537
538 =head2 dbh
539
540 Returns the dbh - a data base handle of class L<DBI>.
541
542 =cut
543
544 sub dbh {
545   my ($self) = @_;
546
547   $self->ensure_connected;
548   return $self->_dbh;
549 }
550
551 sub _sql_maker_args {
552     my ($self) = @_;
553     
554     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
555 }
556
557 sub sql_maker {
558   my ($self) = @_;
559   unless ($self->_sql_maker) {
560     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
561   }
562   return $self->_sql_maker;
563 }
564
565 sub _populate_dbh {
566   my ($self) = @_;
567   my @info = @{$self->_connect_info || []};
568   $self->_dbh($self->_connect(@info));
569
570   if(ref $self eq 'DBIx::Class::Storage::DBI') {
571     my $driver = $self->_dbh->{Driver}->{Name};
572     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
573       bless $self, "DBIx::Class::Storage::DBI::${driver}";
574       $self->_rebless() if $self->can('_rebless');
575     }
576   }
577
578   # if on-connect sql statements are given execute them
579   foreach my $sql_statement (@{$self->on_connect_do || []}) {
580     $self->debugobj->query_start($sql_statement) if $self->debug();
581     $self->_dbh->do($sql_statement);
582     $self->debugobj->query_end($sql_statement) if $self->debug();
583   }
584
585   $self->_conn_pid($$);
586   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
587 }
588
589 sub _connect {
590   my ($self, @info) = @_;
591
592   $self->throw_exception("You failed to provide any connection info")
593       if !@info;
594
595   my ($old_connect_via, $dbh);
596
597   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
598       $old_connect_via = $DBI::connect_via;
599       $DBI::connect_via = 'connect';
600   }
601
602   eval {
603     if(ref $info[0] eq 'CODE') {
604        $dbh = &{$info[0]}
605     }
606     else {
607        $dbh = DBI->connect(@info);
608        $dbh->{RaiseError} = 1;
609        $dbh->{PrintError} = 0;
610     }
611   };
612
613   $DBI::connect_via = $old_connect_via if $old_connect_via;
614
615   if (!$dbh || $@) {
616     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
617   }
618
619   $dbh;
620 }
621
622 sub txn_begin {
623   my $self = shift;
624   if ($self->{transaction_depth}++ == 0) {
625     $self->dbh_do(sub {
626       my $dbh = shift;
627       if ($dbh->{AutoCommit}) {
628         $self->debugobj->txn_begin()
629           if ($self->debug);
630         $dbh->begin_work;
631       }
632     });
633   }
634 }
635
636 sub txn_commit {
637   my $self = shift;
638   $self->dbh_do(sub {
639     my $dbh = shift;
640     if ($self->{transaction_depth} == 0) {
641       unless ($dbh->{AutoCommit}) {
642         $self->debugobj->txn_commit()
643           if ($self->debug);
644         $dbh->commit;
645       }
646     }
647     else {
648       if (--$self->{transaction_depth} == 0) {
649         $self->debugobj->txn_commit()
650           if ($self->debug);
651         $dbh->commit;
652       }
653     }
654   });
655 }
656
657 sub txn_rollback {
658   my $self = shift;
659
660   eval {
661     $self->dbh_do(sub {
662       my $dbh = shift;
663       if ($self->{transaction_depth} == 0) {
664         unless ($dbh->{AutoCommit}) {
665           $self->debugobj->txn_rollback()
666             if ($self->debug);
667           $dbh->rollback;
668         }
669       }
670       else {
671         if (--$self->{transaction_depth} == 0) {
672           $self->debugobj->txn_rollback()
673             if ($self->debug);
674           $dbh->rollback;
675         }
676         else {
677           die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
678         }
679       }
680     });
681   };
682
683   if ($@) {
684     my $error = $@;
685     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
686     $error =~ /$exception_class/ and $self->throw_exception($error);
687     $self->{transaction_depth} = 0;          # ensure that a failed rollback
688     $self->throw_exception($error);          # resets the transaction depth
689   }
690 }
691
692 sub _execute {
693   my ($self, $op, $extra_bind, $ident, @args) = @_;
694   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
695   unshift(@bind, @$extra_bind) if $extra_bind;
696   if ($self->debug) {
697       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
698       $self->debugobj->query_start($sql, @debug_bind);
699   }
700   my $sth = eval { $self->sth($sql,$op) };
701
702   if (!$sth || $@) {
703     $self->throw_exception(
704       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
705     );
706   }
707   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
708   my $rv;
709   if ($sth) {
710     my $time = time();
711     $rv = eval { $sth->execute(@bind) };
712
713     if ($@ || !$rv) {
714       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
715     }
716   } else {
717     $self->throw_exception("'$sql' did not generate a statement.");
718   }
719   if ($self->debug) {
720       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
721       $self->debugobj->query_end($sql, @debug_bind);
722   }
723   return (wantarray ? ($rv, $sth, @bind) : $rv);
724 }
725
726 sub insert {
727   my ($self, $ident, $to_insert) = @_;
728   $self->throw_exception(
729     "Couldn't insert ".join(', ',
730       map "$_ => $to_insert->{$_}", keys %$to_insert
731     )." into ${ident}"
732   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
733   return $to_insert;
734 }
735
736 sub update {
737   return shift->_execute('update' => [], @_);
738 }
739
740 sub delete {
741   return shift->_execute('delete' => [], @_);
742 }
743
744 sub _select {
745   my ($self, $ident, $select, $condition, $attrs) = @_;
746   my $order = $attrs->{order_by};
747   if (ref $condition eq 'SCALAR') {
748     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
749   }
750   if (exists $attrs->{group_by} || $attrs->{having}) {
751     $order = {
752       group_by => $attrs->{group_by},
753       having => $attrs->{having},
754       ($order ? (order_by => $order) : ())
755     };
756   }
757   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
758   if ($attrs->{software_limit} ||
759       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
760         $attrs->{software_limit} = 1;
761   } else {
762     $self->throw_exception("rows attribute must be positive if present")
763       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
764     push @args, $attrs->{rows}, $attrs->{offset};
765   }
766   return $self->_execute(@args);
767 }
768
769 =head2 select
770
771 Handle a SQL select statement.
772
773 =cut
774
775 sub select {
776   my $self = shift;
777   my ($ident, $select, $condition, $attrs) = @_;
778   return $self->cursor->new($self, \@_, $attrs);
779 }
780
781 =head2 select_single
782
783 Performs a select, fetch and return of data - handles a single row
784 only.
785
786 =cut
787
788 # Need to call finish() to work round broken DBDs
789
790 sub select_single {
791   my $self = shift;
792   my ($rv, $sth, @bind) = $self->_select(@_);
793   my @row = $sth->fetchrow_array;
794   $sth->finish();
795   return @row;
796 }
797
798 =head2 sth
799
800 Returns a L<DBI> sth (statement handle) for the supplied SQL.
801
802 =cut
803
804 sub sth {
805   my ($self, $sql) = @_;
806   # 3 is the if_active parameter which avoids active sth re-use
807   return $self->dbh_do(sub { shift->prepare_cached($sql, {}, 3) });
808 }
809
810 =head2 columns_info_for
811
812 Returns database type info for a given table columns.
813
814 =cut
815
816 sub columns_info_for {
817   my ($self, $table) = @_;
818
819   $self->dbh_do(sub {
820     my $dbh = shift;
821
822     if ($dbh->can('column_info')) {
823       my %result;
824       eval {
825         my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
826         my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
827         $sth->execute();
828         while ( my $info = $sth->fetchrow_hashref() ){
829           my %column_info;
830           $column_info{data_type}   = $info->{TYPE_NAME};
831           $column_info{size}      = $info->{COLUMN_SIZE};
832           $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
833           $column_info{default_value} = $info->{COLUMN_DEF};
834           my $col_name = $info->{COLUMN_NAME};
835           $col_name =~ s/^\"(.*)\"$/$1/;
836
837           $result{$col_name} = \%column_info;
838         }
839       };
840       return \%result if !$@;
841     }
842
843     my %result;
844     my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
845     $sth->execute;
846     my @columns = @{$sth->{NAME_lc}};
847     for my $i ( 0 .. $#columns ){
848       my %column_info;
849       my $type_num = $sth->{TYPE}->[$i];
850       my $type_name;
851       if(defined $type_num && $dbh->can('type_info')) {
852         my $type_info = $dbh->type_info($type_num);
853         $type_name = $type_info->{TYPE_NAME} if $type_info;
854       }
855       $column_info{data_type} = $type_name ? $type_name : $type_num;
856       $column_info{size} = $sth->{PRECISION}->[$i];
857       $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
858
859       if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
860         $column_info{data_type} = $1;
861         $column_info{size}    = $2;
862       }
863
864       $result{$columns[$i]} = \%column_info;
865     }
866
867     return \%result;
868   });
869 }
870
871 =head2 last_insert_id
872
873 Return the row id of the last insert.
874
875 =cut
876
877 sub last_insert_id {
878   my ($self, $row) = @_;
879     
880   $self->dbh_do(sub { shift->func('last_insert_rowid') });
881 }
882
883 =head2 sqlt_type
884
885 Returns the database driver name.
886
887 =cut
888
889 sub sqlt_type { shift->dbh_do(sub { shift->{Driver}->{Name} }) }
890
891 =head2 create_ddl_dir (EXPERIMENTAL)
892
893 =over 4
894
895 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
896
897 =back
898
899 Creates an SQL file based on the Schema, for each of the specified
900 database types, in the given directory.
901
902 Note that this feature is currently EXPERIMENTAL and may not work correctly
903 across all databases, or fully handle complex relationships.
904
905 =cut
906
907 sub create_ddl_dir
908 {
909   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
910
911   if(!$dir || !-d $dir)
912   {
913     warn "No directory given, using ./\n";
914     $dir = "./";
915   }
916   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
917   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
918   $version ||= $schema->VERSION || '1.x';
919   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
920
921   eval "use SQL::Translator";
922   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
923
924   my $sqlt = SQL::Translator->new($sqltargs);
925   foreach my $db (@$databases)
926   {
927     $sqlt->reset();
928     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
929 #    $sqlt->parser_args({'DBIx::Class' => $schema);
930     $sqlt->data($schema);
931     $sqlt->producer($db);
932
933     my $file;
934     my $filename = $schema->ddl_filename($db, $dir, $version);
935     if(-e $filename)
936     {
937       $self->throw_exception("$filename already exists, skipping $db");
938       next;
939     }
940     open($file, ">$filename") 
941       or $self->throw_exception("Can't open $filename for writing ($!)");
942     my $output = $sqlt->translate;
943 #use Data::Dumper;
944 #    print join(":", keys %{$schema->source_registrations});
945 #    print Dumper($sqlt->schema);
946     if(!$output)
947     {
948       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
949       next;
950     }
951     print $file $output;
952     close($file);
953   }
954
955 }
956
957 =head2 deployment_statements
958
959 Create the statements for L</deploy> and
960 L<DBIx::Class::Schema/deploy>.
961
962 =cut
963
964 sub deployment_statements {
965   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
966   # Need to be connected to get the correct sqlt_type
967   $self->ensure_connected() unless $type;
968   $type ||= $self->sqlt_type;
969   $version ||= $schema->VERSION || '1.x';
970   $dir ||= './';
971   eval "use SQL::Translator";
972   if(!$@)
973   {
974     eval "use SQL::Translator::Parser::DBIx::Class;";
975     $self->throw_exception($@) if $@;
976     eval "use SQL::Translator::Producer::${type};";
977     $self->throw_exception($@) if $@;
978     my $tr = SQL::Translator->new(%$sqltargs);
979     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
980     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
981   }
982
983   my $filename = $schema->ddl_filename($type, $dir, $version);
984   if(!-f $filename)
985   {
986 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
987       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
988       return;
989   }
990   my $file;
991   open($file, "<$filename") 
992       or $self->throw_exception("Can't open $filename ($!)");
993   my @rows = <$file>;
994   close($file);
995
996   return join('', @rows);
997   
998 }
999
1000 sub deploy {
1001   my ($self, $schema, $type, $sqltargs) = @_;
1002   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1003     for ( split(";\n", $statement)) {
1004       next if($_ =~ /^--/);
1005       next if(!$_);
1006 #      next if($_ =~ /^DROP/m);
1007       next if($_ =~ /^BEGIN TRANSACTION/m);
1008       next if($_ =~ /^COMMIT/m);
1009       next if $_ =~ /^\s+$/; # skip whitespace only
1010       $self->debugobj->query_start($_) if $self->debug;
1011       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1012       $self->debugobj->query_end($_) if $self->debug;
1013     }
1014   }
1015 }
1016
1017 =head2 datetime_parser
1018
1019 Returns the datetime parser class
1020
1021 =cut
1022
1023 sub datetime_parser {
1024   my $self = shift;
1025   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1026 }
1027
1028 =head2 datetime_parser_type
1029
1030 Defines (returns) the datetime parser class - currently hardwired to
1031 L<DateTime::Format::MySQL>
1032
1033 =cut
1034
1035 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1036
1037 =head2 build_datetime_parser
1038
1039 See L</datetime_parser>
1040
1041 =cut
1042
1043 sub build_datetime_parser {
1044   my $self = shift;
1045   my $type = $self->datetime_parser_type(@_);
1046   eval "use ${type}";
1047   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1048   return $type;
1049 }
1050
1051 sub DESTROY {
1052   my $self = shift;
1053   return if !$self->_dbh;
1054
1055   $self->_verify_pid;
1056   $self->_dbh(undef);
1057 }
1058
1059 1;
1060
1061 =head1 SQL METHODS
1062
1063 The module defines a set of methods within the DBIC::SQL::Abstract
1064 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1065 SQL query functions.
1066
1067 The following methods are extended:-
1068
1069 =over 4
1070
1071 =item delete
1072
1073 =item insert
1074
1075 =item select
1076
1077 =item update
1078
1079 =item limit_dialect
1080
1081 See L</connect_info> for details.
1082 For setting, this method is deprecated in favor of L</connect_info>.
1083
1084 =item quote_char
1085
1086 See L</connect_info> for details.
1087 For setting, this method is deprecated in favor of L</connect_info>.
1088
1089 =item name_sep
1090
1091 See L</connect_info> for details.
1092 For setting, this method is deprecated in favor of L</connect_info>.
1093
1094 =back
1095
1096 =head1 AUTHORS
1097
1098 Matt S. Trout <mst@shadowcatsystems.co.uk>
1099
1100 Andy Grundman <andy@hybridized.org>
1101
1102 =head1 LICENSE
1103
1104 You may distribute this code under the same terms as Perl itself.
1105
1106 =cut
1107