reverse r4290 since we -do- -not- currently want these namespaces indexed
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use SQL::Abstract::Limit;
11 use DBIx::Class::Storage::DBI::Cursor;
12 use DBIx::Class::Storage::Statistics;
13 use Scalar::Util qw/blessed weaken/;
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid disable_sth_caching on_connect_do
18        on_disconnect_do transaction_depth unsafe _dbh_autocommit
19        auto_savepoint savepoints/
20 );
21
22 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
23
24 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
25 __PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
26
27 BEGIN {
28
29 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
30
31 use base qw/SQL::Abstract::Limit/;
32
33 # This prevents the caching of $dbh in S::A::L, I believe
34 sub new {
35   my $self = shift->SUPER::new(@_);
36
37   # If limit_dialect is a ref (like a $dbh), go ahead and replace
38   #   it with what it resolves to:
39   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
40     if ref $self->{limit_dialect};
41
42   $self;
43 }
44
45 sub _RowNumberOver {
46   my ($self, $sql, $order, $rows, $offset ) = @_;
47
48   $offset += 1;
49   my $last = $rows + $offset;
50   my ( $order_by ) = $self->_order_by( $order );
51
52   $sql = <<"";
53 SELECT * FROM
54 (
55    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
56       $sql
57       $order_by
58    ) Q1
59 ) Q2
60 WHERE ROW_NUM BETWEEN $offset AND $last
61
62   return $sql;
63 }
64
65
66 # While we're at it, this should make LIMIT queries more efficient,
67 #  without digging into things too deeply
68 use Scalar::Util 'blessed';
69 sub _find_syntax {
70   my ($self, $syntax) = @_;
71   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
72   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
73     return 'RowNumberOver';
74   }
75
76   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
77 }
78
79 sub select {
80   my ($self, $table, $fields, $where, $order, @rest) = @_;
81   $table = $self->_quote($table) unless ref($table);
82   local $self->{rownum_hack_count} = 1
83     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
84   @rest = (-1) unless defined $rest[0];
85   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
86     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
87   local $self->{having_bind} = [];
88   my ($sql, @ret) = $self->SUPER::select(
89     $table, $self->_recurse_fields($fields), $where, $order, @rest
90   );
91   $sql .= 
92     $self->{for} ?
93     (
94       $self->{for} eq 'update' ? ' FOR UPDATE' :
95       $self->{for} eq 'shared' ? ' FOR SHARE'  :
96       ''
97     ) :
98     ''
99   ;
100   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
101 }
102
103 sub insert {
104   my $self = shift;
105   my $table = shift;
106   $table = $self->_quote($table) unless ref($table);
107   $self->SUPER::insert($table, @_);
108 }
109
110 sub update {
111   my $self = shift;
112   my $table = shift;
113   $table = $self->_quote($table) unless ref($table);
114   $self->SUPER::update($table, @_);
115 }
116
117 sub delete {
118   my $self = shift;
119   my $table = shift;
120   $table = $self->_quote($table) unless ref($table);
121   $self->SUPER::delete($table, @_);
122 }
123
124 sub _emulate_limit {
125   my $self = shift;
126   if ($_[3] == -1) {
127     return $_[1].$self->_order_by($_[2]);
128   } else {
129     return $self->SUPER::_emulate_limit(@_);
130   }
131 }
132
133 sub _recurse_fields {
134   my ($self, $fields, $params) = @_;
135   my $ref = ref $fields;
136   return $self->_quote($fields) unless $ref;
137   return $$fields if $ref eq 'SCALAR';
138
139   if ($ref eq 'ARRAY') {
140     return join(', ', map {
141       $self->_recurse_fields($_)
142         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
143           ? ' AS col'.$self->{rownum_hack_count}++
144           : '')
145       } @$fields);
146   } elsif ($ref eq 'HASH') {
147     foreach my $func (keys %$fields) {
148       return $self->_sqlcase($func)
149         .'( '.$self->_recurse_fields($fields->{$func}).' )';
150     }
151   }
152 }
153
154 sub _order_by {
155   my $self = shift;
156   my $ret = '';
157   my @extra;
158   if (ref $_[0] eq 'HASH') {
159     if (defined $_[0]->{group_by}) {
160       $ret = $self->_sqlcase(' group by ')
161         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
162     }
163     if (defined $_[0]->{having}) {
164       my $frag;
165       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
166       push(@{$self->{having_bind}}, @extra);
167       $ret .= $self->_sqlcase(' having ').$frag;
168     }
169     if (defined $_[0]->{order_by}) {
170       $ret .= $self->_order_by($_[0]->{order_by});
171     }
172   } elsif (ref $_[0] eq 'SCALAR') {
173     $ret = $self->_sqlcase(' order by ').${ $_[0] };
174   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
175     my @order = @{+shift};
176     $ret = $self->_sqlcase(' order by ')
177           .join(', ', map {
178                         my $r = $self->_order_by($_, @_);
179                         $r =~ s/^ ?ORDER BY //i;
180                         $r;
181                       } @order);
182   } else {
183     $ret = $self->SUPER::_order_by(@_);
184   }
185   return $ret;
186 }
187
188 sub _order_directions {
189   my ($self, $order) = @_;
190   $order = $order->{order_by} if ref $order eq 'HASH';
191   return $self->SUPER::_order_directions($order);
192 }
193
194 sub _table {
195   my ($self, $from) = @_;
196   if (ref $from eq 'ARRAY') {
197     return $self->_recurse_from(@$from);
198   } elsif (ref $from eq 'HASH') {
199     return $self->_make_as($from);
200   } else {
201     return $from; # would love to quote here but _table ends up getting called
202                   # twice during an ->select without a limit clause due to
203                   # the way S::A::Limit->select works. should maybe consider
204                   # bypassing this and doing S::A::select($self, ...) in
205                   # our select method above. meantime, quoting shims have
206                   # been added to select/insert/update/delete here
207   }
208 }
209
210 sub _recurse_from {
211   my ($self, $from, @join) = @_;
212   my @sqlf;
213   push(@sqlf, $self->_make_as($from));
214   foreach my $j (@join) {
215     my ($to, $on) = @$j;
216
217     # check whether a join type exists
218     my $join_clause = '';
219     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
220     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
221       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
222     } else {
223       $join_clause = ' JOIN ';
224     }
225     push(@sqlf, $join_clause);
226
227     if (ref $to eq 'ARRAY') {
228       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
229     } else {
230       push(@sqlf, $self->_make_as($to));
231     }
232     push(@sqlf, ' ON ', $self->_join_condition($on));
233   }
234   return join('', @sqlf);
235 }
236
237 sub _make_as {
238   my ($self, $from) = @_;
239   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
240                      reverse each %{$self->_skip_options($from)});
241 }
242
243 sub _skip_options {
244   my ($self, $hash) = @_;
245   my $clean_hash = {};
246   $clean_hash->{$_} = $hash->{$_}
247     for grep {!/^-/} keys %$hash;
248   return $clean_hash;
249 }
250
251 sub _join_condition {
252   my ($self, $cond) = @_;
253   if (ref $cond eq 'HASH') {
254     my %j;
255     for (keys %$cond) {
256       my $v = $cond->{$_};
257       if (ref $v) {
258         # XXX no throw_exception() in this package and croak() fails with strange results
259         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
260             if ref($v) ne 'SCALAR';
261         $j{$_} = $v;
262       }
263       else {
264         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
265       }
266     };
267     return scalar($self->_recurse_where(\%j));
268   } elsif (ref $cond eq 'ARRAY') {
269     return join(' OR ', map { $self->_join_condition($_) } @$cond);
270   } else {
271     die "Can't handle this yet!";
272   }
273 }
274
275 sub _quote {
276   my ($self, $label) = @_;
277   return '' unless defined $label;
278   return "*" if $label eq '*';
279   return $label unless $self->{quote_char};
280   if(ref $self->{quote_char} eq "ARRAY"){
281     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
282       if !defined $self->{name_sep};
283     my $sep = $self->{name_sep};
284     return join($self->{name_sep},
285         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
286        split(/\Q$sep\E/,$label));
287   }
288   return $self->SUPER::_quote($label);
289 }
290
291 sub limit_dialect {
292     my $self = shift;
293     $self->{limit_dialect} = shift if @_;
294     return $self->{limit_dialect};
295 }
296
297 sub quote_char {
298     my $self = shift;
299     $self->{quote_char} = shift if @_;
300     return $self->{quote_char};
301 }
302
303 sub name_sep {
304     my $self = shift;
305     $self->{name_sep} = shift if @_;
306     return $self->{name_sep};
307 }
308
309 } # End of BEGIN block
310
311 =head1 NAME
312
313 DBIx::Class::Storage::DBI - DBI storage handler
314
315 =head1 SYNOPSIS
316
317 =head1 DESCRIPTION
318
319 This class represents the connection to an RDBMS via L<DBI>.  See
320 L<DBIx::Class::Storage> for general information.  This pod only
321 documents DBI-specific methods and behaviors.
322
323 =head1 METHODS
324
325 =cut
326
327 sub new {
328   my $new = shift->next::method(@_);
329
330   $new->transaction_depth(0);
331   $new->_sql_maker_opts({});
332   $new->{savepoints} = [];
333   $new->{_in_dbh_do} = 0;
334   $new->{_dbh_gen} = 0;
335
336   $new;
337 }
338
339 =head2 connect_info
340
341 The arguments of C<connect_info> are always a single array reference.
342
343 This is normally accessed via L<DBIx::Class::Schema/connection>, which
344 encapsulates its argument list in an arrayref before calling
345 C<connect_info> here.
346
347 The arrayref can either contain the same set of arguments one would
348 normally pass to L<DBI/connect>, or a lone code reference which returns
349 a connected database handle.  Please note that the L<DBI> docs
350 recommend that you always explicitly set C<AutoCommit> to either
351 C<0> or C<1>.   L<DBIx::Class> further recommends that it be set
352 to C<1>, and that you perform transactions via our L</txn_do>
353 method.  L<DBIx::Class> will set it to C<1> if you do not do explicitly
354 set it to zero.  This is the default for most DBDs.  See below for more
355 details.
356
357 In either case, if the final argument in your connect_info happens
358 to be a hashref, C<connect_info> will look there for several
359 connection-specific options:
360
361 =over 4
362
363 =item on_connect_do
364
365 Specifies things to do immediately after connecting or re-connecting to
366 the database.  Its value may contain:
367
368 =over
369
370 =item an array reference
371
372 This contains SQL statements to execute in order.  Each element contains
373 a string or a code reference that returns a string.
374
375 =item a code reference
376
377 This contains some code to execute.  Unlike code references within an
378 array reference, its return value is ignored.
379
380 =back
381
382 =item on_disconnect_do
383
384 Takes arguments in the same form as L<on_connect_do> and executes them
385 immediately before disconnecting from the database.
386
387 Note, this only runs if you explicitly call L<disconnect> on the
388 storage object.
389
390 =item disable_sth_caching
391
392 If set to a true value, this option will disable the caching of
393 statement handles via L<DBI/prepare_cached>.
394
395 =item limit_dialect 
396
397 Sets the limit dialect. This is useful for JDBC-bridge among others
398 where the remote SQL-dialect cannot be determined by the name of the
399 driver alone.
400
401 =item quote_char
402
403 Specifies what characters to use to quote table and column names. If 
404 you use this you will want to specify L<name_sep> as well.
405
406 quote_char expects either a single character, in which case is it is placed
407 on either side of the table/column, or an arrayref of length 2 in which case the
408 table/column name is placed between the elements.
409
410 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
411 use C<quote_char =E<gt> [qw/[ ]/]>.
412
413 =item name_sep
414
415 This only needs to be used in conjunction with L<quote_char>, and is used to 
416 specify the charecter that seperates elements (schemas, tables, columns) from 
417 each other. In most cases this is simply a C<.>.
418
419 =item unsafe
420
421 This Storage driver normally installs its own C<HandleError>, sets
422 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
423 all database handles, including those supplied by a coderef.  It does this
424 so that it can have consistent and useful error behavior.
425
426 If you set this option to a true value, Storage will not do its usual
427 modifications to the database handle's attributes, and instead relies on
428 the settings in your connect_info DBI options (or the values you set in
429 your connection coderef, in the case that you are connecting via coderef).
430
431 Note that your custom settings can cause Storage to malfunction,
432 especially if you set a C<HandleError> handler that suppresses exceptions
433 and/or disable C<RaiseError>.
434
435 =item auto_savepoint
436
437 If this option is true, L<DBIx::Class> will use savepoints when nesting
438 transactions, making it possible to recover from failure in the inner
439 transaction without having to abort all outer transactions.
440
441 =back
442
443 These options can be mixed in with your other L<DBI> connection attributes,
444 or placed in a seperate hashref after all other normal L<DBI> connection
445 arguments.
446
447 Every time C<connect_info> is invoked, any previous settings for
448 these options will be cleared before setting the new ones, regardless of
449 whether any options are specified in the new C<connect_info>.
450
451 Another Important Note:
452
453 DBIC can do some wonderful magic with handling exceptions,
454 disconnections, and transactions when you use C<< AutoCommit => 1 >>
455 combined with C<txn_do> for transaction support.
456
457 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
458 in an assumed transaction between commits, and you're telling us you'd
459 like to manage that manually.  A lot of DBIC's magic protections
460 go away.  We can't protect you from exceptions due to database
461 disconnects because we don't know anything about how to restart your
462 transactions.  You're on your own for handling all sorts of exceptional
463 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
464 be with raw DBI.
465
466 Examples:
467
468   # Simple SQLite connection
469   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
470
471   # Connect via subref
472   ->connect_info([ sub { DBI->connect(...) } ]);
473
474   # A bit more complicated
475   ->connect_info(
476     [
477       'dbi:Pg:dbname=foo',
478       'postgres',
479       'my_pg_password',
480       { AutoCommit => 1 },
481       { quote_char => q{"}, name_sep => q{.} },
482     ]
483   );
484
485   # Equivalent to the previous example
486   ->connect_info(
487     [
488       'dbi:Pg:dbname=foo',
489       'postgres',
490       'my_pg_password',
491       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
492     ]
493   );
494
495   # Subref + DBIC-specific connection options
496   ->connect_info(
497     [
498       sub { DBI->connect(...) },
499       {
500           quote_char => q{`},
501           name_sep => q{@},
502           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
503           disable_sth_caching => 1,
504       },
505     ]
506   );
507
508 =cut
509
510 sub connect_info {
511   my ($self, $info_arg) = @_;
512
513   return $self->_connect_info if !$info_arg;
514
515   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
516   #  the new set of options
517   $self->_sql_maker(undef);
518   $self->_sql_maker_opts({});
519   $self->_connect_info([@$info_arg]); # copy for _connect_info
520
521   my $dbi_info = [@$info_arg]; # copy for _dbi_connect_info
522
523   my $last_info = $dbi_info->[-1];
524   if(ref $last_info eq 'HASH') {
525     $last_info = { %$last_info }; # so delete is non-destructive
526     my @storage_option = qw(
527       on_connect_do on_disconnect_do disable_sth_caching unsafe cursor_class
528       auto_savepoint
529     );
530     for my $storage_opt (@storage_option) {
531       if(my $value = delete $last_info->{$storage_opt}) {
532         $self->$storage_opt($value);
533       }
534     }
535     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
536       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
537         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
538       }
539     }
540     # re-insert modified hashref
541     $dbi_info->[-1] = $last_info;
542
543     # Get rid of any trailing empty hashref
544     pop(@$dbi_info) if !keys %$last_info;
545   }
546   $self->_dbi_connect_info($dbi_info);
547
548   $self->_connect_info;
549 }
550
551 =head2 on_connect_do
552
553 This method is deprecated in favor of setting via L</connect_info>.
554
555 =head2 dbh_do
556
557 Arguments: ($subref | $method_name), @extra_coderef_args?
558
559 Execute the given $subref or $method_name using the new exception-based
560 connection management.
561
562 The first two arguments will be the storage object that C<dbh_do> was called
563 on and a database handle to use.  Any additional arguments will be passed
564 verbatim to the called subref as arguments 2 and onwards.
565
566 Using this (instead of $self->_dbh or $self->dbh) ensures correct
567 exception handling and reconnection (or failover in future subclasses).
568
569 Your subref should have no side-effects outside of the database, as
570 there is the potential for your subref to be partially double-executed
571 if the database connection was stale/dysfunctional.
572
573 Example:
574
575   my @stuff = $schema->storage->dbh_do(
576     sub {
577       my ($storage, $dbh, @cols) = @_;
578       my $cols = join(q{, }, @cols);
579       $dbh->selectrow_array("SELECT $cols FROM foo");
580     },
581     @column_list
582   );
583
584 =cut
585
586 sub dbh_do {
587   my $self = shift;
588   my $code = shift;
589
590   my $dbh = $self->_dbh;
591
592   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
593       || $self->{transaction_depth};
594
595   local $self->{_in_dbh_do} = 1;
596
597   my @result;
598   my $want_array = wantarray;
599
600   eval {
601     $self->_verify_pid if $dbh;
602     if( !$dbh ) {
603         $self->_populate_dbh;
604         $dbh = $self->_dbh;
605     }
606
607     if($want_array) {
608         @result = $self->$code($dbh, @_);
609     }
610     elsif(defined $want_array) {
611         $result[0] = $self->$code($dbh, @_);
612     }
613     else {
614         $self->$code($dbh, @_);
615     }
616   };
617
618   my $exception = $@;
619   if(!$exception) { return $want_array ? @result : $result[0] }
620
621   $self->throw_exception($exception) if $self->connected;
622
623   # We were not connected - reconnect and retry, but let any
624   #  exception fall right through this time
625   $self->_populate_dbh;
626   $self->$code($self->_dbh, @_);
627 }
628
629 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
630 # It also informs dbh_do to bypass itself while under the direction of txn_do,
631 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
632 sub txn_do {
633   my $self = shift;
634   my $coderef = shift;
635
636   ref $coderef eq 'CODE' or $self->throw_exception
637     ('$coderef must be a CODE reference');
638
639   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
640
641   local $self->{_in_dbh_do} = 1;
642
643   my @result;
644   my $want_array = wantarray;
645
646   my $tried = 0;
647   while(1) {
648     eval {
649       $self->_verify_pid if $self->_dbh;
650       $self->_populate_dbh if !$self->_dbh;
651
652       $self->txn_begin;
653       if($want_array) {
654           @result = $coderef->(@_);
655       }
656       elsif(defined $want_array) {
657           $result[0] = $coderef->(@_);
658       }
659       else {
660           $coderef->(@_);
661       }
662       $self->txn_commit;
663     };
664
665     my $exception = $@;
666     if(!$exception) { return $want_array ? @result : $result[0] }
667
668     if($tried++ > 0 || $self->connected) {
669       eval { $self->txn_rollback };
670       my $rollback_exception = $@;
671       if($rollback_exception) {
672         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
673         $self->throw_exception($exception)  # propagate nested rollback
674           if $rollback_exception =~ /$exception_class/;
675
676         $self->throw_exception(
677           "Transaction aborted: ${exception}. "
678           . "Rollback failed: ${rollback_exception}"
679         );
680       }
681       $self->throw_exception($exception)
682     }
683
684     # We were not connected, and was first try - reconnect and retry
685     # via the while loop
686     $self->_populate_dbh;
687   }
688 }
689
690 =head2 disconnect
691
692 Our C<disconnect> method also performs a rollback first if the
693 database is not in C<AutoCommit> mode.
694
695 =cut
696
697 sub disconnect {
698   my ($self) = @_;
699
700   if( $self->connected ) {
701     my $connection_do = $self->on_disconnect_do;
702     $self->_do_connection_actions($connection_do) if ref($connection_do);
703
704     $self->_dbh->rollback unless $self->_dbh_autocommit;
705     $self->_dbh->disconnect;
706     $self->_dbh(undef);
707     $self->{_dbh_gen}++;
708   }
709 }
710
711 sub connected {
712   my ($self) = @_;
713
714   if(my $dbh = $self->_dbh) {
715       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
716           $self->_dbh(undef);
717           $self->{_dbh_gen}++;
718           return;
719       }
720       else {
721           $self->_verify_pid;
722           return 0 if !$self->_dbh;
723       }
724       return ($dbh->FETCH('Active') && $dbh->ping);
725   }
726
727   return 0;
728 }
729
730 # handle pid changes correctly
731 #  NOTE: assumes $self->_dbh is a valid $dbh
732 sub _verify_pid {
733   my ($self) = @_;
734
735   return if defined $self->_conn_pid && $self->_conn_pid == $$;
736
737   $self->_dbh->{InactiveDestroy} = 1;
738   $self->_dbh(undef);
739   $self->{_dbh_gen}++;
740
741   return;
742 }
743
744 sub ensure_connected {
745   my ($self) = @_;
746
747   unless ($self->connected) {
748     $self->_populate_dbh;
749   }
750 }
751
752 =head2 dbh
753
754 Returns the dbh - a data base handle of class L<DBI>.
755
756 =cut
757
758 sub dbh {
759   my ($self) = @_;
760
761   $self->ensure_connected;
762   return $self->_dbh;
763 }
764
765 sub _sql_maker_args {
766     my ($self) = @_;
767     
768     return ( bindtype=>'columns', limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
769 }
770
771 sub sql_maker {
772   my ($self) = @_;
773   unless ($self->_sql_maker) {
774     my $sql_maker_class = $self->sql_maker_class;
775     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
776   }
777   return $self->_sql_maker;
778 }
779
780 sub _rebless {}
781
782 sub _populate_dbh {
783   my ($self) = @_;
784   my @info = @{$self->_dbi_connect_info || []};
785   $self->_dbh($self->_connect(@info));
786
787   # Always set the transaction depth on connect, since
788   #  there is no transaction in progress by definition
789   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
790
791   if(ref $self eq 'DBIx::Class::Storage::DBI') {
792     my $driver = $self->_dbh->{Driver}->{Name};
793     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
794       bless $self, "DBIx::Class::Storage::DBI::${driver}";
795       $self->_rebless();
796     }
797   }
798
799   my $connection_do = $self->on_connect_do;
800   $self->_do_connection_actions($connection_do) if ref($connection_do);
801
802   $self->_conn_pid($$);
803   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
804 }
805
806 sub _do_connection_actions {
807   my $self = shift;
808   my $connection_do = shift;
809
810   if (ref $connection_do eq 'ARRAY') {
811     $self->_do_query($_) foreach @$connection_do;
812   }
813   elsif (ref $connection_do eq 'CODE') {
814     $connection_do->();
815   }
816
817   return $self;
818 }
819
820 sub _do_query {
821   my ($self, $action) = @_;
822
823   if (ref $action eq 'CODE') {
824     $action = $action->($self);
825     $self->_do_query($_) foreach @$action;
826   }
827   else {
828     my @to_run = (ref $action eq 'ARRAY') ? (@$action) : ($action);
829     $self->_query_start(@to_run);
830     $self->_dbh->do(@to_run);
831     $self->_query_end(@to_run);
832   }
833
834   return $self;
835 }
836
837 sub _connect {
838   my ($self, @info) = @_;
839
840   $self->throw_exception("You failed to provide any connection info")
841     if !@info;
842
843   my ($old_connect_via, $dbh);
844
845   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
846     $old_connect_via = $DBI::connect_via;
847     $DBI::connect_via = 'connect';
848   }
849
850   eval {
851     if(ref $info[0] eq 'CODE') {
852        $dbh = &{$info[0]}
853     }
854     else {
855        $dbh = DBI->connect(@info);
856     }
857
858     if($dbh && !$self->unsafe) {
859       my $weak_self = $self;
860       weaken($weak_self);
861       $dbh->{HandleError} = sub {
862           $weak_self->throw_exception("DBI Exception: $_[0]")
863       };
864       $dbh->{ShowErrorStatement} = 1;
865       $dbh->{RaiseError} = 1;
866       $dbh->{PrintError} = 0;
867     }
868   };
869
870   $DBI::connect_via = $old_connect_via if $old_connect_via;
871
872   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
873     if !$dbh || $@;
874
875   $self->_dbh_autocommit($dbh->{AutoCommit});
876
877   $dbh;
878 }
879
880 sub svp_begin {
881   my ($self, $name) = @_;
882
883   $name = $self->_svp_generate_name
884     unless defined $name;
885
886   $self->throw_exception ("You can't use savepoints outside a transaction")
887     if $self->{transaction_depth} == 0;
888
889   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
890     unless $self->can('_svp_begin');
891   
892   push @{ $self->{savepoints} }, $name;
893
894   $self->debugobj->svp_begin($name) if $self->debug;
895   
896   return $self->_svp_begin($name);
897 }
898
899 sub svp_release {
900   my ($self, $name) = @_;
901
902   $self->throw_exception ("You can't use savepoints outside a transaction")
903     if $self->{transaction_depth} == 0;
904
905   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
906     unless $self->can('_svp_release');
907
908   if (defined $name) {
909     $self->throw_exception ("Savepoint '$name' does not exist")
910       unless grep { $_ eq $name } @{ $self->{savepoints} };
911
912     # Dig through the stack until we find the one we are releasing.  This keeps
913     # the stack up to date.
914     my $svp;
915
916     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
917   } else {
918     $name = pop @{ $self->{savepoints} };
919   }
920
921   $self->debugobj->svp_release($name) if $self->debug;
922
923   return $self->_svp_release($name);
924 }
925
926 sub svp_rollback {
927   my ($self, $name) = @_;
928
929   $self->throw_exception ("You can't use savepoints outside a transaction")
930     if $self->{transaction_depth} == 0;
931
932   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
933     unless $self->can('_svp_rollback');
934
935   if (defined $name) {
936       # If they passed us a name, verify that it exists in the stack
937       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
938           $self->throw_exception("Savepoint '$name' does not exist!");
939       }
940
941       # Dig through the stack until we find the one we are releasing.  This keeps
942       # the stack up to date.
943       while(my $s = pop(@{ $self->{savepoints} })) {
944           last if($s eq $name);
945       }
946       # Add the savepoint back to the stack, as a rollback doesn't remove the
947       # named savepoint, only everything after it.
948       push(@{ $self->{savepoints} }, $name);
949   } else {
950       # We'll assume they want to rollback to the last savepoint
951       $name = $self->{savepoints}->[-1];
952   }
953
954   $self->debugobj->svp_rollback($name) if $self->debug;
955   
956   return $self->_svp_rollback($name);
957 }
958
959 sub _svp_generate_name {
960     my ($self) = @_;
961
962     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
963 }
964
965 sub txn_begin {
966   my $self = shift;
967   $self->ensure_connected();
968   if($self->{transaction_depth} == 0) {
969     $self->debugobj->txn_begin()
970       if $self->debug;
971     # this isn't ->_dbh-> because
972     #  we should reconnect on begin_work
973     #  for AutoCommit users
974     $self->dbh->begin_work;
975   } elsif ($self->auto_savepoint) {
976     $self->svp_begin;
977   }
978   $self->{transaction_depth}++;
979 }
980
981 sub txn_commit {
982   my $self = shift;
983   if ($self->{transaction_depth} == 1) {
984     my $dbh = $self->_dbh;
985     $self->debugobj->txn_commit()
986       if ($self->debug);
987     $dbh->commit;
988     $self->{transaction_depth} = 0
989       if $self->_dbh_autocommit;
990   }
991   elsif($self->{transaction_depth} > 1) {
992     $self->{transaction_depth}--;
993     $self->svp_release
994       if $self->auto_savepoint;
995   }
996 }
997
998 sub txn_rollback {
999   my $self = shift;
1000   my $dbh = $self->_dbh;
1001   eval {
1002     if ($self->{transaction_depth} == 1) {
1003       $self->debugobj->txn_rollback()
1004         if ($self->debug);
1005       $self->{transaction_depth} = 0
1006         if $self->_dbh_autocommit;
1007       $dbh->rollback;
1008     }
1009     elsif($self->{transaction_depth} > 1) {
1010       $self->{transaction_depth}--;
1011       if ($self->auto_savepoint) {
1012         $self->svp_rollback;
1013         $self->svp_release;
1014       }
1015     }
1016     else {
1017       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1018     }
1019   };
1020   if ($@) {
1021     my $error = $@;
1022     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1023     $error =~ /$exception_class/ and $self->throw_exception($error);
1024     # ensure that a failed rollback resets the transaction depth
1025     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1026     $self->throw_exception($error);
1027   }
1028 }
1029
1030 # This used to be the top-half of _execute.  It was split out to make it
1031 #  easier to override in NoBindVars without duping the rest.  It takes up
1032 #  all of _execute's args, and emits $sql, @bind.
1033 sub _prep_for_execute {
1034   my ($self, $op, $extra_bind, $ident, $args) = @_;
1035
1036   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1037   unshift(@bind,
1038     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1039       if $extra_bind;
1040
1041   return ($sql, \@bind);
1042 }
1043
1044 sub _fix_bind_params {
1045     my ($self, @bind) = @_;
1046
1047     ### Turn @bind from something like this:
1048     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1049     ### to this:
1050     ###   ( "'1'", "'1'", "'3'" )
1051     return
1052         map {
1053             if ( defined( $_ && $_->[1] ) ) {
1054                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1055             }
1056             else { q{'NULL'}; }
1057         } @bind;
1058 }
1059
1060 sub _query_start {
1061     my ( $self, $sql, @bind ) = @_;
1062
1063     if ( $self->debug ) {
1064         @bind = $self->_fix_bind_params(@bind);
1065         $self->debugobj->query_start( $sql, @bind );
1066     }
1067 }
1068
1069 sub _query_end {
1070     my ( $self, $sql, @bind ) = @_;
1071
1072     if ( $self->debug ) {
1073         @bind = $self->_fix_bind_params(@bind);
1074         $self->debugobj->query_end( $sql, @bind );
1075     }
1076 }
1077
1078 sub _dbh_execute {
1079   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1080   
1081   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1082     $ident = $ident->from();
1083   }
1084
1085   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1086
1087   $self->_query_start( $sql, @$bind );
1088
1089   my $sth = $self->sth($sql,$op);
1090
1091   my $placeholder_index = 1; 
1092
1093   foreach my $bound (@$bind) {
1094     my $attributes = {};
1095     my($column_name, @data) = @$bound;
1096
1097     if ($bind_attributes) {
1098       $attributes = $bind_attributes->{$column_name}
1099       if defined $bind_attributes->{$column_name};
1100     }
1101
1102     foreach my $data (@data) {
1103       $data = ref $data ? ''.$data : $data; # stringify args
1104
1105       $sth->bind_param($placeholder_index, $data, $attributes);
1106       $placeholder_index++;
1107     }
1108   }
1109
1110   # Can this fail without throwing an exception anyways???
1111   my $rv = $sth->execute();
1112   $self->throw_exception($sth->errstr) if !$rv;
1113
1114   $self->_query_end( $sql, @$bind );
1115
1116   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1117 }
1118
1119 sub _execute {
1120     my $self = shift;
1121     $self->dbh_do('_dbh_execute', @_)
1122 }
1123
1124 sub insert {
1125   my ($self, $source, $to_insert) = @_;
1126   
1127   my $ident = $source->from; 
1128   my $bind_attributes = $self->source_bind_attributes($source);
1129
1130   foreach my $col ( $source->columns ) {
1131     if ( !defined $to_insert->{$col} ) {
1132       my $col_info = $source->column_info($col);
1133
1134       if ( $col_info->{auto_nextval} ) {
1135         $self->ensure_connected; 
1136         $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1137       }
1138     }
1139   }
1140
1141   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1142
1143   return $to_insert;
1144 }
1145
1146 ## Still not quite perfect, and EXPERIMENTAL
1147 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1148 ## scalar refs, or at least, all the same type as the first set, the statement is
1149 ## only prepped once.
1150 sub insert_bulk {
1151   my ($self, $source, $cols, $data) = @_;
1152   my %colvalues;
1153   my $table = $source->from;
1154   @colvalues{@$cols} = (0..$#$cols);
1155   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1156   
1157   $self->_query_start( $sql, @bind );
1158   my $sth = $self->sth($sql);
1159
1160 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1161
1162   ## This must be an arrayref, else nothing works!
1163   
1164   my $tuple_status = [];
1165   
1166   ##use Data::Dumper;
1167   ##print STDERR Dumper( $data, $sql, [@bind] );
1168
1169   my $time = time();
1170
1171   ## Get the bind_attributes, if any exist
1172   my $bind_attributes = $self->source_bind_attributes($source);
1173
1174   ## Bind the values and execute
1175   my $placeholder_index = 1; 
1176
1177   foreach my $bound (@bind) {
1178
1179     my $attributes = {};
1180     my ($column_name, $data_index) = @$bound;
1181
1182     if( $bind_attributes ) {
1183       $attributes = $bind_attributes->{$column_name}
1184       if defined $bind_attributes->{$column_name};
1185     }
1186
1187     my @data = map { $_->[$data_index] } @$data;
1188
1189     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1190     $placeholder_index++;
1191   }
1192   my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
1193   $self->throw_exception($sth->errstr) if !$rv;
1194
1195   $self->_query_end( $sql, @bind );
1196   return (wantarray ? ($rv, $sth, @bind) : $rv);
1197 }
1198
1199 sub update {
1200   my $self = shift @_;
1201   my $source = shift @_;
1202   my $bind_attributes = $self->source_bind_attributes($source);
1203   
1204   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1205 }
1206
1207
1208 sub delete {
1209   my $self = shift @_;
1210   my $source = shift @_;
1211   
1212   my $bind_attrs = {}; ## If ever it's needed...
1213   
1214   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1215 }
1216
1217 sub _select {
1218   my ($self, $ident, $select, $condition, $attrs) = @_;
1219   my $order = $attrs->{order_by};
1220
1221   if (ref $condition eq 'SCALAR') {
1222     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
1223   }
1224
1225   my $for = delete $attrs->{for};
1226   my $sql_maker = $self->sql_maker;
1227   local $sql_maker->{for} = $for;
1228
1229   if (exists $attrs->{group_by} || $attrs->{having}) {
1230     $order = {
1231       group_by => $attrs->{group_by},
1232       having => $attrs->{having},
1233       ($order ? (order_by => $order) : ())
1234     };
1235   }
1236   my $bind_attrs = {}; ## Future support
1237   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1238   if ($attrs->{software_limit} ||
1239       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1240         $attrs->{software_limit} = 1;
1241   } else {
1242     $self->throw_exception("rows attribute must be positive if present")
1243       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1244
1245     # MySQL actually recommends this approach.  I cringe.
1246     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1247     push @args, $attrs->{rows}, $attrs->{offset};
1248   }
1249
1250   return $self->_execute(@args);
1251 }
1252
1253 sub source_bind_attributes {
1254   my ($self, $source) = @_;
1255   
1256   my $bind_attributes;
1257   foreach my $column ($source->columns) {
1258   
1259     my $data_type = $source->column_info($column)->{data_type} || '';
1260     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1261      if $data_type;
1262   }
1263
1264   return $bind_attributes;
1265 }
1266
1267 =head2 select
1268
1269 =over 4
1270
1271 =item Arguments: $ident, $select, $condition, $attrs
1272
1273 =back
1274
1275 Handle a SQL select statement.
1276
1277 =cut
1278
1279 sub select {
1280   my $self = shift;
1281   my ($ident, $select, $condition, $attrs) = @_;
1282   return $self->cursor_class->new($self, \@_, $attrs);
1283 }
1284
1285 sub select_single {
1286   my $self = shift;
1287   my ($rv, $sth, @bind) = $self->_select(@_);
1288   my @row = $sth->fetchrow_array;
1289   carp "Query returned more than one row" if $sth->fetchrow_array;
1290   # Need to call finish() to work round broken DBDs
1291   $sth->finish();
1292   return @row;
1293 }
1294
1295 =head2 sth
1296
1297 =over 4
1298
1299 =item Arguments: $sql
1300
1301 =back
1302
1303 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1304
1305 =cut
1306
1307 sub _dbh_sth {
1308   my ($self, $dbh, $sql) = @_;
1309
1310   # 3 is the if_active parameter which avoids active sth re-use
1311   my $sth = $self->disable_sth_caching
1312     ? $dbh->prepare($sql)
1313     : $dbh->prepare_cached($sql, {}, 3);
1314
1315   # XXX You would think RaiseError would make this impossible,
1316   #  but apparently that's not true :(
1317   $self->throw_exception($dbh->errstr) if !$sth;
1318
1319   $sth;
1320 }
1321
1322 sub sth {
1323   my ($self, $sql) = @_;
1324   $self->dbh_do('_dbh_sth', $sql);
1325 }
1326
1327 sub _dbh_columns_info_for {
1328   my ($self, $dbh, $table) = @_;
1329
1330   if ($dbh->can('column_info')) {
1331     my %result;
1332     eval {
1333       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1334       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1335       $sth->execute();
1336       while ( my $info = $sth->fetchrow_hashref() ){
1337         my %column_info;
1338         $column_info{data_type}   = $info->{TYPE_NAME};
1339         $column_info{size}      = $info->{COLUMN_SIZE};
1340         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1341         $column_info{default_value} = $info->{COLUMN_DEF};
1342         my $col_name = $info->{COLUMN_NAME};
1343         $col_name =~ s/^\"(.*)\"$/$1/;
1344
1345         $result{$col_name} = \%column_info;
1346       }
1347     };
1348     return \%result if !$@ && scalar keys %result;
1349   }
1350
1351   my %result;
1352   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1353   $sth->execute;
1354   my @columns = @{$sth->{NAME_lc}};
1355   for my $i ( 0 .. $#columns ){
1356     my %column_info;
1357     $column_info{data_type} = $sth->{TYPE}->[$i];
1358     $column_info{size} = $sth->{PRECISION}->[$i];
1359     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1360
1361     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1362       $column_info{data_type} = $1;
1363       $column_info{size}    = $2;
1364     }
1365
1366     $result{$columns[$i]} = \%column_info;
1367   }
1368   $sth->finish;
1369
1370   foreach my $col (keys %result) {
1371     my $colinfo = $result{$col};
1372     my $type_num = $colinfo->{data_type};
1373     my $type_name;
1374     if(defined $type_num && $dbh->can('type_info')) {
1375       my $type_info = $dbh->type_info($type_num);
1376       $type_name = $type_info->{TYPE_NAME} if $type_info;
1377       $colinfo->{data_type} = $type_name if $type_name;
1378     }
1379   }
1380
1381   return \%result;
1382 }
1383
1384 sub columns_info_for {
1385   my ($self, $table) = @_;
1386   $self->dbh_do('_dbh_columns_info_for', $table);
1387 }
1388
1389 =head2 last_insert_id
1390
1391 Return the row id of the last insert.
1392
1393 =cut
1394
1395 sub _dbh_last_insert_id {
1396     my ($self, $dbh, $source, $col) = @_;
1397     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1398     $dbh->func('last_insert_rowid');
1399 }
1400
1401 sub last_insert_id {
1402   my $self = shift;
1403   $self->dbh_do('_dbh_last_insert_id', @_);
1404 }
1405
1406 =head2 sqlt_type
1407
1408 Returns the database driver name.
1409
1410 =cut
1411
1412 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1413
1414 =head2 bind_attribute_by_data_type
1415
1416 Given a datatype from column info, returns a database specific bind attribute for
1417 $dbh->bind_param($val,$attribute) or nothing if we will let the database planner
1418 just handle it.
1419
1420 Generally only needed for special case column types, like bytea in postgres.
1421
1422 =cut
1423
1424 sub bind_attribute_by_data_type {
1425     return;
1426 }
1427
1428 =head2 create_ddl_dir
1429
1430 =over 4
1431
1432 =item Arguments: $schema \@databases, $version, $directory, $preversion, $sqlt_args
1433
1434 =back
1435
1436 Creates a SQL file based on the Schema, for each of the specified
1437 database types, in the given directory.
1438
1439 =cut
1440
1441 sub create_ddl_dir
1442 {
1443   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1444
1445   if(!$dir || !-d $dir)
1446   {
1447     warn "No directory given, using ./\n";
1448     $dir = "./";
1449   }
1450   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1451   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1452   $version ||= $schema->VERSION || '1.x';
1453   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1454
1455   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09: '}
1456       . $self->_check_sqlt_message . q{'})
1457           if !$self->_check_sqlt_version;
1458
1459   my $sqlt = SQL::Translator->new( $sqltargs );
1460
1461   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1462   my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
1463
1464   foreach my $db (@$databases)
1465   {
1466     $sqlt->reset();
1467     $sqlt = $self->configure_sqlt($sqlt, $db);
1468     $sqlt->{schema} = $sqlt_schema;
1469     $sqlt->producer($db);
1470
1471     my $file;
1472     my $filename = $schema->ddl_filename($db, $dir, $version);
1473     if(-e $filename)
1474     {
1475       warn("$filename already exists, skipping $db");
1476       next unless ($preversion);
1477     } else {
1478       my $output = $sqlt->translate;
1479       if(!$output)
1480       {
1481         warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1482         next;
1483       }
1484       if(!open($file, ">$filename"))
1485       {
1486           $self->throw_exception("Can't open $filename for writing ($!)");
1487           next;
1488       }
1489       print $file $output;
1490       close($file);
1491     } 
1492     if($preversion)
1493     {
1494       require SQL::Translator::Diff;
1495
1496       my $prefilename = $schema->ddl_filename($db, $dir, $preversion);
1497 #      print "Previous version $prefilename\n";
1498       if(!-e $prefilename)
1499       {
1500         warn("No previous schema file found ($prefilename)");
1501         next;
1502       }
1503
1504       my $difffile = $schema->ddl_filename($db, $dir, $version, $preversion);
1505       print STDERR "Diff: $difffile: $db, $dir, $version, $preversion \n";
1506       if(-e $difffile)
1507       {
1508         warn("$difffile already exists, skipping");
1509         next;
1510       }
1511
1512       my $source_schema;
1513       {
1514         my $t = SQL::Translator->new($sqltargs);
1515         $t->debug( 0 );
1516         $t->trace( 0 );
1517         $t->parser( $db )                       or die $t->error;
1518         $t = $self->configure_sqlt($t, $db);
1519         my $out = $t->translate( $prefilename ) or die $t->error;
1520         $source_schema = $t->schema;
1521         unless ( $source_schema->name ) {
1522           $source_schema->name( $prefilename );
1523         }
1524       }
1525
1526       # The "new" style of producers have sane normalization and can support 
1527       # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1528       # And we have to diff parsed SQL against parsed SQL.
1529       my $dest_schema = $sqlt_schema;
1530
1531       unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1532         my $t = SQL::Translator->new($sqltargs);
1533         $t->debug( 0 );
1534         $t->trace( 0 );
1535         $t->parser( $db )                    or die $t->error;
1536         $t = $self->configure_sqlt($t, $db);
1537         my $out = $t->translate( $filename ) or die $t->error;
1538         $dest_schema = $t->schema;
1539         $dest_schema->name( $filename )
1540           unless $dest_schema->name;
1541       }
1542
1543       $DB::single = 1;
1544       my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1545                                                     $dest_schema,   $db,
1546                                                     $sqltargs
1547                                                    );
1548       if(!open $file, ">$difffile")
1549       { 
1550         $self->throw_exception("Can't write to $difffile ($!)");
1551         next;
1552       }
1553       print $file $diff;
1554       close($file);
1555     }
1556   }
1557 }
1558
1559 sub configure_sqlt() {
1560   my $self = shift;
1561   my $tr = shift;
1562   my $db = shift || $self->sqlt_type;
1563   if ($db eq 'PostgreSQL') {
1564     $tr->quote_table_names(0);
1565     $tr->quote_field_names(0);
1566   }
1567   return $tr;
1568 }
1569
1570 =head2 deployment_statements
1571
1572 =over 4
1573
1574 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1575
1576 =back
1577
1578 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1579 The database driver name is given by C<$type>, though the value from
1580 L</sqlt_type> is used if it is not specified.
1581
1582 C<$directory> is used to return statements from files in a previously created
1583 L</create_ddl_dir> directory and is optional. The filenames are constructed
1584 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1585
1586 If no C<$directory> is specified then the statements are constructed on the
1587 fly using L<SQL::Translator> and C<$version> is ignored.
1588
1589 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1590
1591 =cut
1592
1593 sub deployment_statements {
1594   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1595   # Need to be connected to get the correct sqlt_type
1596   $self->ensure_connected() unless $type;
1597   $type ||= $self->sqlt_type;
1598   $version ||= $schema->VERSION || '1.x';
1599   $dir ||= './';
1600   my $filename = $schema->ddl_filename($type, $dir, $version);
1601   if(-f $filename)
1602   {
1603       my $file;
1604       open($file, "<$filename") 
1605         or $self->throw_exception("Can't open $filename ($!)");
1606       my @rows = <$file>;
1607       close($file);
1608       return join('', @rows);
1609   }
1610
1611   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09: '}
1612       . $self->_check_sqlt_message . q{'})
1613           if !$self->_check_sqlt_version;
1614
1615   require SQL::Translator::Parser::DBIx::Class;
1616   eval qq{use SQL::Translator::Producer::${type}};
1617   $self->throw_exception($@) if $@;
1618
1619   # sources needs to be a parser arg, but for simplicty allow at top level 
1620   # coming in
1621   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1622       if exists $sqltargs->{sources};
1623
1624   my $tr = SQL::Translator->new(%$sqltargs);
1625   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1626   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1627
1628   return;
1629
1630 }
1631
1632 sub deploy {
1633   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1634   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1635     foreach my $line ( split(";\n", $statement)) {
1636       next if($line =~ /^--/);
1637       next if(!$line);
1638 #      next if($line =~ /^DROP/m);
1639       next if($line =~ /^BEGIN TRANSACTION/m);
1640       next if($line =~ /^COMMIT/m);
1641       next if $line =~ /^\s+$/; # skip whitespace only
1642       $self->_query_start($line);
1643       eval {
1644         $self->dbh->do($line); # shouldn't be using ->dbh ?
1645       };
1646       if ($@) {
1647         warn qq{$@ (running "${line}")};
1648       }
1649       $self->_query_end($line);
1650     }
1651   }
1652 }
1653
1654 =head2 datetime_parser
1655
1656 Returns the datetime parser class
1657
1658 =cut
1659
1660 sub datetime_parser {
1661   my $self = shift;
1662   return $self->{datetime_parser} ||= do {
1663     $self->ensure_connected;
1664     $self->build_datetime_parser(@_);
1665   };
1666 }
1667
1668 =head2 datetime_parser_type
1669
1670 Defines (returns) the datetime parser class - currently hardwired to
1671 L<DateTime::Format::MySQL>
1672
1673 =cut
1674
1675 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1676
1677 =head2 build_datetime_parser
1678
1679 See L</datetime_parser>
1680
1681 =cut
1682
1683 sub build_datetime_parser {
1684   my $self = shift;
1685   my $type = $self->datetime_parser_type(@_);
1686   eval "use ${type}";
1687   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1688   return $type;
1689 }
1690
1691 {
1692     my $_check_sqlt_version; # private
1693     my $_check_sqlt_message; # private
1694     sub _check_sqlt_version {
1695         return $_check_sqlt_version if defined $_check_sqlt_version;
1696         eval 'use SQL::Translator "0.09"';
1697         $_check_sqlt_message = $@ || '';
1698         $_check_sqlt_version = !$@;
1699     }
1700
1701     sub _check_sqlt_message {
1702         _check_sqlt_version if !defined $_check_sqlt_message;
1703         $_check_sqlt_message;
1704     }
1705 }
1706
1707 sub DESTROY {
1708   my $self = shift;
1709   return if !$self->_dbh;
1710   $self->_verify_pid;
1711   $self->_dbh(undef);
1712 }
1713
1714 1;
1715
1716 =head1 SQL METHODS
1717
1718 The module defines a set of methods within the DBIC::SQL::Abstract
1719 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1720 SQL query functions.
1721
1722 The following methods are extended:-
1723
1724 =over 4
1725
1726 =item delete
1727
1728 =item insert
1729
1730 =item select
1731
1732 =item update
1733
1734 =item limit_dialect
1735
1736 See L</connect_info> for details.
1737 For setting, this method is deprecated in favor of L</connect_info>.
1738
1739 =item quote_char
1740
1741 See L</connect_info> for details.
1742 For setting, this method is deprecated in favor of L</connect_info>.
1743
1744 =item name_sep
1745
1746 See L</connect_info> for details.
1747 For setting, this method is deprecated in favor of L</connect_info>.
1748
1749 =back
1750
1751 =head1 AUTHORS
1752
1753 Matt S. Trout <mst@shadowcatsystems.co.uk>
1754
1755 Andy Grundman <andy@hybridized.org>
1756
1757 =head1 LICENSE
1758
1759 You may distribute this code under the same terms as Perl itself.
1760
1761 =cut