a4abf35d6da2af236ed8a150fe3d6da93993f529
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 sub _find_syntax {
62   my ($self, $syntax) = @_;
63   my $dbhname = ref $syntax eq 'HASH' ? $syntax->{Driver}{Name} : '';
64   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
65     return 'RowNumberOver';
66   }
67
68   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
69 }
70
71 sub select {
72   my ($self, $table, $fields, $where, $order, @rest) = @_;
73   $table = $self->_quote($table) unless ref($table);
74   local $self->{rownum_hack_count} = 1
75     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
76   @rest = (-1) unless defined $rest[0];
77   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
78     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
79   local $self->{having_bind} = [];
80   my ($sql, @ret) = $self->SUPER::select(
81     $table, $self->_recurse_fields($fields), $where, $order, @rest
82   );
83   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
84 }
85
86 sub insert {
87   my $self = shift;
88   my $table = shift;
89   $table = $self->_quote($table) unless ref($table);
90   $self->SUPER::insert($table, @_);
91 }
92
93 sub update {
94   my $self = shift;
95   my $table = shift;
96   $table = $self->_quote($table) unless ref($table);
97   $self->SUPER::update($table, @_);
98 }
99
100 sub delete {
101   my $self = shift;
102   my $table = shift;
103   $table = $self->_quote($table) unless ref($table);
104   $self->SUPER::delete($table, @_);
105 }
106
107 sub _emulate_limit {
108   my $self = shift;
109   if ($_[3] == -1) {
110     return $_[1].$self->_order_by($_[2]);
111   } else {
112     return $self->SUPER::_emulate_limit(@_);
113   }
114 }
115
116 sub _recurse_fields {
117   my ($self, $fields) = @_;
118   my $ref = ref $fields;
119   return $self->_quote($fields) unless $ref;
120   return $$fields if $ref eq 'SCALAR';
121
122   if ($ref eq 'ARRAY') {
123     return join(', ', map {
124       $self->_recurse_fields($_)
125       .(exists $self->{rownum_hack_count}
126          ? ' AS col'.$self->{rownum_hack_count}++
127          : '')
128      } @$fields);
129   } elsif ($ref eq 'HASH') {
130     foreach my $func (keys %$fields) {
131       return $self->_sqlcase($func)
132         .'( '.$self->_recurse_fields($fields->{$func}).' )';
133     }
134   }
135 }
136
137 sub _order_by {
138   my $self = shift;
139   my $ret = '';
140   my @extra;
141   if (ref $_[0] eq 'HASH') {
142     if (defined $_[0]->{group_by}) {
143       $ret = $self->_sqlcase(' group by ')
144                .$self->_recurse_fields($_[0]->{group_by});
145     }
146     if (defined $_[0]->{having}) {
147       my $frag;
148       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
149       push(@{$self->{having_bind}}, @extra);
150       $ret .= $self->_sqlcase(' having ').$frag;
151     }
152     if (defined $_[0]->{order_by}) {
153       $ret .= $self->_order_by($_[0]->{order_by});
154     }
155   } elsif (ref $_[0] eq 'SCALAR') {
156     $ret = $self->_sqlcase(' order by ').${ $_[0] };
157   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
158     my @order = @{+shift};
159     $ret = $self->_sqlcase(' order by ')
160           .join(', ', map {
161                         my $r = $self->_order_by($_, @_);
162                         $r =~ s/^ ?ORDER BY //i;
163                         $r;
164                       } @order);
165   } else {
166     $ret = $self->SUPER::_order_by(@_);
167   }
168   return $ret;
169 }
170
171 sub _order_directions {
172   my ($self, $order) = @_;
173   $order = $order->{order_by} if ref $order eq 'HASH';
174   return $self->SUPER::_order_directions($order);
175 }
176
177 sub _table {
178   my ($self, $from) = @_;
179   if (ref $from eq 'ARRAY') {
180     return $self->_recurse_from(@$from);
181   } elsif (ref $from eq 'HASH') {
182     return $self->_make_as($from);
183   } else {
184     return $from; # would love to quote here but _table ends up getting called
185                   # twice during an ->select without a limit clause due to
186                   # the way S::A::Limit->select works. should maybe consider
187                   # bypassing this and doing S::A::select($self, ...) in
188                   # our select method above. meantime, quoting shims have
189                   # been added to select/insert/update/delete here
190   }
191 }
192
193 sub _recurse_from {
194   my ($self, $from, @join) = @_;
195   my @sqlf;
196   push(@sqlf, $self->_make_as($from));
197   foreach my $j (@join) {
198     my ($to, $on) = @$j;
199
200     # check whether a join type exists
201     my $join_clause = '';
202     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
203     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
204       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
205     } else {
206       $join_clause = ' JOIN ';
207     }
208     push(@sqlf, $join_clause);
209
210     if (ref $to eq 'ARRAY') {
211       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
212     } else {
213       push(@sqlf, $self->_make_as($to));
214     }
215     push(@sqlf, ' ON ', $self->_join_condition($on));
216   }
217   return join('', @sqlf);
218 }
219
220 sub _make_as {
221   my ($self, $from) = @_;
222   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
223                      reverse each %{$self->_skip_options($from)});
224 }
225
226 sub _skip_options {
227   my ($self, $hash) = @_;
228   my $clean_hash = {};
229   $clean_hash->{$_} = $hash->{$_}
230     for grep {!/^-/} keys %$hash;
231   return $clean_hash;
232 }
233
234 sub _join_condition {
235   my ($self, $cond) = @_;
236   if (ref $cond eq 'HASH') {
237     my %j;
238     for (keys %$cond) {
239       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
240     };
241     return $self->_recurse_where(\%j);
242   } elsif (ref $cond eq 'ARRAY') {
243     return join(' OR ', map { $self->_join_condition($_) } @$cond);
244   } else {
245     die "Can't handle this yet!";
246   }
247 }
248
249 sub _quote {
250   my ($self, $label) = @_;
251   return '' unless defined $label;
252   return "*" if $label eq '*';
253   return $label unless $self->{quote_char};
254   if(ref $self->{quote_char} eq "ARRAY"){
255     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
256       if !defined $self->{name_sep};
257     my $sep = $self->{name_sep};
258     return join($self->{name_sep},
259         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
260        split(/\Q$sep\E/,$label));
261   }
262   return $self->SUPER::_quote($label);
263 }
264
265 sub limit_dialect {
266     my $self = shift;
267     $self->{limit_dialect} = shift if @_;
268     return $self->{limit_dialect};
269 }
270
271 sub quote_char {
272     my $self = shift;
273     $self->{quote_char} = shift if @_;
274     return $self->{quote_char};
275 }
276
277 sub name_sep {
278     my $self = shift;
279     $self->{name_sep} = shift if @_;
280     return $self->{name_sep};
281 }
282
283 } # End of BEGIN block
284
285 =head1 NAME
286
287 DBIx::Class::Storage::DBI - DBI storage handler
288
289 =head1 SYNOPSIS
290
291 =head1 DESCRIPTION
292
293 This class represents the connection to an RDBMS via L<DBI>.  See
294 L<DBIx::Class::Storage> for general information.  This pod only
295 documents DBI-specific methods and behaviors.
296
297 =head1 METHODS
298
299 =cut
300
301 sub new {
302   my $new = shift->next::method(@_);
303
304   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
305   $new->transaction_depth(0);
306   $new->_sql_maker_opts({});
307   $new->{_in_dbh_do} = 0;
308
309   $new;
310 }
311
312 =head2 connect_info
313
314 The arguments of C<connect_info> are always a single array reference.
315
316 This is normally accessed via L<DBIx::Class::Schema/connection>, which
317 encapsulates its argument list in an arrayref before calling
318 C<connect_info> here.
319
320 The arrayref can either contain the same set of arguments one would
321 normally pass to L<DBI/connect>, or a lone code reference which returns
322 a connected database handle.
323
324 In either case, if the final argument in your connect_info happens
325 to be a hashref, C<connect_info> will look there for several
326 connection-specific options:
327
328 =over 4
329
330 =item on_connect_do
331
332 This can be set to an arrayref of literal sql statements, which will
333 be executed immediately after making the connection to the database
334 every time we [re-]connect.
335
336 =item limit_dialect 
337
338 Sets the limit dialect. This is useful for JDBC-bridge among others
339 where the remote SQL-dialect cannot be determined by the name of the
340 driver alone.
341
342 =item quote_char
343
344 Specifies what characters to use to quote table and column names. If 
345 you use this you will want to specify L<name_sep> as well.
346
347 quote_char expects either a single character, in which case is it is placed
348 on either side of the table/column, or an arrayref of length 2 in which case the
349 table/column name is placed between the elements.
350
351 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
352 use C<quote_char =E<gt> [qw/[ ]/]>.
353
354 =item name_sep
355
356 This only needs to be used in conjunction with L<quote_char>, and is used to 
357 specify the charecter that seperates elements (schemas, tables, columns) from 
358 each other. In most cases this is simply a C<.>.
359
360 =back
361
362 These options can be mixed in with your other L<DBI> connection attributes,
363 or placed in a seperate hashref after all other normal L<DBI> connection
364 arguments.
365
366 Every time C<connect_info> is invoked, any previous settings for
367 these options will be cleared before setting the new ones, regardless of
368 whether any options are specified in the new C<connect_info>.
369
370 Important note:  DBIC expects the returned database handle provided by 
371 a subref argument to have RaiseError set on it.  If it doesn't, things
372 might not work very well, YMMV.  If you don't use a subref, DBIC will
373 force this setting for you anyways.  Setting HandleError to anything
374 other than simple exception object wrapper might cause problems too.
375
376 Examples:
377
378   # Simple SQLite connection
379   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
380
381   # Connect via subref
382   ->connect_info([ sub { DBI->connect(...) } ]);
383
384   # A bit more complicated
385   ->connect_info(
386     [
387       'dbi:Pg:dbname=foo',
388       'postgres',
389       'my_pg_password',
390       { AutoCommit => 0 },
391       { quote_char => q{"}, name_sep => q{.} },
392     ]
393   );
394
395   # Equivalent to the previous example
396   ->connect_info(
397     [
398       'dbi:Pg:dbname=foo',
399       'postgres',
400       'my_pg_password',
401       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
402     ]
403   );
404
405   # Subref + DBIC-specific connection options
406   ->connect_info(
407     [
408       sub { DBI->connect(...) },
409       {
410           quote_char => q{`},
411           name_sep => q{@},
412           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
413       },
414     ]
415   );
416
417 =cut
418
419 sub connect_info {
420   my ($self, $info_arg) = @_;
421
422   return $self->_connect_info if !$info_arg;
423
424   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
425   #  the new set of options
426   $self->_sql_maker(undef);
427   $self->_sql_maker_opts({});
428
429   my $info = [ @$info_arg ]; # copy because we can alter it
430   my $last_info = $info->[-1];
431   if(ref $last_info eq 'HASH') {
432     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
433       $self->on_connect_do($on_connect_do);
434     }
435     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
436       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
437         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
438       }
439     }
440
441     # Get rid of any trailing empty hashref
442     pop(@$info) if !keys %$last_info;
443   }
444
445   $self->_connect_info($info);
446 }
447
448 =head2 on_connect_do
449
450 This method is deprecated in favor of setting via L</connect_info>.
451
452 =head2 dbh_do
453
454 Arguments: $subref, @extra_coderef_args?
455
456 Execute the given subref using the new exception-based connection management.
457
458 The first two arguments will be the storage object that C<dbh_do> was called
459 on and a database handle to use.  Any additional arguments will be passed
460 verbatim to the called subref as arguments 2 and onwards.
461
462 Using this (instead of $self->_dbh or $self->dbh) ensures correct
463 exception handling and reconnection (or failover in future subclasses).
464
465 Your subref should have no side-effects outside of the database, as
466 there is the potential for your subref to be partially double-executed
467 if the database connection was stale/dysfunctional.
468
469 Example:
470
471   my @stuff = $schema->storage->dbh_do(
472     sub {
473       my ($storage, $dbh, @cols) = @_;
474       my $cols = join(q{, }, @cols);
475       $dbh->selectrow_array("SELECT $cols FROM foo");
476     },
477     @column_list
478   );
479
480 =cut
481
482 sub dbh_do {
483   my $self = shift;
484   my $coderef = shift;
485
486   ref $coderef eq 'CODE' or $self->throw_exception
487     ('$coderef must be a CODE reference');
488
489   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
490   local $self->{_in_dbh_do} = 1;
491
492   my @result;
493   my $want_array = wantarray;
494
495   eval {
496     $self->_verify_pid if $self->_dbh;
497     $self->_populate_dbh if !$self->_dbh;
498     if($want_array) {
499         @result = $coderef->($self, $self->_dbh, @_);
500     }
501     elsif(defined $want_array) {
502         $result[0] = $coderef->($self, $self->_dbh, @_);
503     }
504     else {
505         $coderef->($self, $self->_dbh, @_);
506     }
507   };
508
509   my $exception = $@;
510   if(!$exception) { return $want_array ? @result : $result[0] }
511
512   $self->throw_exception($exception) if $self->connected;
513
514   # We were not connected - reconnect and retry, but let any
515   #  exception fall right through this time
516   $self->_populate_dbh;
517   $coderef->($self, $self->_dbh, @_);
518 }
519
520 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
521 # It also informs dbh_do to bypass itself while under the direction of txn_do,
522 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
523 sub txn_do {
524   my $self = shift;
525   my $coderef = shift;
526
527   ref $coderef eq 'CODE' or $self->throw_exception
528     ('$coderef must be a CODE reference');
529
530   local $self->{_in_dbh_do} = 1;
531
532   my @result;
533   my $want_array = wantarray;
534
535   my $tried = 0;
536   while(1) {
537     eval {
538       $self->_verify_pid if $self->_dbh;
539       $self->_populate_dbh if !$self->_dbh;
540
541       $self->txn_begin;
542       if($want_array) {
543           @result = $coderef->(@_);
544       }
545       elsif(defined $want_array) {
546           $result[0] = $coderef->(@_);
547       }
548       else {
549           $coderef->(@_);
550       }
551       $self->txn_commit;
552     };
553
554     my $exception = $@;
555     if(!$exception) { return $want_array ? @result : $result[0] }
556
557     if($tried++ > 0 || $self->connected) {
558       eval { $self->txn_rollback };
559       my $rollback_exception = $@;
560       if($rollback_exception) {
561         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
562         $self->throw_exception($exception)  # propagate nested rollback
563           if $rollback_exception =~ /$exception_class/;
564
565         $self->throw_exception(
566           "Transaction aborted: ${exception}. "
567           . "Rollback failed: ${rollback_exception}"
568         );
569       }
570       $self->throw_exception($exception)
571     }
572
573     # We were not connected, and was first try - reconnect and retry
574     # via the while loop
575     $self->_populate_dbh;
576   }
577 }
578
579 =head2 disconnect
580
581 Our C<disconnect> method also performs a rollback first if the
582 database is not in C<AutoCommit> mode.
583
584 =cut
585
586 sub disconnect {
587   my ($self) = @_;
588
589   if( $self->connected ) {
590     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
591     $self->_dbh->disconnect;
592     $self->_dbh(undef);
593   }
594 }
595
596 sub connected {
597   my ($self) = @_;
598
599   if(my $dbh = $self->_dbh) {
600       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
601           return $self->_dbh(undef);
602       }
603       else {
604           $self->_verify_pid;
605       }
606       return ($dbh->FETCH('Active') && $dbh->ping);
607   }
608
609   return 0;
610 }
611
612 # handle pid changes correctly
613 #  NOTE: assumes $self->_dbh is a valid $dbh
614 sub _verify_pid {
615   my ($self) = @_;
616
617   return if $self->_conn_pid == $$;
618
619   $self->_dbh->{InactiveDestroy} = 1;
620   $self->_dbh(undef);
621
622   return;
623 }
624
625 sub ensure_connected {
626   my ($self) = @_;
627
628   unless ($self->connected) {
629     $self->_populate_dbh;
630   }
631 }
632
633 =head2 dbh
634
635 Returns the dbh - a data base handle of class L<DBI>.
636
637 =cut
638
639 sub dbh {
640   my ($self) = @_;
641
642   $self->ensure_connected;
643   return $self->_dbh;
644 }
645
646 sub _sql_maker_args {
647     my ($self) = @_;
648     
649     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
650 }
651
652 sub sql_maker {
653   my ($self) = @_;
654   unless ($self->_sql_maker) {
655     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
656   }
657   return $self->_sql_maker;
658 }
659
660 sub _populate_dbh {
661   my ($self) = @_;
662   my @info = @{$self->_connect_info || []};
663   $self->_dbh($self->_connect(@info));
664
665   if(ref $self eq 'DBIx::Class::Storage::DBI') {
666     my $driver = $self->_dbh->{Driver}->{Name};
667     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
668       bless $self, "DBIx::Class::Storage::DBI::${driver}";
669       $self->_rebless() if $self->can('_rebless');
670     }
671   }
672
673   # if on-connect sql statements are given execute them
674   foreach my $sql_statement (@{$self->on_connect_do || []}) {
675     $self->debugobj->query_start($sql_statement) if $self->debug();
676     $self->_dbh->do($sql_statement);
677     $self->debugobj->query_end($sql_statement) if $self->debug();
678   }
679
680   $self->_conn_pid($$);
681   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
682 }
683
684 sub _connect {
685   my ($self, @info) = @_;
686
687   $self->throw_exception("You failed to provide any connection info")
688       if !@info;
689
690   my ($old_connect_via, $dbh);
691
692   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
693       $old_connect_via = $DBI::connect_via;
694       $DBI::connect_via = 'connect';
695   }
696
697   eval {
698     if(ref $info[0] eq 'CODE') {
699        $dbh = &{$info[0]}
700     }
701     else {
702        $dbh = DBI->connect(@info);
703        $dbh->{RaiseError} = 1;
704        $dbh->{PrintError} = 0;
705        $dbh->{PrintWarn} = 0;
706     }
707   };
708
709   $DBI::connect_via = $old_connect_via if $old_connect_via;
710
711   if (!$dbh || $@) {
712     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
713   }
714
715   $dbh;
716 }
717
718 sub _dbh_txn_begin {
719   my ($self, $dbh) = @_;
720   if ($dbh->{AutoCommit}) {
721     $self->debugobj->txn_begin()
722       if ($self->debug);
723     $dbh->begin_work;
724   }
725 }
726
727 sub txn_begin {
728   my $self = shift;
729   $self->dbh_do($self->can('_dbh_txn_begin'))
730     if $self->{transaction_depth}++ == 0;
731 }
732
733 sub _dbh_txn_commit {
734   my ($self, $dbh) = @_;
735   if ($self->{transaction_depth} == 0) {
736     unless ($dbh->{AutoCommit}) {
737       $self->debugobj->txn_commit()
738         if ($self->debug);
739       $dbh->commit;
740     }
741   }
742   else {
743     if (--$self->{transaction_depth} == 0) {
744       $self->debugobj->txn_commit()
745         if ($self->debug);
746       $dbh->commit;
747     }
748   }
749 }
750
751 sub txn_commit {
752   my $self = shift;
753   $self->dbh_do($self->can('_dbh_txn_commit'));
754 }
755
756 sub _dbh_txn_rollback {
757   my ($self, $dbh) = @_;
758   if ($self->{transaction_depth} == 0) {
759     unless ($dbh->{AutoCommit}) {
760       $self->debugobj->txn_rollback()
761         if ($self->debug);
762       $dbh->rollback;
763     }
764   }
765   else {
766     if (--$self->{transaction_depth} == 0) {
767       $self->debugobj->txn_rollback()
768         if ($self->debug);
769       $dbh->rollback;
770     }
771     else {
772       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
773     }
774   }
775 }
776
777 sub txn_rollback {
778   my $self = shift;
779
780   eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
781   if ($@) {
782     my $error = $@;
783     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
784     $error =~ /$exception_class/ and $self->throw_exception($error);
785     $self->{transaction_depth} = 0;          # ensure that a failed rollback
786     $self->throw_exception($error);          # resets the transaction depth
787   }
788 }
789
790 # This used to be the top-half of _execute.  It was split out to make it
791 #  easier to override in NoBindVars without duping the rest.  It takes up
792 #  all of _execute's args, and emits $sql, @bind.
793 sub _prep_for_execute {
794   my ($self, $op, $extra_bind, $ident, @args) = @_;
795
796   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
797   unshift(@bind, @$extra_bind) if $extra_bind;
798   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
799
800   return ($sql, @bind);
801 }
802
803 sub _execute {
804   my $self = shift;
805
806   my ($sql, @bind) = $self->_prep_for_execute(@_);
807
808   if ($self->debug) {
809       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
810       $self->debugobj->query_start($sql, @debug_bind);
811   }
812
813   my $sth = $self->sth($sql);
814
815   my $rv;
816   if ($sth) {
817     my $time = time();
818     $rv = eval { $sth->execute(@bind) };
819
820     if ($@ || !$rv) {
821       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
822     }
823   } else {
824     $self->throw_exception("'$sql' did not generate a statement.");
825   }
826   if ($self->debug) {
827       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
828       $self->debugobj->query_end($sql, @debug_bind);
829   }
830   return (wantarray ? ($rv, $sth, @bind) : $rv);
831 }
832
833 sub insert {
834   my ($self, $ident, $to_insert) = @_;
835   $self->throw_exception(
836     "Couldn't insert ".join(', ',
837       map "$_ => $to_insert->{$_}", keys %$to_insert
838     )." into ${ident}"
839   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
840   return $to_insert;
841 }
842
843 sub update {
844   return shift->_execute('update' => [], @_);
845 }
846
847 sub delete {
848   return shift->_execute('delete' => [], @_);
849 }
850
851 sub _select {
852   my ($self, $ident, $select, $condition, $attrs) = @_;
853   my $order = $attrs->{order_by};
854   if (ref $condition eq 'SCALAR') {
855     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
856   }
857   if (exists $attrs->{group_by} || $attrs->{having}) {
858     $order = {
859       group_by => $attrs->{group_by},
860       having => $attrs->{having},
861       ($order ? (order_by => $order) : ())
862     };
863   }
864   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
865   if ($attrs->{software_limit} ||
866       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
867         $attrs->{software_limit} = 1;
868   } else {
869     $self->throw_exception("rows attribute must be positive if present")
870       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
871     push @args, $attrs->{rows}, $attrs->{offset};
872   }
873   return $self->_execute(@args);
874 }
875
876 =head2 select
877
878 =over 4
879
880 =item Arguments: $ident, $select, $condition, $attrs
881
882 =back
883
884 Handle a SQL select statement.
885
886 =cut
887
888 sub select {
889   my $self = shift;
890   my ($ident, $select, $condition, $attrs) = @_;
891   return $self->cursor->new($self, \@_, $attrs);
892 }
893
894 sub select_single {
895   my $self = shift;
896   my ($rv, $sth, @bind) = $self->_select(@_);
897   my @row = $sth->fetchrow_array;
898   # Need to call finish() to work round broken DBDs
899   $sth->finish();
900   return @row;
901 }
902
903 =head2 sth
904
905 =over 4
906
907 =item Arguments: $sql
908
909 =back
910
911 Returns a L<DBI> sth (statement handle) for the supplied SQL.
912
913 =cut
914
915 sub _dbh_sth {
916   my ($self, $dbh, $sql) = @_;
917   # 3 is the if_active parameter which avoids active sth re-use
918   $dbh->prepare_cached($sql, {}, 3) or
919     $self->throw_exception(
920       'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
921     );
922 }
923
924 sub sth {
925   my ($self, $sql) = @_;
926   $self->dbh_do($self->can('_dbh_sth'), $sql);
927 }
928
929 sub _dbh_columns_info_for {
930   my ($self, $dbh, $table) = @_;
931
932   if ($dbh->can('column_info')) {
933     my %result;
934     eval {
935       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
936       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
937       $sth->execute();
938       while ( my $info = $sth->fetchrow_hashref() ){
939         my %column_info;
940         $column_info{data_type}   = $info->{TYPE_NAME};
941         $column_info{size}      = $info->{COLUMN_SIZE};
942         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
943         $column_info{default_value} = $info->{COLUMN_DEF};
944         my $col_name = $info->{COLUMN_NAME};
945         $col_name =~ s/^\"(.*)\"$/$1/;
946
947         $result{$col_name} = \%column_info;
948       }
949     };
950     return \%result if !$@ && scalar keys %result;
951   }
952
953   my %result;
954   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
955   $sth->execute;
956   my @columns = @{$sth->{NAME_lc}};
957   for my $i ( 0 .. $#columns ){
958     my %column_info;
959     my $type_num = $sth->{TYPE}->[$i];
960     my $type_name;
961     if(defined $type_num && $dbh->can('type_info')) {
962       my $type_info = $dbh->type_info($type_num);
963       $type_name = $type_info->{TYPE_NAME} if $type_info;
964     }
965     $column_info{data_type} = $type_name ? $type_name : $type_num;
966     $column_info{size} = $sth->{PRECISION}->[$i];
967     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
968
969     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
970       $column_info{data_type} = $1;
971       $column_info{size}    = $2;
972     }
973
974     $result{$columns[$i]} = \%column_info;
975   }
976
977   return \%result;
978 }
979
980 sub columns_info_for {
981   my ($self, $table) = @_;
982   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
983 }
984
985 =head2 last_insert_id
986
987 Return the row id of the last insert.
988
989 =cut
990
991 sub _dbh_last_insert_id {
992     my ($self, $dbh, $source, $col) = @_;
993     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
994     $dbh->func('last_insert_rowid');
995 }
996
997 sub last_insert_id {
998   my $self = shift;
999   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1000 }
1001
1002 =head2 sqlt_type
1003
1004 Returns the database driver name.
1005
1006 =cut
1007
1008 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1009
1010 =head2 create_ddl_dir (EXPERIMENTAL)
1011
1012 =over 4
1013
1014 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1015
1016 =back
1017
1018 Creates a SQL file based on the Schema, for each of the specified
1019 database types, in the given directory.
1020
1021 Note that this feature is currently EXPERIMENTAL and may not work correctly
1022 across all databases, or fully handle complex relationships.
1023
1024 =cut
1025
1026 sub create_ddl_dir
1027 {
1028   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1029
1030   if(!$dir || !-d $dir)
1031   {
1032     warn "No directory given, using ./\n";
1033     $dir = "./";
1034   }
1035   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1036   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1037   $version ||= $schema->VERSION || '1.x';
1038   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1039
1040   eval "use SQL::Translator";
1041   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1042
1043   my $sqlt = SQL::Translator->new($sqltargs);
1044   foreach my $db (@$databases)
1045   {
1046     $sqlt->reset();
1047     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1048 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1049     $sqlt->data($schema);
1050     $sqlt->producer($db);
1051
1052     my $file;
1053     my $filename = $schema->ddl_filename($db, $dir, $version);
1054     if(-e $filename)
1055     {
1056       $self->throw_exception("$filename already exists, skipping $db");
1057       next;
1058     }
1059     open($file, ">$filename") 
1060       or $self->throw_exception("Can't open $filename for writing ($!)");
1061     my $output = $sqlt->translate;
1062 #use Data::Dumper;
1063 #    print join(":", keys %{$schema->source_registrations});
1064 #    print Dumper($sqlt->schema);
1065     if(!$output)
1066     {
1067       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1068       next;
1069     }
1070     print $file $output;
1071     close($file);
1072   }
1073
1074 }
1075
1076 =head2 deployment_statements
1077
1078 =over 4
1079
1080 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1081
1082 =back
1083
1084 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1085 The database driver name is given by C<$type>, though the value from
1086 L</sqlt_type> is used if it is not specified.
1087
1088 C<$directory> is used to return statements from files in a previously created
1089 L</create_ddl_dir> directory and is optional. The filenames are constructed
1090 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1091
1092 If no C<$directory> is specified then the statements are constructed on the
1093 fly using L<SQL::Translator> and C<$version> is ignored.
1094
1095 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1096
1097 =cut
1098
1099 sub deployment_statements {
1100   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1101   # Need to be connected to get the correct sqlt_type
1102   $self->ensure_connected() unless $type;
1103   $type ||= $self->sqlt_type;
1104   $version ||= $schema->VERSION || '1.x';
1105   $dir ||= './';
1106   eval "use SQL::Translator";
1107   if(!$@)
1108   {
1109     eval "use SQL::Translator::Parser::DBIx::Class;";
1110     $self->throw_exception($@) if $@;
1111     eval "use SQL::Translator::Producer::${type};";
1112     $self->throw_exception($@) if $@;
1113     my $tr = SQL::Translator->new(%$sqltargs);
1114     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1115     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1116   }
1117
1118   my $filename = $schema->ddl_filename($type, $dir, $version);
1119   if(!-f $filename)
1120   {
1121 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1122       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1123       return;
1124   }
1125   my $file;
1126   open($file, "<$filename") 
1127       or $self->throw_exception("Can't open $filename ($!)");
1128   my @rows = <$file>;
1129   close($file);
1130
1131   return join('', @rows);
1132   
1133 }
1134
1135 sub deploy {
1136   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1137   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1138     for ( split(";\n", $statement)) {
1139       next if($_ =~ /^--/);
1140       next if(!$_);
1141 #      next if($_ =~ /^DROP/m);
1142       next if($_ =~ /^BEGIN TRANSACTION/m);
1143       next if($_ =~ /^COMMIT/m);
1144       next if $_ =~ /^\s+$/; # skip whitespace only
1145       $self->debugobj->query_start($_) if $self->debug;
1146       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1147       $self->debugobj->query_end($_) if $self->debug;
1148     }
1149   }
1150 }
1151
1152 =head2 datetime_parser
1153
1154 Returns the datetime parser class
1155
1156 =cut
1157
1158 sub datetime_parser {
1159   my $self = shift;
1160   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1161 }
1162
1163 =head2 datetime_parser_type
1164
1165 Defines (returns) the datetime parser class - currently hardwired to
1166 L<DateTime::Format::MySQL>
1167
1168 =cut
1169
1170 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1171
1172 =head2 build_datetime_parser
1173
1174 See L</datetime_parser>
1175
1176 =cut
1177
1178 sub build_datetime_parser {
1179   my $self = shift;
1180   my $type = $self->datetime_parser_type(@_);
1181   eval "use ${type}";
1182   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1183   return $type;
1184 }
1185
1186 sub DESTROY {
1187   my $self = shift;
1188   return if !$self->_dbh;
1189   $self->_verify_pid;
1190   $self->_dbh(undef);
1191 }
1192
1193 1;
1194
1195 =head1 SQL METHODS
1196
1197 The module defines a set of methods within the DBIC::SQL::Abstract
1198 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1199 SQL query functions.
1200
1201 The following methods are extended:-
1202
1203 =over 4
1204
1205 =item delete
1206
1207 =item insert
1208
1209 =item select
1210
1211 =item update
1212
1213 =item limit_dialect
1214
1215 See L</connect_info> for details.
1216 For setting, this method is deprecated in favor of L</connect_info>.
1217
1218 =item quote_char
1219
1220 See L</connect_info> for details.
1221 For setting, this method is deprecated in favor of L</connect_info>.
1222
1223 =item name_sep
1224
1225 See L</connect_info> for details.
1226 For setting, this method is deprecated in favor of L</connect_info>.
1227
1228 =back
1229
1230 =head1 AUTHORS
1231
1232 Matt S. Trout <mst@shadowcatsystems.co.uk>
1233
1234 Andy Grundman <andy@hybridized.org>
1235
1236 =head1 LICENSE
1237
1238 You may distribute this code under the same terms as Perl itself.
1239
1240 =cut