* Removed dead code (sub _RowNumberOver).
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use SQL::Abstract::Limit;
11 use DBIx::Class::Storage::DBI::Cursor;
12 use DBIx::Class::Storage::Statistics;
13 use Scalar::Util qw/blessed weaken/;
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
18 );
19
20 # the values for these accessors are picked out (and deleted) from
21 # the attribute hashref passed to connect_info
22 my @storage_options = qw/
23   on_connect_do on_disconnect_do disable_sth_caching unsafe auto_savepoint
24 /;
25 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
26
27
28 # default cursor class, overridable in connect_info attributes
29 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
30
31 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
32 __PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
33
34 BEGIN {
35
36 package # Hide from PAUSE
37   DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
38
39 use base qw/SQL::Abstract::Limit/;
40
41 # This prevents the caching of $dbh in S::A::L, I believe
42 sub new {
43   my $self = shift->SUPER::new(@_);
44
45   # If limit_dialect is a ref (like a $dbh), go ahead and replace
46   #   it with what it resolves to:
47   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
48     if ref $self->{limit_dialect};
49
50   $self;
51 }
52
53 # While we're at it, this should make LIMIT queries more efficient,
54 #  without digging into things too deeply
55 use Scalar::Util 'blessed';
56 sub _find_syntax {
57   my ($self, $syntax) = @_;
58   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
59   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
60     return 'RowNumberOver';
61   }
62
63   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
64 }
65
66 sub select {
67   my ($self, $table, $fields, $where, $order, @rest) = @_;
68   $table = $self->_quote($table) unless ref($table);
69   local $self->{rownum_hack_count} = 1
70     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
71   @rest = (-1) unless defined $rest[0];
72   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
73     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
74   local $self->{having_bind} = [];
75   my ($sql, @ret) = $self->SUPER::select(
76     $table, $self->_recurse_fields($fields), $where, $order, @rest
77   );
78   $sql .= 
79     $self->{for} ?
80     (
81       $self->{for} eq 'update' ? ' FOR UPDATE' :
82       $self->{for} eq 'shared' ? ' FOR SHARE'  :
83       ''
84     ) :
85     ''
86   ;
87   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
88 }
89
90 sub insert {
91   my $self = shift;
92   my $table = shift;
93   $table = $self->_quote($table) unless ref($table);
94   $self->SUPER::insert($table, @_);
95 }
96
97 sub update {
98   my $self = shift;
99   my $table = shift;
100   $table = $self->_quote($table) unless ref($table);
101   $self->SUPER::update($table, @_);
102 }
103
104 sub delete {
105   my $self = shift;
106   my $table = shift;
107   $table = $self->_quote($table) unless ref($table);
108   $self->SUPER::delete($table, @_);
109 }
110
111 sub _emulate_limit {
112   my $self = shift;
113   if ($_[3] == -1) {
114     return $_[1].$self->_order_by($_[2]);
115   } else {
116     return $self->SUPER::_emulate_limit(@_);
117   }
118 }
119
120 sub _recurse_fields {
121   my ($self, $fields, $params) = @_;
122   my $ref = ref $fields;
123   return $self->_quote($fields) unless $ref;
124   return $$fields if $ref eq 'SCALAR';
125
126   if ($ref eq 'ARRAY') {
127     return join(', ', map {
128       $self->_recurse_fields($_)
129         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
130           ? ' AS col'.$self->{rownum_hack_count}++
131           : '')
132       } @$fields);
133   } elsif ($ref eq 'HASH') {
134     foreach my $func (keys %$fields) {
135       return $self->_sqlcase($func)
136         .'( '.$self->_recurse_fields($fields->{$func}).' )';
137     }
138   }
139 }
140
141 sub _order_by {
142   my $self = shift;
143   my $ret = '';
144   my @extra;
145   if (ref $_[0] eq 'HASH') {
146     if (defined $_[0]->{group_by}) {
147       $ret = $self->_sqlcase(' group by ')
148         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
149     }
150     if (defined $_[0]->{having}) {
151       my $frag;
152       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
153       push(@{$self->{having_bind}}, @extra);
154       $ret .= $self->_sqlcase(' having ').$frag;
155     }
156     if (defined $_[0]->{order_by}) {
157       $ret .= $self->_order_by($_[0]->{order_by});
158     }
159   } elsif (ref $_[0] eq 'SCALAR') {
160     $ret = $self->_sqlcase(' order by ').${ $_[0] };
161   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
162     my @order = @{+shift};
163     $ret = $self->_sqlcase(' order by ')
164           .join(', ', map {
165                         my $r = $self->_order_by($_, @_);
166                         $r =~ s/^ ?ORDER BY //i;
167                         $r;
168                       } @order);
169   } else {
170     $ret = $self->SUPER::_order_by(@_);
171   }
172   return $ret;
173 }
174
175 sub _order_directions {
176   my ($self, $order) = @_;
177   $order = $order->{order_by} if ref $order eq 'HASH';
178   return $self->SUPER::_order_directions($order);
179 }
180
181 sub _table {
182   my ($self, $from) = @_;
183   if (ref $from eq 'ARRAY') {
184     return $self->_recurse_from(@$from);
185   } elsif (ref $from eq 'HASH') {
186     return $self->_make_as($from);
187   } else {
188     return $from; # would love to quote here but _table ends up getting called
189                   # twice during an ->select without a limit clause due to
190                   # the way S::A::Limit->select works. should maybe consider
191                   # bypassing this and doing S::A::select($self, ...) in
192                   # our select method above. meantime, quoting shims have
193                   # been added to select/insert/update/delete here
194   }
195 }
196
197 sub _recurse_from {
198   my ($self, $from, @join) = @_;
199   my @sqlf;
200   push(@sqlf, $self->_make_as($from));
201   foreach my $j (@join) {
202     my ($to, $on) = @$j;
203
204     # check whether a join type exists
205     my $join_clause = '';
206     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
207     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
208       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
209     } else {
210       $join_clause = ' JOIN ';
211     }
212     push(@sqlf, $join_clause);
213
214     if (ref $to eq 'ARRAY') {
215       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
216     } else {
217       push(@sqlf, $self->_make_as($to));
218     }
219     push(@sqlf, ' ON (', $self->_join_condition($on), ')');
220   }
221   return join('', @sqlf);
222 }
223
224 sub _make_as {
225   my ($self, $from) = @_;
226   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
227                      reverse each %{$self->_skip_options($from)});
228 }
229
230 sub _skip_options {
231   my ($self, $hash) = @_;
232   my $clean_hash = {};
233   $clean_hash->{$_} = $hash->{$_}
234     for grep {!/^-/} keys %$hash;
235   return $clean_hash;
236 }
237
238 sub _join_condition {
239   my ($self, $cond) = @_;
240   if (ref $cond eq 'HASH') {
241     my %j;
242     for (keys %$cond) {
243       my $v = $cond->{$_};
244       if (ref $v) {
245         # XXX no throw_exception() in this package and croak() fails with strange results
246         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
247             if ref($v) ne 'SCALAR';
248         $j{$_} = $v;
249       }
250       else {
251         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
252       }
253     };
254     return scalar($self->_recurse_where(\%j));
255   } elsif (ref $cond eq 'ARRAY') {
256     return join(' OR ', map { $self->_join_condition($_) } @$cond);
257   } else {
258     die "Can't handle this yet!";
259   }
260 }
261
262 sub _quote {
263   my ($self, $label) = @_;
264   return '' unless defined $label;
265   return "*" if $label eq '*';
266   return $label unless $self->{quote_char};
267   if(ref $self->{quote_char} eq "ARRAY"){
268     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
269       if !defined $self->{name_sep};
270     my $sep = $self->{name_sep};
271     return join($self->{name_sep},
272         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
273        split(/\Q$sep\E/,$label));
274   }
275   return $self->SUPER::_quote($label);
276 }
277
278 sub limit_dialect {
279     my $self = shift;
280     $self->{limit_dialect} = shift if @_;
281     return $self->{limit_dialect};
282 }
283
284 sub quote_char {
285     my $self = shift;
286     $self->{quote_char} = shift if @_;
287     return $self->{quote_char};
288 }
289
290 sub name_sep {
291     my $self = shift;
292     $self->{name_sep} = shift if @_;
293     return $self->{name_sep};
294 }
295
296 } # End of BEGIN block
297
298 =head1 NAME
299
300 DBIx::Class::Storage::DBI - DBI storage handler
301
302 =head1 SYNOPSIS
303
304   my $schema = MySchema->connect('dbi:SQLite:my.db');
305
306   $schema->storage->debug(1);
307   $schema->dbh_do("DROP TABLE authors");
308
309   $schema->resultset('Book')->search({
310      written_on => $schema->storage->datetime_parser(DateTime->now)
311   });
312
313 =head1 DESCRIPTION
314
315 This class represents the connection to an RDBMS via L<DBI>.  See
316 L<DBIx::Class::Storage> for general information.  This pod only
317 documents DBI-specific methods and behaviors.
318
319 =head1 METHODS
320
321 =cut
322
323 sub new {
324   my $new = shift->next::method(@_);
325
326   $new->transaction_depth(0);
327   $new->_sql_maker_opts({});
328   $new->{savepoints} = [];
329   $new->{_in_dbh_do} = 0;
330   $new->{_dbh_gen} = 0;
331
332   $new;
333 }
334
335 =head2 connect_info
336
337 This method is normally called by L<DBIx::Class::Schema/connection>, which
338 encapsulates its argument list in an arrayref before passing them here.
339
340 The argument list may contain:
341
342 =over
343
344 =item *
345
346 The same 4-element argument set one would normally pass to
347 L<DBI/connect>, optionally followed by
348 L<extra attributes|/DBIx::Class specific connection attributes>
349 recognized by DBIx::Class:
350
351   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
352
353 =item *
354
355 A single code reference which returns a connected 
356 L<DBI database handle|DBI/connect> optionally followed by 
357 L<extra attributes|/DBIx::Class specific connection attributes> recognized
358 by DBIx::Class:
359
360   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
361
362 =item *
363
364 A single hashref with all the attributes and the dsn/user/password
365 mixed together:
366
367   $connect_info_args = [{
368     dsn => $dsn,
369     user => $user,
370     password => $pass,
371     %dbi_attributes,
372     %extra_attributes,
373   }];
374
375 This is particularly useful for L<Catalyst> based applications, allowing the 
376 following config (L<Config::General> style):
377
378   <Model::DB>
379     schema_class   App::DB
380     <connect_info>
381       dsn          dbi:mysql:database=test
382       user         testuser
383       password     TestPass
384       AutoCommit   1
385     </connect_info>
386   </Model::DB>
387
388 =back
389
390 Please note that the L<DBI> docs recommend that you always explicitly
391 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
392 recommends that it be set to I<1>, and that you perform transactions
393 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
394 to I<1> if you do not do explicitly set it to zero.  This is the default 
395 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
396
397 =head3 DBIx::Class specific connection attributes
398
399 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
400 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
401 the following connection options. These options can be mixed in with your other
402 L<DBI> connection attributes, or placed in a seperate hashref
403 (C<\%extra_attributes>) as shown above.
404
405 Every time C<connect_info> is invoked, any previous settings for
406 these options will be cleared before setting the new ones, regardless of
407 whether any options are specified in the new C<connect_info>.
408
409
410 =over
411
412 =item on_connect_do
413
414 Specifies things to do immediately after connecting or re-connecting to
415 the database.  Its value may contain:
416
417 =over
418
419 =item an array reference
420
421 This contains SQL statements to execute in order.  Each element contains
422 a string or a code reference that returns a string.
423
424 =item a code reference
425
426 This contains some code to execute.  Unlike code references within an
427 array reference, its return value is ignored.
428
429 =back
430
431 =item on_disconnect_do
432
433 Takes arguments in the same form as L</on_connect_do> and executes them
434 immediately before disconnecting from the database.
435
436 Note, this only runs if you explicitly call L</disconnect> on the
437 storage object.
438
439 =item disable_sth_caching
440
441 If set to a true value, this option will disable the caching of
442 statement handles via L<DBI/prepare_cached>.
443
444 =item limit_dialect 
445
446 Sets the limit dialect. This is useful for JDBC-bridge among others
447 where the remote SQL-dialect cannot be determined by the name of the
448 driver alone. See also L<SQL::Abstract::Limit>.
449
450 =item quote_char
451
452 Specifies what characters to use to quote table and column names. If 
453 you use this you will want to specify L</name_sep> as well.
454
455 C<quote_char> expects either a single character, in which case is it
456 is placed on either side of the table/column name, or an arrayref of length
457 2 in which case the table/column name is placed between the elements.
458
459 For example under MySQL you should use C<< quote_char => '`' >>, and for
460 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
461
462 =item name_sep
463
464 This only needs to be used in conjunction with C<quote_char>, and is used to 
465 specify the charecter that seperates elements (schemas, tables, columns) from 
466 each other. In most cases this is simply a C<.>.
467
468 The consequences of not supplying this value is that L<SQL::Abstract>
469 will assume DBIx::Class' uses of aliases to be complete column
470 names. The output will look like I<"me.name"> when it should actually
471 be I<"me"."name">.
472
473 =item unsafe
474
475 This Storage driver normally installs its own C<HandleError>, sets
476 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
477 all database handles, including those supplied by a coderef.  It does this
478 so that it can have consistent and useful error behavior.
479
480 If you set this option to a true value, Storage will not do its usual
481 modifications to the database handle's attributes, and instead relies on
482 the settings in your connect_info DBI options (or the values you set in
483 your connection coderef, in the case that you are connecting via coderef).
484
485 Note that your custom settings can cause Storage to malfunction,
486 especially if you set a C<HandleError> handler that suppresses exceptions
487 and/or disable C<RaiseError>.
488
489 =item auto_savepoint
490
491 If this option is true, L<DBIx::Class> will use savepoints when nesting
492 transactions, making it possible to recover from failure in the inner
493 transaction without having to abort all outer transactions.
494
495 =item cursor_class
496
497 Use this argument to supply a cursor class other than the default
498 L<DBIx::Class::Storage::DBI::Cursor>.
499
500 =back
501
502 Some real-life examples of arguments to L</connect_info> and
503 L<DBIx::Class::Schema/connect>
504
505   # Simple SQLite connection
506   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
507
508   # Connect via subref
509   ->connect_info([ sub { DBI->connect(...) } ]);
510
511   # A bit more complicated
512   ->connect_info(
513     [
514       'dbi:Pg:dbname=foo',
515       'postgres',
516       'my_pg_password',
517       { AutoCommit => 1 },
518       { quote_char => q{"}, name_sep => q{.} },
519     ]
520   );
521
522   # Equivalent to the previous example
523   ->connect_info(
524     [
525       'dbi:Pg:dbname=foo',
526       'postgres',
527       'my_pg_password',
528       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
529     ]
530   );
531
532   # Same, but with hashref as argument
533   # See parse_connect_info for explanation
534   ->connect_info(
535     [{
536       dsn         => 'dbi:Pg:dbname=foo',
537       user        => 'postgres',
538       password    => 'my_pg_password',
539       AutoCommit  => 1,
540       quote_char  => q{"},
541       name_sep    => q{.},
542     }]
543   );
544
545   # Subref + DBIx::Class-specific connection options
546   ->connect_info(
547     [
548       sub { DBI->connect(...) },
549       {
550           quote_char => q{`},
551           name_sep => q{@},
552           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
553           disable_sth_caching => 1,
554       },
555     ]
556   );
557
558
559
560 =cut
561
562 sub connect_info {
563   my ($self, $info_arg) = @_;
564
565   return $self->_connect_info if !$info_arg;
566
567   my @args = @$info_arg;  # take a shallow copy for further mutilation
568   $self->_connect_info([@args]); # copy for _connect_info
569
570
571   # combine/pre-parse arguments depending on invocation style
572
573   my %attrs;
574   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
575     %attrs = %{ $args[1] || {} };
576     @args = $args[0];
577   }
578   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
579     %attrs = %{$args[0]};
580     @args = ();
581     for (qw/password user dsn/) {
582       unshift @args, delete $attrs{$_};
583     }
584   }
585   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
586     %attrs = (
587       % { $args[3] || {} },
588       % { $args[4] || {} },
589     );
590     @args = @args[0,1,2];
591   }
592
593   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
594   #  the new set of options
595   $self->_sql_maker(undef);
596   $self->_sql_maker_opts({});
597
598   if(keys %attrs) {
599     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
600       if(my $value = delete $attrs{$storage_opt}) {
601         $self->$storage_opt($value);
602       }
603     }
604     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
605       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
606         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
607       }
608     }
609   }
610
611   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
612
613   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
614   $self->_connect_info;
615 }
616
617 =head2 on_connect_do
618
619 This method is deprecated in favour of setting via L</connect_info>.
620
621
622 =head2 dbh_do
623
624 Arguments: ($subref | $method_name), @extra_coderef_args?
625
626 Execute the given $subref or $method_name using the new exception-based
627 connection management.
628
629 The first two arguments will be the storage object that C<dbh_do> was called
630 on and a database handle to use.  Any additional arguments will be passed
631 verbatim to the called subref as arguments 2 and onwards.
632
633 Using this (instead of $self->_dbh or $self->dbh) ensures correct
634 exception handling and reconnection (or failover in future subclasses).
635
636 Your subref should have no side-effects outside of the database, as
637 there is the potential for your subref to be partially double-executed
638 if the database connection was stale/dysfunctional.
639
640 Example:
641
642   my @stuff = $schema->storage->dbh_do(
643     sub {
644       my ($storage, $dbh, @cols) = @_;
645       my $cols = join(q{, }, @cols);
646       $dbh->selectrow_array("SELECT $cols FROM foo");
647     },
648     @column_list
649   );
650
651 =cut
652
653 sub dbh_do {
654   my $self = shift;
655   my $code = shift;
656
657   my $dbh = $self->_dbh;
658
659   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
660       || $self->{transaction_depth};
661
662   local $self->{_in_dbh_do} = 1;
663
664   my @result;
665   my $want_array = wantarray;
666
667   eval {
668     $self->_verify_pid if $dbh;
669     if(!$self->_dbh) {
670         $self->_populate_dbh;
671         $dbh = $self->_dbh;
672     }
673
674     if($want_array) {
675         @result = $self->$code($dbh, @_);
676     }
677     elsif(defined $want_array) {
678         $result[0] = $self->$code($dbh, @_);
679     }
680     else {
681         $self->$code($dbh, @_);
682     }
683   };
684
685   my $exception = $@;
686   if(!$exception) { return $want_array ? @result : $result[0] }
687
688   $self->throw_exception($exception) if $self->connected;
689
690   # We were not connected - reconnect and retry, but let any
691   #  exception fall right through this time
692   $self->_populate_dbh;
693   $self->$code($self->_dbh, @_);
694 }
695
696 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
697 # It also informs dbh_do to bypass itself while under the direction of txn_do,
698 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
699 sub txn_do {
700   my $self = shift;
701   my $coderef = shift;
702
703   ref $coderef eq 'CODE' or $self->throw_exception
704     ('$coderef must be a CODE reference');
705
706   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
707
708   local $self->{_in_dbh_do} = 1;
709
710   my @result;
711   my $want_array = wantarray;
712
713   my $tried = 0;
714   while(1) {
715     eval {
716       $self->_verify_pid if $self->_dbh;
717       $self->_populate_dbh if !$self->_dbh;
718
719       $self->txn_begin;
720       if($want_array) {
721           @result = $coderef->(@_);
722       }
723       elsif(defined $want_array) {
724           $result[0] = $coderef->(@_);
725       }
726       else {
727           $coderef->(@_);
728       }
729       $self->txn_commit;
730     };
731
732     my $exception = $@;
733     if(!$exception) { return $want_array ? @result : $result[0] }
734
735     if($tried++ > 0 || $self->connected) {
736       eval { $self->txn_rollback };
737       my $rollback_exception = $@;
738       if($rollback_exception) {
739         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
740         $self->throw_exception($exception)  # propagate nested rollback
741           if $rollback_exception =~ /$exception_class/;
742
743         $self->throw_exception(
744           "Transaction aborted: ${exception}. "
745           . "Rollback failed: ${rollback_exception}"
746         );
747       }
748       $self->throw_exception($exception)
749     }
750
751     # We were not connected, and was first try - reconnect and retry
752     # via the while loop
753     $self->_populate_dbh;
754   }
755 }
756
757 =head2 disconnect
758
759 Our C<disconnect> method also performs a rollback first if the
760 database is not in C<AutoCommit> mode.
761
762 =cut
763
764 sub disconnect {
765   my ($self) = @_;
766
767   if( $self->connected ) {
768     my $connection_do = $self->on_disconnect_do;
769     $self->_do_connection_actions($connection_do) if ref($connection_do);
770
771     $self->_dbh->rollback unless $self->_dbh_autocommit;
772     $self->_dbh->disconnect;
773     $self->_dbh(undef);
774     $self->{_dbh_gen}++;
775   }
776 }
777
778 =head2 with_deferred_fk_checks
779
780 =over 4
781
782 =item Arguments: C<$coderef>
783
784 =item Return Value: The return value of $coderef
785
786 =back
787
788 Storage specific method to run the code ref with FK checks deferred or
789 in MySQL's case disabled entirely.
790
791 =cut
792
793 # Storage subclasses should override this
794 sub with_deferred_fk_checks {
795   my ($self, $sub) = @_;
796
797   $sub->();
798 }
799
800 sub connected {
801   my ($self) = @_;
802
803   if(my $dbh = $self->_dbh) {
804       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
805           $self->_dbh(undef);
806           $self->{_dbh_gen}++;
807           return;
808       }
809       else {
810           $self->_verify_pid;
811           return 0 if !$self->_dbh;
812       }
813       return ($dbh->FETCH('Active') && $dbh->ping);
814   }
815
816   return 0;
817 }
818
819 # handle pid changes correctly
820 #  NOTE: assumes $self->_dbh is a valid $dbh
821 sub _verify_pid {
822   my ($self) = @_;
823
824   return if defined $self->_conn_pid && $self->_conn_pid == $$;
825
826   $self->_dbh->{InactiveDestroy} = 1;
827   $self->_dbh(undef);
828   $self->{_dbh_gen}++;
829
830   return;
831 }
832
833 sub ensure_connected {
834   my ($self) = @_;
835
836   unless ($self->connected) {
837     $self->_populate_dbh;
838   }
839 }
840
841 =head2 dbh
842
843 Returns the dbh - a data base handle of class L<DBI>.
844
845 =cut
846
847 sub dbh {
848   my ($self) = @_;
849
850   $self->ensure_connected;
851   return $self->_dbh;
852 }
853
854 sub _sql_maker_args {
855     my ($self) = @_;
856     
857     return ( bindtype=>'columns', limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
858 }
859
860 sub sql_maker {
861   my ($self) = @_;
862   unless ($self->_sql_maker) {
863     my $sql_maker_class = $self->sql_maker_class;
864     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
865   }
866   return $self->_sql_maker;
867 }
868
869 sub _rebless {}
870
871 sub _populate_dbh {
872   my ($self) = @_;
873   my @info = @{$self->_dbi_connect_info || []};
874   $self->_dbh($self->_connect(@info));
875
876   # Always set the transaction depth on connect, since
877   #  there is no transaction in progress by definition
878   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
879
880   if(ref $self eq 'DBIx::Class::Storage::DBI') {
881     my $driver = $self->_dbh->{Driver}->{Name};
882     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
883       bless $self, "DBIx::Class::Storage::DBI::${driver}";
884       $self->_rebless();
885     }
886   }
887
888   my $connection_do = $self->on_connect_do;
889   $self->_do_connection_actions($connection_do) if ref($connection_do);
890
891   $self->_conn_pid($$);
892   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
893 }
894
895 sub _do_connection_actions {
896   my $self = shift;
897   my $connection_do = shift;
898
899   if (ref $connection_do eq 'ARRAY') {
900     $self->_do_query($_) foreach @$connection_do;
901   }
902   elsif (ref $connection_do eq 'CODE') {
903     $connection_do->();
904   }
905
906   return $self;
907 }
908
909 sub _do_query {
910   my ($self, $action) = @_;
911
912   if (ref $action eq 'CODE') {
913     $action = $action->($self);
914     $self->_do_query($_) foreach @$action;
915   }
916   else {
917     my @to_run = (ref $action eq 'ARRAY') ? (@$action) : ($action);
918     $self->_query_start(@to_run);
919     $self->_dbh->do(@to_run);
920     $self->_query_end(@to_run);
921   }
922
923   return $self;
924 }
925
926 sub _connect {
927   my ($self, @info) = @_;
928
929   $self->throw_exception("You failed to provide any connection info")
930     if !@info;
931
932   my ($old_connect_via, $dbh);
933
934   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
935     $old_connect_via = $DBI::connect_via;
936     $DBI::connect_via = 'connect';
937   }
938
939   eval {
940     if(ref $info[0] eq 'CODE') {
941        $dbh = &{$info[0]}
942     }
943     else {
944        $dbh = DBI->connect(@info);
945     }
946
947     if($dbh && !$self->unsafe) {
948       my $weak_self = $self;
949       weaken($weak_self);
950       $dbh->{HandleError} = sub {
951           if ($weak_self) {
952             $weak_self->throw_exception("DBI Exception: $_[0]");
953           }
954           else {
955             croak ("DBI Exception: $_[0]");
956           }
957       };
958       $dbh->{ShowErrorStatement} = 1;
959       $dbh->{RaiseError} = 1;
960       $dbh->{PrintError} = 0;
961     }
962   };
963
964   $DBI::connect_via = $old_connect_via if $old_connect_via;
965
966   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
967     if !$dbh || $@;
968
969   $self->_dbh_autocommit($dbh->{AutoCommit});
970
971   $dbh;
972 }
973
974 sub svp_begin {
975   my ($self, $name) = @_;
976
977   $name = $self->_svp_generate_name
978     unless defined $name;
979
980   $self->throw_exception ("You can't use savepoints outside a transaction")
981     if $self->{transaction_depth} == 0;
982
983   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
984     unless $self->can('_svp_begin');
985   
986   push @{ $self->{savepoints} }, $name;
987
988   $self->debugobj->svp_begin($name) if $self->debug;
989   
990   return $self->_svp_begin($name);
991 }
992
993 sub svp_release {
994   my ($self, $name) = @_;
995
996   $self->throw_exception ("You can't use savepoints outside a transaction")
997     if $self->{transaction_depth} == 0;
998
999   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1000     unless $self->can('_svp_release');
1001
1002   if (defined $name) {
1003     $self->throw_exception ("Savepoint '$name' does not exist")
1004       unless grep { $_ eq $name } @{ $self->{savepoints} };
1005
1006     # Dig through the stack until we find the one we are releasing.  This keeps
1007     # the stack up to date.
1008     my $svp;
1009
1010     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1011   } else {
1012     $name = pop @{ $self->{savepoints} };
1013   }
1014
1015   $self->debugobj->svp_release($name) if $self->debug;
1016
1017   return $self->_svp_release($name);
1018 }
1019
1020 sub svp_rollback {
1021   my ($self, $name) = @_;
1022
1023   $self->throw_exception ("You can't use savepoints outside a transaction")
1024     if $self->{transaction_depth} == 0;
1025
1026   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1027     unless $self->can('_svp_rollback');
1028
1029   if (defined $name) {
1030       # If they passed us a name, verify that it exists in the stack
1031       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1032           $self->throw_exception("Savepoint '$name' does not exist!");
1033       }
1034
1035       # Dig through the stack until we find the one we are releasing.  This keeps
1036       # the stack up to date.
1037       while(my $s = pop(@{ $self->{savepoints} })) {
1038           last if($s eq $name);
1039       }
1040       # Add the savepoint back to the stack, as a rollback doesn't remove the
1041       # named savepoint, only everything after it.
1042       push(@{ $self->{savepoints} }, $name);
1043   } else {
1044       # We'll assume they want to rollback to the last savepoint
1045       $name = $self->{savepoints}->[-1];
1046   }
1047
1048   $self->debugobj->svp_rollback($name) if $self->debug;
1049   
1050   return $self->_svp_rollback($name);
1051 }
1052
1053 sub _svp_generate_name {
1054     my ($self) = @_;
1055
1056     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1057 }
1058
1059 sub txn_begin {
1060   my $self = shift;
1061   $self->ensure_connected();
1062   if($self->{transaction_depth} == 0) {
1063     $self->debugobj->txn_begin()
1064       if $self->debug;
1065     # this isn't ->_dbh-> because
1066     #  we should reconnect on begin_work
1067     #  for AutoCommit users
1068     $self->dbh->begin_work;
1069   } elsif ($self->auto_savepoint) {
1070     $self->svp_begin;
1071   }
1072   $self->{transaction_depth}++;
1073 }
1074
1075 sub txn_commit {
1076   my $self = shift;
1077   if ($self->{transaction_depth} == 1) {
1078     my $dbh = $self->_dbh;
1079     $self->debugobj->txn_commit()
1080       if ($self->debug);
1081     $dbh->commit;
1082     $self->{transaction_depth} = 0
1083       if $self->_dbh_autocommit;
1084   }
1085   elsif($self->{transaction_depth} > 1) {
1086     $self->{transaction_depth}--;
1087     $self->svp_release
1088       if $self->auto_savepoint;
1089   }
1090 }
1091
1092 sub txn_rollback {
1093   my $self = shift;
1094   my $dbh = $self->_dbh;
1095   eval {
1096     if ($self->{transaction_depth} == 1) {
1097       $self->debugobj->txn_rollback()
1098         if ($self->debug);
1099       $self->{transaction_depth} = 0
1100         if $self->_dbh_autocommit;
1101       $dbh->rollback;
1102     }
1103     elsif($self->{transaction_depth} > 1) {
1104       $self->{transaction_depth}--;
1105       if ($self->auto_savepoint) {
1106         $self->svp_rollback;
1107         $self->svp_release;
1108       }
1109     }
1110     else {
1111       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1112     }
1113   };
1114   if ($@) {
1115     my $error = $@;
1116     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1117     $error =~ /$exception_class/ and $self->throw_exception($error);
1118     # ensure that a failed rollback resets the transaction depth
1119     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1120     $self->throw_exception($error);
1121   }
1122 }
1123
1124 # This used to be the top-half of _execute.  It was split out to make it
1125 #  easier to override in NoBindVars without duping the rest.  It takes up
1126 #  all of _execute's args, and emits $sql, @bind.
1127 sub _prep_for_execute {
1128   my ($self, $op, $extra_bind, $ident, $args) = @_;
1129
1130   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1131   unshift(@bind,
1132     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1133       if $extra_bind;
1134
1135   return ($sql, \@bind);
1136 }
1137
1138 sub _fix_bind_params {
1139     my ($self, @bind) = @_;
1140
1141     ### Turn @bind from something like this:
1142     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1143     ### to this:
1144     ###   ( "'1'", "'1'", "'3'" )
1145     return
1146         map {
1147             if ( defined( $_ && $_->[1] ) ) {
1148                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1149             }
1150             else { q{'NULL'}; }
1151         } @bind;
1152 }
1153
1154 sub _query_start {
1155     my ( $self, $sql, @bind ) = @_;
1156
1157     if ( $self->debug ) {
1158         @bind = $self->_fix_bind_params(@bind);
1159         
1160         $self->debugobj->query_start( $sql, @bind );
1161     }
1162 }
1163
1164 sub _query_end {
1165     my ( $self, $sql, @bind ) = @_;
1166
1167     if ( $self->debug ) {
1168         @bind = $self->_fix_bind_params(@bind);
1169         $self->debugobj->query_end( $sql, @bind );
1170     }
1171 }
1172
1173 sub _dbh_execute {
1174   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1175   
1176   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1177     $ident = $ident->from();
1178   }
1179
1180   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1181
1182   $self->_query_start( $sql, @$bind );
1183
1184   my $sth = $self->sth($sql,$op);
1185
1186   my $placeholder_index = 1; 
1187
1188   foreach my $bound (@$bind) {
1189     my $attributes = {};
1190     my($column_name, @data) = @$bound;
1191
1192     if ($bind_attributes) {
1193       $attributes = $bind_attributes->{$column_name}
1194       if defined $bind_attributes->{$column_name};
1195     }
1196
1197     foreach my $data (@data) {
1198       $data = ref $data ? ''.$data : $data; # stringify args
1199
1200       $sth->bind_param($placeholder_index, $data, $attributes);
1201       $placeholder_index++;
1202     }
1203   }
1204
1205   # Can this fail without throwing an exception anyways???
1206   my $rv = $sth->execute();
1207   $self->throw_exception($sth->errstr) if !$rv;
1208
1209   $self->_query_end( $sql, @$bind );
1210
1211   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1212 }
1213
1214 sub _execute {
1215     my $self = shift;
1216     $self->dbh_do('_dbh_execute', @_)
1217 }
1218
1219 sub insert {
1220   my ($self, $source, $to_insert) = @_;
1221   
1222   my $ident = $source->from; 
1223   my $bind_attributes = $self->source_bind_attributes($source);
1224
1225   $self->ensure_connected;
1226   foreach my $col ( $source->columns ) {
1227     if ( !defined $to_insert->{$col} ) {
1228       my $col_info = $source->column_info($col);
1229
1230       if ( $col_info->{auto_nextval} ) {
1231         $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1232       }
1233     }
1234   }
1235
1236   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1237
1238   return $to_insert;
1239 }
1240
1241 ## Still not quite perfect, and EXPERIMENTAL
1242 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1243 ## scalar refs, or at least, all the same type as the first set, the statement is
1244 ## only prepped once.
1245 sub insert_bulk {
1246   my ($self, $source, $cols, $data) = @_;
1247   my %colvalues;
1248   my $table = $source->from;
1249   @colvalues{@$cols} = (0..$#$cols);
1250   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1251   
1252   $self->_query_start( $sql, @bind );
1253   my $sth = $self->sth($sql);
1254
1255 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1256
1257   ## This must be an arrayref, else nothing works!
1258   
1259   my $tuple_status = [];
1260   
1261   ##use Data::Dumper;
1262   ##print STDERR Dumper( $data, $sql, [@bind] );
1263
1264   my $time = time();
1265
1266   ## Get the bind_attributes, if any exist
1267   my $bind_attributes = $self->source_bind_attributes($source);
1268
1269   ## Bind the values and execute
1270   my $placeholder_index = 1; 
1271
1272   foreach my $bound (@bind) {
1273
1274     my $attributes = {};
1275     my ($column_name, $data_index) = @$bound;
1276
1277     if( $bind_attributes ) {
1278       $attributes = $bind_attributes->{$column_name}
1279       if defined $bind_attributes->{$column_name};
1280     }
1281
1282     my @data = map { $_->[$data_index] } @$data;
1283
1284     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1285     $placeholder_index++;
1286   }
1287   my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
1288   $self->throw_exception($sth->errstr) if !$rv;
1289
1290   $self->_query_end( $sql, @bind );
1291   return (wantarray ? ($rv, $sth, @bind) : $rv);
1292 }
1293
1294 sub update {
1295   my $self = shift @_;
1296   my $source = shift @_;
1297   my $bind_attributes = $self->source_bind_attributes($source);
1298   
1299   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1300 }
1301
1302
1303 sub delete {
1304   my $self = shift @_;
1305   my $source = shift @_;
1306   
1307   my $bind_attrs = {}; ## If ever it's needed...
1308   
1309   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1310 }
1311
1312 sub _select {
1313   my ($self, $ident, $select, $condition, $attrs) = @_;
1314   my $order = $attrs->{order_by};
1315
1316   if (ref $condition eq 'SCALAR') {
1317     my $unwrap = ${$condition};
1318     if ($unwrap =~ s/ORDER BY (.*)$//i) {
1319       $order = $1;
1320       $condition = \$unwrap;
1321     }
1322   }
1323
1324   my $for = delete $attrs->{for};
1325   my $sql_maker = $self->sql_maker;
1326   local $sql_maker->{for} = $for;
1327
1328   if (exists $attrs->{group_by} || $attrs->{having}) {
1329     $order = {
1330       group_by => $attrs->{group_by},
1331       having => $attrs->{having},
1332       ($order ? (order_by => $order) : ())
1333     };
1334   }
1335   my $bind_attrs = {}; ## Future support
1336   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1337   if ($attrs->{software_limit} ||
1338       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1339         $attrs->{software_limit} = 1;
1340   } else {
1341     $self->throw_exception("rows attribute must be positive if present")
1342       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1343
1344     # MySQL actually recommends this approach.  I cringe.
1345     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1346     push @args, $attrs->{rows}, $attrs->{offset};
1347   }
1348
1349   return $self->_execute(@args);
1350 }
1351
1352 sub source_bind_attributes {
1353   my ($self, $source) = @_;
1354   
1355   my $bind_attributes;
1356   foreach my $column ($source->columns) {
1357   
1358     my $data_type = $source->column_info($column)->{data_type} || '';
1359     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1360      if $data_type;
1361   }
1362
1363   return $bind_attributes;
1364 }
1365
1366 =head2 select
1367
1368 =over 4
1369
1370 =item Arguments: $ident, $select, $condition, $attrs
1371
1372 =back
1373
1374 Handle a SQL select statement.
1375
1376 =cut
1377
1378 sub select {
1379   my $self = shift;
1380   my ($ident, $select, $condition, $attrs) = @_;
1381   return $self->cursor_class->new($self, \@_, $attrs);
1382 }
1383
1384 sub select_single {
1385   my $self = shift;
1386   my ($rv, $sth, @bind) = $self->_select(@_);
1387   my @row = $sth->fetchrow_array;
1388   my @nextrow = $sth->fetchrow_array if @row;
1389   if(@row && @nextrow) {
1390     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1391   }
1392   # Need to call finish() to work round broken DBDs
1393   $sth->finish();
1394   return @row;
1395 }
1396
1397 =head2 sth
1398
1399 =over 4
1400
1401 =item Arguments: $sql
1402
1403 =back
1404
1405 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1406
1407 =cut
1408
1409 sub _dbh_sth {
1410   my ($self, $dbh, $sql) = @_;
1411
1412   # 3 is the if_active parameter which avoids active sth re-use
1413   my $sth = $self->disable_sth_caching
1414     ? $dbh->prepare($sql)
1415     : $dbh->prepare_cached($sql, {}, 3);
1416
1417   # XXX You would think RaiseError would make this impossible,
1418   #  but apparently that's not true :(
1419   $self->throw_exception($dbh->errstr) if !$sth;
1420
1421   $sth;
1422 }
1423
1424 sub sth {
1425   my ($self, $sql) = @_;
1426   $self->dbh_do('_dbh_sth', $sql);
1427 }
1428
1429 sub _dbh_columns_info_for {
1430   my ($self, $dbh, $table) = @_;
1431
1432   if ($dbh->can('column_info')) {
1433     my %result;
1434     eval {
1435       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1436       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1437       $sth->execute();
1438       while ( my $info = $sth->fetchrow_hashref() ){
1439         my %column_info;
1440         $column_info{data_type}   = $info->{TYPE_NAME};
1441         $column_info{size}      = $info->{COLUMN_SIZE};
1442         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1443         $column_info{default_value} = $info->{COLUMN_DEF};
1444         my $col_name = $info->{COLUMN_NAME};
1445         $col_name =~ s/^\"(.*)\"$/$1/;
1446
1447         $result{$col_name} = \%column_info;
1448       }
1449     };
1450     return \%result if !$@ && scalar keys %result;
1451   }
1452
1453   my %result;
1454   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1455   $sth->execute;
1456   my @columns = @{$sth->{NAME_lc}};
1457   for my $i ( 0 .. $#columns ){
1458     my %column_info;
1459     $column_info{data_type} = $sth->{TYPE}->[$i];
1460     $column_info{size} = $sth->{PRECISION}->[$i];
1461     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1462
1463     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1464       $column_info{data_type} = $1;
1465       $column_info{size}    = $2;
1466     }
1467
1468     $result{$columns[$i]} = \%column_info;
1469   }
1470   $sth->finish;
1471
1472   foreach my $col (keys %result) {
1473     my $colinfo = $result{$col};
1474     my $type_num = $colinfo->{data_type};
1475     my $type_name;
1476     if(defined $type_num && $dbh->can('type_info')) {
1477       my $type_info = $dbh->type_info($type_num);
1478       $type_name = $type_info->{TYPE_NAME} if $type_info;
1479       $colinfo->{data_type} = $type_name if $type_name;
1480     }
1481   }
1482
1483   return \%result;
1484 }
1485
1486 sub columns_info_for {
1487   my ($self, $table) = @_;
1488   $self->dbh_do('_dbh_columns_info_for', $table);
1489 }
1490
1491 =head2 last_insert_id
1492
1493 Return the row id of the last insert.
1494
1495 =cut
1496
1497 sub _dbh_last_insert_id {
1498     my ($self, $dbh, $source, $col) = @_;
1499     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1500     $dbh->func('last_insert_rowid');
1501 }
1502
1503 sub last_insert_id {
1504   my $self = shift;
1505   $self->dbh_do('_dbh_last_insert_id', @_);
1506 }
1507
1508 =head2 sqlt_type
1509
1510 Returns the database driver name.
1511
1512 =cut
1513
1514 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1515
1516 =head2 bind_attribute_by_data_type
1517
1518 Given a datatype from column info, returns a database specific bind
1519 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1520 let the database planner just handle it.
1521
1522 Generally only needed for special case column types, like bytea in postgres.
1523
1524 =cut
1525
1526 sub bind_attribute_by_data_type {
1527     return;
1528 }
1529
1530 =head2 create_ddl_dir
1531
1532 =over 4
1533
1534 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1535
1536 =back
1537
1538 Creates a SQL file based on the Schema, for each of the specified
1539 database types, in the given directory.
1540
1541 By default, C<\%sqlt_args> will have
1542
1543  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1544
1545 merged with the hash passed in. To disable any of those features, pass in a 
1546 hashref like the following
1547
1548  { ignore_constraint_names => 0, # ... other options }
1549
1550 =cut
1551
1552 sub create_ddl_dir {
1553   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1554
1555   if(!$dir || !-d $dir) {
1556     warn "No directory given, using ./\n";
1557     $dir = "./";
1558   }
1559   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1560   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1561
1562   my $schema_version = $schema->schema_version || '1.x';
1563   $version ||= $schema_version;
1564
1565   $sqltargs = {
1566     add_drop_table => 1, 
1567     ignore_constraint_names => 1,
1568     ignore_index_names => 1,
1569     %{$sqltargs || {}}
1570   };
1571
1572   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09: '}
1573       . $self->_check_sqlt_message . q{'})
1574           if !$self->_check_sqlt_version;
1575
1576   my $sqlt = SQL::Translator->new( $sqltargs );
1577
1578   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1579   my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
1580
1581   foreach my $db (@$databases) {
1582     $sqlt->reset();
1583     $sqlt = $self->configure_sqlt($sqlt, $db);
1584     $sqlt->{schema} = $sqlt_schema;
1585     $sqlt->producer($db);
1586
1587     my $file;
1588     my $filename = $schema->ddl_filename($db, $version, $dir);
1589     if (-e $filename && ($version eq $schema_version )) {
1590       # if we are dumping the current version, overwrite the DDL
1591       warn "Overwriting existing DDL file - $filename";
1592       unlink($filename);
1593     }
1594
1595     my $output = $sqlt->translate;
1596     if(!$output) {
1597       warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1598       next;
1599     }
1600     if(!open($file, ">$filename")) {
1601       $self->throw_exception("Can't open $filename for writing ($!)");
1602       next;
1603     }
1604     print $file $output;
1605     close($file);
1606   
1607     next unless ($preversion);
1608
1609     require SQL::Translator::Diff;
1610
1611     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1612     if(!-e $prefilename) {
1613       warn("No previous schema file found ($prefilename)");
1614       next;
1615     }
1616
1617     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1618     if(-e $difffile) {
1619       warn("Overwriting existing diff file - $difffile");
1620       unlink($difffile);
1621     }
1622     
1623     my $source_schema;
1624     {
1625       my $t = SQL::Translator->new($sqltargs);
1626       $t->debug( 0 );
1627       $t->trace( 0 );
1628       $t->parser( $db )                       or die $t->error;
1629       $t = $self->configure_sqlt($t, $db);
1630       my $out = $t->translate( $prefilename ) or die $t->error;
1631       $source_schema = $t->schema;
1632       unless ( $source_schema->name ) {
1633         $source_schema->name( $prefilename );
1634       }
1635     }
1636
1637     # The "new" style of producers have sane normalization and can support 
1638     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1639     # And we have to diff parsed SQL against parsed SQL.
1640     my $dest_schema = $sqlt_schema;
1641     
1642     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1643       my $t = SQL::Translator->new($sqltargs);
1644       $t->debug( 0 );
1645       $t->trace( 0 );
1646       $t->parser( $db )                    or die $t->error;
1647       $t = $self->configure_sqlt($t, $db);
1648       my $out = $t->translate( $filename ) or die $t->error;
1649       $dest_schema = $t->schema;
1650       $dest_schema->name( $filename )
1651         unless $dest_schema->name;
1652     }
1653     
1654     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1655                                                   $dest_schema,   $db,
1656                                                   $sqltargs
1657                                                  );
1658     if(!open $file, ">$difffile") { 
1659       $self->throw_exception("Can't write to $difffile ($!)");
1660       next;
1661     }
1662     print $file $diff;
1663     close($file);
1664   }
1665 }
1666
1667 sub configure_sqlt() {
1668   my $self = shift;
1669   my $tr = shift;
1670   my $db = shift || $self->sqlt_type;
1671   if ($db eq 'PostgreSQL') {
1672     $tr->quote_table_names(0);
1673     $tr->quote_field_names(0);
1674   }
1675   return $tr;
1676 }
1677
1678 =head2 deployment_statements
1679
1680 =over 4
1681
1682 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1683
1684 =back
1685
1686 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1687 The database driver name is given by C<$type>, though the value from
1688 L</sqlt_type> is used if it is not specified.
1689
1690 C<$directory> is used to return statements from files in a previously created
1691 L</create_ddl_dir> directory and is optional. The filenames are constructed
1692 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1693
1694 If no C<$directory> is specified then the statements are constructed on the
1695 fly using L<SQL::Translator> and C<$version> is ignored.
1696
1697 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1698
1699 =cut
1700
1701 sub deployment_statements {
1702   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1703   # Need to be connected to get the correct sqlt_type
1704   $self->ensure_connected() unless $type;
1705   $type ||= $self->sqlt_type;
1706   $version ||= $schema->schema_version || '1.x';
1707   $dir ||= './';
1708   my $filename = $schema->ddl_filename($type, $dir, $version);
1709   if(-f $filename)
1710   {
1711       my $file;
1712       open($file, "<$filename") 
1713         or $self->throw_exception("Can't open $filename ($!)");
1714       my @rows = <$file>;
1715       close($file);
1716       return join('', @rows);
1717   }
1718
1719   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09: '}
1720       . $self->_check_sqlt_message . q{'})
1721           if !$self->_check_sqlt_version;
1722
1723   require SQL::Translator::Parser::DBIx::Class;
1724   eval qq{use SQL::Translator::Producer::${type}};
1725   $self->throw_exception($@) if $@;
1726
1727   # sources needs to be a parser arg, but for simplicty allow at top level 
1728   # coming in
1729   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1730       if exists $sqltargs->{sources};
1731
1732   my $tr = SQL::Translator->new(%$sqltargs);
1733   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1734   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1735 }
1736
1737 sub deploy {
1738   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1739   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1740     foreach my $line ( split(";\n", $statement)) {
1741       next if($line =~ /^--/);
1742       next if(!$line);
1743 #      next if($line =~ /^DROP/m);
1744       next if($line =~ /^BEGIN TRANSACTION/m);
1745       next if($line =~ /^COMMIT/m);
1746       next if $line =~ /^\s+$/; # skip whitespace only
1747       $self->_query_start($line);
1748       eval {
1749         $self->dbh->do($line); # shouldn't be using ->dbh ?
1750       };
1751       if ($@) {
1752         warn qq{$@ (running "${line}")};
1753       }
1754       $self->_query_end($line);
1755     }
1756   }
1757 }
1758
1759 =head2 datetime_parser
1760
1761 Returns the datetime parser class
1762
1763 =cut
1764
1765 sub datetime_parser {
1766   my $self = shift;
1767   return $self->{datetime_parser} ||= do {
1768     $self->ensure_connected;
1769     $self->build_datetime_parser(@_);
1770   };
1771 }
1772
1773 =head2 datetime_parser_type
1774
1775 Defines (returns) the datetime parser class - currently hardwired to
1776 L<DateTime::Format::MySQL>
1777
1778 =cut
1779
1780 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1781
1782 =head2 build_datetime_parser
1783
1784 See L</datetime_parser>
1785
1786 =cut
1787
1788 sub build_datetime_parser {
1789   my $self = shift;
1790   my $type = $self->datetime_parser_type(@_);
1791   eval "use ${type}";
1792   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1793   return $type;
1794 }
1795
1796 {
1797     my $_check_sqlt_version; # private
1798     my $_check_sqlt_message; # private
1799     sub _check_sqlt_version {
1800         return $_check_sqlt_version if defined $_check_sqlt_version;
1801         eval 'use SQL::Translator "0.09"';
1802         $_check_sqlt_message = $@ || '';
1803         $_check_sqlt_version = !$@;
1804     }
1805
1806     sub _check_sqlt_message {
1807         _check_sqlt_version if !defined $_check_sqlt_message;
1808         $_check_sqlt_message;
1809     }
1810 }
1811
1812 =head2 is_replicating
1813
1814 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
1815 replicate from a master database.  Default is undef, which is the result
1816 returned by databases that don't support replication.
1817
1818 =cut
1819
1820 sub is_replicating {
1821     return;
1822     
1823 }
1824
1825 =head2 lag_behind_master
1826
1827 Returns a number that represents a certain amount of lag behind a master db
1828 when a given storage is replicating.  The number is database dependent, but
1829 starts at zero and increases with the amount of lag. Default in undef
1830
1831 =cut
1832
1833 sub lag_behind_master {
1834     return;
1835 }
1836
1837 sub DESTROY {
1838   my $self = shift;
1839   return if !$self->_dbh;
1840   $self->_verify_pid;
1841   $self->_dbh(undef);
1842 }
1843
1844 1;
1845
1846 =head1 USAGE NOTES
1847
1848 =head2 DBIx::Class and AutoCommit
1849
1850 DBIx::Class can do some wonderful magic with handling exceptions,
1851 disconnections, and transactions when you use C<< AutoCommit => 1 >>
1852 combined with C<txn_do> for transaction support.
1853
1854 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
1855 in an assumed transaction between commits, and you're telling us you'd
1856 like to manage that manually.  A lot of the magic protections offered by
1857 this module will go away.  We can't protect you from exceptions due to database
1858 disconnects because we don't know anything about how to restart your
1859 transactions.  You're on your own for handling all sorts of exceptional
1860 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
1861 be with raw DBI.
1862
1863
1864 =head1 SQL METHODS
1865
1866 The module defines a set of methods within the DBIC::SQL::Abstract
1867 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1868 SQL query functions.
1869
1870 The following methods are extended:-
1871
1872 =over 4
1873
1874 =item delete
1875
1876 =item insert
1877
1878 =item select
1879
1880 =item update
1881
1882 =item limit_dialect
1883
1884 See L</connect_info> for details.
1885
1886 =item quote_char
1887
1888 See L</connect_info> for details.
1889
1890 =item name_sep
1891
1892 See L</connect_info> for details.
1893
1894 =back
1895
1896 =head1 AUTHORS
1897
1898 Matt S. Trout <mst@shadowcatsystems.co.uk>
1899
1900 Andy Grundman <andy@hybridized.org>
1901
1902 =head1 LICENSE
1903
1904 You may distribute this code under the same terms as Perl itself.
1905
1906 =cut