create insert_bulk using execute_array, and make populate use it in void context
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 use Scalar::Util 'blessed';
62 sub _find_syntax {
63   my ($self, $syntax) = @_;
64   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
65 #  print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
66   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
67     return 'RowNumberOver';
68   }
69
70   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
71 }
72
73 sub select {
74   my ($self, $table, $fields, $where, $order, @rest) = @_;
75   $table = $self->_quote($table) unless ref($table);
76   local $self->{rownum_hack_count} = 1
77     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
78   @rest = (-1) unless defined $rest[0];
79   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
80     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
81   local $self->{having_bind} = [];
82   my ($sql, @ret) = $self->SUPER::select(
83     $table, $self->_recurse_fields($fields), $where, $order, @rest
84   );
85   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
86 }
87
88 sub insert {
89   my $self = shift;
90   my $table = shift;
91   $table = $self->_quote($table) unless ref($table);
92   $self->SUPER::insert($table, @_);
93 }
94
95 sub update {
96   my $self = shift;
97   my $table = shift;
98   $table = $self->_quote($table) unless ref($table);
99   $self->SUPER::update($table, @_);
100 }
101
102 sub delete {
103   my $self = shift;
104   my $table = shift;
105   $table = $self->_quote($table) unless ref($table);
106   $self->SUPER::delete($table, @_);
107 }
108
109 sub _emulate_limit {
110   my $self = shift;
111   if ($_[3] == -1) {
112     return $_[1].$self->_order_by($_[2]);
113   } else {
114     return $self->SUPER::_emulate_limit(@_);
115   }
116 }
117
118 sub _recurse_fields {
119   my ($self, $fields) = @_;
120   my $ref = ref $fields;
121   return $self->_quote($fields) unless $ref;
122   return $$fields if $ref eq 'SCALAR';
123
124   if ($ref eq 'ARRAY') {
125     return join(', ', map {
126       $self->_recurse_fields($_)
127       .(exists $self->{rownum_hack_count}
128          ? ' AS col'.$self->{rownum_hack_count}++
129          : '')
130      } @$fields);
131   } elsif ($ref eq 'HASH') {
132     foreach my $func (keys %$fields) {
133       return $self->_sqlcase($func)
134         .'( '.$self->_recurse_fields($fields->{$func}).' )';
135     }
136   }
137 }
138
139 sub _order_by {
140   my $self = shift;
141   my $ret = '';
142   my @extra;
143   if (ref $_[0] eq 'HASH') {
144     if (defined $_[0]->{group_by}) {
145       $ret = $self->_sqlcase(' group by ')
146                .$self->_recurse_fields($_[0]->{group_by});
147     }
148     if (defined $_[0]->{having}) {
149       my $frag;
150       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
151       push(@{$self->{having_bind}}, @extra);
152       $ret .= $self->_sqlcase(' having ').$frag;
153     }
154     if (defined $_[0]->{order_by}) {
155       $ret .= $self->_order_by($_[0]->{order_by});
156     }
157   } elsif (ref $_[0] eq 'SCALAR') {
158     $ret = $self->_sqlcase(' order by ').${ $_[0] };
159   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
160     my @order = @{+shift};
161     $ret = $self->_sqlcase(' order by ')
162           .join(', ', map {
163                         my $r = $self->_order_by($_, @_);
164                         $r =~ s/^ ?ORDER BY //i;
165                         $r;
166                       } @order);
167   } else {
168     $ret = $self->SUPER::_order_by(@_);
169   }
170   return $ret;
171 }
172
173 sub _order_directions {
174   my ($self, $order) = @_;
175   $order = $order->{order_by} if ref $order eq 'HASH';
176   return $self->SUPER::_order_directions($order);
177 }
178
179 sub _table {
180   my ($self, $from) = @_;
181   if (ref $from eq 'ARRAY') {
182     return $self->_recurse_from(@$from);
183   } elsif (ref $from eq 'HASH') {
184     return $self->_make_as($from);
185   } else {
186     return $from; # would love to quote here but _table ends up getting called
187                   # twice during an ->select without a limit clause due to
188                   # the way S::A::Limit->select works. should maybe consider
189                   # bypassing this and doing S::A::select($self, ...) in
190                   # our select method above. meantime, quoting shims have
191                   # been added to select/insert/update/delete here
192   }
193 }
194
195 sub _recurse_from {
196   my ($self, $from, @join) = @_;
197   my @sqlf;
198   push(@sqlf, $self->_make_as($from));
199   foreach my $j (@join) {
200     my ($to, $on) = @$j;
201
202     # check whether a join type exists
203     my $join_clause = '';
204     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
205     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
206       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
207     } else {
208       $join_clause = ' JOIN ';
209     }
210     push(@sqlf, $join_clause);
211
212     if (ref $to eq 'ARRAY') {
213       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
214     } else {
215       push(@sqlf, $self->_make_as($to));
216     }
217     push(@sqlf, ' ON ', $self->_join_condition($on));
218   }
219   return join('', @sqlf);
220 }
221
222 sub _make_as {
223   my ($self, $from) = @_;
224   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
225                      reverse each %{$self->_skip_options($from)});
226 }
227
228 sub _skip_options {
229   my ($self, $hash) = @_;
230   my $clean_hash = {};
231   $clean_hash->{$_} = $hash->{$_}
232     for grep {!/^-/} keys %$hash;
233   return $clean_hash;
234 }
235
236 sub _join_condition {
237   my ($self, $cond) = @_;
238   if (ref $cond eq 'HASH') {
239     my %j;
240     for (keys %$cond) {
241       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
242     };
243     return $self->_recurse_where(\%j);
244   } elsif (ref $cond eq 'ARRAY') {
245     return join(' OR ', map { $self->_join_condition($_) } @$cond);
246   } else {
247     die "Can't handle this yet!";
248   }
249 }
250
251 sub _quote {
252   my ($self, $label) = @_;
253   return '' unless defined $label;
254   return "*" if $label eq '*';
255   return $label unless $self->{quote_char};
256   if(ref $self->{quote_char} eq "ARRAY"){
257     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
258       if !defined $self->{name_sep};
259     my $sep = $self->{name_sep};
260     return join($self->{name_sep},
261         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
262        split(/\Q$sep\E/,$label));
263   }
264   return $self->SUPER::_quote($label);
265 }
266
267 sub limit_dialect {
268     my $self = shift;
269     $self->{limit_dialect} = shift if @_;
270     return $self->{limit_dialect};
271 }
272
273 sub quote_char {
274     my $self = shift;
275     $self->{quote_char} = shift if @_;
276     return $self->{quote_char};
277 }
278
279 sub name_sep {
280     my $self = shift;
281     $self->{name_sep} = shift if @_;
282     return $self->{name_sep};
283 }
284
285 } # End of BEGIN block
286
287 =head1 NAME
288
289 DBIx::Class::Storage::DBI - DBI storage handler
290
291 =head1 SYNOPSIS
292
293 =head1 DESCRIPTION
294
295 This class represents the connection to an RDBMS via L<DBI>.  See
296 L<DBIx::Class::Storage> for general information.  This pod only
297 documents DBI-specific methods and behaviors.
298
299 =head1 METHODS
300
301 =cut
302
303 sub new {
304   my $new = shift->next::method(@_);
305
306   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
307   $new->transaction_depth(0);
308   $new->_sql_maker_opts({});
309   $new->{_in_dbh_do} = 0;
310   $new->{_dbh_gen} = 0;
311
312   $new;
313 }
314
315 =head2 connect_info
316
317 The arguments of C<connect_info> are always a single array reference.
318
319 This is normally accessed via L<DBIx::Class::Schema/connection>, which
320 encapsulates its argument list in an arrayref before calling
321 C<connect_info> here.
322
323 The arrayref can either contain the same set of arguments one would
324 normally pass to L<DBI/connect>, or a lone code reference which returns
325 a connected database handle.
326
327 In either case, if the final argument in your connect_info happens
328 to be a hashref, C<connect_info> will look there for several
329 connection-specific options:
330
331 =over 4
332
333 =item on_connect_do
334
335 This can be set to an arrayref of literal sql statements, which will
336 be executed immediately after making the connection to the database
337 every time we [re-]connect.
338
339 =item limit_dialect 
340
341 Sets the limit dialect. This is useful for JDBC-bridge among others
342 where the remote SQL-dialect cannot be determined by the name of the
343 driver alone.
344
345 =item quote_char
346
347 Specifies what characters to use to quote table and column names. If 
348 you use this you will want to specify L<name_sep> as well.
349
350 quote_char expects either a single character, in which case is it is placed
351 on either side of the table/column, or an arrayref of length 2 in which case the
352 table/column name is placed between the elements.
353
354 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
355 use C<quote_char =E<gt> [qw/[ ]/]>.
356
357 =item name_sep
358
359 This only needs to be used in conjunction with L<quote_char>, and is used to 
360 specify the charecter that seperates elements (schemas, tables, columns) from 
361 each other. In most cases this is simply a C<.>.
362
363 =back
364
365 These options can be mixed in with your other L<DBI> connection attributes,
366 or placed in a seperate hashref after all other normal L<DBI> connection
367 arguments.
368
369 Every time C<connect_info> is invoked, any previous settings for
370 these options will be cleared before setting the new ones, regardless of
371 whether any options are specified in the new C<connect_info>.
372
373 Important note:  DBIC expects the returned database handle provided by 
374 a subref argument to have RaiseError set on it.  If it doesn't, things
375 might not work very well, YMMV.  If you don't use a subref, DBIC will
376 force this setting for you anyways.  Setting HandleError to anything
377 other than simple exception object wrapper might cause problems too.
378
379 Examples:
380
381   # Simple SQLite connection
382   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
383
384   # Connect via subref
385   ->connect_info([ sub { DBI->connect(...) } ]);
386
387   # A bit more complicated
388   ->connect_info(
389     [
390       'dbi:Pg:dbname=foo',
391       'postgres',
392       'my_pg_password',
393       { AutoCommit => 0 },
394       { quote_char => q{"}, name_sep => q{.} },
395     ]
396   );
397
398   # Equivalent to the previous example
399   ->connect_info(
400     [
401       'dbi:Pg:dbname=foo',
402       'postgres',
403       'my_pg_password',
404       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
405     ]
406   );
407
408   # Subref + DBIC-specific connection options
409   ->connect_info(
410     [
411       sub { DBI->connect(...) },
412       {
413           quote_char => q{`},
414           name_sep => q{@},
415           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
416       },
417     ]
418   );
419
420 =cut
421
422 sub connect_info {
423   my ($self, $info_arg) = @_;
424
425   return $self->_connect_info if !$info_arg;
426
427   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
428   #  the new set of options
429   $self->_sql_maker(undef);
430   $self->_sql_maker_opts({});
431
432   my $info = [ @$info_arg ]; # copy because we can alter it
433   my $last_info = $info->[-1];
434   if(ref $last_info eq 'HASH') {
435     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
436       $self->on_connect_do($on_connect_do);
437     }
438     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
439       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
440         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
441       }
442     }
443
444     # Get rid of any trailing empty hashref
445     pop(@$info) if !keys %$last_info;
446   }
447
448   $self->_connect_info($info);
449 }
450
451 =head2 on_connect_do
452
453 This method is deprecated in favor of setting via L</connect_info>.
454
455 =head2 dbh_do
456
457 Arguments: $subref, @extra_coderef_args?
458
459 Execute the given subref using the new exception-based connection management.
460
461 The first two arguments will be the storage object that C<dbh_do> was called
462 on and a database handle to use.  Any additional arguments will be passed
463 verbatim to the called subref as arguments 2 and onwards.
464
465 Using this (instead of $self->_dbh or $self->dbh) ensures correct
466 exception handling and reconnection (or failover in future subclasses).
467
468 Your subref should have no side-effects outside of the database, as
469 there is the potential for your subref to be partially double-executed
470 if the database connection was stale/dysfunctional.
471
472 Example:
473
474   my @stuff = $schema->storage->dbh_do(
475     sub {
476       my ($storage, $dbh, @cols) = @_;
477       my $cols = join(q{, }, @cols);
478       $dbh->selectrow_array("SELECT $cols FROM foo");
479     },
480     @column_list
481   );
482
483 =cut
484
485 sub dbh_do {
486   my $self = shift;
487   my $coderef = shift;
488
489   ref $coderef eq 'CODE' or $self->throw_exception
490     ('$coderef must be a CODE reference');
491
492   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
493   local $self->{_in_dbh_do} = 1;
494
495   my @result;
496   my $want_array = wantarray;
497
498   eval {
499     $self->_verify_pid if $self->_dbh;
500     $self->_populate_dbh if !$self->_dbh;
501     if($want_array) {
502         @result = $coderef->($self, $self->_dbh, @_);
503     }
504     elsif(defined $want_array) {
505         $result[0] = $coderef->($self, $self->_dbh, @_);
506     }
507     else {
508         $coderef->($self, $self->_dbh, @_);
509     }
510   };
511
512   my $exception = $@;
513   if(!$exception) { return $want_array ? @result : $result[0] }
514
515   $self->throw_exception($exception) if $self->connected;
516
517   # We were not connected - reconnect and retry, but let any
518   #  exception fall right through this time
519   $self->_populate_dbh;
520   $coderef->($self, $self->_dbh, @_);
521 }
522
523 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
524 # It also informs dbh_do to bypass itself while under the direction of txn_do,
525 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
526 sub txn_do {
527   my $self = shift;
528   my $coderef = shift;
529
530   ref $coderef eq 'CODE' or $self->throw_exception
531     ('$coderef must be a CODE reference');
532
533   local $self->{_in_dbh_do} = 1;
534
535   my @result;
536   my $want_array = wantarray;
537
538   my $tried = 0;
539   while(1) {
540     eval {
541       $self->_verify_pid if $self->_dbh;
542       $self->_populate_dbh if !$self->_dbh;
543
544       $self->txn_begin;
545       if($want_array) {
546           @result = $coderef->(@_);
547       }
548       elsif(defined $want_array) {
549           $result[0] = $coderef->(@_);
550       }
551       else {
552           $coderef->(@_);
553       }
554       $self->txn_commit;
555     };
556
557     my $exception = $@;
558     if(!$exception) { return $want_array ? @result : $result[0] }
559
560     if($tried++ > 0 || $self->connected) {
561       eval { $self->txn_rollback };
562       my $rollback_exception = $@;
563       if($rollback_exception) {
564         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
565         $self->throw_exception($exception)  # propagate nested rollback
566           if $rollback_exception =~ /$exception_class/;
567
568         $self->throw_exception(
569           "Transaction aborted: ${exception}. "
570           . "Rollback failed: ${rollback_exception}"
571         );
572       }
573       $self->throw_exception($exception)
574     }
575
576     # We were not connected, and was first try - reconnect and retry
577     # via the while loop
578     $self->_populate_dbh;
579   }
580 }
581
582 =head2 disconnect
583
584 Our C<disconnect> method also performs a rollback first if the
585 database is not in C<AutoCommit> mode.
586
587 =cut
588
589 sub disconnect {
590   my ($self) = @_;
591
592   if( $self->connected ) {
593     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
594     $self->_dbh->disconnect;
595     $self->_dbh(undef);
596     $self->{_dbh_gen}++;
597   }
598 }
599
600 sub connected {
601   my ($self) = @_;
602
603   if(my $dbh = $self->_dbh) {
604       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
605           $self->_dbh(undef);
606           $self->{_dbh_gen}++;
607           return;
608       }
609       else {
610           $self->_verify_pid;
611       }
612       return ($dbh->FETCH('Active') && $dbh->ping);
613   }
614
615   return 0;
616 }
617
618 # handle pid changes correctly
619 #  NOTE: assumes $self->_dbh is a valid $dbh
620 sub _verify_pid {
621   my ($self) = @_;
622
623   return if $self->_conn_pid == $$;
624
625   $self->_dbh->{InactiveDestroy} = 1;
626   $self->_dbh(undef);
627   $self->{_dbh_gen}++;
628
629   return;
630 }
631
632 sub ensure_connected {
633   my ($self) = @_;
634
635   unless ($self->connected) {
636     $self->_populate_dbh;
637   }
638 }
639
640 =head2 dbh
641
642 Returns the dbh - a data base handle of class L<DBI>.
643
644 =cut
645
646 sub dbh {
647   my ($self) = @_;
648
649   $self->ensure_connected;
650   return $self->_dbh;
651 }
652
653 sub _sql_maker_args {
654     my ($self) = @_;
655     
656     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
657 }
658
659 sub sql_maker {
660   my ($self) = @_;
661   unless ($self->_sql_maker) {
662     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
663   }
664   return $self->_sql_maker;
665 }
666
667 sub _populate_dbh {
668   my ($self) = @_;
669   my @info = @{$self->_connect_info || []};
670   $self->_dbh($self->_connect(@info));
671
672   if(ref $self eq 'DBIx::Class::Storage::DBI') {
673     my $driver = $self->_dbh->{Driver}->{Name};
674     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
675       bless $self, "DBIx::Class::Storage::DBI::${driver}";
676       $self->_rebless() if $self->can('_rebless');
677     }
678   }
679
680   # if on-connect sql statements are given execute them
681   foreach my $sql_statement (@{$self->on_connect_do || []}) {
682     $self->debugobj->query_start($sql_statement) if $self->debug();
683     $self->_dbh->do($sql_statement);
684     $self->debugobj->query_end($sql_statement) if $self->debug();
685   }
686
687   $self->_conn_pid($$);
688   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
689 }
690
691 sub _connect {
692   my ($self, @info) = @_;
693
694   $self->throw_exception("You failed to provide any connection info")
695       if !@info;
696
697   my ($old_connect_via, $dbh);
698
699   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
700       $old_connect_via = $DBI::connect_via;
701       $DBI::connect_via = 'connect';
702   }
703
704   eval {
705     if(ref $info[0] eq 'CODE') {
706        $dbh = &{$info[0]}
707     }
708     else {
709        $dbh = DBI->connect(@info);
710        $dbh->{RaiseError} = 1;
711        $dbh->{PrintError} = 0;
712        $dbh->{PrintWarn} = 0;
713     }
714   };
715
716   $DBI::connect_via = $old_connect_via if $old_connect_via;
717
718   if (!$dbh || $@) {
719     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
720   }
721
722   $dbh;
723 }
724
725 sub _dbh_txn_begin {
726   my ($self, $dbh) = @_;
727   if ($dbh->{AutoCommit}) {
728     $self->debugobj->txn_begin()
729       if ($self->debug);
730     $dbh->begin_work;
731   }
732 }
733
734 sub txn_begin {
735   my $self = shift;
736   $self->dbh_do($self->can('_dbh_txn_begin'))
737     if $self->{transaction_depth}++ == 0;
738 }
739
740 sub _dbh_txn_commit {
741   my ($self, $dbh) = @_;
742   if ($self->{transaction_depth} == 0) {
743     unless ($dbh->{AutoCommit}) {
744       $self->debugobj->txn_commit()
745         if ($self->debug);
746       $dbh->commit;
747     }
748   }
749   else {
750     if (--$self->{transaction_depth} == 0) {
751       $self->debugobj->txn_commit()
752         if ($self->debug);
753       $dbh->commit;
754     }
755   }
756 }
757
758 sub txn_commit {
759   my $self = shift;
760   $self->dbh_do($self->can('_dbh_txn_commit'));
761 }
762
763 sub _dbh_txn_rollback {
764   my ($self, $dbh) = @_;
765   if ($self->{transaction_depth} == 0) {
766     unless ($dbh->{AutoCommit}) {
767       $self->debugobj->txn_rollback()
768         if ($self->debug);
769       $dbh->rollback;
770     }
771   }
772   else {
773     if (--$self->{transaction_depth} == 0) {
774       $self->debugobj->txn_rollback()
775         if ($self->debug);
776       $dbh->rollback;
777     }
778     else {
779       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
780     }
781   }
782 }
783
784 sub txn_rollback {
785   my $self = shift;
786
787   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
788   if ($@) {
789     my $error = $@;
790     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
791     $error =~ /$exception_class/ and $self->throw_exception($error);
792     $self->{transaction_depth} = 0;          # ensure that a failed rollback
793     $self->throw_exception($error);          # resets the transaction depth
794   }
795 }
796
797 # This used to be the top-half of _execute.  It was split out to make it
798 #  easier to override in NoBindVars without duping the rest.  It takes up
799 #  all of _execute's args, and emits $sql, @bind.
800 sub _prep_for_execute {
801   my ($self, $op, $extra_bind, $ident, @args) = @_;
802
803   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
804   unshift(@bind, @$extra_bind) if $extra_bind;
805   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
806
807   return ($sql, @bind);
808 }
809
810 sub _execute {
811   my $self = shift;
812
813   my ($sql, @bind) = $self->_prep_for_execute(@_);
814
815   if ($self->debug) {
816       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
817       $self->debugobj->query_start($sql, @debug_bind);
818   }
819
820   my $sth = $self->sth($sql);
821
822   my $rv;
823   if ($sth) {
824     my $time = time();
825     $rv = eval { $sth->execute(@bind) };
826
827     if ($@ || !$rv) {
828       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
829     }
830   } else {
831     $self->throw_exception("'$sql' did not generate a statement.");
832   }
833   if ($self->debug) {
834       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind; 
835       $self->debugobj->query_end($sql, @debug_bind);
836   }
837   return (wantarray ? ($rv, $sth, @bind) : $rv);
838 }
839
840 sub insert {
841   my ($self, $ident, $to_insert) = @_;
842   $self->throw_exception(
843     "Couldn't insert ".join(', ',
844       map "$_ => $to_insert->{$_}", keys %$to_insert
845     )." into ${ident}"
846   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
847   return $to_insert;
848 }
849
850 sub insert_bulk {
851   my ($self, $table, $cols, $data) = @_;
852   my ($sql, @bind) = $self->sql_maker->insert($table, @$data);
853
854   if ($self->debug) {
855       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
856       $self->debugobj->query_start($sql, @debug_bind);
857   }
858   my $sth = eval { $self->sth($sql,'insert') };
859
860   if (!$sth || $@) {
861     $self->throw_exception(
862       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
863     );
864   }
865 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
866
867   my $rv;
868   if ($sth) {
869     my $time = time();
870     $rv = eval { $sth->execute_array({ ArrayTupleFetch => sub { return shift @$data; }}) };
871
872     if ($@ || !$rv) {
873       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
874     }
875   } else {
876     $self->throw_exception("'$sql' did not generate a statement.");
877   }
878   if ($self->debug) {
879       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
880       $self->debugobj->query_end($sql, @debug_bind);
881   }
882   return (wantarray ? ($rv, $sth, @bind) : $rv);
883 }
884
885 sub update {
886   return shift->_execute('update' => [], @_);
887 }
888
889 sub delete {
890   return shift->_execute('delete' => [], @_);
891 }
892
893 sub _select {
894   my ($self, $ident, $select, $condition, $attrs) = @_;
895   my $order = $attrs->{order_by};
896   if (ref $condition eq 'SCALAR') {
897     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
898   }
899   if (exists $attrs->{group_by} || $attrs->{having}) {
900     $order = {
901       group_by => $attrs->{group_by},
902       having => $attrs->{having},
903       ($order ? (order_by => $order) : ())
904     };
905   }
906   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
907   if ($attrs->{software_limit} ||
908       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
909         $attrs->{software_limit} = 1;
910   } else {
911     $self->throw_exception("rows attribute must be positive if present")
912       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
913     push @args, $attrs->{rows}, $attrs->{offset};
914   }
915   return $self->_execute(@args);
916 }
917
918 =head2 select
919
920 =over 4
921
922 =item Arguments: $ident, $select, $condition, $attrs
923
924 =back
925
926 Handle a SQL select statement.
927
928 =cut
929
930 sub select {
931   my $self = shift;
932   my ($ident, $select, $condition, $attrs) = @_;
933   return $self->cursor->new($self, \@_, $attrs);
934 }
935
936 sub select_single {
937   my $self = shift;
938   my ($rv, $sth, @bind) = $self->_select(@_);
939   my @row = $sth->fetchrow_array;
940   # Need to call finish() to work round broken DBDs
941   $sth->finish();
942   return @row;
943 }
944
945 =head2 sth
946
947 =over 4
948
949 =item Arguments: $sql
950
951 =back
952
953 Returns a L<DBI> sth (statement handle) for the supplied SQL.
954
955 =cut
956
957 sub _dbh_sth {
958   my ($self, $dbh, $sql) = @_;
959   # 3 is the if_active parameter which avoids active sth re-use
960   $dbh->prepare_cached($sql, {}, 3) or
961     $self->throw_exception(
962       'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
963     );
964 }
965
966 sub sth {
967   my ($self, $sql) = @_;
968   $self->dbh_do($self->can('_dbh_sth'), $sql);
969 }
970
971 sub _dbh_columns_info_for {
972   my ($self, $dbh, $table) = @_;
973
974   if ($dbh->can('column_info')) {
975     my %result;
976     eval {
977       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
978       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
979       $sth->execute();
980       while ( my $info = $sth->fetchrow_hashref() ){
981         my %column_info;
982         $column_info{data_type}   = $info->{TYPE_NAME};
983         $column_info{size}      = $info->{COLUMN_SIZE};
984         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
985         $column_info{default_value} = $info->{COLUMN_DEF};
986         my $col_name = $info->{COLUMN_NAME};
987         $col_name =~ s/^\"(.*)\"$/$1/;
988
989         $result{$col_name} = \%column_info;
990       }
991     };
992     return \%result if !$@ && scalar keys %result;
993   }
994
995   my %result;
996   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
997   $sth->execute;
998   my @columns = @{$sth->{NAME_lc}};
999   for my $i ( 0 .. $#columns ){
1000     my %column_info;
1001     my $type_num = $sth->{TYPE}->[$i];
1002     my $type_name;
1003     if(defined $type_num && $dbh->can('type_info')) {
1004       my $type_info = $dbh->type_info($type_num);
1005       $type_name = $type_info->{TYPE_NAME} if $type_info;
1006     }
1007     $column_info{data_type} = $type_name ? $type_name : $type_num;
1008     $column_info{size} = $sth->{PRECISION}->[$i];
1009     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1010
1011     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1012       $column_info{data_type} = $1;
1013       $column_info{size}    = $2;
1014     }
1015
1016     $result{$columns[$i]} = \%column_info;
1017   }
1018
1019   return \%result;
1020 }
1021
1022 sub columns_info_for {
1023   my ($self, $table) = @_;
1024   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
1025 }
1026
1027 =head2 last_insert_id
1028
1029 Return the row id of the last insert.
1030
1031 =cut
1032
1033 sub _dbh_last_insert_id {
1034     my ($self, $dbh, $source, $col) = @_;
1035     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1036     $dbh->func('last_insert_rowid');
1037 }
1038
1039 sub last_insert_id {
1040   my $self = shift;
1041   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1042 }
1043
1044 =head2 sqlt_type
1045
1046 Returns the database driver name.
1047
1048 =cut
1049
1050 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1051
1052 =head2 create_ddl_dir (EXPERIMENTAL)
1053
1054 =over 4
1055
1056 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1057
1058 =back
1059
1060 Creates a SQL file based on the Schema, for each of the specified
1061 database types, in the given directory.
1062
1063 Note that this feature is currently EXPERIMENTAL and may not work correctly
1064 across all databases, or fully handle complex relationships.
1065
1066 =cut
1067
1068 sub create_ddl_dir
1069 {
1070   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1071
1072   if(!$dir || !-d $dir)
1073   {
1074     warn "No directory given, using ./\n";
1075     $dir = "./";
1076   }
1077   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1078   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1079   $version ||= $schema->VERSION || '1.x';
1080   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1081
1082   eval "use SQL::Translator";
1083   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1084
1085   my $sqlt = SQL::Translator->new($sqltargs);
1086   foreach my $db (@$databases)
1087   {
1088     $sqlt->reset();
1089     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1090 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1091     $sqlt->data($schema);
1092     $sqlt->producer($db);
1093
1094     my $file;
1095     my $filename = $schema->ddl_filename($db, $dir, $version);
1096     if(-e $filename)
1097     {
1098       $self->throw_exception("$filename already exists, skipping $db");
1099       next;
1100     }
1101     open($file, ">$filename") 
1102       or $self->throw_exception("Can't open $filename for writing ($!)");
1103     my $output = $sqlt->translate;
1104 #use Data::Dumper;
1105 #    print join(":", keys %{$schema->source_registrations});
1106 #    print Dumper($sqlt->schema);
1107     if(!$output)
1108     {
1109       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1110       next;
1111     }
1112     print $file $output;
1113     close($file);
1114   }
1115
1116 }
1117
1118 =head2 deployment_statements
1119
1120 =over 4
1121
1122 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1123
1124 =back
1125
1126 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1127 The database driver name is given by C<$type>, though the value from
1128 L</sqlt_type> is used if it is not specified.
1129
1130 C<$directory> is used to return statements from files in a previously created
1131 L</create_ddl_dir> directory and is optional. The filenames are constructed
1132 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1133
1134 If no C<$directory> is specified then the statements are constructed on the
1135 fly using L<SQL::Translator> and C<$version> is ignored.
1136
1137 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1138
1139 =cut
1140
1141 sub deployment_statements {
1142   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1143   # Need to be connected to get the correct sqlt_type
1144   $self->ensure_connected() unless $type;
1145   $type ||= $self->sqlt_type;
1146   $version ||= $schema->VERSION || '1.x';
1147   $dir ||= './';
1148   eval "use SQL::Translator";
1149   if(!$@)
1150   {
1151     eval "use SQL::Translator::Parser::DBIx::Class;";
1152     $self->throw_exception($@) if $@;
1153     eval "use SQL::Translator::Producer::${type};";
1154     $self->throw_exception($@) if $@;
1155     my $tr = SQL::Translator->new(%$sqltargs);
1156     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1157     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1158   }
1159
1160   my $filename = $schema->ddl_filename($type, $dir, $version);
1161   if(!-f $filename)
1162   {
1163 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1164       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1165       return;
1166   }
1167   my $file;
1168   open($file, "<$filename") 
1169       or $self->throw_exception("Can't open $filename ($!)");
1170   my @rows = <$file>;
1171   close($file);
1172
1173   return join('', @rows);
1174   
1175 }
1176
1177 sub deploy {
1178   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1179   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1180     for ( split(";\n", $statement)) {
1181       next if($_ =~ /^--/);
1182       next if(!$_);
1183 #      next if($_ =~ /^DROP/m);
1184       next if($_ =~ /^BEGIN TRANSACTION/m);
1185       next if($_ =~ /^COMMIT/m);
1186       next if $_ =~ /^\s+$/; # skip whitespace only
1187       $self->debugobj->query_start($_) if $self->debug;
1188       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1189       $self->debugobj->query_end($_) if $self->debug;
1190     }
1191   }
1192 }
1193
1194 =head2 datetime_parser
1195
1196 Returns the datetime parser class
1197
1198 =cut
1199
1200 sub datetime_parser {
1201   my $self = shift;
1202   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1203 }
1204
1205 =head2 datetime_parser_type
1206
1207 Defines (returns) the datetime parser class - currently hardwired to
1208 L<DateTime::Format::MySQL>
1209
1210 =cut
1211
1212 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1213
1214 =head2 build_datetime_parser
1215
1216 See L</datetime_parser>
1217
1218 =cut
1219
1220 sub build_datetime_parser {
1221   my $self = shift;
1222   my $type = $self->datetime_parser_type(@_);
1223   eval "use ${type}";
1224   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1225   return $type;
1226 }
1227
1228 sub DESTROY {
1229   my $self = shift;
1230   return if !$self->_dbh;
1231   $self->_verify_pid;
1232   $self->_dbh(undef);
1233 }
1234
1235 1;
1236
1237 =head1 SQL METHODS
1238
1239 The module defines a set of methods within the DBIC::SQL::Abstract
1240 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1241 SQL query functions.
1242
1243 The following methods are extended:-
1244
1245 =over 4
1246
1247 =item delete
1248
1249 =item insert
1250
1251 =item select
1252
1253 =item update
1254
1255 =item limit_dialect
1256
1257 See L</connect_info> for details.
1258 For setting, this method is deprecated in favor of L</connect_info>.
1259
1260 =item quote_char
1261
1262 See L</connect_info> for details.
1263 For setting, this method is deprecated in favor of L</connect_info>.
1264
1265 =item name_sep
1266
1267 See L</connect_info> for details.
1268 For setting, this method is deprecated in favor of L</connect_info>.
1269
1270 =back
1271
1272 =head1 AUTHORS
1273
1274 Matt S. Trout <mst@shadowcatsystems.co.uk>
1275
1276 Andy Grundman <andy@hybridized.org>
1277
1278 =head1 LICENSE
1279
1280 You may distribute this code under the same terms as Perl itself.
1281
1282 =cut