more dbh_do/txn_do improvements
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14
15 __PACKAGE__->mk_group_accessors(
16   'simple' =>
17     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
18        cursor on_connect_do transaction_depth/
19 );
20
21 BEGIN {
22
23 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
24
25 use base qw/SQL::Abstract::Limit/;
26
27 # This prevents the caching of $dbh in S::A::L, I believe
28 sub new {
29   my $self = shift->SUPER::new(@_);
30
31   # If limit_dialect is a ref (like a $dbh), go ahead and replace
32   #   it with what it resolves to:
33   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
34     if ref $self->{limit_dialect};
35
36   $self;
37 }
38
39 # While we're at it, this should make LIMIT queries more efficient,
40 #  without digging into things too deeply
41 sub _find_syntax {
42   my ($self, $syntax) = @_;
43   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
44 }
45
46 sub select {
47   my ($self, $table, $fields, $where, $order, @rest) = @_;
48   $table = $self->_quote($table) unless ref($table);
49   local $self->{rownum_hack_count} = 1
50     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
51   @rest = (-1) unless defined $rest[0];
52   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
53     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
54   local $self->{having_bind} = [];
55   my ($sql, @ret) = $self->SUPER::select(
56     $table, $self->_recurse_fields($fields), $where, $order, @rest
57   );
58   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
59 }
60
61 sub insert {
62   my $self = shift;
63   my $table = shift;
64   $table = $self->_quote($table) unless ref($table);
65   $self->SUPER::insert($table, @_);
66 }
67
68 sub update {
69   my $self = shift;
70   my $table = shift;
71   $table = $self->_quote($table) unless ref($table);
72   $self->SUPER::update($table, @_);
73 }
74
75 sub delete {
76   my $self = shift;
77   my $table = shift;
78   $table = $self->_quote($table) unless ref($table);
79   $self->SUPER::delete($table, @_);
80 }
81
82 sub _emulate_limit {
83   my $self = shift;
84   if ($_[3] == -1) {
85     return $_[1].$self->_order_by($_[2]);
86   } else {
87     return $self->SUPER::_emulate_limit(@_);
88   }
89 }
90
91 sub _recurse_fields {
92   my ($self, $fields) = @_;
93   my $ref = ref $fields;
94   return $self->_quote($fields) unless $ref;
95   return $$fields if $ref eq 'SCALAR';
96
97   if ($ref eq 'ARRAY') {
98     return join(', ', map {
99       $self->_recurse_fields($_)
100       .(exists $self->{rownum_hack_count}
101          ? ' AS col'.$self->{rownum_hack_count}++
102          : '')
103      } @$fields);
104   } elsif ($ref eq 'HASH') {
105     foreach my $func (keys %$fields) {
106       return $self->_sqlcase($func)
107         .'( '.$self->_recurse_fields($fields->{$func}).' )';
108     }
109   }
110 }
111
112 sub _order_by {
113   my $self = shift;
114   my $ret = '';
115   my @extra;
116   if (ref $_[0] eq 'HASH') {
117     if (defined $_[0]->{group_by}) {
118       $ret = $self->_sqlcase(' group by ')
119                .$self->_recurse_fields($_[0]->{group_by});
120     }
121     if (defined $_[0]->{having}) {
122       my $frag;
123       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
124       push(@{$self->{having_bind}}, @extra);
125       $ret .= $self->_sqlcase(' having ').$frag;
126     }
127     if (defined $_[0]->{order_by}) {
128       $ret .= $self->_order_by($_[0]->{order_by});
129     }
130   } elsif (ref $_[0] eq 'SCALAR') {
131     $ret = $self->_sqlcase(' order by ').${ $_[0] };
132   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
133     my @order = @{+shift};
134     $ret = $self->_sqlcase(' order by ')
135           .join(', ', map {
136                         my $r = $self->_order_by($_, @_);
137                         $r =~ s/^ ?ORDER BY //i;
138                         $r;
139                       } @order);
140   } else {
141     $ret = $self->SUPER::_order_by(@_);
142   }
143   return $ret;
144 }
145
146 sub _order_directions {
147   my ($self, $order) = @_;
148   $order = $order->{order_by} if ref $order eq 'HASH';
149   return $self->SUPER::_order_directions($order);
150 }
151
152 sub _table {
153   my ($self, $from) = @_;
154   if (ref $from eq 'ARRAY') {
155     return $self->_recurse_from(@$from);
156   } elsif (ref $from eq 'HASH') {
157     return $self->_make_as($from);
158   } else {
159     return $from; # would love to quote here but _table ends up getting called
160                   # twice during an ->select without a limit clause due to
161                   # the way S::A::Limit->select works. should maybe consider
162                   # bypassing this and doing S::A::select($self, ...) in
163                   # our select method above. meantime, quoting shims have
164                   # been added to select/insert/update/delete here
165   }
166 }
167
168 sub _recurse_from {
169   my ($self, $from, @join) = @_;
170   my @sqlf;
171   push(@sqlf, $self->_make_as($from));
172   foreach my $j (@join) {
173     my ($to, $on) = @$j;
174
175     # check whether a join type exists
176     my $join_clause = '';
177     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
178     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
179       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
180     } else {
181       $join_clause = ' JOIN ';
182     }
183     push(@sqlf, $join_clause);
184
185     if (ref $to eq 'ARRAY') {
186       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
187     } else {
188       push(@sqlf, $self->_make_as($to));
189     }
190     push(@sqlf, ' ON ', $self->_join_condition($on));
191   }
192   return join('', @sqlf);
193 }
194
195 sub _make_as {
196   my ($self, $from) = @_;
197   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
198                      reverse each %{$self->_skip_options($from)});
199 }
200
201 sub _skip_options {
202   my ($self, $hash) = @_;
203   my $clean_hash = {};
204   $clean_hash->{$_} = $hash->{$_}
205     for grep {!/^-/} keys %$hash;
206   return $clean_hash;
207 }
208
209 sub _join_condition {
210   my ($self, $cond) = @_;
211   if (ref $cond eq 'HASH') {
212     my %j;
213     for (keys %$cond) {
214       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
215     };
216     return $self->_recurse_where(\%j);
217   } elsif (ref $cond eq 'ARRAY') {
218     return join(' OR ', map { $self->_join_condition($_) } @$cond);
219   } else {
220     die "Can't handle this yet!";
221   }
222 }
223
224 sub _quote {
225   my ($self, $label) = @_;
226   return '' unless defined $label;
227   return "*" if $label eq '*';
228   return $label unless $self->{quote_char};
229   if(ref $self->{quote_char} eq "ARRAY"){
230     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
231       if !defined $self->{name_sep};
232     my $sep = $self->{name_sep};
233     return join($self->{name_sep},
234         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
235        split(/\Q$sep\E/,$label));
236   }
237   return $self->SUPER::_quote($label);
238 }
239
240 sub limit_dialect {
241     my $self = shift;
242     $self->{limit_dialect} = shift if @_;
243     return $self->{limit_dialect};
244 }
245
246 sub quote_char {
247     my $self = shift;
248     $self->{quote_char} = shift if @_;
249     return $self->{quote_char};
250 }
251
252 sub name_sep {
253     my $self = shift;
254     $self->{name_sep} = shift if @_;
255     return $self->{name_sep};
256 }
257
258 } # End of BEGIN block
259
260 =head1 NAME
261
262 DBIx::Class::Storage::DBI - DBI storage handler
263
264 =head1 SYNOPSIS
265
266 =head1 DESCRIPTION
267
268 This class represents the connection to an RDBMS via L<DBI>.  See
269 L<DBIx::Class::Storage> for general information.  This pod only
270 documents DBI-specific methods and behaviors.
271
272 =head1 METHODS
273
274 =cut
275
276 sub new {
277   my $new = shift->next::method(@_);
278
279   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
280   $new->transaction_depth(0);
281   $new->_sql_maker_opts({});
282
283   $new;
284 }
285
286 =head2 connect_info
287
288 The arguments of C<connect_info> are always a single array reference.
289
290 This is normally accessed via L<DBIx::Class::Schema/connection>, which
291 encapsulates its argument list in an arrayref before calling
292 C<connect_info> here.
293
294 The arrayref can either contain the same set of arguments one would
295 normally pass to L<DBI/connect>, or a lone code reference which returns
296 a connected database handle.
297
298 In either case, if the final argument in your connect_info happens
299 to be a hashref, C<connect_info> will look there for several
300 connection-specific options:
301
302 =over 4
303
304 =item on_connect_do
305
306 This can be set to an arrayref of literal sql statements, which will
307 be executed immediately after making the connection to the database
308 every time we [re-]connect.
309
310 =item limit_dialect 
311
312 Sets the limit dialect. This is useful for JDBC-bridge among others
313 where the remote SQL-dialect cannot be determined by the name of the
314 driver alone.
315
316 =item quote_char
317
318 Specifies what characters to use to quote table and column names. If 
319 you use this you will want to specify L<name_sep> as well.
320
321 quote_char expects either a single character, in which case is it is placed
322 on either side of the table/column, or an arrayref of length 2 in which case the
323 table/column name is placed between the elements.
324
325 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
326 use C<quote_char =E<gt> [qw/[ ]/]>.
327
328 =item name_sep
329
330 This only needs to be used in conjunction with L<quote_char>, and is used to 
331 specify the charecter that seperates elements (schemas, tables, columns) from 
332 each other. In most cases this is simply a C<.>.
333
334 =back
335
336 These options can be mixed in with your other L<DBI> connection attributes,
337 or placed in a seperate hashref after all other normal L<DBI> connection
338 arguments.
339
340 Every time C<connect_info> is invoked, any previous settings for
341 these options will be cleared before setting the new ones, regardless of
342 whether any options are specified in the new C<connect_info>.
343
344 Important note:  DBIC expects the returned database handle provided by 
345 a subref argument to have RaiseError set on it.  If it doesn't, things
346 might not work very well, YMMV.  If you don't use a subref, DBIC will
347 force this setting for you anyways.  Setting HandleError to anything
348 other than simple exception object wrapper might cause problems too.
349
350 Examples:
351
352   # Simple SQLite connection
353   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
354
355   # Connect via subref
356   ->connect_info([ sub { DBI->connect(...) } ]);
357
358   # A bit more complicated
359   ->connect_info(
360     [
361       'dbi:Pg:dbname=foo',
362       'postgres',
363       'my_pg_password',
364       { AutoCommit => 0 },
365       { quote_char => q{"}, name_sep => q{.} },
366     ]
367   );
368
369   # Equivalent to the previous example
370   ->connect_info(
371     [
372       'dbi:Pg:dbname=foo',
373       'postgres',
374       'my_pg_password',
375       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
376     ]
377   );
378
379   # Subref + DBIC-specific connection options
380   ->connect_info(
381     [
382       sub { DBI->connect(...) },
383       {
384           quote_char => q{`},
385           name_sep => q{@},
386           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
387       },
388     ]
389   );
390
391 =cut
392
393 sub connect_info {
394   my ($self, $info_arg) = @_;
395
396   return $self->_connect_info if !$info_arg;
397
398   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
399   #  the new set of options
400   $self->_sql_maker(undef);
401   $self->_sql_maker_opts({});
402
403   my $info = [ @$info_arg ]; # copy because we can alter it
404   my $last_info = $info->[-1];
405   if(ref $last_info eq 'HASH') {
406     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
407       $self->on_connect_do($on_connect_do);
408     }
409     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
410       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
411         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
412       }
413     }
414
415     # Get rid of any trailing empty hashref
416     pop(@$info) if !keys %$last_info;
417   }
418
419   $self->_connect_info($info);
420 }
421
422 =head2 on_connect_do
423
424 This method is deprecated in favor of setting via L</connect_info>.
425
426 =head2 dbh_do
427
428 Arguments: $subref, @extra_coderef_args?
429
430 Execute the given subref with the underlying database handle as its
431 first argument, using the new exception-based connection management.
432
433 Any additional arguments will be passed verbatim to the called subref
434 as arguments 2 and onwards.
435
436 Example:
437
438   my @stuff = $schema->storage->dbh_do(
439     sub {
440       my $dbh = shift;
441       my $cols = join(q{, }, @_);
442       shift->selectrow_array("SELECT $cols FROM foo")
443     },
444     @column_list
445   );
446
447 =cut
448
449 sub dbh_do {
450   my $self = shift;
451   my $coderef = shift;
452
453   return $coderef->($self->_dbh, @_) if $self->{_in_txn_do};
454
455   ref $coderef eq 'CODE' or $self->throw_exception
456     ('$coderef must be a CODE reference');
457
458   my @result;
459   my $want_array = wantarray;
460
461   eval {
462     $self->_verify_pid if $self->_dbh;
463     $self->_populate_dbh if !$self->_dbh;
464     if($want_array) {
465         @result = $coderef->($self->_dbh, @_);
466     }
467     elsif(defined $want_array) {
468         $result[0] = $coderef->($self->_dbh, @_);
469     }
470     else {
471         $coderef->($self->_dbh, @_);
472     }
473   };
474
475   my $exception = $@;
476   if(!$exception) { return $want_array ? @result : $result[0] }
477
478   $self->throw_exception($exception) if $self->connected;
479
480   # We were not connected - reconnect and retry, but let any
481   #  exception fall right through this time
482   $self->_populate_dbh;
483   $coderef->($self->_dbh, @_);
484 }
485
486 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
487 # It also informs dbh_do to bypass itself while under the direction of txn_do,
488 #  via $self->{_in_txn_do} (this saves some redundant eval and errorcheck, etc)
489 sub txn_do {
490   my $self = shift;
491   my $coderef = shift;
492
493   ref $coderef eq 'CODE' or $self->throw_exception
494     ('$coderef must be a CODE reference');
495
496   local $self->{_in_txn_do} = 1;
497
498   my $tried = 0;
499
500   my @result;
501   my $want_array = wantarray;
502
503   START_TXN: eval {
504     $self->_verify_pid if $self->_dbh;
505     $self->_populate_dbh if !$self->_dbh;
506
507     $self->txn_begin;
508     if($want_array) {
509         @result = $coderef->(@_);
510     }
511     elsif(defined $want_array) {
512         $result[0] = $coderef->(@_);
513     }
514     else {
515         $coderef->(@_);
516     }
517     $self->txn_commit;
518   };
519
520   my $exception = $@;
521   if(!$exception) { return $want_array ? @result : $result[0] }
522
523   if($tried++ > 0 || $self->connected) {
524     eval { $self->txn_rollback };
525     my $rollback_exception = $@;
526     if($rollback_exception) {
527       my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
528       $self->throw_exception($exception)  # propagate nested rollback
529         if $rollback_exception =~ /$exception_class/;
530
531       $self->throw_exception(
532         "Transaction aborted: ${exception}. "
533         . "Rollback failed: ${rollback_exception}"
534       );
535     }
536     $self->throw_exception($exception)
537   }
538
539   # We were not connected, and was first try - reconnect and retry
540   # XXX I know, gotos are evil.  If you can find a better way
541   #  to write this that doesn't duplicate a lot of code/structure,
542   #  and behaves identically, feel free...
543
544   $self->_populate_dbh;
545   goto START_TXN;
546 }
547
548 =head2 disconnect
549
550 Our C<disconnect> method also performs a rollback first if the
551 database is not in C<AutoCommit> mode.
552
553 =cut
554
555 sub disconnect {
556   my ($self) = @_;
557
558   if( $self->connected ) {
559     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
560     $self->_dbh->disconnect;
561     $self->_dbh(undef);
562   }
563 }
564
565 sub connected {
566   my ($self) = @_;
567
568   if(my $dbh = $self->_dbh) {
569       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
570           return $self->_dbh(undef);
571       }
572       else {
573           $self->_verify_pid;
574       }
575       return ($dbh->FETCH('Active') && $dbh->ping);
576   }
577
578   return 0;
579 }
580
581 # handle pid changes correctly
582 #  NOTE: assumes $self->_dbh is a valid $dbh
583 sub _verify_pid {
584   my ($self) = @_;
585
586   return if $self->_conn_pid == $$;
587
588   $self->_dbh->{InactiveDestroy} = 1;
589   $self->_dbh(undef);
590
591   return;
592 }
593
594 sub ensure_connected {
595   my ($self) = @_;
596
597   unless ($self->connected) {
598     $self->_populate_dbh;
599   }
600 }
601
602 =head2 dbh
603
604 Returns the dbh - a data base handle of class L<DBI>.
605
606 =cut
607
608 sub dbh {
609   my ($self) = @_;
610
611   $self->ensure_connected;
612   return $self->_dbh;
613 }
614
615 sub _sql_maker_args {
616     my ($self) = @_;
617     
618     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
619 }
620
621 sub sql_maker {
622   my ($self) = @_;
623   unless ($self->_sql_maker) {
624     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
625   }
626   return $self->_sql_maker;
627 }
628
629 sub _populate_dbh {
630   my ($self) = @_;
631   my @info = @{$self->_connect_info || []};
632   $self->_dbh($self->_connect(@info));
633
634   if(ref $self eq 'DBIx::Class::Storage::DBI') {
635     my $driver = $self->_dbh->{Driver}->{Name};
636     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
637       bless $self, "DBIx::Class::Storage::DBI::${driver}";
638       $self->_rebless() if $self->can('_rebless');
639     }
640   }
641
642   # if on-connect sql statements are given execute them
643   foreach my $sql_statement (@{$self->on_connect_do || []}) {
644     $self->debugobj->query_start($sql_statement) if $self->debug();
645     $self->_dbh->do($sql_statement);
646     $self->debugobj->query_end($sql_statement) if $self->debug();
647   }
648
649   $self->_conn_pid($$);
650   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
651 }
652
653 sub _connect {
654   my ($self, @info) = @_;
655
656   $self->throw_exception("You failed to provide any connection info")
657       if !@info;
658
659   my ($old_connect_via, $dbh);
660
661   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
662       $old_connect_via = $DBI::connect_via;
663       $DBI::connect_via = 'connect';
664   }
665
666   eval {
667     if(ref $info[0] eq 'CODE') {
668        $dbh = &{$info[0]}
669     }
670     else {
671        $dbh = DBI->connect(@info);
672        $dbh->{RaiseError} = 1;
673        $dbh->{PrintError} = 0;
674     }
675   };
676
677   $DBI::connect_via = $old_connect_via if $old_connect_via;
678
679   if (!$dbh || $@) {
680     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
681   }
682
683   $dbh;
684 }
685
686 sub __txn_begin {
687   my ($dbh, $self) = @_;
688   if ($dbh->{AutoCommit}) {
689     $self->debugobj->txn_begin()
690       if ($self->debug);
691     $dbh->begin_work;
692   }
693 }
694
695 sub txn_begin {
696   my $self = shift;
697   $self->dbh_do(\&__txn_begin, $self)
698     if $self->{transaction_depth}++ == 0;
699 }
700
701 sub __txn_commit {
702   my ($dbh, $self) = @_;
703   if ($self->{transaction_depth} == 0) {
704     unless ($dbh->{AutoCommit}) {
705       $self->debugobj->txn_commit()
706         if ($self->debug);
707       $dbh->commit;
708     }
709   }
710   else {
711     if (--$self->{transaction_depth} == 0) {
712       $self->debugobj->txn_commit()
713         if ($self->debug);
714       $dbh->commit;
715     }
716   }
717 }
718
719 sub txn_commit {
720   my $self = shift;
721   $self->dbh_do(\&__txn_commit, $self);
722 }
723
724 sub __txn_rollback {
725   my ($dbh, $self) = @_;
726   if ($self->{transaction_depth} == 0) {
727     unless ($dbh->{AutoCommit}) {
728       $self->debugobj->txn_rollback()
729         if ($self->debug);
730       $dbh->rollback;
731     }
732   }
733   else {
734     if (--$self->{transaction_depth} == 0) {
735       $self->debugobj->txn_rollback()
736         if ($self->debug);
737       $dbh->rollback;
738     }
739     else {
740       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
741     }
742   }
743 }
744
745 sub txn_rollback {
746   my $self = shift;
747   eval { $self->dbh_do(\&__txn_rollback, $self) };
748   if ($@) {
749     my $error = $@;
750     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
751     $error =~ /$exception_class/ and $self->throw_exception($error);
752     $self->{transaction_depth} = 0;          # ensure that a failed rollback
753     $self->throw_exception($error);          # resets the transaction depth
754   }
755 }
756
757 sub _execute {
758   my ($self, $op, $extra_bind, $ident, @args) = @_;
759   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
760   unshift(@bind, @$extra_bind) if $extra_bind;
761   if ($self->debug) {
762       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
763       $self->debugobj->query_start($sql, @debug_bind);
764   }
765   my $sth = eval { $self->sth($sql,$op) };
766
767   if (!$sth || $@) {
768     $self->throw_exception(
769       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
770     );
771   }
772   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
773   my $rv;
774   if ($sth) {
775     my $time = time();
776     $rv = eval { $sth->execute(@bind) };
777
778     if ($@ || !$rv) {
779       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
780     }
781   } else {
782     $self->throw_exception("'$sql' did not generate a statement.");
783   }
784   if ($self->debug) {
785       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
786       $self->debugobj->query_end($sql, @debug_bind);
787   }
788   return (wantarray ? ($rv, $sth, @bind) : $rv);
789 }
790
791 sub insert {
792   my ($self, $ident, $to_insert) = @_;
793   $self->throw_exception(
794     "Couldn't insert ".join(', ',
795       map "$_ => $to_insert->{$_}", keys %$to_insert
796     )." into ${ident}"
797   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
798   return $to_insert;
799 }
800
801 sub update {
802   return shift->_execute('update' => [], @_);
803 }
804
805 sub delete {
806   return shift->_execute('delete' => [], @_);
807 }
808
809 sub _select {
810   my ($self, $ident, $select, $condition, $attrs) = @_;
811   my $order = $attrs->{order_by};
812   if (ref $condition eq 'SCALAR') {
813     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
814   }
815   if (exists $attrs->{group_by} || $attrs->{having}) {
816     $order = {
817       group_by => $attrs->{group_by},
818       having => $attrs->{having},
819       ($order ? (order_by => $order) : ())
820     };
821   }
822   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
823   if ($attrs->{software_limit} ||
824       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
825         $attrs->{software_limit} = 1;
826   } else {
827     $self->throw_exception("rows attribute must be positive if present")
828       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
829     push @args, $attrs->{rows}, $attrs->{offset};
830   }
831   return $self->_execute(@args);
832 }
833
834 sub select {
835   my $self = shift;
836   my ($ident, $select, $condition, $attrs) = @_;
837   return $self->cursor->new($self, \@_, $attrs);
838 }
839
840 sub select_single {
841   my $self = shift;
842   my ($rv, $sth, @bind) = $self->_select(@_);
843   my @row = $sth->fetchrow_array;
844   # Need to call finish() to work round broken DBDs
845   $sth->finish();
846   return @row;
847 }
848
849 =head2 sth
850
851 Returns a L<DBI> sth (statement handle) for the supplied SQL.
852
853 =cut
854
855 sub __sth {
856   my ($dbh, $sql) = @_;
857   # 3 is the if_active parameter which avoids active sth re-use
858   $dbh->prepare_cached($sql, {}, 3);
859 }
860
861 sub sth {
862   my ($self, $sql) = @_;
863   $self->dbh_do(\&__sth, $sql);
864 }
865
866
867 sub __columns_info_for {
868   my ($dbh, $self, $table) = @_;
869
870   if ($dbh->can('column_info')) {
871     my %result;
872     eval {
873       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
874       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
875       $sth->execute();
876       while ( my $info = $sth->fetchrow_hashref() ){
877         my %column_info;
878         $column_info{data_type}   = $info->{TYPE_NAME};
879         $column_info{size}      = $info->{COLUMN_SIZE};
880         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
881         $column_info{default_value} = $info->{COLUMN_DEF};
882         my $col_name = $info->{COLUMN_NAME};
883         $col_name =~ s/^\"(.*)\"$/$1/;
884
885         $result{$col_name} = \%column_info;
886       }
887     };
888     return \%result if !$@;
889   }
890
891   my %result;
892   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
893   $sth->execute;
894   my @columns = @{$sth->{NAME_lc}};
895   for my $i ( 0 .. $#columns ){
896     my %column_info;
897     my $type_num = $sth->{TYPE}->[$i];
898     my $type_name;
899     if(defined $type_num && $dbh->can('type_info')) {
900       my $type_info = $dbh->type_info($type_num);
901       $type_name = $type_info->{TYPE_NAME} if $type_info;
902     }
903     $column_info{data_type} = $type_name ? $type_name : $type_num;
904     $column_info{size} = $sth->{PRECISION}->[$i];
905     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
906
907     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
908       $column_info{data_type} = $1;
909       $column_info{size}    = $2;
910     }
911
912     $result{$columns[$i]} = \%column_info;
913   }
914
915   return \%result;
916 }
917
918 sub columns_info_for {
919   my ($self, $table) = @_;
920   $self->dbh_do(\&__columns_info_for, $self, $table);
921 }
922
923 =head2 last_insert_id
924
925 Return the row id of the last insert.
926
927 =cut
928
929 sub last_insert_id {
930   my ($self, $row) = @_;
931     
932   $self->dbh_do(sub { shift->func('last_insert_rowid') });
933 }
934
935 =head2 sqlt_type
936
937 Returns the database driver name.
938
939 =cut
940
941 sub sqlt_type { shift->dbh_do(sub { shift->{Driver}->{Name} }) }
942
943 =head2 create_ddl_dir (EXPERIMENTAL)
944
945 =over 4
946
947 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
948
949 =back
950
951 Creates an SQL file based on the Schema, for each of the specified
952 database types, in the given directory.
953
954 Note that this feature is currently EXPERIMENTAL and may not work correctly
955 across all databases, or fully handle complex relationships.
956
957 =cut
958
959 sub create_ddl_dir
960 {
961   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
962
963   if(!$dir || !-d $dir)
964   {
965     warn "No directory given, using ./\n";
966     $dir = "./";
967   }
968   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
969   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
970   $version ||= $schema->VERSION || '1.x';
971   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
972
973   eval "use SQL::Translator";
974   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
975
976   my $sqlt = SQL::Translator->new($sqltargs);
977   foreach my $db (@$databases)
978   {
979     $sqlt->reset();
980     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
981 #    $sqlt->parser_args({'DBIx::Class' => $schema);
982     $sqlt->data($schema);
983     $sqlt->producer($db);
984
985     my $file;
986     my $filename = $schema->ddl_filename($db, $dir, $version);
987     if(-e $filename)
988     {
989       $self->throw_exception("$filename already exists, skipping $db");
990       next;
991     }
992     open($file, ">$filename") 
993       or $self->throw_exception("Can't open $filename for writing ($!)");
994     my $output = $sqlt->translate;
995 #use Data::Dumper;
996 #    print join(":", keys %{$schema->source_registrations});
997 #    print Dumper($sqlt->schema);
998     if(!$output)
999     {
1000       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1001       next;
1002     }
1003     print $file $output;
1004     close($file);
1005   }
1006
1007 }
1008
1009 =head2 deployment_statements
1010
1011 Create the statements for L</deploy> and
1012 L<DBIx::Class::Schema/deploy>.
1013
1014 =cut
1015
1016 sub deployment_statements {
1017   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1018   # Need to be connected to get the correct sqlt_type
1019   $self->ensure_connected() unless $type;
1020   $type ||= $self->sqlt_type;
1021   $version ||= $schema->VERSION || '1.x';
1022   $dir ||= './';
1023   eval "use SQL::Translator";
1024   if(!$@)
1025   {
1026     eval "use SQL::Translator::Parser::DBIx::Class;";
1027     $self->throw_exception($@) if $@;
1028     eval "use SQL::Translator::Producer::${type};";
1029     $self->throw_exception($@) if $@;
1030     my $tr = SQL::Translator->new(%$sqltargs);
1031     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1032     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1033   }
1034
1035   my $filename = $schema->ddl_filename($type, $dir, $version);
1036   if(!-f $filename)
1037   {
1038 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1039       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1040       return;
1041   }
1042   my $file;
1043   open($file, "<$filename") 
1044       or $self->throw_exception("Can't open $filename ($!)");
1045   my @rows = <$file>;
1046   close($file);
1047
1048   return join('', @rows);
1049   
1050 }
1051
1052 sub deploy {
1053   my ($self, $schema, $type, $sqltargs) = @_;
1054   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1055     for ( split(";\n", $statement)) {
1056       next if($_ =~ /^--/);
1057       next if(!$_);
1058 #      next if($_ =~ /^DROP/m);
1059       next if($_ =~ /^BEGIN TRANSACTION/m);
1060       next if($_ =~ /^COMMIT/m);
1061       next if $_ =~ /^\s+$/; # skip whitespace only
1062       $self->debugobj->query_start($_) if $self->debug;
1063       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1064       $self->debugobj->query_end($_) if $self->debug;
1065     }
1066   }
1067 }
1068
1069 =head2 datetime_parser
1070
1071 Returns the datetime parser class
1072
1073 =cut
1074
1075 sub datetime_parser {
1076   my $self = shift;
1077   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1078 }
1079
1080 =head2 datetime_parser_type
1081
1082 Defines (returns) the datetime parser class - currently hardwired to
1083 L<DateTime::Format::MySQL>
1084
1085 =cut
1086
1087 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1088
1089 =head2 build_datetime_parser
1090
1091 See L</datetime_parser>
1092
1093 =cut
1094
1095 sub build_datetime_parser {
1096   my $self = shift;
1097   my $type = $self->datetime_parser_type(@_);
1098   eval "use ${type}";
1099   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1100   return $type;
1101 }
1102
1103 sub DESTROY {
1104   my $self = shift;
1105   return if !$self->_dbh;
1106
1107   $self->_verify_pid;
1108   $self->_dbh(undef);
1109 }
1110
1111 1;
1112
1113 =head1 SQL METHODS
1114
1115 The module defines a set of methods within the DBIC::SQL::Abstract
1116 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1117 SQL query functions.
1118
1119 The following methods are extended:-
1120
1121 =over 4
1122
1123 =item delete
1124
1125 =item insert
1126
1127 =item select
1128
1129 =item update
1130
1131 =item limit_dialect
1132
1133 See L</connect_info> for details.
1134 For setting, this method is deprecated in favor of L</connect_info>.
1135
1136 =item quote_char
1137
1138 See L</connect_info> for details.
1139 For setting, this method is deprecated in favor of L</connect_info>.
1140
1141 =item name_sep
1142
1143 See L</connect_info> for details.
1144 For setting, this method is deprecated in favor of L</connect_info>.
1145
1146 =back
1147
1148 =head1 AUTHORS
1149
1150 Matt S. Trout <mst@shadowcatsystems.co.uk>
1151
1152 Andy Grundman <andy@hybridized.org>
1153
1154 =head1 LICENSE
1155
1156 You may distribute this code under the same terms as Perl itself.
1157
1158 =cut
1159