Fixed to actually insert using column names, thanks claco!
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 use Scalar::Util 'blessed';
62 sub _find_syntax {
63   my ($self, $syntax) = @_;
64   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
65 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
66   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
67     return 'RowNumberOver';
68   }
69
70   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
71 }
72
73 sub select {
74   my ($self, $table, $fields, $where, $order, @rest) = @_;
75   $table = $self->_quote($table) unless ref($table);
76   local $self->{rownum_hack_count} = 1
77     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
78   @rest = (-1) unless defined $rest[0];
79   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
80     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
81   local $self->{having_bind} = [];
82   my ($sql, @ret) = $self->SUPER::select(
83     $table, $self->_recurse_fields($fields), $where, $order, @rest
84   );
85   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
86 }
87
88 sub insert {
89   my $self = shift;
90   my $table = shift;
91   $table = $self->_quote($table) unless ref($table);
92   $self->SUPER::insert($table, @_);
93 }
94
95 sub update {
96   my $self = shift;
97   my $table = shift;
98   $table = $self->_quote($table) unless ref($table);
99   $self->SUPER::update($table, @_);
100 }
101
102 sub delete {
103   my $self = shift;
104   my $table = shift;
105   $table = $self->_quote($table) unless ref($table);
106   $self->SUPER::delete($table, @_);
107 }
108
109 sub _emulate_limit {
110   my $self = shift;
111   if ($_[3] == -1) {
112     return $_[1].$self->_order_by($_[2]);
113   } else {
114     return $self->SUPER::_emulate_limit(@_);
115   }
116 }
117
118 sub _recurse_fields {
119   my ($self, $fields) = @_;
120   my $ref = ref $fields;
121   return $self->_quote($fields) unless $ref;
122   return $$fields if $ref eq 'SCALAR';
123
124   if ($ref eq 'ARRAY') {
125     return join(', ', map {
126       $self->_recurse_fields($_)
127       .(exists $self->{rownum_hack_count}
128          ? ' AS col'.$self->{rownum_hack_count}++
129          : '')
130      } @$fields);
131   } elsif ($ref eq 'HASH') {
132     foreach my $func (keys %$fields) {
133       return $self->_sqlcase($func)
134         .'( '.$self->_recurse_fields($fields->{$func}).' )';
135     }
136   }
137 }
138
139 sub _order_by {
140   my $self = shift;
141   my $ret = '';
142   my @extra;
143   if (ref $_[0] eq 'HASH') {
144     if (defined $_[0]->{group_by}) {
145       $ret = $self->_sqlcase(' group by ')
146                .$self->_recurse_fields($_[0]->{group_by});
147     }
148     if (defined $_[0]->{having}) {
149       my $frag;
150       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
151       push(@{$self->{having_bind}}, @extra);
152       $ret .= $self->_sqlcase(' having ').$frag;
153     }
154     if (defined $_[0]->{order_by}) {
155       $ret .= $self->_order_by($_[0]->{order_by});
156     }
157   } elsif (ref $_[0] eq 'SCALAR') {
158     $ret = $self->_sqlcase(' order by ').${ $_[0] };
159   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
160     my @order = @{+shift};
161     $ret = $self->_sqlcase(' order by ')
162           .join(', ', map {
163                         my $r = $self->_order_by($_, @_);
164                         $r =~ s/^ ?ORDER BY //i;
165                         $r;
166                       } @order);
167   } else {
168     $ret = $self->SUPER::_order_by(@_);
169   }
170   return $ret;
171 }
172
173 sub _order_directions {
174   my ($self, $order) = @_;
175   $order = $order->{order_by} if ref $order eq 'HASH';
176   return $self->SUPER::_order_directions($order);
177 }
178
179 sub _table {
180   my ($self, $from) = @_;
181   if (ref $from eq 'ARRAY') {
182     return $self->_recurse_from(@$from);
183   } elsif (ref $from eq 'HASH') {
184     return $self->_make_as($from);
185   } else {
186     return $from; # would love to quote here but _table ends up getting called
187                   # twice during an ->select without a limit clause due to
188                   # the way S::A::Limit->select works. should maybe consider
189                   # bypassing this and doing S::A::select($self, ...) in
190                   # our select method above. meantime, quoting shims have
191                   # been added to select/insert/update/delete here
192   }
193 }
194
195 sub _recurse_from {
196   my ($self, $from, @join) = @_;
197   my @sqlf;
198   push(@sqlf, $self->_make_as($from));
199   foreach my $j (@join) {
200     my ($to, $on) = @$j;
201
202     # check whether a join type exists
203     my $join_clause = '';
204     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
205     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
206       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
207     } else {
208       $join_clause = ' JOIN ';
209     }
210     push(@sqlf, $join_clause);
211
212     if (ref $to eq 'ARRAY') {
213       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
214     } else {
215       push(@sqlf, $self->_make_as($to));
216     }
217     push(@sqlf, ' ON ', $self->_join_condition($on));
218   }
219   return join('', @sqlf);
220 }
221
222 sub _make_as {
223   my ($self, $from) = @_;
224   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
225                      reverse each %{$self->_skip_options($from)});
226 }
227
228 sub _skip_options {
229   my ($self, $hash) = @_;
230   my $clean_hash = {};
231   $clean_hash->{$_} = $hash->{$_}
232     for grep {!/^-/} keys %$hash;
233   return $clean_hash;
234 }
235
236 sub _join_condition {
237   my ($self, $cond) = @_;
238   if (ref $cond eq 'HASH') {
239     my %j;
240     for (keys %$cond) {
241       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
242     };
243     return $self->_recurse_where(\%j);
244   } elsif (ref $cond eq 'ARRAY') {
245     return join(' OR ', map { $self->_join_condition($_) } @$cond);
246   } else {
247     die "Can't handle this yet!";
248   }
249 }
250
251 sub _quote {
252   my ($self, $label) = @_;
253   return '' unless defined $label;
254   return "*" if $label eq '*';
255   return $label unless $self->{quote_char};
256   if(ref $self->{quote_char} eq "ARRAY"){
257     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
258       if !defined $self->{name_sep};
259     my $sep = $self->{name_sep};
260     return join($self->{name_sep},
261         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
262        split(/\Q$sep\E/,$label));
263   }
264   return $self->SUPER::_quote($label);
265 }
266
267 sub limit_dialect {
268     my $self = shift;
269     $self->{limit_dialect} = shift if @_;
270     return $self->{limit_dialect};
271 }
272
273 sub quote_char {
274     my $self = shift;
275     $self->{quote_char} = shift if @_;
276     return $self->{quote_char};
277 }
278
279 sub name_sep {
280     my $self = shift;
281     $self->{name_sep} = shift if @_;
282     return $self->{name_sep};
283 }
284
285 } # End of BEGIN block
286
287 =head1 NAME
288
289 DBIx::Class::Storage::DBI - DBI storage handler
290
291 =head1 SYNOPSIS
292
293 =head1 DESCRIPTION
294
295 This class represents the connection to an RDBMS via L<DBI>.  See
296 L<DBIx::Class::Storage> for general information.  This pod only
297 documents DBI-specific methods and behaviors.
298
299 =head1 METHODS
300
301 =cut
302
303 sub new {
304   my $new = shift->next::method(@_);
305
306   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
307   $new->transaction_depth(0);
308   $new->_sql_maker_opts({});
309   $new->{_in_dbh_do} = 0;
310   $new->{_dbh_gen} = 0;
311
312   $new;
313 }
314
315 =head2 connect_info
316
317 The arguments of C<connect_info> are always a single array reference.
318
319 This is normally accessed via L<DBIx::Class::Schema/connection>, which
320 encapsulates its argument list in an arrayref before calling
321 C<connect_info> here.
322
323 The arrayref can either contain the same set of arguments one would
324 normally pass to L<DBI/connect>, or a lone code reference which returns
325 a connected database handle.
326
327 In either case, if the final argument in your connect_info happens
328 to be a hashref, C<connect_info> will look there for several
329 connection-specific options:
330
331 =over 4
332
333 =item on_connect_do
334
335 This can be set to an arrayref of literal sql statements, which will
336 be executed immediately after making the connection to the database
337 every time we [re-]connect.
338
339 =item limit_dialect 
340
341 Sets the limit dialect. This is useful for JDBC-bridge among others
342 where the remote SQL-dialect cannot be determined by the name of the
343 driver alone.
344
345 =item quote_char
346
347 Specifies what characters to use to quote table and column names. If 
348 you use this you will want to specify L<name_sep> as well.
349
350 quote_char expects either a single character, in which case is it is placed
351 on either side of the table/column, or an arrayref of length 2 in which case the
352 table/column name is placed between the elements.
353
354 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
355 use C<quote_char =E<gt> [qw/[ ]/]>.
356
357 =item name_sep
358
359 This only needs to be used in conjunction with L<quote_char>, and is used to 
360 specify the charecter that seperates elements (schemas, tables, columns) from 
361 each other. In most cases this is simply a C<.>.
362
363 =back
364
365 These options can be mixed in with your other L<DBI> connection attributes,
366 or placed in a seperate hashref after all other normal L<DBI> connection
367 arguments.
368
369 Every time C<connect_info> is invoked, any previous settings for
370 these options will be cleared before setting the new ones, regardless of
371 whether any options are specified in the new C<connect_info>.
372
373 Important note:  DBIC expects the returned database handle provided by 
374 a subref argument to have RaiseError set on it.  If it doesn't, things
375 might not work very well, YMMV.  If you don't use a subref, DBIC will
376 force this setting for you anyways.  Setting HandleError to anything
377 other than simple exception object wrapper might cause problems too.
378
379 Examples:
380
381   # Simple SQLite connection
382   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
383
384   # Connect via subref
385   ->connect_info([ sub { DBI->connect(...) } ]);
386
387   # A bit more complicated
388   ->connect_info(
389     [
390       'dbi:Pg:dbname=foo',
391       'postgres',
392       'my_pg_password',
393       { AutoCommit => 0 },
394       { quote_char => q{"}, name_sep => q{.} },
395     ]
396   );
397
398   # Equivalent to the previous example
399   ->connect_info(
400     [
401       'dbi:Pg:dbname=foo',
402       'postgres',
403       'my_pg_password',
404       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
405     ]
406   );
407
408   # Subref + DBIC-specific connection options
409   ->connect_info(
410     [
411       sub { DBI->connect(...) },
412       {
413           quote_char => q{`},
414           name_sep => q{@},
415           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
416       },
417     ]
418   );
419
420 =cut
421
422 sub connect_info {
423   my ($self, $info_arg) = @_;
424
425   return $self->_connect_info if !$info_arg;
426
427   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
428   #  the new set of options
429   $self->_sql_maker(undef);
430   $self->_sql_maker_opts({});
431
432   my $info = [ @$info_arg ]; # copy because we can alter it
433   my $last_info = $info->[-1];
434   if(ref $last_info eq 'HASH') {
435     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
436       $self->on_connect_do($on_connect_do);
437     }
438     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
439       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
440         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
441       }
442     }
443
444     # Get rid of any trailing empty hashref
445     pop(@$info) if !keys %$last_info;
446   }
447
448   $self->_connect_info($info);
449 }
450
451 =head2 on_connect_do
452
453 This method is deprecated in favor of setting via L</connect_info>.
454
455 =head2 dbh_do
456
457 Arguments: $subref, @extra_coderef_args?
458
459 Execute the given subref using the new exception-based connection management.
460
461 The first two arguments will be the storage object that C<dbh_do> was called
462 on and a database handle to use.  Any additional arguments will be passed
463 verbatim to the called subref as arguments 2 and onwards.
464
465 Using this (instead of $self->_dbh or $self->dbh) ensures correct
466 exception handling and reconnection (or failover in future subclasses).
467
468 Your subref should have no side-effects outside of the database, as
469 there is the potential for your subref to be partially double-executed
470 if the database connection was stale/dysfunctional.
471
472 Example:
473
474   my @stuff = $schema->storage->dbh_do(
475     sub {
476       my ($storage, $dbh, @cols) = @_;
477       my $cols = join(q{, }, @cols);
478       $dbh->selectrow_array("SELECT $cols FROM foo");
479     },
480     @column_list
481   );
482
483 =cut
484
485 sub dbh_do {
486   my $self = shift;
487   my $coderef = shift;
488
489   ref $coderef eq 'CODE' or $self->throw_exception
490     ('$coderef must be a CODE reference');
491
492   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
493   local $self->{_in_dbh_do} = 1;
494
495   my @result;
496   my $want_array = wantarray;
497
498   eval {
499     $self->_verify_pid if $self->_dbh;
500     $self->_populate_dbh if !$self->_dbh;
501     if($want_array) {
502         @result = $coderef->($self, $self->_dbh, @_);
503     }
504     elsif(defined $want_array) {
505         $result[0] = $coderef->($self, $self->_dbh, @_);
506     }
507     else {
508         $coderef->($self, $self->_dbh, @_);
509     }
510   };
511
512   my $exception = $@;
513   if(!$exception) { return $want_array ? @result : $result[0] }
514
515   $self->throw_exception($exception) if $self->connected;
516
517   # We were not connected - reconnect and retry, but let any
518   #  exception fall right through this time
519   $self->_populate_dbh;
520   $coderef->($self, $self->_dbh, @_);
521 }
522
523 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
524 # It also informs dbh_do to bypass itself while under the direction of txn_do,
525 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
526 sub txn_do {
527   my $self = shift;
528   my $coderef = shift;
529
530   ref $coderef eq 'CODE' or $self->throw_exception
531     ('$coderef must be a CODE reference');
532
533   local $self->{_in_dbh_do} = 1;
534
535   my @result;
536   my $want_array = wantarray;
537
538   my $tried = 0;
539   while(1) {
540     eval {
541       $self->_verify_pid if $self->_dbh;
542       $self->_populate_dbh if !$self->_dbh;
543
544       $self->txn_begin;
545       if($want_array) {
546           @result = $coderef->(@_);
547       }
548       elsif(defined $want_array) {
549           $result[0] = $coderef->(@_);
550       }
551       else {
552           $coderef->(@_);
553       }
554       $self->txn_commit;
555     };
556
557     my $exception = $@;
558     if(!$exception) { return $want_array ? @result : $result[0] }
559
560     if($tried++ > 0 || $self->connected) {
561       eval { $self->txn_rollback };
562       my $rollback_exception = $@;
563       if($rollback_exception) {
564         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
565         $self->throw_exception($exception)  # propagate nested rollback
566           if $rollback_exception =~ /$exception_class/;
567
568         $self->throw_exception(
569           "Transaction aborted: ${exception}. "
570           . "Rollback failed: ${rollback_exception}"
571         );
572       }
573       $self->throw_exception($exception)
574     }
575
576     # We were not connected, and was first try - reconnect and retry
577     # via the while loop
578     $self->_populate_dbh;
579   }
580 }
581
582 =head2 disconnect
583
584 Our C<disconnect> method also performs a rollback first if the
585 database is not in C<AutoCommit> mode.
586
587 =cut
588
589 sub disconnect {
590   my ($self) = @_;
591
592   if( $self->connected ) {
593     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
594     $self->_dbh->disconnect;
595     $self->_dbh(undef);
596     $self->{_dbh_gen}++;
597   }
598 }
599
600 sub connected {
601   my ($self) = @_;
602
603   if(my $dbh = $self->_dbh) {
604       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
605           $self->_dbh(undef);
606           $self->{_dbh_gen}++;
607           return;
608       }
609       else {
610           $self->_verify_pid;
611       }
612       return ($dbh->FETCH('Active') && $dbh->ping);
613   }
614
615   return 0;
616 }
617
618 # handle pid changes correctly
619 #  NOTE: assumes $self->_dbh is a valid $dbh
620 sub _verify_pid {
621   my ($self) = @_;
622
623   return if $self->_conn_pid == $$;
624
625   $self->_dbh->{InactiveDestroy} = 1;
626   $self->_dbh(undef);
627   $self->{_dbh_gen}++;
628
629   return;
630 }
631
632 sub ensure_connected {
633   my ($self) = @_;
634
635   unless ($self->connected) {
636     $self->_populate_dbh;
637   }
638 }
639
640 =head2 dbh
641
642 Returns the dbh - a data base handle of class L<DBI>.
643
644 =cut
645
646 sub dbh {
647   my ($self) = @_;
648
649   $self->ensure_connected;
650   return $self->_dbh;
651 }
652
653 sub _sql_maker_args {
654     my ($self) = @_;
655     
656     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
657 }
658
659 sub sql_maker {
660   my ($self) = @_;
661   unless ($self->_sql_maker) {
662     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
663   }
664   return $self->_sql_maker;
665 }
666
667 sub _populate_dbh {
668   my ($self) = @_;
669   my @info = @{$self->_connect_info || []};
670   $self->_dbh($self->_connect(@info));
671
672   if(ref $self eq 'DBIx::Class::Storage::DBI') {
673     my $driver = $self->_dbh->{Driver}->{Name};
674     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
675       bless $self, "DBIx::Class::Storage::DBI::${driver}";
676       $self->_rebless() if $self->can('_rebless');
677     }
678   }
679
680   # if on-connect sql statements are given execute them
681   foreach my $sql_statement (@{$self->on_connect_do || []}) {
682     $self->debugobj->query_start($sql_statement) if $self->debug();
683     $self->_dbh->do($sql_statement);
684     $self->debugobj->query_end($sql_statement) if $self->debug();
685   }
686
687   $self->_conn_pid($$);
688   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
689 }
690
691 sub _connect {
692   my ($self, @info) = @_;
693
694   $self->throw_exception("You failed to provide any connection info")
695       if !@info;
696
697   my ($old_connect_via, $dbh);
698
699   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
700       $old_connect_via = $DBI::connect_via;
701       $DBI::connect_via = 'connect';
702   }
703
704   eval {
705     if(ref $info[0] eq 'CODE') {
706        $dbh = &{$info[0]}
707     }
708     else {
709        $dbh = DBI->connect(@info);
710        $dbh->{RaiseError} = 1;
711        $dbh->{PrintError} = 0;
712        $dbh->{PrintWarn} = 0;
713     }
714   };
715
716   $DBI::connect_via = $old_connect_via if $old_connect_via;
717
718   if (!$dbh || $@) {
719     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
720   }
721
722   $dbh;
723 }
724
725 sub _dbh_txn_begin {
726   my ($self, $dbh) = @_;
727   if ($dbh->{AutoCommit}) {
728     $self->debugobj->txn_begin()
729       if ($self->debug);
730     $dbh->begin_work;
731   }
732 }
733
734 sub txn_begin {
735   my $self = shift;
736   $self->dbh_do($self->can('_dbh_txn_begin'))
737     if $self->{transaction_depth}++ == 0;
738 }
739
740 sub _dbh_txn_commit {
741   my ($self, $dbh) = @_;
742   if ($self->{transaction_depth} == 0) {
743     unless ($dbh->{AutoCommit}) {
744       $self->debugobj->txn_commit()
745         if ($self->debug);
746       $dbh->commit;
747     }
748   }
749   else {
750     if (--$self->{transaction_depth} == 0) {
751       $self->debugobj->txn_commit()
752         if ($self->debug);
753       $dbh->commit;
754     }
755   }
756 }
757
758 sub txn_commit {
759   my $self = shift;
760   $self->dbh_do($self->can('_dbh_txn_commit'));
761 }
762
763 sub _dbh_txn_rollback {
764   my ($self, $dbh) = @_;
765   if ($self->{transaction_depth} == 0) {
766     unless ($dbh->{AutoCommit}) {
767       $self->debugobj->txn_rollback()
768         if ($self->debug);
769       $dbh->rollback;
770     }
771   }
772   else {
773     if (--$self->{transaction_depth} == 0) {
774       $self->debugobj->txn_rollback()
775         if ($self->debug);
776       $dbh->rollback;
777     }
778     else {
779       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
780     }
781   }
782 }
783
784 sub txn_rollback {
785   my $self = shift;
786
787   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
788   if ($@) {
789     my $error = $@;
790     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
791     $error =~ /$exception_class/ and $self->throw_exception($error);
792     $self->{transaction_depth} = 0;          # ensure that a failed rollback
793     $self->throw_exception($error);          # resets the transaction depth
794   }
795 }
796
797 # This used to be the top-half of _execute.  It was split out to make it
798 #  easier to override in NoBindVars without duping the rest.  It takes up
799 #  all of _execute's args, and emits $sql, @bind.
800 sub _prep_for_execute {
801   my ($self, $op, $extra_bind, $ident, @args) = @_;
802
803   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
804   unshift(@bind, @$extra_bind) if $extra_bind;
805   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
806
807   return ($sql, @bind);
808 }
809
810 sub _execute {
811   my $self = shift;
812
813   my ($sql, @bind) = $self->_prep_for_execute(@_);
814
815   if ($self->debug) {
816       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
817       $self->debugobj->query_start($sql, @debug_bind);
818   }
819
820   my $sth = $self->sth($sql);
821
822   my $rv;
823   if ($sth) {
824     my $time = time();
825     $rv = eval { $sth->execute(@bind) };
826
827     if ($@ || !$rv) {
828       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
829     }
830   } else {
831     $self->throw_exception("'$sql' did not generate a statement.");
832   }
833   if ($self->debug) {
834       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind; 
835       $self->debugobj->query_end($sql, @debug_bind);
836   }
837   return (wantarray ? ($rv, $sth, @bind) : $rv);
838 }
839
840 sub insert {
841   my ($self, $ident, $to_insert) = @_;
842   $self->throw_exception(
843     "Couldn't insert ".join(', ',
844       map "$_ => $to_insert->{$_}", keys %$to_insert
845     )." into ${ident}"
846   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
847   return $to_insert;
848 }
849
850 ## Still not quite perfect, and EXPERIMENTAL
851 ## Currently it is assumed that all values passed will be "normal", i.e. not 
852 ## scalar refs, or at least, all the same type as the first set, the statement is
853 ## only prepped once.
854 sub insert_bulk {
855   my ($self, $table, $cols, $data) = @_;
856   my %colvalues;
857   @colvalues{@$cols} = (0..$#$cols);
858   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
859 # print STDERR "BIND".Dumper(\@bind);
860
861   if ($self->debug) {
862       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
863       $self->debugobj->query_start($sql, @debug_bind);
864   }
865   my $sth = eval { $self->sth($sql,'insert') };
866
867   if (!$sth || $@) {
868     $self->throw_exception(
869       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
870     );
871   }
872 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
873
874   my $rv;
875   ## This must be an arrayref, else nothing works!
876   my $tuple_status = [];
877 #  use Data::Dumper;
878 #  print STDERR Dumper($data);
879   if ($sth) {
880     my $time = time();
881     $rv = eval { $sth->execute_array({ ArrayTupleFetch => sub { my $values = shift @$data;  return if !$values; return [ @{$values}[@bind] ]},
882                                        ArrayTupleStatus => $tuple_status }) };
883 # print STDERR Dumper($tuple_status);
884 # print STDERR "RV: $rv\n";
885     if ($@ || !defined $rv) {
886       my $errors = '';
887       foreach my $tuple (@$tuple_status)
888       {
889           $errors .= "\n" . $tuple->[1] if(ref $tuple);
890       }
891       $self->throw_exception("Error executing '$sql': ".($@ || $errors));
892     }
893   } else {
894     $self->throw_exception("'$sql' did not generate a statement.");
895   }
896   if ($self->debug) {
897       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
898       $self->debugobj->query_end($sql, @debug_bind);
899   }
900   return (wantarray ? ($rv, $sth, @bind) : $rv);
901 }
902
903 sub update {
904   return shift->_execute('update' => [], @_);
905 }
906
907 sub delete {
908   return shift->_execute('delete' => [], @_);
909 }
910
911 sub _select {
912   my ($self, $ident, $select, $condition, $attrs) = @_;
913   my $order = $attrs->{order_by};
914   if (ref $condition eq 'SCALAR') {
915     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
916   }
917   if (exists $attrs->{group_by} || $attrs->{having}) {
918     $order = {
919       group_by => $attrs->{group_by},
920       having => $attrs->{having},
921       ($order ? (order_by => $order) : ())
922     };
923   }
924   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
925   if ($attrs->{software_limit} ||
926       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
927         $attrs->{software_limit} = 1;
928   } else {
929     $self->throw_exception("rows attribute must be positive if present")
930       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
931     push @args, $attrs->{rows}, $attrs->{offset};
932   }
933   return $self->_execute(@args);
934 }
935
936 =head2 select
937
938 =over 4
939
940 =item Arguments: $ident, $select, $condition, $attrs
941
942 =back
943
944 Handle a SQL select statement.
945
946 =cut
947
948 sub select {
949   my $self = shift;
950   my ($ident, $select, $condition, $attrs) = @_;
951   return $self->cursor->new($self, \@_, $attrs);
952 }
953
954 sub select_single {
955   my $self = shift;
956   my ($rv, $sth, @bind) = $self->_select(@_);
957   my @row = $sth->fetchrow_array;
958   # Need to call finish() to work round broken DBDs
959   $sth->finish();
960   return @row;
961 }
962
963 =head2 sth
964
965 =over 4
966
967 =item Arguments: $sql
968
969 =back
970
971 Returns a L<DBI> sth (statement handle) for the supplied SQL.
972
973 =cut
974
975 sub _dbh_sth {
976   my ($self, $dbh, $sql) = @_;
977   # 3 is the if_active parameter which avoids active sth re-use
978   $dbh->prepare_cached($sql, {}, 3) or
979     $self->throw_exception(
980       'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
981     );
982 }
983
984 sub sth {
985   my ($self, $sql) = @_;
986   $self->dbh_do($self->can('_dbh_sth'), $sql);
987 }
988
989 sub _dbh_columns_info_for {
990   my ($self, $dbh, $table) = @_;
991
992   if ($dbh->can('column_info')) {
993     my %result;
994     eval {
995       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
996       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
997       $sth->execute();
998       while ( my $info = $sth->fetchrow_hashref() ){
999         my %column_info;
1000         $column_info{data_type}   = $info->{TYPE_NAME};
1001         $column_info{size}      = $info->{COLUMN_SIZE};
1002         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1003         $column_info{default_value} = $info->{COLUMN_DEF};
1004         my $col_name = $info->{COLUMN_NAME};
1005         $col_name =~ s/^\"(.*)\"$/$1/;
1006
1007         $result{$col_name} = \%column_info;
1008       }
1009     };
1010     return \%result if !$@ && scalar keys %result;
1011   }
1012
1013   my %result;
1014   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
1015   $sth->execute;
1016   my @columns = @{$sth->{NAME_lc}};
1017   for my $i ( 0 .. $#columns ){
1018     my %column_info;
1019     my $type_num = $sth->{TYPE}->[$i];
1020     my $type_name;
1021     if(defined $type_num && $dbh->can('type_info')) {
1022       my $type_info = $dbh->type_info($type_num);
1023       $type_name = $type_info->{TYPE_NAME} if $type_info;
1024     }
1025     $column_info{data_type} = $type_name ? $type_name : $type_num;
1026     $column_info{size} = $sth->{PRECISION}->[$i];
1027     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1028
1029     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1030       $column_info{data_type} = $1;
1031       $column_info{size}    = $2;
1032     }
1033
1034     $result{$columns[$i]} = \%column_info;
1035   }
1036
1037   return \%result;
1038 }
1039
1040 sub columns_info_for {
1041   my ($self, $table) = @_;
1042   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
1043 }
1044
1045 =head2 last_insert_id
1046
1047 Return the row id of the last insert.
1048
1049 =cut
1050
1051 sub _dbh_last_insert_id {
1052     my ($self, $dbh, $source, $col) = @_;
1053     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1054     $dbh->func('last_insert_rowid');
1055 }
1056
1057 sub last_insert_id {
1058   my $self = shift;
1059   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1060 }
1061
1062 =head2 sqlt_type
1063
1064 Returns the database driver name.
1065
1066 =cut
1067
1068 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1069
1070 =head2 create_ddl_dir (EXPERIMENTAL)
1071
1072 =over 4
1073
1074 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1075
1076 =back
1077
1078 Creates a SQL file based on the Schema, for each of the specified
1079 database types, in the given directory.
1080
1081 Note that this feature is currently EXPERIMENTAL and may not work correctly
1082 across all databases, or fully handle complex relationships.
1083
1084 =cut
1085
1086 sub create_ddl_dir
1087 {
1088   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1089
1090   if(!$dir || !-d $dir)
1091   {
1092     warn "No directory given, using ./\n";
1093     $dir = "./";
1094   }
1095   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1096   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1097   $version ||= $schema->VERSION || '1.x';
1098   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1099
1100   eval "use SQL::Translator";
1101   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1102
1103   my $sqlt = SQL::Translator->new($sqltargs);
1104   foreach my $db (@$databases)
1105   {
1106     $sqlt->reset();
1107     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1108 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1109     $sqlt->data($schema);
1110     $sqlt->producer($db);
1111
1112     my $file;
1113     my $filename = $schema->ddl_filename($db, $dir, $version);
1114     if(-e $filename)
1115     {
1116       $self->throw_exception("$filename already exists, skipping $db");
1117       next;
1118     }
1119     open($file, ">$filename") 
1120       or $self->throw_exception("Can't open $filename for writing ($!)");
1121     my $output = $sqlt->translate;
1122 #use Data::Dumper;
1123 #    print join(":", keys %{$schema->source_registrations});
1124 #    print Dumper($sqlt->schema);
1125     if(!$output)
1126     {
1127       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1128       next;
1129     }
1130     print $file $output;
1131     close($file);
1132   }
1133
1134 }
1135
1136 =head2 deployment_statements
1137
1138 =over 4
1139
1140 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1141
1142 =back
1143
1144 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1145 The database driver name is given by C<$type>, though the value from
1146 L</sqlt_type> is used if it is not specified.
1147
1148 C<$directory> is used to return statements from files in a previously created
1149 L</create_ddl_dir> directory and is optional. The filenames are constructed
1150 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1151
1152 If no C<$directory> is specified then the statements are constructed on the
1153 fly using L<SQL::Translator> and C<$version> is ignored.
1154
1155 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1156
1157 =cut
1158
1159 sub deployment_statements {
1160   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1161   # Need to be connected to get the correct sqlt_type
1162   $self->ensure_connected() unless $type;
1163   $type ||= $self->sqlt_type;
1164   $version ||= $schema->VERSION || '1.x';
1165   $dir ||= './';
1166   eval "use SQL::Translator";
1167   if(!$@)
1168   {
1169     eval "use SQL::Translator::Parser::DBIx::Class;";
1170     $self->throw_exception($@) if $@;
1171     eval "use SQL::Translator::Producer::${type};";
1172     $self->throw_exception($@) if $@;
1173     my $tr = SQL::Translator->new(%$sqltargs);
1174     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1175     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1176   }
1177
1178   my $filename = $schema->ddl_filename($type, $dir, $version);
1179   if(!-f $filename)
1180   {
1181 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1182       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1183       return;
1184   }
1185   my $file;
1186   open($file, "<$filename") 
1187       or $self->throw_exception("Can't open $filename ($!)");
1188   my @rows = <$file>;
1189   close($file);
1190
1191   return join('', @rows);
1192   
1193 }
1194
1195 sub deploy {
1196   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1197   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1198     for ( split(";\n", $statement)) {
1199       next if($_ =~ /^--/);
1200       next if(!$_);
1201 #      next if($_ =~ /^DROP/m);
1202       next if($_ =~ /^BEGIN TRANSACTION/m);
1203       next if($_ =~ /^COMMIT/m);
1204       next if $_ =~ /^\s+$/; # skip whitespace only
1205       $self->debugobj->query_start($_) if $self->debug;
1206       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1207       $self->debugobj->query_end($_) if $self->debug;
1208     }
1209   }
1210 }
1211
1212 =head2 datetime_parser
1213
1214 Returns the datetime parser class
1215
1216 =cut
1217
1218 sub datetime_parser {
1219   my $self = shift;
1220   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1221 }
1222
1223 =head2 datetime_parser_type
1224
1225 Defines (returns) the datetime parser class - currently hardwired to
1226 L<DateTime::Format::MySQL>
1227
1228 =cut
1229
1230 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1231
1232 =head2 build_datetime_parser
1233
1234 See L</datetime_parser>
1235
1236 =cut
1237
1238 sub build_datetime_parser {
1239   my $self = shift;
1240   my $type = $self->datetime_parser_type(@_);
1241   eval "use ${type}";
1242   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1243   return $type;
1244 }
1245
1246 sub DESTROY {
1247   my $self = shift;
1248   return if !$self->_dbh;
1249   $self->_verify_pid;
1250   $self->_dbh(undef);
1251 }
1252
1253 1;
1254
1255 =head1 SQL METHODS
1256
1257 The module defines a set of methods within the DBIC::SQL::Abstract
1258 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1259 SQL query functions.
1260
1261 The following methods are extended:-
1262
1263 =over 4
1264
1265 =item delete
1266
1267 =item insert
1268
1269 =item select
1270
1271 =item update
1272
1273 =item limit_dialect
1274
1275 See L</connect_info> for details.
1276 For setting, this method is deprecated in favor of L</connect_info>.
1277
1278 =item quote_char
1279
1280 See L</connect_info> for details.
1281 For setting, this method is deprecated in favor of L</connect_info>.
1282
1283 =item name_sep
1284
1285 See L</connect_info> for details.
1286 For setting, this method is deprecated in favor of L</connect_info>.
1287
1288 =back
1289
1290 =head1 AUTHORS
1291
1292 Matt S. Trout <mst@shadowcatsystems.co.uk>
1293
1294 Andy Grundman <andy@hybridized.org>
1295
1296 =head1 LICENSE
1297
1298 You may distribute this code under the same terms as Perl itself.
1299
1300 =cut