d2da6bc2b4e79b494d705409e098243dea20ebc9
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 # While we're at it, this should make LIMIT queries more efficient,
39 #  without digging into things too deeply
40 sub _find_syntax {
41   my ($self, $syntax) = @_;
42   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
43 }
44
45 sub select {
46   my ($self, $table, $fields, $where, $order, @rest) = @_;
47   $table = $self->_quote($table) unless ref($table);
48   local $self->{rownum_hack_count} = 1
49     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
50   @rest = (-1) unless defined $rest[0];
51   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
52     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
53   local $self->{having_bind} = [];
54   my ($sql, @ret) = $self->SUPER::select(
55     $table, $self->_recurse_fields($fields), $where, $order, @rest
56   );
57   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
58 }
59
60 sub insert {
61   my $self = shift;
62   my $table = shift;
63   $table = $self->_quote($table) unless ref($table);
64   $self->SUPER::insert($table, @_);
65 }
66
67 sub update {
68   my $self = shift;
69   my $table = shift;
70   $table = $self->_quote($table) unless ref($table);
71   $self->SUPER::update($table, @_);
72 }
73
74 sub delete {
75   my $self = shift;
76   my $table = shift;
77   $table = $self->_quote($table) unless ref($table);
78   $self->SUPER::delete($table, @_);
79 }
80
81 sub _emulate_limit {
82   my $self = shift;
83   if ($_[3] == -1) {
84     return $_[1].$self->_order_by($_[2]);
85   } else {
86     return $self->SUPER::_emulate_limit(@_);
87   }
88 }
89
90 sub _recurse_fields {
91   my ($self, $fields) = @_;
92   my $ref = ref $fields;
93   return $self->_quote($fields) unless $ref;
94   return $$fields if $ref eq 'SCALAR';
95
96   if ($ref eq 'ARRAY') {
97     return join(', ', map {
98       $self->_recurse_fields($_)
99       .(exists $self->{rownum_hack_count}
100          ? ' AS col'.$self->{rownum_hack_count}++
101          : '')
102      } @$fields);
103   } elsif ($ref eq 'HASH') {
104     foreach my $func (keys %$fields) {
105       return $self->_sqlcase($func)
106         .'( '.$self->_recurse_fields($fields->{$func}).' )';
107     }
108   }
109 }
110
111 sub _order_by {
112   my $self = shift;
113   my $ret = '';
114   my @extra;
115   if (ref $_[0] eq 'HASH') {
116     if (defined $_[0]->{group_by}) {
117       $ret = $self->_sqlcase(' group by ')
118                .$self->_recurse_fields($_[0]->{group_by});
119     }
120     if (defined $_[0]->{having}) {
121       my $frag;
122       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
123       push(@{$self->{having_bind}}, @extra);
124       $ret .= $self->_sqlcase(' having ').$frag;
125     }
126     if (defined $_[0]->{order_by}) {
127       $ret .= $self->_order_by($_[0]->{order_by});
128     }
129   } elsif (ref $_[0] eq 'SCALAR') {
130     $ret = $self->_sqlcase(' order by ').${ $_[0] };
131   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
132     my @order = @{+shift};
133     $ret = $self->_sqlcase(' order by ')
134           .join(', ', map {
135                         my $r = $self->_order_by($_, @_);
136                         $r =~ s/^ ?ORDER BY //i;
137                         $r;
138                       } @order);
139   } else {
140     $ret = $self->SUPER::_order_by(@_);
141   }
142   return $ret;
143 }
144
145 sub _order_directions {
146   my ($self, $order) = @_;
147   $order = $order->{order_by} if ref $order eq 'HASH';
148   return $self->SUPER::_order_directions($order);
149 }
150
151 sub _table {
152   my ($self, $from) = @_;
153   if (ref $from eq 'ARRAY') {
154     return $self->_recurse_from(@$from);
155   } elsif (ref $from eq 'HASH') {
156     return $self->_make_as($from);
157   } else {
158     return $from; # would love to quote here but _table ends up getting called
159                   # twice during an ->select without a limit clause due to
160                   # the way S::A::Limit->select works. should maybe consider
161                   # bypassing this and doing S::A::select($self, ...) in
162                   # our select method above. meantime, quoting shims have
163                   # been added to select/insert/update/delete here
164   }
165 }
166
167 sub _recurse_from {
168   my ($self, $from, @join) = @_;
169   my @sqlf;
170   push(@sqlf, $self->_make_as($from));
171   foreach my $j (@join) {
172     my ($to, $on) = @$j;
173
174     # check whether a join type exists
175     my $join_clause = '';
176     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
177     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
178       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
179     } else {
180       $join_clause = ' JOIN ';
181     }
182     push(@sqlf, $join_clause);
183
184     if (ref $to eq 'ARRAY') {
185       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
186     } else {
187       push(@sqlf, $self->_make_as($to));
188     }
189     push(@sqlf, ' ON ', $self->_join_condition($on));
190   }
191   return join('', @sqlf);
192 }
193
194 sub _make_as {
195   my ($self, $from) = @_;
196   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
197                      reverse each %{$self->_skip_options($from)});
198 }
199
200 sub _skip_options {
201   my ($self, $hash) = @_;
202   my $clean_hash = {};
203   $clean_hash->{$_} = $hash->{$_}
204     for grep {!/^-/} keys %$hash;
205   return $clean_hash;
206 }
207
208 sub _join_condition {
209   my ($self, $cond) = @_;
210   if (ref $cond eq 'HASH') {
211     my %j;
212     for (keys %$cond) {
213       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
214     };
215     return $self->_recurse_where(\%j);
216   } elsif (ref $cond eq 'ARRAY') {
217     return join(' OR ', map { $self->_join_condition($_) } @$cond);
218   } else {
219     die "Can't handle this yet!";
220   }
221 }
222
223 sub _quote {
224   my ($self, $label) = @_;
225   return '' unless defined $label;
226   return "*" if $label eq '*';
227   return $label unless $self->{quote_char};
228   if(ref $self->{quote_char} eq "ARRAY"){
229     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
230       if !defined $self->{name_sep};
231     my $sep = $self->{name_sep};
232     return join($self->{name_sep},
233         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
234        split(/\Q$sep\E/,$label));
235   }
236   return $self->SUPER::_quote($label);
237 }
238
239 sub limit_dialect {
240     my $self = shift;
241     $self->{limit_dialect} = shift if @_;
242     return $self->{limit_dialect};
243 }
244
245 sub quote_char {
246     my $self = shift;
247     $self->{quote_char} = shift if @_;
248     return $self->{quote_char};
249 }
250
251 sub name_sep {
252     my $self = shift;
253     $self->{name_sep} = shift if @_;
254     return $self->{name_sep};
255 }
256
257 } # End of BEGIN block
258
259 =head1 NAME
260
261 DBIx::Class::Storage::DBI - DBI storage handler
262
263 =head1 SYNOPSIS
264
265 =head1 DESCRIPTION
266
267 This class represents the connection to an RDBMS via L<DBI>.  See
268 L<DBIx::Class::Storage> for general information.  This pod only
269 documents DBI-specific methods and behaviors.
270
271 =head1 METHODS
272
273 =cut
274
275 sub new {
276   my $new = shift->next::method(@_);
277
278   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
279   $new->transaction_depth(0);
280   $new->_sql_maker_opts({});
281
282   $new;
283 }
284
285 =head2 connect_info
286
287 The arguments of C<connect_info> are always a single array reference.
288
289 This is normally accessed via L<DBIx::Class::Schema/connection>, which
290 encapsulates its argument list in an arrayref before calling
291 C<connect_info> here.
292
293 The arrayref can either contain the same set of arguments one would
294 normally pass to L<DBI/connect>, or a lone code reference which returns
295 a connected database handle.
296
297 In either case, if the final argument in your connect_info happens
298 to be a hashref, C<connect_info> will look there for several
299 connection-specific options:
300
301 =over 4
302
303 =item on_connect_do
304
305 This can be set to an arrayref of literal sql statements, which will
306 be executed immediately after making the connection to the database
307 every time we [re-]connect.
308
309 =item limit_dialect 
310
311 Sets the limit dialect. This is useful for JDBC-bridge among others
312 where the remote SQL-dialect cannot be determined by the name of the
313 driver alone.
314
315 =item quote_char
316
317 Specifies what characters to use to quote table and column names. If 
318 you use this you will want to specify L<name_sep> as well.
319
320 quote_char expects either a single character, in which case is it is placed
321 on either side of the table/column, or an arrayref of length 2 in which case the
322 table/column name is placed between the elements.
323
324 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
325 use C<quote_char =E<gt> [qw/[ ]/]>.
326
327 =item name_sep
328
329 This only needs to be used in conjunction with L<quote_char>, and is used to 
330 specify the charecter that seperates elements (schemas, tables, columns) from 
331 each other. In most cases this is simply a C<.>.
332
333 =back
334
335 These options can be mixed in with your other L<DBI> connection attributes,
336 or placed in a seperate hashref after all other normal L<DBI> connection
337 arguments.
338
339 Every time C<connect_info> is invoked, any previous settings for
340 these options will be cleared before setting the new ones, regardless of
341 whether any options are specified in the new C<connect_info>.
342
343 Important note:  DBIC expects the returned database handle provided by 
344 a subref argument to have RaiseError set on it.  If it doesn't, things
345 might not work very well, YMMV.  If you don't use a subref, DBIC will
346 force this setting for you anyways.  Setting HandleError to anything
347 other than simple exception object wrapper might cause problems too.
348
349 Examples:
350
351   # Simple SQLite connection
352   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
353
354   # Connect via subref
355   ->connect_info([ sub { DBI->connect(...) } ]);
356
357   # A bit more complicated
358   ->connect_info(
359     [
360       'dbi:Pg:dbname=foo',
361       'postgres',
362       'my_pg_password',
363       { AutoCommit => 0 },
364       { quote_char => q{"}, name_sep => q{.} },
365     ]
366   );
367
368   # Equivalent to the previous example
369   ->connect_info(
370     [
371       'dbi:Pg:dbname=foo',
372       'postgres',
373       'my_pg_password',
374       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
375     ]
376   );
377
378   # Subref + DBIC-specific connection options
379   ->connect_info(
380     [
381       sub { DBI->connect(...) },
382       {
383           quote_char => q{`},
384           name_sep => q{@},
385           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
386       },
387     ]
388   );
389
390 =cut
391
392 sub connect_info {
393   my ($self, $info_arg) = @_;
394
395   return $self->_connect_info if !$info_arg;
396
397   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
398   #  the new set of options
399   $self->_sql_maker(undef);
400   $self->_sql_maker_opts({});
401
402   my $info = [ @$info_arg ]; # copy because we can alter it
403   my $last_info = $info->[-1];
404   if(ref $last_info eq 'HASH') {
405     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
406       $self->on_connect_do($on_connect_do);
407     }
408     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
409       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
410         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
411       }
412     }
413
414     # Get rid of any trailing empty hashref
415     pop(@$info) if !keys %$last_info;
416   }
417
418   $self->_connect_info($info);
419 }
420
421 =head2 on_connect_do
422
423 This method is deprecated in favor of setting via L</connect_info>.
424
425 =head2 dbh_do
426
427 Arguments: $subref, @extra_coderef_args?
428
429 Execute the given subref with the underlying database handle as its
430 first argument, using the new exception-based connection management.
431
432 Any additional arguments will be passed verbatim to the called subref
433 as arguments 2 and onwards.
434
435 Example:
436
437   my @stuff = $schema->storage->dbh_do(
438     sub {
439       my $dbh = shift;
440       my $cols = join(q{, }, @_);
441       shift->selectrow_array("SELECT $cols FROM foo")
442     },
443     @column_list
444   );
445
446 =cut
447
448 sub dbh_do {
449   my $self = shift;
450   my $coderef = shift;
451
452   return $coderef->($self->_dbh, @_) if $self->{_in_txn_do};
453
454   ref $coderef eq 'CODE' or $self->throw_exception
455     ('$coderef must be a CODE reference');
456
457   my @result;
458   my $want_array = wantarray;
459
460   eval {
461     $self->_verify_pid if $self->_dbh;
462     $self->_populate_dbh if !$self->_dbh;
463     if($want_array) {
464         @result = $coderef->($self->_dbh, @_);
465     }
466     elsif(defined $want_array) {
467         $result[0] = $coderef->($self->_dbh, @_);
468     }
469     else {
470         $coderef->($self->_dbh, @_);
471     }
472   };
473
474   my $exception = $@;
475   if(!$exception) { return $want_array ? @result : $result[0] }
476
477   $self->throw_exception($exception) if $self->connected;
478
479   # We were not connected - reconnect and retry, but let any
480   #  exception fall right through this time
481   $self->_populate_dbh;
482   $coderef->($self->_dbh, @_);
483 }
484
485 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
486 # It also informs dbh_do to bypass itself while under the direction of txn_do,
487 #  via $self->{_in_txn_do} (this saves some redundant eval and errorcheck, etc)
488 sub txn_do {
489   my $self = shift;
490   my $coderef = shift;
491
492   ref $coderef eq 'CODE' or $self->throw_exception
493     ('$coderef must be a CODE reference');
494
495   local $self->{_in_txn_do} = 1;
496
497   my $tried = 0;
498
499   my @result;
500   my $want_array = wantarray;
501
502   START_TXN: eval {
503     $self->_verify_pid if $self->_dbh;
504     $self->_populate_dbh if !$self->_dbh;
505
506     $self->txn_begin;
507     if($want_array) {
508         @result = $coderef->(@_);
509     }
510     elsif(defined $want_array) {
511         $result[0] = $coderef->(@_);
512     }
513     else {
514         $coderef->(@_);
515     }
516     $self->txn_commit;
517   };
518
519   my $exception = $@;
520   if(!$exception) { return $want_array ? @result : $result[0] }
521
522   if($tried++ > 0 || $self->connected) {
523     eval { $self->txn_rollback };
524     my $rollback_exception = $@;
525     if($rollback_exception) {
526       my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
527       $self->throw_exception($exception)  # propagate nested rollback
528         if $rollback_exception =~ /$exception_class/;
529
530       $self->throw_exception(
531         "Transaction aborted: ${exception}. "
532         . "Rollback failed: ${rollback_exception}"
533       );
534     }
535     $self->throw_exception($exception)
536   }
537
538   # We were not connected, and was first try - reconnect and retry
539   # XXX I know, gotos are evil.  If you can find a better way
540   #  to write this that doesn't duplicate a lot of code/structure,
541   #  and behaves identically, feel free...
542
543   $self->_populate_dbh;
544   goto START_TXN;
545 }
546
547 =head2 disconnect
548
549 Our C<disconnect> method also performs a rollback first if the
550 database is not in C<AutoCommit> mode.
551
552 =cut
553
554 sub disconnect {
555   my ($self) = @_;
556
557   if( $self->connected ) {
558     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
559     $self->_dbh->disconnect;
560     $self->_dbh(undef);
561   }
562 }
563
564 sub connected {
565   my ($self) = @_;
566
567   if(my $dbh = $self->_dbh) {
568       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
569           return $self->_dbh(undef);
570       }
571       else {
572           $self->_verify_pid;
573       }
574       return ($dbh->FETCH('Active') && $dbh->ping);
575   }
576
577   return 0;
578 }
579
580 # handle pid changes correctly
581 #  NOTE: assumes $self->_dbh is a valid $dbh
582 sub _verify_pid {
583   my ($self) = @_;
584
585   return if $self->_conn_pid == $$;
586
587   $self->_dbh->{InactiveDestroy} = 1;
588   $self->_dbh(undef);
589
590   return;
591 }
592
593 sub ensure_connected {
594   my ($self) = @_;
595
596   unless ($self->connected) {
597     $self->_populate_dbh;
598   }
599 }
600
601 =head2 dbh
602
603 Returns the dbh - a data base handle of class L<DBI>.
604
605 =cut
606
607 sub dbh {
608   my ($self) = @_;
609
610   $self->ensure_connected;
611   return $self->_dbh;
612 }
613
614 sub _sql_maker_args {
615     my ($self) = @_;
616     
617     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
618 }
619
620 sub sql_maker {
621   my ($self) = @_;
622   unless ($self->_sql_maker) {
623     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
624   }
625   return $self->_sql_maker;
626 }
627
628 sub _populate_dbh {
629   my ($self) = @_;
630   my @info = @{$self->_connect_info || []};
631   $self->_dbh($self->_connect(@info));
632
633   if(ref $self eq 'DBIx::Class::Storage::DBI') {
634     my $driver = $self->_dbh->{Driver}->{Name};
635     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
636       bless $self, "DBIx::Class::Storage::DBI::${driver}";
637       $self->_rebless() if $self->can('_rebless');
638     }
639   }
640
641   # if on-connect sql statements are given execute them
642   foreach my $sql_statement (@{$self->on_connect_do || []}) {
643     $self->debugobj->query_start($sql_statement) if $self->debug();
644     $self->_dbh->do($sql_statement);
645     $self->debugobj->query_end($sql_statement) if $self->debug();
646   }
647
648   $self->_conn_pid($$);
649   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
650 }
651
652 sub _connect {
653   my ($self, @info) = @_;
654
655   $self->throw_exception("You failed to provide any connection info")
656       if !@info;
657
658   my ($old_connect_via, $dbh);
659
660   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
661       $old_connect_via = $DBI::connect_via;
662       $DBI::connect_via = 'connect';
663   }
664
665   eval {
666     if(ref $info[0] eq 'CODE') {
667        $dbh = &{$info[0]}
668     }
669     else {
670        $dbh = DBI->connect(@info);
671        $dbh->{RaiseError} = 1;
672        $dbh->{PrintError} = 0;
673     }
674   };
675
676   $DBI::connect_via = $old_connect_via if $old_connect_via;
677
678   if (!$dbh || $@) {
679     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
680   }
681
682   $dbh;
683 }
684
685 sub __txn_begin {
686   my ($dbh, $self) = @_;
687   if ($dbh->{AutoCommit}) {
688     $self->debugobj->txn_begin()
689       if ($self->debug);
690     $dbh->begin_work;
691   }
692 }
693
694 sub txn_begin {
695   my $self = shift;
696   $self->dbh_do(\&__txn_begin, $self)
697     if $self->{transaction_depth}++ == 0;
698 }
699
700 sub __txn_commit {
701   my ($dbh, $self) = @_;
702   if ($self->{transaction_depth} == 0) {
703     unless ($dbh->{AutoCommit}) {
704       $self->debugobj->txn_commit()
705         if ($self->debug);
706       $dbh->commit;
707     }
708   }
709   else {
710     if (--$self->{transaction_depth} == 0) {
711       $self->debugobj->txn_commit()
712         if ($self->debug);
713       $dbh->commit;
714     }
715   }
716 }
717
718 sub txn_commit {
719   my $self = shift;
720   $self->dbh_do(\&__txn_commit, $self);
721 }
722
723 sub __txn_rollback {
724   my ($dbh, $self) = @_;
725   if ($self->{transaction_depth} == 0) {
726     unless ($dbh->{AutoCommit}) {
727       $self->debugobj->txn_rollback()
728         if ($self->debug);
729       $dbh->rollback;
730     }
731   }
732   else {
733     if (--$self->{transaction_depth} == 0) {
734       $self->debugobj->txn_rollback()
735         if ($self->debug);
736       $dbh->rollback;
737     }
738     else {
739       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
740     }
741   }
742 }
743
744 sub txn_rollback {
745   my $self = shift;
746   eval { $self->dbh_do(\&__txn_rollback, $self) };
747   if ($@) {
748     my $error = $@;
749     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
750     $error =~ /$exception_class/ and $self->throw_exception($error);
751     $self->{transaction_depth} = 0;          # ensure that a failed rollback
752     $self->throw_exception($error);          # resets the transaction depth
753   }
754 }
755
756 sub _execute {
757   my ($self, $op, $extra_bind, $ident, @args) = @_;
758   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
759   unshift(@bind, @$extra_bind) if $extra_bind;
760   if ($self->debug) {
761       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
762       $self->debugobj->query_start($sql, @debug_bind);
763   }
764   my $sth = eval { $self->sth($sql,$op) };
765
766   if (!$sth || $@) {
767     $self->throw_exception(
768       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
769     );
770   }
771   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
772   my $rv;
773   if ($sth) {
774     my $time = time();
775     $rv = eval { $sth->execute(@bind) };
776
777     if ($@ || !$rv) {
778       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
779     }
780   } else {
781     $self->throw_exception("'$sql' did not generate a statement.");
782   }
783   if ($self->debug) {
784       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
785       $self->debugobj->query_end($sql, @debug_bind);
786   }
787   return (wantarray ? ($rv, $sth, @bind) : $rv);
788 }
789
790 sub insert {
791   my ($self, $ident, $to_insert) = @_;
792   $self->throw_exception(
793     "Couldn't insert ".join(', ',
794       map "$_ => $to_insert->{$_}", keys %$to_insert
795     )." into ${ident}"
796   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
797   return $to_insert;
798 }
799
800 sub update {
801   return shift->_execute('update' => [], @_);
802 }
803
804 sub delete {
805   return shift->_execute('delete' => [], @_);
806 }
807
808 sub _select {
809   my ($self, $ident, $select, $condition, $attrs) = @_;
810   my $order = $attrs->{order_by};
811   if (ref $condition eq 'SCALAR') {
812     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
813   }
814   if (exists $attrs->{group_by} || $attrs->{having}) {
815     $order = {
816       group_by => $attrs->{group_by},
817       having => $attrs->{having},
818       ($order ? (order_by => $order) : ())
819     };
820   }
821   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
822   if ($attrs->{software_limit} ||
823       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
824         $attrs->{software_limit} = 1;
825   } else {
826     $self->throw_exception("rows attribute must be positive if present")
827       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
828     push @args, $attrs->{rows}, $attrs->{offset};
829   }
830   return $self->_execute(@args);
831 }
832
833 sub select {
834   my $self = shift;
835   my ($ident, $select, $condition, $attrs) = @_;
836   return $self->cursor->new($self, \@_, $attrs);
837 }
838
839 sub select_single {
840   my $self = shift;
841   my ($rv, $sth, @bind) = $self->_select(@_);
842   my @row = $sth->fetchrow_array;
843   # Need to call finish() to work round broken DBDs
844   $sth->finish();
845   return @row;
846 }
847
848 =head2 sth
849
850 Returns a L<DBI> sth (statement handle) for the supplied SQL.
851
852 =cut
853
854 sub __sth {
855   my ($dbh, $sql) = @_;
856   # 3 is the if_active parameter which avoids active sth re-use
857   $dbh->prepare_cached($sql, {}, 3);
858 }
859
860 sub sth {
861   my ($self, $sql) = @_;
862   $self->dbh_do(\&__sth, $sql);
863 }
864
865
866 sub __columns_info_for {
867   my ($dbh, $self, $table) = @_;
868
869   if ($dbh->can('column_info')) {
870     my %result;
871     eval {
872       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
873       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
874       $sth->execute();
875       while ( my $info = $sth->fetchrow_hashref() ){
876         my %column_info;
877         $column_info{data_type}   = $info->{TYPE_NAME};
878         $column_info{size}      = $info->{COLUMN_SIZE};
879         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
880         $column_info{default_value} = $info->{COLUMN_DEF};
881         my $col_name = $info->{COLUMN_NAME};
882         $col_name =~ s/^\"(.*)\"$/$1/;
883
884         $result{$col_name} = \%column_info;
885       }
886     };
887     return \%result if !$@ && scalar keys %result;
888   }
889
890   my %result;
891   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
892   $sth->execute;
893   my @columns = @{$sth->{NAME_lc}};
894   for my $i ( 0 .. $#columns ){
895     my %column_info;
896     my $type_num = $sth->{TYPE}->[$i];
897     my $type_name;
898     if(defined $type_num && $dbh->can('type_info')) {
899       my $type_info = $dbh->type_info($type_num);
900       $type_name = $type_info->{TYPE_NAME} if $type_info;
901     }
902     $column_info{data_type} = $type_name ? $type_name : $type_num;
903     $column_info{size} = $sth->{PRECISION}->[$i];
904     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
905
906     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
907       $column_info{data_type} = $1;
908       $column_info{size}    = $2;
909     }
910
911     $result{$columns[$i]} = \%column_info;
912   }
913
914   return \%result;
915 }
916
917 sub columns_info_for {
918   my ($self, $table) = @_;
919   $self->dbh_do(\&__columns_info_for, $self, $table);
920 }
921
922 =head2 last_insert_id
923
924 Return the row id of the last insert.
925
926 =cut
927
928 sub last_insert_id {
929   my ($self, $row) = @_;
930     
931   $self->dbh_do(sub { shift->func('last_insert_rowid') });
932 }
933
934 =head2 sqlt_type
935
936 Returns the database driver name.
937
938 =cut
939
940 sub sqlt_type { shift->dbh_do(sub { shift->{Driver}->{Name} }) }
941
942 =head2 create_ddl_dir (EXPERIMENTAL)
943
944 =over 4
945
946 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
947
948 =back
949
950 Creates an SQL file based on the Schema, for each of the specified
951 database types, in the given directory.
952
953 Note that this feature is currently EXPERIMENTAL and may not work correctly
954 across all databases, or fully handle complex relationships.
955
956 =cut
957
958 sub create_ddl_dir
959 {
960   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
961
962   if(!$dir || !-d $dir)
963   {
964     warn "No directory given, using ./\n";
965     $dir = "./";
966   }
967   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
968   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
969   $version ||= $schema->VERSION || '1.x';
970   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
971
972   eval "use SQL::Translator";
973   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
974
975   my $sqlt = SQL::Translator->new($sqltargs);
976   foreach my $db (@$databases)
977   {
978     $sqlt->reset();
979     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
980 #    $sqlt->parser_args({'DBIx::Class' => $schema);
981     $sqlt->data($schema);
982     $sqlt->producer($db);
983
984     my $file;
985     my $filename = $schema->ddl_filename($db, $dir, $version);
986     if(-e $filename)
987     {
988       $self->throw_exception("$filename already exists, skipping $db");
989       next;
990     }
991     open($file, ">$filename") 
992       or $self->throw_exception("Can't open $filename for writing ($!)");
993     my $output = $sqlt->translate;
994 #use Data::Dumper;
995 #    print join(":", keys %{$schema->source_registrations});
996 #    print Dumper($sqlt->schema);
997     if(!$output)
998     {
999       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1000       next;
1001     }
1002     print $file $output;
1003     close($file);
1004   }
1005
1006 }
1007
1008 =head2 deployment_statements
1009
1010 Create the statements for L</deploy> and
1011 L<DBIx::Class::Schema/deploy>.
1012
1013 =cut
1014
1015 sub deployment_statements {
1016   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1017   # Need to be connected to get the correct sqlt_type
1018   $self->ensure_connected() unless $type;
1019   $type ||= $self->sqlt_type;
1020   $version ||= $schema->VERSION || '1.x';
1021   $dir ||= './';
1022   eval "use SQL::Translator";
1023   if(!$@)
1024   {
1025     eval "use SQL::Translator::Parser::DBIx::Class;";
1026     $self->throw_exception($@) if $@;
1027     eval "use SQL::Translator::Producer::${type};";
1028     $self->throw_exception($@) if $@;
1029     my $tr = SQL::Translator->new(%$sqltargs);
1030     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1031     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1032   }
1033
1034   my $filename = $schema->ddl_filename($type, $dir, $version);
1035   if(!-f $filename)
1036   {
1037 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1038       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1039       return;
1040   }
1041   my $file;
1042   open($file, "<$filename") 
1043       or $self->throw_exception("Can't open $filename ($!)");
1044   my @rows = <$file>;
1045   close($file);
1046
1047   return join('', @rows);
1048   
1049 }
1050
1051 sub deploy {
1052   my ($self, $schema, $type, $sqltargs) = @_;
1053   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1054     for ( split(";\n", $statement)) {
1055       next if($_ =~ /^--/);
1056       next if(!$_);
1057 #      next if($_ =~ /^DROP/m);
1058       next if($_ =~ /^BEGIN TRANSACTION/m);
1059       next if($_ =~ /^COMMIT/m);
1060       next if $_ =~ /^\s+$/; # skip whitespace only
1061       $self->debugobj->query_start($_) if $self->debug;
1062       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1063       $self->debugobj->query_end($_) if $self->debug;
1064     }
1065   }
1066 }
1067
1068 =head2 datetime_parser
1069
1070 Returns the datetime parser class
1071
1072 =cut
1073
1074 sub datetime_parser {
1075   my $self = shift;
1076   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1077 }
1078
1079 =head2 datetime_parser_type
1080
1081 Defines (returns) the datetime parser class - currently hardwired to
1082 L<DateTime::Format::MySQL>
1083
1084 =cut
1085
1086 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1087
1088 =head2 build_datetime_parser
1089
1090 See L</datetime_parser>
1091
1092 =cut
1093
1094 sub build_datetime_parser {
1095   my $self = shift;
1096   my $type = $self->datetime_parser_type(@_);
1097   eval "use ${type}";
1098   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1099   return $type;
1100 }
1101
1102 sub DESTROY {
1103   my $self = shift;
1104   return if !$self->_dbh;
1105
1106   $self->_verify_pid;
1107   $self->_dbh(undef);
1108 }
1109
1110 1;
1111
1112 =head1 SQL METHODS
1113
1114 The module defines a set of methods within the DBIC::SQL::Abstract
1115 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1116 SQL query functions.
1117
1118 The following methods are extended:-
1119
1120 =over 4
1121
1122 =item delete
1123
1124 =item insert
1125
1126 =item select
1127
1128 =item update
1129
1130 =item limit_dialect
1131
1132 See L</connect_info> for details.
1133 For setting, this method is deprecated in favor of L</connect_info>.
1134
1135 =item quote_char
1136
1137 See L</connect_info> for details.
1138 For setting, this method is deprecated in favor of L</connect_info>.
1139
1140 =item name_sep
1141
1142 See L</connect_info> for details.
1143 For setting, this method is deprecated in favor of L</connect_info>.
1144
1145 =back
1146
1147 =head1 AUTHORS
1148
1149 Matt S. Trout <mst@shadowcatsystems.co.uk>
1150
1151 Andy Grundman <andy@hybridized.org>
1152
1153 =head1 LICENSE
1154
1155 You may distribute this code under the same terms as Perl itself.
1156
1157 =cut
1158