DB2 compatibility fix, update test to add new artist column (ick)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use SQL::Abstract::Limit;
11 use DBIx::Class::Storage::DBI::Cursor;
12 use DBIx::Class::Storage::Statistics;
13 use Scalar::Util qw/blessed weaken/;
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
18 );
19
20 # the values for these accessors are picked out (and deleted) from
21 # the attribute hashref passed to connect_info
22 my @storage_options = qw/
23   on_connect_do on_disconnect_do disable_sth_caching unsafe auto_savepoint
24 /;
25 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
26
27
28 # default cursor class, overridable in connect_info attributes
29 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
30
31 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
32 __PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
33
34 BEGIN {
35
36 package # Hide from PAUSE
37   DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
38
39 use base qw/SQL::Abstract::Limit/;
40
41 # This prevents the caching of $dbh in S::A::L, I believe
42 sub new {
43   my $self = shift->SUPER::new(@_);
44
45   # If limit_dialect is a ref (like a $dbh), go ahead and replace
46   #   it with what it resolves to:
47   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
48     if ref $self->{limit_dialect};
49
50   $self;
51 }
52
53
54 # While we're at it, this should make LIMIT queries more efficient,
55 #  without digging into things too deeply
56 use Scalar::Util 'blessed';
57
58 sub select {
59   my ($self, $table, $fields, $where, $order, @rest) = @_;
60   if (ref $table eq 'SCALAR') {
61     $table = $$table;
62   }
63   elsif (not ref $table) {
64     $table = $self->_quote($table);
65   }
66   local $self->{rownum_hack_count} = 1
67     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
68   @rest = (-1) unless defined $rest[0];
69   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
70     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
71   local $self->{having_bind} = [];
72   my ($sql, @ret) = $self->SUPER::select(
73     $table, $self->_recurse_fields($fields), $where, $order, @rest
74   );
75   $sql .= 
76     $self->{for} ?
77     (
78       $self->{for} eq 'update' ? ' FOR UPDATE' :
79       $self->{for} eq 'shared' ? ' FOR SHARE'  :
80       ''
81     ) :
82     ''
83   ;
84   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
85 }
86
87 sub insert {
88   my $self = shift;
89   my $table = shift;
90   $table = $self->_quote($table) unless ref($table);
91   $self->SUPER::insert($table, @_);
92 }
93
94 sub update {
95   my $self = shift;
96   my $table = shift;
97   $table = $self->_quote($table) unless ref($table);
98   $self->SUPER::update($table, @_);
99 }
100
101 sub delete {
102   my $self = shift;
103   my $table = shift;
104   $table = $self->_quote($table) unless ref($table);
105   $self->SUPER::delete($table, @_);
106 }
107
108 sub _emulate_limit {
109   my $self = shift;
110   if ($_[3] == -1) {
111     return $_[1].$self->_order_by($_[2]);
112   } else {
113     return $self->SUPER::_emulate_limit(@_);
114   }
115 }
116
117 sub _recurse_fields {
118   my ($self, $fields, $params) = @_;
119   my $ref = ref $fields;
120   return $self->_quote($fields) unless $ref;
121   return $$fields if $ref eq 'SCALAR';
122
123   if ($ref eq 'ARRAY') {
124     return join(', ', map {
125       $self->_recurse_fields($_)
126         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
127           ? ' AS col'.$self->{rownum_hack_count}++
128           : '')
129       } @$fields);
130   } elsif ($ref eq 'HASH') {
131     foreach my $func (keys %$fields) {
132       return $self->_sqlcase($func)
133         .'( '.$self->_recurse_fields($fields->{$func}).' )';
134     }
135   }
136 }
137
138 sub _order_by {
139   my $self = shift;
140   my $ret = '';
141   my @extra;
142   if (ref $_[0] eq 'HASH') {
143     if (defined $_[0]->{group_by}) {
144       $ret = $self->_sqlcase(' group by ')
145         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
146     }
147     if (defined $_[0]->{having}) {
148       my $frag;
149       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
150       push(@{$self->{having_bind}}, @extra);
151       $ret .= $self->_sqlcase(' having ').$frag;
152     }
153     if (defined $_[0]->{order_by}) {
154       $ret .= $self->_order_by($_[0]->{order_by});
155     }
156     if (grep { $_ =~ /^-(desc|asc)/i } keys %{$_[0]}) {
157       return $self->SUPER::_order_by($_[0]);
158     }
159   } elsif (ref $_[0] eq 'SCALAR') {
160     $ret = $self->_sqlcase(' order by ').${ $_[0] };
161   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
162     my @order = @{+shift};
163     $ret = $self->_sqlcase(' order by ')
164           .join(', ', map {
165                         my $r = $self->_order_by($_, @_);
166                         $r =~ s/^ ?ORDER BY //i;
167                         $r;
168                       } @order);
169   } else {
170     $ret = $self->SUPER::_order_by(@_);
171   }
172   return $ret;
173 }
174
175 sub _order_directions {
176   my ($self, $order) = @_;
177   $order = $order->{order_by} if ref $order eq 'HASH';
178   return $self->SUPER::_order_directions($order);
179 }
180
181 sub _table {
182   my ($self, $from) = @_;
183   if (ref $from eq 'ARRAY') {
184     return $self->_recurse_from(@$from);
185   } elsif (ref $from eq 'HASH') {
186     return $self->_make_as($from);
187   } else {
188     return $from; # would love to quote here but _table ends up getting called
189                   # twice during an ->select without a limit clause due to
190                   # the way S::A::Limit->select works. should maybe consider
191                   # bypassing this and doing S::A::select($self, ...) in
192                   # our select method above. meantime, quoting shims have
193                   # been added to select/insert/update/delete here
194   }
195 }
196
197 sub _recurse_from {
198   my ($self, $from, @join) = @_;
199   my @sqlf;
200   push(@sqlf, $self->_make_as($from));
201   foreach my $j (@join) {
202     my ($to, $on) = @$j;
203
204     # check whether a join type exists
205     my $join_clause = '';
206     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
207     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
208       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
209     } else {
210       $join_clause = ' JOIN ';
211     }
212     push(@sqlf, $join_clause);
213
214     if (ref $to eq 'ARRAY') {
215       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
216     } else {
217       push(@sqlf, $self->_make_as($to));
218     }
219     push(@sqlf, ' ON ', $self->_join_condition($on));
220   }
221   return join('', @sqlf);
222 }
223
224 sub _make_as {
225   my ($self, $from) = @_;
226   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
227                      reverse each %{$self->_skip_options($from)});
228 }
229
230 sub _skip_options {
231   my ($self, $hash) = @_;
232   my $clean_hash = {};
233   $clean_hash->{$_} = $hash->{$_}
234     for grep {!/^-/} keys %$hash;
235   return $clean_hash;
236 }
237
238 sub _join_condition {
239   my ($self, $cond) = @_;
240   if (ref $cond eq 'HASH') {
241     my %j;
242     for (keys %$cond) {
243       my $v = $cond->{$_};
244       if (ref $v) {
245         # XXX no throw_exception() in this package and croak() fails with strange results
246         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
247             if ref($v) ne 'SCALAR';
248         $j{$_} = $v;
249       }
250       else {
251         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
252       }
253     };
254     return scalar($self->_recurse_where(\%j));
255   } elsif (ref $cond eq 'ARRAY') {
256     return join(' OR ', map { $self->_join_condition($_) } @$cond);
257   } else {
258     die "Can't handle this yet!";
259   }
260 }
261
262 sub _quote {
263   my ($self, $label) = @_;
264   return '' unless defined $label;
265   return "*" if $label eq '*';
266   return $label unless $self->{quote_char};
267   if(ref $self->{quote_char} eq "ARRAY"){
268     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
269       if !defined $self->{name_sep};
270     my $sep = $self->{name_sep};
271     return join($self->{name_sep},
272         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
273        split(/\Q$sep\E/,$label));
274   }
275   return $self->SUPER::_quote($label);
276 }
277
278 sub limit_dialect {
279     my $self = shift;
280     $self->{limit_dialect} = shift if @_;
281     return $self->{limit_dialect};
282 }
283
284 sub quote_char {
285     my $self = shift;
286     $self->{quote_char} = shift if @_;
287     return $self->{quote_char};
288 }
289
290 sub name_sep {
291     my $self = shift;
292     $self->{name_sep} = shift if @_;
293     return $self->{name_sep};
294 }
295
296 } # End of BEGIN block
297
298 =head1 NAME
299
300 DBIx::Class::Storage::DBI - DBI storage handler
301
302 =head1 SYNOPSIS
303
304   my $schema = MySchema->connect('dbi:SQLite:my.db');
305
306   $schema->storage->debug(1);
307   $schema->dbh_do("DROP TABLE authors");
308
309   $schema->resultset('Book')->search({
310      written_on => $schema->storage->datetime_parser(DateTime->now)
311   });
312
313 =head1 DESCRIPTION
314
315 This class represents the connection to an RDBMS via L<DBI>.  See
316 L<DBIx::Class::Storage> for general information.  This pod only
317 documents DBI-specific methods and behaviors.
318
319 =head1 METHODS
320
321 =cut
322
323 sub new {
324   my $new = shift->next::method(@_);
325
326   $new->transaction_depth(0);
327   $new->_sql_maker_opts({});
328   $new->{savepoints} = [];
329   $new->{_in_dbh_do} = 0;
330   $new->{_dbh_gen} = 0;
331
332   $new;
333 }
334
335 =head2 connect_info
336
337 This method is normally called by L<DBIx::Class::Schema/connection>, which
338 encapsulates its argument list in an arrayref before passing them here.
339
340 The argument list may contain:
341
342 =over
343
344 =item *
345
346 The same 4-element argument set one would normally pass to
347 L<DBI/connect>, optionally followed by
348 L<extra attributes|/DBIx::Class specific connection attributes>
349 recognized by DBIx::Class:
350
351   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
352
353 =item *
354
355 A single code reference which returns a connected 
356 L<DBI database handle|DBI/connect> optionally followed by 
357 L<extra attributes|/DBIx::Class specific connection attributes> recognized
358 by DBIx::Class:
359
360   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
361
362 =item *
363
364 A single hashref with all the attributes and the dsn/user/password
365 mixed together:
366
367   $connect_info_args = [{
368     dsn => $dsn,
369     user => $user,
370     password => $pass,
371     %dbi_attributes,
372     %extra_attributes,
373   }];
374
375 This is particularly useful for L<Catalyst> based applications, allowing the 
376 following config (L<Config::General> style):
377
378   <Model::DB>
379     schema_class   App::DB
380     <connect_info>
381       dsn          dbi:mysql:database=test
382       user         testuser
383       password     TestPass
384       AutoCommit   1
385     </connect_info>
386   </Model::DB>
387
388 =back
389
390 Please note that the L<DBI> docs recommend that you always explicitly
391 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
392 recommends that it be set to I<1>, and that you perform transactions
393 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
394 to I<1> if you do not do explicitly set it to zero.  This is the default 
395 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
396
397 =head3 DBIx::Class specific connection attributes
398
399 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
400 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
401 the following connection options. These options can be mixed in with your other
402 L<DBI> connection attributes, or placed in a seperate hashref
403 (C<\%extra_attributes>) as shown above.
404
405 Every time C<connect_info> is invoked, any previous settings for
406 these options will be cleared before setting the new ones, regardless of
407 whether any options are specified in the new C<connect_info>.
408
409
410 =over
411
412 =item on_connect_do
413
414 Specifies things to do immediately after connecting or re-connecting to
415 the database.  Its value may contain:
416
417 =over
418
419 =item an array reference
420
421 This contains SQL statements to execute in order.  Each element contains
422 a string or a code reference that returns a string.
423
424 =item a code reference
425
426 This contains some code to execute.  Unlike code references within an
427 array reference, its return value is ignored.
428
429 =back
430
431 =item on_disconnect_do
432
433 Takes arguments in the same form as L</on_connect_do> and executes them
434 immediately before disconnecting from the database.
435
436 Note, this only runs if you explicitly call L</disconnect> on the
437 storage object.
438
439 =item disable_sth_caching
440
441 If set to a true value, this option will disable the caching of
442 statement handles via L<DBI/prepare_cached>.
443
444 =item limit_dialect 
445
446 Sets the limit dialect. This is useful for JDBC-bridge among others
447 where the remote SQL-dialect cannot be determined by the name of the
448 driver alone. See also L<SQL::Abstract::Limit>.
449
450 =item quote_char
451
452 Specifies what characters to use to quote table and column names. If 
453 you use this you will want to specify L</name_sep> as well.
454
455 C<quote_char> expects either a single character, in which case is it
456 is placed on either side of the table/column name, or an arrayref of length
457 2 in which case the table/column name is placed between the elements.
458
459 For example under MySQL you should use C<< quote_char => '`' >>, and for
460 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
461
462 =item name_sep
463
464 This only needs to be used in conjunction with C<quote_char>, and is used to 
465 specify the charecter that seperates elements (schemas, tables, columns) from 
466 each other. In most cases this is simply a C<.>.
467
468 The consequences of not supplying this value is that L<SQL::Abstract>
469 will assume DBIx::Class' uses of aliases to be complete column
470 names. The output will look like I<"me.name"> when it should actually
471 be I<"me"."name">.
472
473 =item unsafe
474
475 This Storage driver normally installs its own C<HandleError>, sets
476 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
477 all database handles, including those supplied by a coderef.  It does this
478 so that it can have consistent and useful error behavior.
479
480 If you set this option to a true value, Storage will not do its usual
481 modifications to the database handle's attributes, and instead relies on
482 the settings in your connect_info DBI options (or the values you set in
483 your connection coderef, in the case that you are connecting via coderef).
484
485 Note that your custom settings can cause Storage to malfunction,
486 especially if you set a C<HandleError> handler that suppresses exceptions
487 and/or disable C<RaiseError>.
488
489 =item auto_savepoint
490
491 If this option is true, L<DBIx::Class> will use savepoints when nesting
492 transactions, making it possible to recover from failure in the inner
493 transaction without having to abort all outer transactions.
494
495 =item cursor_class
496
497 Use this argument to supply a cursor class other than the default
498 L<DBIx::Class::Storage::DBI::Cursor>.
499
500 =back
501
502 Some real-life examples of arguments to L</connect_info> and
503 L<DBIx::Class::Schema/connect>
504
505   # Simple SQLite connection
506   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
507
508   # Connect via subref
509   ->connect_info([ sub { DBI->connect(...) } ]);
510
511   # A bit more complicated
512   ->connect_info(
513     [
514       'dbi:Pg:dbname=foo',
515       'postgres',
516       'my_pg_password',
517       { AutoCommit => 1 },
518       { quote_char => q{"}, name_sep => q{.} },
519     ]
520   );
521
522   # Equivalent to the previous example
523   ->connect_info(
524     [
525       'dbi:Pg:dbname=foo',
526       'postgres',
527       'my_pg_password',
528       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
529     ]
530   );
531
532   # Same, but with hashref as argument
533   # See parse_connect_info for explanation
534   ->connect_info(
535     [{
536       dsn         => 'dbi:Pg:dbname=foo',
537       user        => 'postgres',
538       password    => 'my_pg_password',
539       AutoCommit  => 1,
540       quote_char  => q{"},
541       name_sep    => q{.},
542     }]
543   );
544
545   # Subref + DBIx::Class-specific connection options
546   ->connect_info(
547     [
548       sub { DBI->connect(...) },
549       {
550           quote_char => q{`},
551           name_sep => q{@},
552           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
553           disable_sth_caching => 1,
554       },
555     ]
556   );
557
558
559
560 =cut
561
562 sub connect_info {
563   my ($self, $info_arg) = @_;
564
565   return $self->_connect_info if !$info_arg;
566
567   my @args = @$info_arg;  # take a shallow copy for further mutilation
568   $self->_connect_info([@args]); # copy for _connect_info
569
570
571   # combine/pre-parse arguments depending on invocation style
572
573   my %attrs;
574   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
575     %attrs = %{ $args[1] || {} };
576     @args = $args[0];
577   }
578   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
579     %attrs = %{$args[0]};
580     @args = ();
581     for (qw/password user dsn/) {
582       unshift @args, delete $attrs{$_};
583     }
584   }
585   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
586     %attrs = (
587       % { $args[3] || {} },
588       % { $args[4] || {} },
589     );
590     @args = @args[0,1,2];
591   }
592
593   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
594   #  the new set of options
595   $self->_sql_maker(undef);
596   $self->_sql_maker_opts({});
597
598   if(keys %attrs) {
599     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
600       if(my $value = delete $attrs{$storage_opt}) {
601         $self->$storage_opt($value);
602       }
603     }
604     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
605       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
606         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
607       }
608     }
609   }
610
611   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
612
613   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
614   $self->_connect_info;
615 }
616
617 =head2 on_connect_do
618
619 This method is deprecated in favour of setting via L</connect_info>.
620
621
622 =head2 dbh_do
623
624 Arguments: ($subref | $method_name), @extra_coderef_args?
625
626 Execute the given $subref or $method_name using the new exception-based
627 connection management.
628
629 The first two arguments will be the storage object that C<dbh_do> was called
630 on and a database handle to use.  Any additional arguments will be passed
631 verbatim to the called subref as arguments 2 and onwards.
632
633 Using this (instead of $self->_dbh or $self->dbh) ensures correct
634 exception handling and reconnection (or failover in future subclasses).
635
636 Your subref should have no side-effects outside of the database, as
637 there is the potential for your subref to be partially double-executed
638 if the database connection was stale/dysfunctional.
639
640 Example:
641
642   my @stuff = $schema->storage->dbh_do(
643     sub {
644       my ($storage, $dbh, @cols) = @_;
645       my $cols = join(q{, }, @cols);
646       $dbh->selectrow_array("SELECT $cols FROM foo");
647     },
648     @column_list
649   );
650
651 =cut
652
653 sub dbh_do {
654   my $self = shift;
655   my $code = shift;
656
657   my $dbh = $self->_dbh;
658
659   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
660       || $self->{transaction_depth};
661
662   local $self->{_in_dbh_do} = 1;
663
664   my @result;
665   my $want_array = wantarray;
666
667   eval {
668     $self->_verify_pid if $dbh;
669     if(!$self->_dbh) {
670         $self->_populate_dbh;
671         $dbh = $self->_dbh;
672     }
673
674     if($want_array) {
675         @result = $self->$code($dbh, @_);
676     }
677     elsif(defined $want_array) {
678         $result[0] = $self->$code($dbh, @_);
679     }
680     else {
681         $self->$code($dbh, @_);
682     }
683   };
684
685   my $exception = $@;
686   if(!$exception) { return $want_array ? @result : $result[0] }
687
688   $self->throw_exception($exception) if $self->connected;
689
690   # We were not connected - reconnect and retry, but let any
691   #  exception fall right through this time
692   $self->_populate_dbh;
693   $self->$code($self->_dbh, @_);
694 }
695
696 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
697 # It also informs dbh_do to bypass itself while under the direction of txn_do,
698 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
699 sub txn_do {
700   my $self = shift;
701   my $coderef = shift;
702
703   ref $coderef eq 'CODE' or $self->throw_exception
704     ('$coderef must be a CODE reference');
705
706   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
707
708   local $self->{_in_dbh_do} = 1;
709
710   my @result;
711   my $want_array = wantarray;
712
713   my $tried = 0;
714   while(1) {
715     eval {
716       $self->_verify_pid if $self->_dbh;
717       $self->_populate_dbh if !$self->_dbh;
718
719       $self->txn_begin;
720       if($want_array) {
721           @result = $coderef->(@_);
722       }
723       elsif(defined $want_array) {
724           $result[0] = $coderef->(@_);
725       }
726       else {
727           $coderef->(@_);
728       }
729       $self->txn_commit;
730     };
731
732     my $exception = $@;
733     if(!$exception) { return $want_array ? @result : $result[0] }
734
735     if($tried++ > 0 || $self->connected) {
736       eval { $self->txn_rollback };
737       my $rollback_exception = $@;
738       if($rollback_exception) {
739         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
740         $self->throw_exception($exception)  # propagate nested rollback
741           if $rollback_exception =~ /$exception_class/;
742
743         $self->throw_exception(
744           "Transaction aborted: ${exception}. "
745           . "Rollback failed: ${rollback_exception}"
746         );
747       }
748       $self->throw_exception($exception)
749     }
750
751     # We were not connected, and was first try - reconnect and retry
752     # via the while loop
753     $self->_populate_dbh;
754   }
755 }
756
757 =head2 disconnect
758
759 Our C<disconnect> method also performs a rollback first if the
760 database is not in C<AutoCommit> mode.
761
762 =cut
763
764 sub disconnect {
765   my ($self) = @_;
766
767   if( $self->connected ) {
768     my $connection_do = $self->on_disconnect_do;
769     $self->_do_connection_actions($connection_do) if ref($connection_do);
770
771     $self->_dbh->rollback unless $self->_dbh_autocommit;
772     $self->_dbh->disconnect;
773     $self->_dbh(undef);
774     $self->{_dbh_gen}++;
775   }
776 }
777
778 =head2 with_deferred_fk_checks
779
780 =over 4
781
782 =item Arguments: C<$coderef>
783
784 =item Return Value: The return value of $coderef
785
786 =back
787
788 Storage specific method to run the code ref with FK checks deferred or
789 in MySQL's case disabled entirely.
790
791 =cut
792
793 # Storage subclasses should override this
794 sub with_deferred_fk_checks {
795   my ($self, $sub) = @_;
796
797   $sub->();
798 }
799
800 sub connected {
801   my ($self) = @_;
802
803   if(my $dbh = $self->_dbh) {
804       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
805           $self->_dbh(undef);
806           $self->{_dbh_gen}++;
807           return;
808       }
809       else {
810           $self->_verify_pid;
811           return 0 if !$self->_dbh;
812       }
813       return ($dbh->FETCH('Active') && $dbh->ping);
814   }
815
816   return 0;
817 }
818
819 # handle pid changes correctly
820 #  NOTE: assumes $self->_dbh is a valid $dbh
821 sub _verify_pid {
822   my ($self) = @_;
823
824   return if defined $self->_conn_pid && $self->_conn_pid == $$;
825
826   $self->_dbh->{InactiveDestroy} = 1;
827   $self->_dbh(undef);
828   $self->{_dbh_gen}++;
829
830   return;
831 }
832
833 sub ensure_connected {
834   my ($self) = @_;
835
836   unless ($self->connected) {
837     $self->_populate_dbh;
838   }
839 }
840
841 =head2 dbh
842
843 Returns the dbh - a data base handle of class L<DBI>.
844
845 =cut
846
847 sub dbh {
848   my ($self) = @_;
849
850   $self->ensure_connected;
851   return $self->_dbh;
852 }
853
854 sub _sql_maker_args {
855     my ($self) = @_;
856     
857     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
858 }
859
860 sub sql_maker {
861   my ($self) = @_;
862   unless ($self->_sql_maker) {
863     my $sql_maker_class = $self->sql_maker_class;
864     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
865   }
866   return $self->_sql_maker;
867 }
868
869 sub _rebless {}
870
871 sub _populate_dbh {
872   my ($self) = @_;
873   my @info = @{$self->_dbi_connect_info || []};
874   $self->_dbh($self->_connect(@info));
875
876   # Always set the transaction depth on connect, since
877   #  there is no transaction in progress by definition
878   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
879
880   if(ref $self eq 'DBIx::Class::Storage::DBI') {
881     my $driver = $self->_dbh->{Driver}->{Name};
882     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
883       bless $self, "DBIx::Class::Storage::DBI::${driver}";
884       $self->_rebless();
885     }
886   }
887
888   $self->_conn_pid($$);
889   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
890
891   my $connection_do = $self->on_connect_do;
892   $self->_do_connection_actions($connection_do) if ref($connection_do);
893 }
894
895 sub _do_connection_actions {
896   my $self = shift;
897   my $connection_do = shift;
898
899   if (ref $connection_do eq 'ARRAY') {
900     $self->_do_query($_) foreach @$connection_do;
901   }
902   elsif (ref $connection_do eq 'CODE') {
903     $connection_do->($self);
904   }
905
906   return $self;
907 }
908
909 sub _do_query {
910   my ($self, $action) = @_;
911
912   if (ref $action eq 'CODE') {
913     $action = $action->($self);
914     $self->_do_query($_) foreach @$action;
915   }
916   else {
917     my @to_run = (ref $action eq 'ARRAY') ? (@$action) : ($action);
918     $self->_query_start(@to_run);
919     $self->_dbh->do(@to_run);
920     $self->_query_end(@to_run);
921   }
922
923   return $self;
924 }
925
926 sub _connect {
927   my ($self, @info) = @_;
928
929   $self->throw_exception("You failed to provide any connection info")
930     if !@info;
931
932   my ($old_connect_via, $dbh);
933
934   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
935     $old_connect_via = $DBI::connect_via;
936     $DBI::connect_via = 'connect';
937   }
938
939   eval {
940     if(ref $info[0] eq 'CODE') {
941        $dbh = &{$info[0]}
942     }
943     else {
944        $dbh = DBI->connect(@info);
945     }
946
947     if($dbh && !$self->unsafe) {
948       my $weak_self = $self;
949       weaken($weak_self);
950       $dbh->{HandleError} = sub {
951           if ($weak_self) {
952             $weak_self->throw_exception("DBI Exception: $_[0]");
953           }
954           else {
955             croak ("DBI Exception: $_[0]");
956           }
957       };
958       $dbh->{ShowErrorStatement} = 1;
959       $dbh->{RaiseError} = 1;
960       $dbh->{PrintError} = 0;
961     }
962   };
963
964   $DBI::connect_via = $old_connect_via if $old_connect_via;
965
966   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
967     if !$dbh || $@;
968
969   $self->_dbh_autocommit($dbh->{AutoCommit});
970
971   $dbh;
972 }
973
974 sub svp_begin {
975   my ($self, $name) = @_;
976
977   $name = $self->_svp_generate_name
978     unless defined $name;
979
980   $self->throw_exception ("You can't use savepoints outside a transaction")
981     if $self->{transaction_depth} == 0;
982
983   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
984     unless $self->can('_svp_begin');
985   
986   push @{ $self->{savepoints} }, $name;
987
988   $self->debugobj->svp_begin($name) if $self->debug;
989   
990   return $self->_svp_begin($name);
991 }
992
993 sub svp_release {
994   my ($self, $name) = @_;
995
996   $self->throw_exception ("You can't use savepoints outside a transaction")
997     if $self->{transaction_depth} == 0;
998
999   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1000     unless $self->can('_svp_release');
1001
1002   if (defined $name) {
1003     $self->throw_exception ("Savepoint '$name' does not exist")
1004       unless grep { $_ eq $name } @{ $self->{savepoints} };
1005
1006     # Dig through the stack until we find the one we are releasing.  This keeps
1007     # the stack up to date.
1008     my $svp;
1009
1010     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1011   } else {
1012     $name = pop @{ $self->{savepoints} };
1013   }
1014
1015   $self->debugobj->svp_release($name) if $self->debug;
1016
1017   return $self->_svp_release($name);
1018 }
1019
1020 sub svp_rollback {
1021   my ($self, $name) = @_;
1022
1023   $self->throw_exception ("You can't use savepoints outside a transaction")
1024     if $self->{transaction_depth} == 0;
1025
1026   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1027     unless $self->can('_svp_rollback');
1028
1029   if (defined $name) {
1030       # If they passed us a name, verify that it exists in the stack
1031       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1032           $self->throw_exception("Savepoint '$name' does not exist!");
1033       }
1034
1035       # Dig through the stack until we find the one we are releasing.  This keeps
1036       # the stack up to date.
1037       while(my $s = pop(@{ $self->{savepoints} })) {
1038           last if($s eq $name);
1039       }
1040       # Add the savepoint back to the stack, as a rollback doesn't remove the
1041       # named savepoint, only everything after it.
1042       push(@{ $self->{savepoints} }, $name);
1043   } else {
1044       # We'll assume they want to rollback to the last savepoint
1045       $name = $self->{savepoints}->[-1];
1046   }
1047
1048   $self->debugobj->svp_rollback($name) if $self->debug;
1049   
1050   return $self->_svp_rollback($name);
1051 }
1052
1053 sub _svp_generate_name {
1054     my ($self) = @_;
1055
1056     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1057 }
1058
1059 sub txn_begin {
1060   my $self = shift;
1061   $self->ensure_connected();
1062   if($self->{transaction_depth} == 0) {
1063     $self->debugobj->txn_begin()
1064       if $self->debug;
1065     # this isn't ->_dbh-> because
1066     #  we should reconnect on begin_work
1067     #  for AutoCommit users
1068     $self->dbh->begin_work;
1069   } elsif ($self->auto_savepoint) {
1070     $self->svp_begin;
1071   }
1072   $self->{transaction_depth}++;
1073 }
1074
1075 sub txn_commit {
1076   my $self = shift;
1077   if ($self->{transaction_depth} == 1) {
1078     my $dbh = $self->_dbh;
1079     $self->debugobj->txn_commit()
1080       if ($self->debug);
1081     $dbh->commit;
1082     $self->{transaction_depth} = 0
1083       if $self->_dbh_autocommit;
1084   }
1085   elsif($self->{transaction_depth} > 1) {
1086     $self->{transaction_depth}--;
1087     $self->svp_release
1088       if $self->auto_savepoint;
1089   }
1090 }
1091
1092 sub txn_rollback {
1093   my $self = shift;
1094   my $dbh = $self->_dbh;
1095   eval {
1096     if ($self->{transaction_depth} == 1) {
1097       $self->debugobj->txn_rollback()
1098         if ($self->debug);
1099       $self->{transaction_depth} = 0
1100         if $self->_dbh_autocommit;
1101       $dbh->rollback;
1102     }
1103     elsif($self->{transaction_depth} > 1) {
1104       $self->{transaction_depth}--;
1105       if ($self->auto_savepoint) {
1106         $self->svp_rollback;
1107         $self->svp_release;
1108       }
1109     }
1110     else {
1111       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1112     }
1113   };
1114   if ($@) {
1115     my $error = $@;
1116     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1117     $error =~ /$exception_class/ and $self->throw_exception($error);
1118     # ensure that a failed rollback resets the transaction depth
1119     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1120     $self->throw_exception($error);
1121   }
1122 }
1123
1124 # This used to be the top-half of _execute.  It was split out to make it
1125 #  easier to override in NoBindVars without duping the rest.  It takes up
1126 #  all of _execute's args, and emits $sql, @bind.
1127 sub _prep_for_execute {
1128   my ($self, $op, $extra_bind, $ident, $args) = @_;
1129
1130   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1131   unshift(@bind,
1132     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1133       if $extra_bind;
1134
1135   return ($sql, \@bind);
1136 }
1137
1138 sub _fix_bind_params {
1139     my ($self, @bind) = @_;
1140
1141     ### Turn @bind from something like this:
1142     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1143     ### to this:
1144     ###   ( "'1'", "'1'", "'3'" )
1145     return
1146         map {
1147             if ( defined( $_ && $_->[1] ) ) {
1148                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1149             }
1150             else { q{'NULL'}; }
1151         } @bind;
1152 }
1153
1154 sub _query_start {
1155     my ( $self, $sql, @bind ) = @_;
1156
1157     if ( $self->debug ) {
1158         @bind = $self->_fix_bind_params(@bind);
1159         
1160         $self->debugobj->query_start( $sql, @bind );
1161     }
1162 }
1163
1164 sub _query_end {
1165     my ( $self, $sql, @bind ) = @_;
1166
1167     if ( $self->debug ) {
1168         @bind = $self->_fix_bind_params(@bind);
1169         $self->debugobj->query_end( $sql, @bind );
1170     }
1171 }
1172
1173 sub _dbh_execute {
1174   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1175   
1176   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1177     $ident = $ident->from();
1178   }
1179
1180   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1181
1182   $self->_query_start( $sql, @$bind );
1183
1184   my $sth = $self->sth($sql,$op);
1185
1186   my $placeholder_index = 1; 
1187
1188   foreach my $bound (@$bind) {
1189     my $attributes = {};
1190     my($column_name, @data) = @$bound;
1191
1192     if ($bind_attributes) {
1193       $attributes = $bind_attributes->{$column_name}
1194       if defined $bind_attributes->{$column_name};
1195     }
1196
1197     foreach my $data (@data) {
1198       my $ref = ref $data;
1199       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1200
1201       $sth->bind_param($placeholder_index, $data, $attributes);
1202       $placeholder_index++;
1203     }
1204   }
1205
1206   # Can this fail without throwing an exception anyways???
1207   my $rv = $sth->execute();
1208   $self->throw_exception($sth->errstr) if !$rv;
1209
1210   $self->_query_end( $sql, @$bind );
1211
1212   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1213 }
1214
1215 sub _execute {
1216     my $self = shift;
1217     $self->dbh_do('_dbh_execute', @_)
1218 }
1219
1220 sub insert {
1221   my ($self, $source, $to_insert) = @_;
1222   
1223   my $ident = $source->from; 
1224   my $bind_attributes = $self->source_bind_attributes($source);
1225
1226   $self->ensure_connected;
1227   foreach my $col ( $source->columns ) {
1228     if ( !defined $to_insert->{$col} ) {
1229       my $col_info = $source->column_info($col);
1230
1231       if ( $col_info->{auto_nextval} ) {
1232         $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1233       }
1234     }
1235   }
1236
1237   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1238
1239   return $to_insert;
1240 }
1241
1242 ## Still not quite perfect, and EXPERIMENTAL
1243 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1244 ## scalar refs, or at least, all the same type as the first set, the statement is
1245 ## only prepped once.
1246 sub insert_bulk {
1247   my ($self, $source, $cols, $data) = @_;
1248   my %colvalues;
1249   my $table = $source->from;
1250   @colvalues{@$cols} = (0..$#$cols);
1251   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1252   
1253   $self->_query_start( $sql, @bind );
1254   my $sth = $self->sth($sql);
1255
1256 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1257
1258   ## This must be an arrayref, else nothing works!
1259   
1260   my $tuple_status = [];
1261   
1262   ##use Data::Dumper;
1263   ##print STDERR Dumper( $data, $sql, [@bind] );
1264
1265   my $time = time();
1266
1267   ## Get the bind_attributes, if any exist
1268   my $bind_attributes = $self->source_bind_attributes($source);
1269
1270   ## Bind the values and execute
1271   my $placeholder_index = 1; 
1272
1273   foreach my $bound (@bind) {
1274
1275     my $attributes = {};
1276     my ($column_name, $data_index) = @$bound;
1277
1278     if( $bind_attributes ) {
1279       $attributes = $bind_attributes->{$column_name}
1280       if defined $bind_attributes->{$column_name};
1281     }
1282
1283     my @data = map { $_->[$data_index] } @$data;
1284
1285     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1286     $placeholder_index++;
1287   }
1288   my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
1289   $self->throw_exception($sth->errstr) if !$rv;
1290
1291   $self->_query_end( $sql, @bind );
1292   return (wantarray ? ($rv, $sth, @bind) : $rv);
1293 }
1294
1295 sub update {
1296   my $self = shift @_;
1297   my $source = shift @_;
1298   my $bind_attributes = $self->source_bind_attributes($source);
1299   
1300   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1301 }
1302
1303
1304 sub delete {
1305   my $self = shift @_;
1306   my $source = shift @_;
1307   
1308   my $bind_attrs = {}; ## If ever it's needed...
1309   
1310   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1311 }
1312
1313 sub _select {
1314   my ($self, $ident, $select, $condition, $attrs) = @_;
1315   my $order = $attrs->{order_by};
1316
1317   if (ref $condition eq 'SCALAR') {
1318     my $unwrap = ${$condition};
1319     if ($unwrap =~ s/ORDER BY (.*)$//i) {
1320       $order = $1;
1321       $condition = \$unwrap;
1322     }
1323   }
1324
1325   my $for = delete $attrs->{for};
1326   my $sql_maker = $self->sql_maker;
1327   local $sql_maker->{for} = $for;
1328
1329   if (exists $attrs->{group_by} || $attrs->{having}) {
1330     $order = {
1331       group_by => $attrs->{group_by},
1332       having => $attrs->{having},
1333       ($order ? (order_by => $order) : ())
1334     };
1335   }
1336   my $bind_attrs = {}; ## Future support
1337   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1338   if ($attrs->{software_limit} ||
1339       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1340         $attrs->{software_limit} = 1;
1341   } else {
1342     $self->throw_exception("rows attribute must be positive if present")
1343       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1344
1345     # MySQL actually recommends this approach.  I cringe.
1346     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1347     push @args, $attrs->{rows}, $attrs->{offset};
1348   }
1349
1350   return $self->_execute(@args);
1351 }
1352
1353 sub source_bind_attributes {
1354   my ($self, $source) = @_;
1355   
1356   my $bind_attributes;
1357   foreach my $column ($source->columns) {
1358   
1359     my $data_type = $source->column_info($column)->{data_type} || '';
1360     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1361      if $data_type;
1362   }
1363
1364   return $bind_attributes;
1365 }
1366
1367 =head2 select
1368
1369 =over 4
1370
1371 =item Arguments: $ident, $select, $condition, $attrs
1372
1373 =back
1374
1375 Handle a SQL select statement.
1376
1377 =cut
1378
1379 sub select {
1380   my $self = shift;
1381   my ($ident, $select, $condition, $attrs) = @_;
1382   return $self->cursor_class->new($self, \@_, $attrs);
1383 }
1384
1385 sub select_single {
1386   my $self = shift;
1387   my ($rv, $sth, @bind) = $self->_select(@_);
1388   my @row = $sth->fetchrow_array;
1389   my @nextrow = $sth->fetchrow_array if @row;
1390   if(@row && @nextrow) {
1391     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1392   }
1393   # Need to call finish() to work round broken DBDs
1394   $sth->finish();
1395   return @row;
1396 }
1397
1398 =head2 sth
1399
1400 =over 4
1401
1402 =item Arguments: $sql
1403
1404 =back
1405
1406 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1407
1408 =cut
1409
1410 sub _dbh_sth {
1411   my ($self, $dbh, $sql) = @_;
1412
1413   # 3 is the if_active parameter which avoids active sth re-use
1414   my $sth = $self->disable_sth_caching
1415     ? $dbh->prepare($sql)
1416     : $dbh->prepare_cached($sql, {}, 3);
1417
1418   # XXX You would think RaiseError would make this impossible,
1419   #  but apparently that's not true :(
1420   $self->throw_exception($dbh->errstr) if !$sth;
1421
1422   $sth;
1423 }
1424
1425 sub sth {
1426   my ($self, $sql) = @_;
1427   $self->dbh_do('_dbh_sth', $sql);
1428 }
1429
1430 sub _dbh_columns_info_for {
1431   my ($self, $dbh, $table) = @_;
1432
1433   if ($dbh->can('column_info')) {
1434     my %result;
1435     eval {
1436       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1437       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1438       $sth->execute();
1439       while ( my $info = $sth->fetchrow_hashref() ){
1440         my %column_info;
1441         $column_info{data_type}   = $info->{TYPE_NAME};
1442         $column_info{size}      = $info->{COLUMN_SIZE};
1443         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1444         $column_info{default_value} = $info->{COLUMN_DEF};
1445         my $col_name = $info->{COLUMN_NAME};
1446         $col_name =~ s/^\"(.*)\"$/$1/;
1447
1448         $result{$col_name} = \%column_info;
1449       }
1450     };
1451     return \%result if !$@ && scalar keys %result;
1452   }
1453
1454   my %result;
1455   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1456   $sth->execute;
1457   my @columns = @{$sth->{NAME_lc}};
1458   for my $i ( 0 .. $#columns ){
1459     my %column_info;
1460     $column_info{data_type} = $sth->{TYPE}->[$i];
1461     $column_info{size} = $sth->{PRECISION}->[$i];
1462     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1463
1464     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1465       $column_info{data_type} = $1;
1466       $column_info{size}    = $2;
1467     }
1468
1469     $result{$columns[$i]} = \%column_info;
1470   }
1471   $sth->finish;
1472
1473   foreach my $col (keys %result) {
1474     my $colinfo = $result{$col};
1475     my $type_num = $colinfo->{data_type};
1476     my $type_name;
1477     if(defined $type_num && $dbh->can('type_info')) {
1478       my $type_info = $dbh->type_info($type_num);
1479       $type_name = $type_info->{TYPE_NAME} if $type_info;
1480       $colinfo->{data_type} = $type_name if $type_name;
1481     }
1482   }
1483
1484   return \%result;
1485 }
1486
1487 sub columns_info_for {
1488   my ($self, $table) = @_;
1489   $self->dbh_do('_dbh_columns_info_for', $table);
1490 }
1491
1492 =head2 last_insert_id
1493
1494 Return the row id of the last insert.
1495
1496 =cut
1497
1498 sub _dbh_last_insert_id {
1499     my ($self, $dbh, $source, $col) = @_;
1500     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1501     $dbh->func('last_insert_rowid');
1502 }
1503
1504 sub last_insert_id {
1505   my $self = shift;
1506   $self->dbh_do('_dbh_last_insert_id', @_);
1507 }
1508
1509 =head2 sqlt_type
1510
1511 Returns the database driver name.
1512
1513 =cut
1514
1515 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1516
1517 =head2 bind_attribute_by_data_type
1518
1519 Given a datatype from column info, returns a database specific bind
1520 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1521 let the database planner just handle it.
1522
1523 Generally only needed for special case column types, like bytea in postgres.
1524
1525 =cut
1526
1527 sub bind_attribute_by_data_type {
1528     return;
1529 }
1530
1531 =head2 create_ddl_dir
1532
1533 =over 4
1534
1535 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1536
1537 =back
1538
1539 Creates a SQL file based on the Schema, for each of the specified
1540 database types, in the given directory.
1541
1542 By default, C<\%sqlt_args> will have
1543
1544  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1545
1546 merged with the hash passed in. To disable any of those features, pass in a 
1547 hashref like the following
1548
1549  { ignore_constraint_names => 0, # ... other options }
1550
1551 =cut
1552
1553 sub create_ddl_dir {
1554   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1555
1556   if(!$dir || !-d $dir) {
1557     warn "No directory given, using ./\n";
1558     $dir = "./";
1559   }
1560   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1561   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1562
1563   my $schema_version = $schema->schema_version || '1.x';
1564   $version ||= $schema_version;
1565
1566   $sqltargs = {
1567     add_drop_table => 1, 
1568     ignore_constraint_names => 1,
1569     ignore_index_names => 1,
1570     %{$sqltargs || {}}
1571   };
1572
1573   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09: '}
1574       . $self->_check_sqlt_message . q{'})
1575           if !$self->_check_sqlt_version;
1576
1577   my $sqlt = SQL::Translator->new( $sqltargs );
1578
1579   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1580   my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
1581
1582   foreach my $db (@$databases) {
1583     $sqlt->reset();
1584     $sqlt = $self->configure_sqlt($sqlt, $db);
1585     $sqlt->{schema} = $sqlt_schema;
1586     $sqlt->producer($db);
1587
1588     my $file;
1589     my $filename = $schema->ddl_filename($db, $version, $dir);
1590     if (-e $filename && ($version eq $schema_version )) {
1591       # if we are dumping the current version, overwrite the DDL
1592       warn "Overwriting existing DDL file - $filename";
1593       unlink($filename);
1594     }
1595
1596     my $output = $sqlt->translate;
1597     if(!$output) {
1598       warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1599       next;
1600     }
1601     if(!open($file, ">$filename")) {
1602       $self->throw_exception("Can't open $filename for writing ($!)");
1603       next;
1604     }
1605     print $file $output;
1606     close($file);
1607   
1608     next unless ($preversion);
1609
1610     require SQL::Translator::Diff;
1611
1612     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1613     if(!-e $prefilename) {
1614       warn("No previous schema file found ($prefilename)");
1615       next;
1616     }
1617
1618     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1619     if(-e $difffile) {
1620       warn("Overwriting existing diff file - $difffile");
1621       unlink($difffile);
1622     }
1623     
1624     my $source_schema;
1625     {
1626       my $t = SQL::Translator->new($sqltargs);
1627       $t->debug( 0 );
1628       $t->trace( 0 );
1629       $t->parser( $db )                       or die $t->error;
1630       $t = $self->configure_sqlt($t, $db);
1631       my $out = $t->translate( $prefilename ) or die $t->error;
1632       $source_schema = $t->schema;
1633       unless ( $source_schema->name ) {
1634         $source_schema->name( $prefilename );
1635       }
1636     }
1637
1638     # The "new" style of producers have sane normalization and can support 
1639     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1640     # And we have to diff parsed SQL against parsed SQL.
1641     my $dest_schema = $sqlt_schema;
1642     
1643     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1644       my $t = SQL::Translator->new($sqltargs);
1645       $t->debug( 0 );
1646       $t->trace( 0 );
1647       $t->parser( $db )                    or die $t->error;
1648       $t = $self->configure_sqlt($t, $db);
1649       my $out = $t->translate( $filename ) or die $t->error;
1650       $dest_schema = $t->schema;
1651       $dest_schema->name( $filename )
1652         unless $dest_schema->name;
1653     }
1654     
1655     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1656                                                   $dest_schema,   $db,
1657                                                   $sqltargs
1658                                                  );
1659     if(!open $file, ">$difffile") { 
1660       $self->throw_exception("Can't write to $difffile ($!)");
1661       next;
1662     }
1663     print $file $diff;
1664     close($file);
1665   }
1666 }
1667
1668 sub configure_sqlt() {
1669   my $self = shift;
1670   my $tr = shift;
1671   my $db = shift || $self->sqlt_type;
1672   if ($db eq 'PostgreSQL') {
1673     $tr->quote_table_names(0);
1674     $tr->quote_field_names(0);
1675   }
1676   return $tr;
1677 }
1678
1679 =head2 deployment_statements
1680
1681 =over 4
1682
1683 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1684
1685 =back
1686
1687 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1688 The database driver name is given by C<$type>, though the value from
1689 L</sqlt_type> is used if it is not specified.
1690
1691 C<$directory> is used to return statements from files in a previously created
1692 L</create_ddl_dir> directory and is optional. The filenames are constructed
1693 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1694
1695 If no C<$directory> is specified then the statements are constructed on the
1696 fly using L<SQL::Translator> and C<$version> is ignored.
1697
1698 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1699
1700 =cut
1701
1702 sub deployment_statements {
1703   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1704   # Need to be connected to get the correct sqlt_type
1705   $self->ensure_connected() unless $type;
1706   $type ||= $self->sqlt_type;
1707   $version ||= $schema->schema_version || '1.x';
1708   $dir ||= './';
1709   my $filename = $schema->ddl_filename($type, $dir, $version);
1710   if(-f $filename)
1711   {
1712       my $file;
1713       open($file, "<$filename") 
1714         or $self->throw_exception("Can't open $filename ($!)");
1715       my @rows = <$file>;
1716       close($file);
1717       return join('', @rows);
1718   }
1719
1720   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09: '}
1721       . $self->_check_sqlt_message . q{'})
1722           if !$self->_check_sqlt_version;
1723
1724   require SQL::Translator::Parser::DBIx::Class;
1725   eval qq{use SQL::Translator::Producer::${type}};
1726   $self->throw_exception($@) if $@;
1727
1728   # sources needs to be a parser arg, but for simplicty allow at top level 
1729   # coming in
1730   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1731       if exists $sqltargs->{sources};
1732
1733   my $tr = SQL::Translator->new(%$sqltargs);
1734   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1735   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1736 }
1737
1738 sub deploy {
1739   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1740   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1741     foreach my $line ( split(";\n", $statement)) {
1742       next if($line =~ /^--/);
1743       next if(!$line);
1744 #      next if($line =~ /^DROP/m);
1745       next if($line =~ /^BEGIN TRANSACTION/m);
1746       next if($line =~ /^COMMIT/m);
1747       next if $line =~ /^\s+$/; # skip whitespace only
1748       $self->_query_start($line);
1749       eval {
1750         $self->dbh->do($line); # shouldn't be using ->dbh ?
1751       };
1752       if ($@) {
1753         warn qq{$@ (running "${line}")};
1754       }
1755       $self->_query_end($line);
1756     }
1757   }
1758 }
1759
1760 =head2 datetime_parser
1761
1762 Returns the datetime parser class
1763
1764 =cut
1765
1766 sub datetime_parser {
1767   my $self = shift;
1768   return $self->{datetime_parser} ||= do {
1769     $self->ensure_connected;
1770     $self->build_datetime_parser(@_);
1771   };
1772 }
1773
1774 =head2 datetime_parser_type
1775
1776 Defines (returns) the datetime parser class - currently hardwired to
1777 L<DateTime::Format::MySQL>
1778
1779 =cut
1780
1781 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1782
1783 =head2 build_datetime_parser
1784
1785 See L</datetime_parser>
1786
1787 =cut
1788
1789 sub build_datetime_parser {
1790   my $self = shift;
1791   my $type = $self->datetime_parser_type(@_);
1792   eval "use ${type}";
1793   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1794   return $type;
1795 }
1796
1797 {
1798     my $_check_sqlt_version; # private
1799     my $_check_sqlt_message; # private
1800     sub _check_sqlt_version {
1801         return $_check_sqlt_version if defined $_check_sqlt_version;
1802         eval 'use SQL::Translator "0.09"';
1803         $_check_sqlt_message = $@ || '';
1804         $_check_sqlt_version = !$@;
1805     }
1806
1807     sub _check_sqlt_message {
1808         _check_sqlt_version if !defined $_check_sqlt_message;
1809         $_check_sqlt_message;
1810     }
1811 }
1812
1813 =head2 is_replicating
1814
1815 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
1816 replicate from a master database.  Default is undef, which is the result
1817 returned by databases that don't support replication.
1818
1819 =cut
1820
1821 sub is_replicating {
1822     return;
1823     
1824 }
1825
1826 =head2 lag_behind_master
1827
1828 Returns a number that represents a certain amount of lag behind a master db
1829 when a given storage is replicating.  The number is database dependent, but
1830 starts at zero and increases with the amount of lag. Default in undef
1831
1832 =cut
1833
1834 sub lag_behind_master {
1835     return;
1836 }
1837
1838 sub DESTROY {
1839   my $self = shift;
1840   return if !$self->_dbh;
1841   $self->_verify_pid;
1842   $self->_dbh(undef);
1843 }
1844
1845 1;
1846
1847 =head1 USAGE NOTES
1848
1849 =head2 DBIx::Class and AutoCommit
1850
1851 DBIx::Class can do some wonderful magic with handling exceptions,
1852 disconnections, and transactions when you use C<< AutoCommit => 1 >>
1853 combined with C<txn_do> for transaction support.
1854
1855 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
1856 in an assumed transaction between commits, and you're telling us you'd
1857 like to manage that manually.  A lot of the magic protections offered by
1858 this module will go away.  We can't protect you from exceptions due to database
1859 disconnects because we don't know anything about how to restart your
1860 transactions.  You're on your own for handling all sorts of exceptional
1861 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
1862 be with raw DBI.
1863
1864
1865 =head1 SQL METHODS
1866
1867 The module defines a set of methods within the DBIC::SQL::Abstract
1868 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1869 SQL query functions.
1870
1871 The following methods are extended:-
1872
1873 =over 4
1874
1875 =item delete
1876
1877 =item insert
1878
1879 =item select
1880
1881 =item update
1882
1883 =item limit_dialect
1884
1885 See L</connect_info> for details.
1886
1887 =item quote_char
1888
1889 See L</connect_info> for details.
1890
1891 =item name_sep
1892
1893 See L</connect_info> for details.
1894
1895 =back
1896
1897 =head1 AUTHORS
1898
1899 Matt S. Trout <mst@shadowcatsystems.co.uk>
1900
1901 Andy Grundman <andy@hybridized.org>
1902
1903 =head1 LICENSE
1904
1905 You may distribute this code under the same terms as Perl itself.
1906
1907 =cut