Merge 'trunk' into 'DBIx-Class-current'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13
14 __PACKAGE__->mk_group_accessors(
15   'simple' =>
16     qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17        cursor on_connect_do transaction_depth/
18 );
19
20 BEGIN {
21
22 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
23
24 use base qw/SQL::Abstract::Limit/;
25
26 # This prevents the caching of $dbh in S::A::L, I believe
27 sub new {
28   my $self = shift->SUPER::new(@_);
29
30   # If limit_dialect is a ref (like a $dbh), go ahead and replace
31   #   it with what it resolves to:
32   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33     if ref $self->{limit_dialect};
34
35   $self;
36 }
37
38 sub _RowNumberOver {
39   my ($self, $sql, $order, $rows, $offset ) = @_;
40
41   $offset += 1;
42   my $last = $rows + $offset;
43   my ( $order_by ) = $self->_order_by( $order );
44
45   $sql = <<"";
46 SELECT * FROM
47 (
48    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49       $sql
50       $order_by
51    ) Q1
52 ) Q2
53 WHERE ROW_NUM BETWEEN $offset AND $last
54
55   return $sql;
56 }
57
58
59 # While we're at it, this should make LIMIT queries more efficient,
60 #  without digging into things too deeply
61 sub _find_syntax {
62   my ($self, $syntax) = @_;
63   my $dbhname = ref $syntax eq 'HASH' ? $syntax->{Driver}{Name} : '';
64   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
65     return 'RowNumberOver';
66   }
67
68   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
69 }
70
71 sub select {
72   my ($self, $table, $fields, $where, $order, @rest) = @_;
73   $table = $self->_quote($table) unless ref($table);
74   local $self->{rownum_hack_count} = 1
75     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
76   @rest = (-1) unless defined $rest[0];
77   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
78     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
79   local $self->{having_bind} = [];
80   my ($sql, @ret) = $self->SUPER::select(
81     $table, $self->_recurse_fields($fields), $where, $order, @rest
82   );
83   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
84 }
85
86 sub insert {
87   my $self = shift;
88   my $table = shift;
89   $table = $self->_quote($table) unless ref($table);
90   $self->SUPER::insert($table, @_);
91 }
92
93 sub update {
94   my $self = shift;
95   my $table = shift;
96   $table = $self->_quote($table) unless ref($table);
97   $self->SUPER::update($table, @_);
98 }
99
100 sub delete {
101   my $self = shift;
102   my $table = shift;
103   $table = $self->_quote($table) unless ref($table);
104   $self->SUPER::delete($table, @_);
105 }
106
107 sub _emulate_limit {
108   my $self = shift;
109   if ($_[3] == -1) {
110     return $_[1].$self->_order_by($_[2]);
111   } else {
112     return $self->SUPER::_emulate_limit(@_);
113   }
114 }
115
116 sub _recurse_fields {
117   my ($self, $fields) = @_;
118   my $ref = ref $fields;
119   return $self->_quote($fields) unless $ref;
120   return $$fields if $ref eq 'SCALAR';
121
122   if ($ref eq 'ARRAY') {
123     return join(', ', map {
124       $self->_recurse_fields($_)
125       .(exists $self->{rownum_hack_count}
126          ? ' AS col'.$self->{rownum_hack_count}++
127          : '')
128      } @$fields);
129   } elsif ($ref eq 'HASH') {
130     foreach my $func (keys %$fields) {
131       return $self->_sqlcase($func)
132         .'( '.$self->_recurse_fields($fields->{$func}).' )';
133     }
134   }
135 }
136
137 sub _order_by {
138   my $self = shift;
139   my $ret = '';
140   my @extra;
141   if (ref $_[0] eq 'HASH') {
142     if (defined $_[0]->{group_by}) {
143       $ret = $self->_sqlcase(' group by ')
144                .$self->_recurse_fields($_[0]->{group_by});
145     }
146     if (defined $_[0]->{having}) {
147       my $frag;
148       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
149       push(@{$self->{having_bind}}, @extra);
150       $ret .= $self->_sqlcase(' having ').$frag;
151     }
152     if (defined $_[0]->{order_by}) {
153       $ret .= $self->_order_by($_[0]->{order_by});
154     }
155   } elsif (ref $_[0] eq 'SCALAR') {
156     $ret = $self->_sqlcase(' order by ').${ $_[0] };
157   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
158     my @order = @{+shift};
159     $ret = $self->_sqlcase(' order by ')
160           .join(', ', map {
161                         my $r = $self->_order_by($_, @_);
162                         $r =~ s/^ ?ORDER BY //i;
163                         $r;
164                       } @order);
165   } else {
166     $ret = $self->SUPER::_order_by(@_);
167   }
168   return $ret;
169 }
170
171 sub _order_directions {
172   my ($self, $order) = @_;
173   $order = $order->{order_by} if ref $order eq 'HASH';
174   return $self->SUPER::_order_directions($order);
175 }
176
177 sub _table {
178   my ($self, $from) = @_;
179   if (ref $from eq 'ARRAY') {
180     return $self->_recurse_from(@$from);
181   } elsif (ref $from eq 'HASH') {
182     return $self->_make_as($from);
183   } else {
184     return $from; # would love to quote here but _table ends up getting called
185                   # twice during an ->select without a limit clause due to
186                   # the way S::A::Limit->select works. should maybe consider
187                   # bypassing this and doing S::A::select($self, ...) in
188                   # our select method above. meantime, quoting shims have
189                   # been added to select/insert/update/delete here
190   }
191 }
192
193 sub _recurse_from {
194   my ($self, $from, @join) = @_;
195   my @sqlf;
196   push(@sqlf, $self->_make_as($from));
197   foreach my $j (@join) {
198     my ($to, $on) = @$j;
199
200     # check whether a join type exists
201     my $join_clause = '';
202     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
203     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
204       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
205     } else {
206       $join_clause = ' JOIN ';
207     }
208     push(@sqlf, $join_clause);
209
210     if (ref $to eq 'ARRAY') {
211       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
212     } else {
213       push(@sqlf, $self->_make_as($to));
214     }
215     push(@sqlf, ' ON ', $self->_join_condition($on));
216   }
217   return join('', @sqlf);
218 }
219
220 sub _make_as {
221   my ($self, $from) = @_;
222   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
223                      reverse each %{$self->_skip_options($from)});
224 }
225
226 sub _skip_options {
227   my ($self, $hash) = @_;
228   my $clean_hash = {};
229   $clean_hash->{$_} = $hash->{$_}
230     for grep {!/^-/} keys %$hash;
231   return $clean_hash;
232 }
233
234 sub _join_condition {
235   my ($self, $cond) = @_;
236   if (ref $cond eq 'HASH') {
237     my %j;
238     for (keys %$cond) {
239       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
240     };
241     return $self->_recurse_where(\%j);
242   } elsif (ref $cond eq 'ARRAY') {
243     return join(' OR ', map { $self->_join_condition($_) } @$cond);
244   } else {
245     die "Can't handle this yet!";
246   }
247 }
248
249 sub _quote {
250   my ($self, $label) = @_;
251   return '' unless defined $label;
252   return "*" if $label eq '*';
253   return $label unless $self->{quote_char};
254   if(ref $self->{quote_char} eq "ARRAY"){
255     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
256       if !defined $self->{name_sep};
257     my $sep = $self->{name_sep};
258     return join($self->{name_sep},
259         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
260        split(/\Q$sep\E/,$label));
261   }
262   return $self->SUPER::_quote($label);
263 }
264
265 sub limit_dialect {
266     my $self = shift;
267     $self->{limit_dialect} = shift if @_;
268     return $self->{limit_dialect};
269 }
270
271 sub quote_char {
272     my $self = shift;
273     $self->{quote_char} = shift if @_;
274     return $self->{quote_char};
275 }
276
277 sub name_sep {
278     my $self = shift;
279     $self->{name_sep} = shift if @_;
280     return $self->{name_sep};
281 }
282
283 } # End of BEGIN block
284
285 =head1 NAME
286
287 DBIx::Class::Storage::DBI - DBI storage handler
288
289 =head1 SYNOPSIS
290
291 =head1 DESCRIPTION
292
293 This class represents the connection to an RDBMS via L<DBI>.  See
294 L<DBIx::Class::Storage> for general information.  This pod only
295 documents DBI-specific methods and behaviors.
296
297 =head1 METHODS
298
299 =cut
300
301 sub new {
302   my $new = shift->next::method(@_);
303
304   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
305   $new->transaction_depth(0);
306   $new->_sql_maker_opts({});
307
308   $new;
309 }
310
311 =head2 connect_info
312
313 The arguments of C<connect_info> are always a single array reference.
314
315 This is normally accessed via L<DBIx::Class::Schema/connection>, which
316 encapsulates its argument list in an arrayref before calling
317 C<connect_info> here.
318
319 The arrayref can either contain the same set of arguments one would
320 normally pass to L<DBI/connect>, or a lone code reference which returns
321 a connected database handle.
322
323 In either case, if the final argument in your connect_info happens
324 to be a hashref, C<connect_info> will look there for several
325 connection-specific options:
326
327 =over 4
328
329 =item on_connect_do
330
331 This can be set to an arrayref of literal sql statements, which will
332 be executed immediately after making the connection to the database
333 every time we [re-]connect.
334
335 =item limit_dialect 
336
337 Sets the limit dialect. This is useful for JDBC-bridge among others
338 where the remote SQL-dialect cannot be determined by the name of the
339 driver alone.
340
341 =item quote_char
342
343 Specifies what characters to use to quote table and column names. If 
344 you use this you will want to specify L<name_sep> as well.
345
346 quote_char expects either a single character, in which case is it is placed
347 on either side of the table/column, or an arrayref of length 2 in which case the
348 table/column name is placed between the elements.
349
350 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
351 use C<quote_char =E<gt> [qw/[ ]/]>.
352
353 =item name_sep
354
355 This only needs to be used in conjunction with L<quote_char>, and is used to 
356 specify the charecter that seperates elements (schemas, tables, columns) from 
357 each other. In most cases this is simply a C<.>.
358
359 =back
360
361 These options can be mixed in with your other L<DBI> connection attributes,
362 or placed in a seperate hashref after all other normal L<DBI> connection
363 arguments.
364
365 Every time C<connect_info> is invoked, any previous settings for
366 these options will be cleared before setting the new ones, regardless of
367 whether any options are specified in the new C<connect_info>.
368
369 Important note:  DBIC expects the returned database handle provided by 
370 a subref argument to have RaiseError set on it.  If it doesn't, things
371 might not work very well, YMMV.  If you don't use a subref, DBIC will
372 force this setting for you anyways.  Setting HandleError to anything
373 other than simple exception object wrapper might cause problems too.
374
375 Examples:
376
377   # Simple SQLite connection
378   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
379
380   # Connect via subref
381   ->connect_info([ sub { DBI->connect(...) } ]);
382
383   # A bit more complicated
384   ->connect_info(
385     [
386       'dbi:Pg:dbname=foo',
387       'postgres',
388       'my_pg_password',
389       { AutoCommit => 0 },
390       { quote_char => q{"}, name_sep => q{.} },
391     ]
392   );
393
394   # Equivalent to the previous example
395   ->connect_info(
396     [
397       'dbi:Pg:dbname=foo',
398       'postgres',
399       'my_pg_password',
400       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
401     ]
402   );
403
404   # Subref + DBIC-specific connection options
405   ->connect_info(
406     [
407       sub { DBI->connect(...) },
408       {
409           quote_char => q{`},
410           name_sep => q{@},
411           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
412       },
413     ]
414   );
415
416 =cut
417
418 sub connect_info {
419   my ($self, $info_arg) = @_;
420
421   return $self->_connect_info if !$info_arg;
422
423   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
424   #  the new set of options
425   $self->_sql_maker(undef);
426   $self->_sql_maker_opts({});
427
428   my $info = [ @$info_arg ]; # copy because we can alter it
429   my $last_info = $info->[-1];
430   if(ref $last_info eq 'HASH') {
431     if(my $on_connect_do = delete $last_info->{on_connect_do}) {
432       $self->on_connect_do($on_connect_do);
433     }
434     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
435       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
436         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
437       }
438     }
439
440     # Get rid of any trailing empty hashref
441     pop(@$info) if !keys %$last_info;
442   }
443
444   $self->_connect_info($info);
445 }
446
447 =head2 on_connect_do
448
449 This method is deprecated in favor of setting via L</connect_info>.
450
451 =head2 dbh_do
452
453 Arguments: $subref, @extra_coderef_args?
454
455 Execute the given subref with the underlying database handle as its
456 first argument, using the new exception-based connection management.
457
458 Any additional arguments will be passed verbatim to the called subref
459 as arguments 2 and onwards.
460
461 Example:
462
463   my @stuff = $schema->storage->dbh_do(
464     sub {
465       my $dbh = shift;
466       my $cols = join(q{, }, @_);
467       shift->selectrow_array("SELECT $cols FROM foo")
468     },
469     @column_list
470   );
471
472 =cut
473
474 sub dbh_do {
475   my $self = shift;
476   my $coderef = shift;
477
478   return $coderef->($self->_dbh, @_) if $self->{_in_txn_do};
479
480   ref $coderef eq 'CODE' or $self->throw_exception
481     ('$coderef must be a CODE reference');
482
483   my @result;
484   my $want_array = wantarray;
485
486   eval {
487     $self->_verify_pid if $self->_dbh;
488     $self->_populate_dbh if !$self->_dbh;
489     if($want_array) {
490         @result = $coderef->($self->_dbh, @_);
491     }
492     elsif(defined $want_array) {
493         $result[0] = $coderef->($self->_dbh, @_);
494     }
495     else {
496         $coderef->($self->_dbh, @_);
497     }
498   };
499
500   my $exception = $@;
501   if(!$exception) { return $want_array ? @result : $result[0] }
502
503   $self->throw_exception($exception) if $self->connected;
504
505   # We were not connected - reconnect and retry, but let any
506   #  exception fall right through this time
507   $self->_populate_dbh;
508   $coderef->($self->_dbh, @_);
509 }
510
511 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
512 # It also informs dbh_do to bypass itself while under the direction of txn_do,
513 #  via $self->{_in_txn_do} (this saves some redundant eval and errorcheck, etc)
514 sub txn_do {
515   my $self = shift;
516   my $coderef = shift;
517
518   ref $coderef eq 'CODE' or $self->throw_exception
519     ('$coderef must be a CODE reference');
520
521   local $self->{_in_txn_do} = 1;
522
523   my $tried = 0;
524
525   my @result;
526   my $want_array = wantarray;
527
528   START_TXN: eval {
529     $self->_verify_pid if $self->_dbh;
530     $self->_populate_dbh if !$self->_dbh;
531
532     $self->txn_begin;
533     if($want_array) {
534         @result = $coderef->(@_);
535     }
536     elsif(defined $want_array) {
537         $result[0] = $coderef->(@_);
538     }
539     else {
540         $coderef->(@_);
541     }
542     $self->txn_commit;
543   };
544
545   my $exception = $@;
546   if(!$exception) { return $want_array ? @result : $result[0] }
547
548   if($tried++ > 0 || $self->connected) {
549     eval { $self->txn_rollback };
550     my $rollback_exception = $@;
551     if($rollback_exception) {
552       my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
553       $self->throw_exception($exception)  # propagate nested rollback
554         if $rollback_exception =~ /$exception_class/;
555
556       $self->throw_exception(
557         "Transaction aborted: ${exception}. "
558         . "Rollback failed: ${rollback_exception}"
559       );
560     }
561     $self->throw_exception($exception)
562   }
563
564   # We were not connected, and was first try - reconnect and retry
565   # XXX I know, gotos are evil.  If you can find a better way
566   #  to write this that doesn't duplicate a lot of code/structure,
567   #  and behaves identically, feel free...
568
569   $self->_populate_dbh;
570   goto START_TXN;
571 }
572
573 =head2 disconnect
574
575 Our C<disconnect> method also performs a rollback first if the
576 database is not in C<AutoCommit> mode.
577
578 =cut
579
580 sub disconnect {
581   my ($self) = @_;
582
583   if( $self->connected ) {
584     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
585     $self->_dbh->disconnect;
586     $self->_dbh(undef);
587   }
588 }
589
590 sub connected {
591   my ($self) = @_;
592
593   if(my $dbh = $self->_dbh) {
594       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
595           return $self->_dbh(undef);
596       }
597       else {
598           $self->_verify_pid;
599       }
600       return ($dbh->FETCH('Active') && $dbh->ping);
601   }
602
603   return 0;
604 }
605
606 # handle pid changes correctly
607 #  NOTE: assumes $self->_dbh is a valid $dbh
608 sub _verify_pid {
609   my ($self) = @_;
610
611   return if $self->_conn_pid == $$;
612
613   $self->_dbh->{InactiveDestroy} = 1;
614   $self->_dbh(undef);
615
616   return;
617 }
618
619 sub ensure_connected {
620   my ($self) = @_;
621
622   unless ($self->connected) {
623     $self->_populate_dbh;
624   }
625 }
626
627 =head2 dbh
628
629 Returns the dbh - a data base handle of class L<DBI>.
630
631 =cut
632
633 sub dbh {
634   my ($self) = @_;
635
636   $self->ensure_connected;
637   return $self->_dbh;
638 }
639
640 sub _sql_maker_args {
641     my ($self) = @_;
642     
643     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
644 }
645
646 sub sql_maker {
647   my ($self) = @_;
648   unless ($self->_sql_maker) {
649     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
650   }
651   return $self->_sql_maker;
652 }
653
654 sub _populate_dbh {
655   my ($self) = @_;
656   my @info = @{$self->_connect_info || []};
657   $self->_dbh($self->_connect(@info));
658
659   if(ref $self eq 'DBIx::Class::Storage::DBI') {
660     my $driver = $self->_dbh->{Driver}->{Name};
661     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
662       bless $self, "DBIx::Class::Storage::DBI::${driver}";
663       $self->_rebless() if $self->can('_rebless');
664     }
665   }
666
667   # if on-connect sql statements are given execute them
668   foreach my $sql_statement (@{$self->on_connect_do || []}) {
669     $self->debugobj->query_start($sql_statement) if $self->debug();
670     $self->_dbh->do($sql_statement);
671     $self->debugobj->query_end($sql_statement) if $self->debug();
672   }
673
674   $self->_conn_pid($$);
675   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
676 }
677
678 sub _connect {
679   my ($self, @info) = @_;
680
681   $self->throw_exception("You failed to provide any connection info")
682       if !@info;
683
684   my ($old_connect_via, $dbh);
685
686   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
687       $old_connect_via = $DBI::connect_via;
688       $DBI::connect_via = 'connect';
689   }
690
691   eval {
692     if(ref $info[0] eq 'CODE') {
693        $dbh = &{$info[0]}
694     }
695     else {
696        $dbh = DBI->connect(@info);
697        $dbh->{RaiseError} = 1;
698        $dbh->{PrintError} = 0;
699     }
700   };
701
702   $DBI::connect_via = $old_connect_via if $old_connect_via;
703
704   if (!$dbh || $@) {
705     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
706   }
707
708   $dbh;
709 }
710
711 sub __txn_begin {
712   my ($dbh, $self) = @_;
713   if ($dbh->{AutoCommit}) {
714     $self->debugobj->txn_begin()
715       if ($self->debug);
716     $dbh->begin_work;
717   }
718 }
719
720 sub txn_begin {
721   my $self = shift;
722   $self->dbh_do(\&__txn_begin, $self)
723     if $self->{transaction_depth}++ == 0;
724 }
725
726 sub __txn_commit {
727   my ($dbh, $self) = @_;
728   if ($self->{transaction_depth} == 0) {
729     unless ($dbh->{AutoCommit}) {
730       $self->debugobj->txn_commit()
731         if ($self->debug);
732       $dbh->commit;
733     }
734   }
735   else {
736     if (--$self->{transaction_depth} == 0) {
737       $self->debugobj->txn_commit()
738         if ($self->debug);
739       $dbh->commit;
740     }
741   }
742 }
743
744 sub txn_commit {
745   my $self = shift;
746   $self->dbh_do(\&__txn_commit, $self);
747 }
748
749 sub __txn_rollback {
750   my ($dbh, $self) = @_;
751   if ($self->{transaction_depth} == 0) {
752     unless ($dbh->{AutoCommit}) {
753       $self->debugobj->txn_rollback()
754         if ($self->debug);
755       $dbh->rollback;
756     }
757   }
758   else {
759     if (--$self->{transaction_depth} == 0) {
760       $self->debugobj->txn_rollback()
761         if ($self->debug);
762       $dbh->rollback;
763     }
764     else {
765       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
766     }
767   }
768 }
769
770 sub txn_rollback {
771   my $self = shift;
772   eval { $self->dbh_do(\&__txn_rollback, $self) };
773   if ($@) {
774     my $error = $@;
775     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
776     $error =~ /$exception_class/ and $self->throw_exception($error);
777     $self->{transaction_depth} = 0;          # ensure that a failed rollback
778     $self->throw_exception($error);          # resets the transaction depth
779   }
780 }
781
782 sub _execute {
783   my ($self, $op, $extra_bind, $ident, @args) = @_;
784   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
785   unshift(@bind, @$extra_bind) if $extra_bind;
786   if ($self->debug) {
787       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
788       $self->debugobj->query_start($sql, @debug_bind);
789   }
790   my $sth = eval { $self->sth($sql,$op) };
791
792   if (!$sth || $@) {
793     $self->throw_exception(
794       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
795     );
796   }
797   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
798   my $rv;
799   if ($sth) {
800     my $time = time();
801     $rv = eval { $sth->execute(@bind) };
802
803     if ($@ || !$rv) {
804       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
805     }
806   } else {
807     $self->throw_exception("'$sql' did not generate a statement.");
808   }
809   if ($self->debug) {
810       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
811       $self->debugobj->query_end($sql, @debug_bind);
812   }
813   return (wantarray ? ($rv, $sth, @bind) : $rv);
814 }
815
816 sub insert {
817   my ($self, $ident, $to_insert) = @_;
818   $self->throw_exception(
819     "Couldn't insert ".join(', ',
820       map "$_ => $to_insert->{$_}", keys %$to_insert
821     )." into ${ident}"
822   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
823   return $to_insert;
824 }
825
826 sub update {
827   return shift->_execute('update' => [], @_);
828 }
829
830 sub delete {
831   return shift->_execute('delete' => [], @_);
832 }
833
834 sub _select {
835   my ($self, $ident, $select, $condition, $attrs) = @_;
836   my $order = $attrs->{order_by};
837   if (ref $condition eq 'SCALAR') {
838     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
839   }
840   if (exists $attrs->{group_by} || $attrs->{having}) {
841     $order = {
842       group_by => $attrs->{group_by},
843       having => $attrs->{having},
844       ($order ? (order_by => $order) : ())
845     };
846   }
847   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
848   if ($attrs->{software_limit} ||
849       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
850         $attrs->{software_limit} = 1;
851   } else {
852     $self->throw_exception("rows attribute must be positive if present")
853       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
854     push @args, $attrs->{rows}, $attrs->{offset};
855   }
856   return $self->_execute(@args);
857 }
858
859 =head2 select
860
861 =over 4
862
863 =item Arguments: $ident, $select, $condition, $attrs
864
865 =back
866
867 Handle a SQL select statement.
868
869 =cut
870
871 sub select {
872   my $self = shift;
873   my ($ident, $select, $condition, $attrs) = @_;
874   return $self->cursor->new($self, \@_, $attrs);
875 }
876
877 sub select_single {
878   my $self = shift;
879   my ($rv, $sth, @bind) = $self->_select(@_);
880   my @row = $sth->fetchrow_array;
881   # Need to call finish() to work round broken DBDs
882   $sth->finish();
883   return @row;
884 }
885
886 =head2 sth
887
888 =over 4
889
890 =item Arguments: $sql
891
892 =back
893
894 Returns a L<DBI> sth (statement handle) for the supplied SQL.
895
896 =cut
897
898 sub __sth {
899   my ($dbh, $sql) = @_;
900   # 3 is the if_active parameter which avoids active sth re-use
901   $dbh->prepare_cached($sql, {}, 3);
902 }
903
904 sub sth {
905   my ($self, $sql) = @_;
906   $self->dbh_do(\&__sth, $sql);
907 }
908
909
910 sub __columns_info_for {
911   my ($dbh, $self, $table) = @_;
912
913   if ($dbh->can('column_info')) {
914     my %result;
915     eval {
916       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
917       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
918       $sth->execute();
919       while ( my $info = $sth->fetchrow_hashref() ){
920         my %column_info;
921         $column_info{data_type}   = $info->{TYPE_NAME};
922         $column_info{size}      = $info->{COLUMN_SIZE};
923         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
924         $column_info{default_value} = $info->{COLUMN_DEF};
925         my $col_name = $info->{COLUMN_NAME};
926         $col_name =~ s/^\"(.*)\"$/$1/;
927
928         $result{$col_name} = \%column_info;
929       }
930     };
931     return \%result if !$@ && scalar keys %result;
932   }
933
934   my %result;
935   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
936   $sth->execute;
937   my @columns = @{$sth->{NAME_lc}};
938   for my $i ( 0 .. $#columns ){
939     my %column_info;
940     my $type_num = $sth->{TYPE}->[$i];
941     my $type_name;
942     if(defined $type_num && $dbh->can('type_info')) {
943       my $type_info = $dbh->type_info($type_num);
944       $type_name = $type_info->{TYPE_NAME} if $type_info;
945     }
946     $column_info{data_type} = $type_name ? $type_name : $type_num;
947     $column_info{size} = $sth->{PRECISION}->[$i];
948     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
949
950     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
951       $column_info{data_type} = $1;
952       $column_info{size}    = $2;
953     }
954
955     $result{$columns[$i]} = \%column_info;
956   }
957
958   return \%result;
959 }
960
961 sub columns_info_for {
962   my ($self, $table) = @_;
963   $self->dbh_do(\&__columns_info_for, $self, $table);
964 }
965
966 =head2 last_insert_id
967
968 Return the row id of the last insert.
969
970 =cut
971
972 sub last_insert_id {
973   my ($self, $row) = @_;
974     
975   $self->dbh_do(sub { shift->func('last_insert_rowid') });
976 }
977
978 =head2 sqlt_type
979
980 Returns the database driver name.
981
982 =cut
983
984 sub sqlt_type { shift->dbh_do(sub { shift->{Driver}->{Name} }) }
985
986 =head2 create_ddl_dir (EXPERIMENTAL)
987
988 =over 4
989
990 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
991
992 =back
993
994 Creates a SQL file based on the Schema, for each of the specified
995 database types, in the given directory.
996
997 Note that this feature is currently EXPERIMENTAL and may not work correctly
998 across all databases, or fully handle complex relationships.
999
1000 =cut
1001
1002 sub create_ddl_dir
1003 {
1004   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1005
1006   if(!$dir || !-d $dir)
1007   {
1008     warn "No directory given, using ./\n";
1009     $dir = "./";
1010   }
1011   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1012   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1013   $version ||= $schema->VERSION || '1.x';
1014   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1015
1016   eval "use SQL::Translator";
1017   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
1018
1019   my $sqlt = SQL::Translator->new($sqltargs);
1020   foreach my $db (@$databases)
1021   {
1022     $sqlt->reset();
1023     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1024 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1025     $sqlt->data($schema);
1026     $sqlt->producer($db);
1027
1028     my $file;
1029     my $filename = $schema->ddl_filename($db, $dir, $version);
1030     if(-e $filename)
1031     {
1032       $self->throw_exception("$filename already exists, skipping $db");
1033       next;
1034     }
1035     open($file, ">$filename") 
1036       or $self->throw_exception("Can't open $filename for writing ($!)");
1037     my $output = $sqlt->translate;
1038 #use Data::Dumper;
1039 #    print join(":", keys %{$schema->source_registrations});
1040 #    print Dumper($sqlt->schema);
1041     if(!$output)
1042     {
1043       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1044       next;
1045     }
1046     print $file $output;
1047     close($file);
1048   }
1049
1050 }
1051
1052 =head2 deployment_statements
1053
1054 =over 4
1055
1056 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1057
1058 =back
1059
1060 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1061 The database driver name is given by C<$type>, though the value from
1062 L</sqlt_type> is used if it is not specified.
1063
1064 C<$directory> is used to return statements from files in a previously created
1065 L</create_ddl_dir> directory and is optional. The filenames are constructed
1066 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1067
1068 If no C<$directory> is specified then the statements are constructed on the
1069 fly using L<SQL::Translator> and C<$version> is ignored.
1070
1071 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1072
1073 =cut
1074
1075 sub deployment_statements {
1076   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1077   # Need to be connected to get the correct sqlt_type
1078   $self->ensure_connected() unless $type;
1079   $type ||= $self->sqlt_type;
1080   $version ||= $schema->VERSION || '1.x';
1081   $dir ||= './';
1082   eval "use SQL::Translator";
1083   if(!$@)
1084   {
1085     eval "use SQL::Translator::Parser::DBIx::Class;";
1086     $self->throw_exception($@) if $@;
1087     eval "use SQL::Translator::Producer::${type};";
1088     $self->throw_exception($@) if $@;
1089     my $tr = SQL::Translator->new(%$sqltargs);
1090     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1091     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1092   }
1093
1094   my $filename = $schema->ddl_filename($type, $dir, $version);
1095   if(!-f $filename)
1096   {
1097 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1098       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1099       return;
1100   }
1101   my $file;
1102   open($file, "<$filename") 
1103       or $self->throw_exception("Can't open $filename ($!)");
1104   my @rows = <$file>;
1105   close($file);
1106
1107   return join('', @rows);
1108   
1109 }
1110
1111 sub deploy {
1112   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1113   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1114     for ( split(";\n", $statement)) {
1115       next if($_ =~ /^--/);
1116       next if(!$_);
1117 #      next if($_ =~ /^DROP/m);
1118       next if($_ =~ /^BEGIN TRANSACTION/m);
1119       next if($_ =~ /^COMMIT/m);
1120       next if $_ =~ /^\s+$/; # skip whitespace only
1121       $self->debugobj->query_start($_) if $self->debug;
1122       $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
1123       $self->debugobj->query_end($_) if $self->debug;
1124     }
1125   }
1126 }
1127
1128 =head2 datetime_parser
1129
1130 Returns the datetime parser class
1131
1132 =cut
1133
1134 sub datetime_parser {
1135   my $self = shift;
1136   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1137 }
1138
1139 =head2 datetime_parser_type
1140
1141 Defines (returns) the datetime parser class - currently hardwired to
1142 L<DateTime::Format::MySQL>
1143
1144 =cut
1145
1146 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1147
1148 =head2 build_datetime_parser
1149
1150 See L</datetime_parser>
1151
1152 =cut
1153
1154 sub build_datetime_parser {
1155   my $self = shift;
1156   my $type = $self->datetime_parser_type(@_);
1157   eval "use ${type}";
1158   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1159   return $type;
1160 }
1161
1162 sub DESTROY {
1163   my $self = shift;
1164   return if !$self->_dbh;
1165   $self->_verify_pid;
1166   $self->_dbh(undef);
1167 }
1168
1169 1;
1170
1171 =head1 SQL METHODS
1172
1173 The module defines a set of methods within the DBIC::SQL::Abstract
1174 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1175 SQL query functions.
1176
1177 The following methods are extended:-
1178
1179 =over 4
1180
1181 =item delete
1182
1183 =item insert
1184
1185 =item select
1186
1187 =item update
1188
1189 =item limit_dialect
1190
1191 See L</connect_info> for details.
1192 For setting, this method is deprecated in favor of L</connect_info>.
1193
1194 =item quote_char
1195
1196 See L</connect_info> for details.
1197 For setting, this method is deprecated in favor of L</connect_info>.
1198
1199 =item name_sep
1200
1201 See L</connect_info> for details.
1202 For setting, this method is deprecated in favor of L</connect_info>.
1203
1204 =back
1205
1206 =head1 AUTHORS
1207
1208 Matt S. Trout <mst@shadowcatsystems.co.uk>
1209
1210 Andy Grundman <andy@hybridized.org>
1211
1212 =head1 LICENSE
1213
1214 You may distribute this code under the same terms as Perl itself.
1215
1216 =cut