Documentation fixups, still needs standardising though
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use SQL::Abstract::Limit;
11 use DBIx::Class::Storage::DBI::Cursor;
12 use DBIx::Class::Storage::Statistics;
13 use Scalar::Util qw/blessed weaken/;
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
18 );
19
20 # the values for these accessors are picked out (and deleted) from
21 # the attribute hashref passed to connect_info
22 my @storage_options = qw/
23   on_connect_do on_disconnect_do disable_sth_caching unsafe auto_savepoint
24 /;
25 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
26
27
28 # default cursor class, overridable in connect_info attributes
29 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
30
31 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
32 __PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
33
34 BEGIN {
35
36 package # Hide from PAUSE
37   DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
38
39 use base qw/SQL::Abstract::Limit/;
40
41 # This prevents the caching of $dbh in S::A::L, I believe
42 sub new {
43   my $self = shift->SUPER::new(@_);
44
45   # If limit_dialect is a ref (like a $dbh), go ahead and replace
46   #   it with what it resolves to:
47   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
48     if ref $self->{limit_dialect};
49
50   $self;
51 }
52
53 sub _RowNumberOver {
54   my ($self, $sql, $order, $rows, $offset ) = @_;
55
56   $offset += 1;
57   my $last = $rows + $offset;
58   my ( $order_by ) = $self->_order_by( $order );
59
60   $sql = <<"";
61 SELECT * FROM
62 (
63    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
64       $sql
65       $order_by
66    ) Q1
67 ) Q2
68 WHERE ROW_NUM BETWEEN $offset AND $last
69
70   return $sql;
71 }
72
73
74 # While we're at it, this should make LIMIT queries more efficient,
75 #  without digging into things too deeply
76 use Scalar::Util 'blessed';
77 sub _find_syntax {
78   my ($self, $syntax) = @_;
79   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
80   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
81     return 'RowNumberOver';
82   }
83
84   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
85 }
86
87 sub select {
88   my ($self, $table, $fields, $where, $order, @rest) = @_;
89   $table = $self->_quote($table) unless ref($table);
90   local $self->{rownum_hack_count} = 1
91     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
92   @rest = (-1) unless defined $rest[0];
93   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
94     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
95   local $self->{having_bind} = [];
96   my ($sql, @ret) = $self->SUPER::select(
97     $table, $self->_recurse_fields($fields), $where, $order, @rest
98   );
99   $sql .= 
100     $self->{for} ?
101     (
102       $self->{for} eq 'update' ? ' FOR UPDATE' :
103       $self->{for} eq 'shared' ? ' FOR SHARE'  :
104       ''
105     ) :
106     ''
107   ;
108   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
109 }
110
111 sub insert {
112   my $self = shift;
113   my $table = shift;
114   $table = $self->_quote($table) unless ref($table);
115   $self->SUPER::insert($table, @_);
116 }
117
118 sub update {
119   my $self = shift;
120   my $table = shift;
121   $table = $self->_quote($table) unless ref($table);
122   $self->SUPER::update($table, @_);
123 }
124
125 sub delete {
126   my $self = shift;
127   my $table = shift;
128   $table = $self->_quote($table) unless ref($table);
129   $self->SUPER::delete($table, @_);
130 }
131
132 sub _emulate_limit {
133   my $self = shift;
134   if ($_[3] == -1) {
135     return $_[1].$self->_order_by($_[2]);
136   } else {
137     return $self->SUPER::_emulate_limit(@_);
138   }
139 }
140
141 sub _recurse_fields {
142   my ($self, $fields, $params) = @_;
143   my $ref = ref $fields;
144   return $self->_quote($fields) unless $ref;
145   return $$fields if $ref eq 'SCALAR';
146
147   if ($ref eq 'ARRAY') {
148     return join(', ', map {
149       $self->_recurse_fields($_)
150         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
151           ? ' AS col'.$self->{rownum_hack_count}++
152           : '')
153       } @$fields);
154   } elsif ($ref eq 'HASH') {
155     foreach my $func (keys %$fields) {
156       return $self->_sqlcase($func)
157         .'( '.$self->_recurse_fields($fields->{$func}).' )';
158     }
159   }
160 }
161
162 sub _order_by {
163   my $self = shift;
164   my $ret = '';
165   my @extra;
166   if (ref $_[0] eq 'HASH') {
167     if (defined $_[0]->{group_by}) {
168       $ret = $self->_sqlcase(' group by ')
169         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
170     }
171     if (defined $_[0]->{having}) {
172       my $frag;
173       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
174       push(@{$self->{having_bind}}, @extra);
175       $ret .= $self->_sqlcase(' having ').$frag;
176     }
177     if (defined $_[0]->{order_by}) {
178       $ret .= $self->_order_by($_[0]->{order_by});
179     }
180   } elsif (ref $_[0] eq 'SCALAR') {
181     $ret = $self->_sqlcase(' order by ').${ $_[0] };
182   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
183     my @order = @{+shift};
184     $ret = $self->_sqlcase(' order by ')
185           .join(', ', map {
186                         my $r = $self->_order_by($_, @_);
187                         $r =~ s/^ ?ORDER BY //i;
188                         $r;
189                       } @order);
190   } else {
191     $ret = $self->SUPER::_order_by(@_);
192   }
193   return $ret;
194 }
195
196 sub _order_directions {
197   my ($self, $order) = @_;
198   $order = $order->{order_by} if ref $order eq 'HASH';
199   return $self->SUPER::_order_directions($order);
200 }
201
202 sub _table {
203   my ($self, $from) = @_;
204   if (ref $from eq 'ARRAY') {
205     return $self->_recurse_from(@$from);
206   } elsif (ref $from eq 'HASH') {
207     return $self->_make_as($from);
208   } else {
209     return $from; # would love to quote here but _table ends up getting called
210                   # twice during an ->select without a limit clause due to
211                   # the way S::A::Limit->select works. should maybe consider
212                   # bypassing this and doing S::A::select($self, ...) in
213                   # our select method above. meantime, quoting shims have
214                   # been added to select/insert/update/delete here
215   }
216 }
217
218 sub _recurse_from {
219   my ($self, $from, @join) = @_;
220   my @sqlf;
221   push(@sqlf, $self->_make_as($from));
222   foreach my $j (@join) {
223     my ($to, $on) = @$j;
224
225     # check whether a join type exists
226     my $join_clause = '';
227     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
228     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
229       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
230     } else {
231       $join_clause = ' JOIN ';
232     }
233     push(@sqlf, $join_clause);
234
235     if (ref $to eq 'ARRAY') {
236       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
237     } else {
238       push(@sqlf, $self->_make_as($to));
239     }
240     push(@sqlf, ' ON ', $self->_join_condition($on));
241   }
242   return join('', @sqlf);
243 }
244
245 sub _make_as {
246   my ($self, $from) = @_;
247   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
248                      reverse each %{$self->_skip_options($from)});
249 }
250
251 sub _skip_options {
252   my ($self, $hash) = @_;
253   my $clean_hash = {};
254   $clean_hash->{$_} = $hash->{$_}
255     for grep {!/^-/} keys %$hash;
256   return $clean_hash;
257 }
258
259 sub _join_condition {
260   my ($self, $cond) = @_;
261   if (ref $cond eq 'HASH') {
262     my %j;
263     for (keys %$cond) {
264       my $v = $cond->{$_};
265       if (ref $v) {
266         # XXX no throw_exception() in this package and croak() fails with strange results
267         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
268             if ref($v) ne 'SCALAR';
269         $j{$_} = $v;
270       }
271       else {
272         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
273       }
274     };
275     return scalar($self->_recurse_where(\%j));
276   } elsif (ref $cond eq 'ARRAY') {
277     return join(' OR ', map { $self->_join_condition($_) } @$cond);
278   } else {
279     die "Can't handle this yet!";
280   }
281 }
282
283 sub _quote {
284   my ($self, $label) = @_;
285   return '' unless defined $label;
286   return "*" if $label eq '*';
287   return $label unless $self->{quote_char};
288   if(ref $self->{quote_char} eq "ARRAY"){
289     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
290       if !defined $self->{name_sep};
291     my $sep = $self->{name_sep};
292     return join($self->{name_sep},
293         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
294        split(/\Q$sep\E/,$label));
295   }
296   return $self->SUPER::_quote($label);
297 }
298
299 sub limit_dialect {
300     my $self = shift;
301     $self->{limit_dialect} = shift if @_;
302     return $self->{limit_dialect};
303 }
304
305 sub quote_char {
306     my $self = shift;
307     $self->{quote_char} = shift if @_;
308     return $self->{quote_char};
309 }
310
311 sub name_sep {
312     my $self = shift;
313     $self->{name_sep} = shift if @_;
314     return $self->{name_sep};
315 }
316
317 } # End of BEGIN block
318
319 =head1 NAME
320
321 DBIx::Class::Storage::DBI - DBI storage handler
322
323 =head1 SYNOPSIS
324
325   my $schema = MySchema->connect('dbi:SQLite:my.db');
326
327   $schema->storage->debug(1);
328   $schema->dbh_do("DROP TABLE authors");
329
330   $schema->resultset('Book')->search({
331      written_on => $schema->storage->datetime_parser(DateTime->now)
332   });
333
334 =head1 DESCRIPTION
335
336 This class represents the connection to an RDBMS via L<DBI>.  See
337 L<DBIx::Class::Storage> for general information.  This pod only
338 documents DBI-specific methods and behaviors.
339
340 =head1 METHODS
341
342 =cut
343
344 sub new {
345   my $new = shift->next::method(@_);
346
347   $new->transaction_depth(0);
348   $new->_sql_maker_opts({});
349   $new->{savepoints} = [];
350   $new->{_in_dbh_do} = 0;
351   $new->{_dbh_gen} = 0;
352
353   $new;
354 }
355
356 =head2 connect_info
357
358 This method is normally called by L<DBIx::Class::Schema/connection>, which
359 encapsulates its argument list in an arrayref before passing them here.
360
361 The argument list may contain:
362
363 =over
364
365 =item *
366
367 The same 4-element argument set one would normally pass to
368 L<DBI/connect>, optionally followed by L<extra attributes|/DBIx::Class
369 specific connection attributes> recognized by DBIx::Class:
370
371   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
372
373 =item *
374
375 A single code reference which returns a connected L<DBI database
376 handle|DBI/connect> optionally followed by L<extra
377 attributes|/DBIx::Class specific connection attributes> recognized by
378 DBIx::Class:
379
380   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
381
382 =item *
383
384 A single hashref with all the attributes and the dsn/user/password
385 mixed together:
386
387   $connect_info_args = [{
388     dsn => $dsn,
389     user => $user,
390     password => $pass,
391     %dbi_attributes,
392     %extra_attributes,
393   }];
394
395 This is particularly useful for L<Catalyst> based applications, allowing the 
396 following config (in L<Config::General> style):
397
398   <Model::DB>
399     schema_class   App::DB
400     <connect_info>
401       dsn          dbi:mysql:database=test
402       user         testuser
403       password     TestPass
404       AutoCommit   1
405     </connect_info>
406   </Model::DB>
407
408 =back
409
410 Please note that the L<DBI> docs recommend that you always explicitly
411 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
412 recommends that it be set to I<1>, and that you perform transactions
413 via our L</txn_do> method.  L<DBIx::Class> will set it to I<1> if you
414 do not do explicitly set it to zero.  This is the default for most
415 DBDs. See L</DBIx::Class and AutoCommit> for details.
416
417 =head3 DBIx::Class specific connection attributes
418
419 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
420 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
421 the following connection options. These options can be mixed in with your other
422 L<DBI> connection attributes, or placed in a seperate hashref
423 (C<\%extra_attributes>) as shown above.
424
425 Every time C<connect_info> is invoked, any previous settings for
426 these options will be cleared before setting the new ones, regardless of
427 whether any options are specified in the new C<connect_info>.
428
429
430 =over
431
432 =item on_connect_do
433
434 Specifies things to do immediately after connecting or re-connecting to
435 the database.  Its value may contain:
436
437 =over
438
439 =item an array reference
440
441 This contains SQL statements to execute in order.  Each element contains
442 a string or a code reference that returns a string.
443
444 =item a code reference
445
446 This contains some code to execute.  Unlike code references within an
447 array reference, its return value is ignored.
448
449 =back
450
451 =item on_disconnect_do
452
453 Takes arguments in the same form as L</on_connect_do> and executes them
454 immediately before disconnecting from the database.
455
456 Note, this only runs if you explicitly call L</disconnect> on the
457 storage object.
458
459 =item disable_sth_caching
460
461 If set to a true value, this option will disable the caching of
462 statement handles via L<DBI/prepare_cached>.
463
464 =item limit_dialect 
465
466 Sets the limit dialect. This is useful for JDBC-bridge among others
467 where the remote SQL-dialect cannot be determined by the name of the
468 driver alone. See also L<SQL::Abstract::Limit>.
469
470 =item quote_char
471
472 Specifies what characters to use to quote table and column names. If 
473 you use this you will want to specify L</name_sep> as well.
474
475 C<quote_char> expects either a single character, in which case is it
476 is placed on either side of the table/column name, or an arrayref of length
477 2 in which case the table/column name is placed between the elements.
478
479 For example under MySQL you should use C<< quote_char => '`' >>, and for
480 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
481
482 =item name_sep
483
484 This only needs to be used in conjunction with L<quote_char>, and is used to 
485 specify the charecter that seperates elements (schemas, tables, columns) from 
486 each other. In most cases this is simply a C<.>.
487
488 The consequences of not supplying this value is that L<SQL::Abstract>
489 will assume DBIx::Class' uses of aliases to be complete column
490 names. The output will look like I<"me.name"> when it should actually
491 be I<"me"."name">.
492
493 =item unsafe
494
495 This Storage driver normally installs its own C<HandleError>, sets
496 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
497 all database handles, including those supplied by a coderef.  It does this
498 so that it can have consistent and useful error behavior.
499
500 If you set this option to a true value, Storage will not do its usual
501 modifications to the database handle's attributes, and instead relies on
502 the settings in your connect_info DBI options (or the values you set in
503 your connection coderef, in the case that you are connecting via coderef).
504
505 Note that your custom settings can cause Storage to malfunction,
506 especially if you set a C<HandleError> handler that suppresses exceptions
507 and/or disable C<RaiseError>.
508
509 =item auto_savepoint
510
511 If this option is true, L<DBIx::Class> will use savepoints when nesting
512 transactions, making it possible to recover from failure in the inner
513 transaction without having to abort all outer transactions.
514
515 =item cursor_class
516
517 Use this argument to supply a cursor class other than the default
518 L<DBIx::Class::Storage::DBI::Cursor>.
519
520 =back
521
522 Some real-life examples of arguments to L</connect_info> and
523 L<DBIx::Class::Schema/connect>
524
525   # Simple SQLite connection
526   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
527
528   # Connect via subref
529   ->connect_info([ sub { DBI->connect(...) } ]);
530
531   # A bit more complicated
532   ->connect_info(
533     [
534       'dbi:Pg:dbname=foo',
535       'postgres',
536       'my_pg_password',
537       { AutoCommit => 1 },
538       { quote_char => q{"}, name_sep => q{.} },
539     ]
540   );
541
542   # Equivalent to the previous example
543   ->connect_info(
544     [
545       'dbi:Pg:dbname=foo',
546       'postgres',
547       'my_pg_password',
548       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
549     ]
550   );
551
552   # Same, but with hashref as argument
553   # See parse_connect_info for explanation
554   ->connect_info(
555     [{
556       dsn         => 'dbi:Pg:dbname=foo',
557       user        => 'postgres',
558       password    => 'my_pg_password',
559       AutoCommit  => 1,
560       quote_char  => q{"},
561       name_sep    => q{.},
562     }]
563   );
564
565   # Subref + DBIx::Class-specific connection options
566   ->connect_info(
567     [
568       sub { DBI->connect(...) },
569       {
570           quote_char => q{`},
571           name_sep => q{@},
572           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
573           disable_sth_caching => 1,
574       },
575     ]
576   );
577
578
579
580 =cut
581
582 sub connect_info {
583   my ($self, $info_arg) = @_;
584
585   return $self->_connect_info if !$info_arg;
586
587   my @args = @$info_arg;  # take a shallow copy for further mutilation
588   $self->_connect_info([@args]); # copy for _connect_info
589
590
591   # combine/pre-parse arguments depending on invocation style
592
593   my %attrs;
594   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
595     %attrs = %{ $args[1] || {} };
596     @args = $args[0];
597   }
598   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
599     %attrs = %{$args[0]};
600     @args = ();
601     for (qw/password user dsn/) {
602       unshift @args, delete $attrs{$_};
603     }
604   }
605   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
606     %attrs = (
607       % { $args[3] || {} },
608       % { $args[4] || {} },
609     );
610     @args = @args[0,1,2];
611   }
612
613   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
614   #  the new set of options
615   $self->_sql_maker(undef);
616   $self->_sql_maker_opts({});
617
618   if(keys %attrs) {
619     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
620       if(my $value = delete $attrs{$storage_opt}) {
621         $self->$storage_opt($value);
622       }
623     }
624     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
625       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
626         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
627       }
628     }
629   }
630
631   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
632
633   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
634   $self->_connect_info;
635 }
636
637 =head2 on_connect_do
638
639 This method is deprecated in favour of setting via L</connect_info>.
640
641
642 =head2 dbh_do
643
644 Arguments: ($subref | $method_name), @extra_coderef_args?
645
646 Execute the given $subref or $method_name using the new exception-based
647 connection management.
648
649 The first two arguments will be the storage object that C<dbh_do> was called
650 on and a database handle to use.  Any additional arguments will be passed
651 verbatim to the called subref as arguments 2 and onwards.
652
653 Using this (instead of $self->_dbh or $self->dbh) ensures correct
654 exception handling and reconnection (or failover in future subclasses).
655
656 Your subref should have no side-effects outside of the database, as
657 there is the potential for your subref to be partially double-executed
658 if the database connection was stale/dysfunctional.
659
660 Example:
661
662   my @stuff = $schema->storage->dbh_do(
663     sub {
664       my ($storage, $dbh, @cols) = @_;
665       my $cols = join(q{, }, @cols);
666       $dbh->selectrow_array("SELECT $cols FROM foo");
667     },
668     @column_list
669   );
670
671 =cut
672
673 sub dbh_do {
674   my $self = shift;
675   my $code = shift;
676
677   my $dbh = $self->_dbh;
678
679   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
680       || $self->{transaction_depth};
681
682   local $self->{_in_dbh_do} = 1;
683
684   my @result;
685   my $want_array = wantarray;
686
687   eval {
688     $self->_verify_pid if $dbh;
689     if(!$self->_dbh) {
690         $self->_populate_dbh;
691         $dbh = $self->_dbh;
692     }
693
694     if($want_array) {
695         @result = $self->$code($dbh, @_);
696     }
697     elsif(defined $want_array) {
698         $result[0] = $self->$code($dbh, @_);
699     }
700     else {
701         $self->$code($dbh, @_);
702     }
703   };
704
705   my $exception = $@;
706   if(!$exception) { return $want_array ? @result : $result[0] }
707
708   $self->throw_exception($exception) if $self->connected;
709
710   # We were not connected - reconnect and retry, but let any
711   #  exception fall right through this time
712   $self->_populate_dbh;
713   $self->$code($self->_dbh, @_);
714 }
715
716 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
717 # It also informs dbh_do to bypass itself while under the direction of txn_do,
718 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
719 sub txn_do {
720   my $self = shift;
721   my $coderef = shift;
722
723   ref $coderef eq 'CODE' or $self->throw_exception
724     ('$coderef must be a CODE reference');
725
726   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
727
728   local $self->{_in_dbh_do} = 1;
729
730   my @result;
731   my $want_array = wantarray;
732
733   my $tried = 0;
734   while(1) {
735     eval {
736       $self->_verify_pid if $self->_dbh;
737       $self->_populate_dbh if !$self->_dbh;
738
739       $self->txn_begin;
740       if($want_array) {
741           @result = $coderef->(@_);
742       }
743       elsif(defined $want_array) {
744           $result[0] = $coderef->(@_);
745       }
746       else {
747           $coderef->(@_);
748       }
749       $self->txn_commit;
750     };
751
752     my $exception = $@;
753     if(!$exception) { return $want_array ? @result : $result[0] }
754
755     if($tried++ > 0 || $self->connected) {
756       eval { $self->txn_rollback };
757       my $rollback_exception = $@;
758       if($rollback_exception) {
759         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
760         $self->throw_exception($exception)  # propagate nested rollback
761           if $rollback_exception =~ /$exception_class/;
762
763         $self->throw_exception(
764           "Transaction aborted: ${exception}. "
765           . "Rollback failed: ${rollback_exception}"
766         );
767       }
768       $self->throw_exception($exception)
769     }
770
771     # We were not connected, and was first try - reconnect and retry
772     # via the while loop
773     $self->_populate_dbh;
774   }
775 }
776
777 =head2 disconnect
778
779 Our C<disconnect> method also performs a rollback first if the
780 database is not in C<AutoCommit> mode.
781
782 =cut
783
784 sub disconnect {
785   my ($self) = @_;
786
787   if( $self->connected ) {
788     my $connection_do = $self->on_disconnect_do;
789     $self->_do_connection_actions($connection_do) if ref($connection_do);
790
791     $self->_dbh->rollback unless $self->_dbh_autocommit;
792     $self->_dbh->disconnect;
793     $self->_dbh(undef);
794     $self->{_dbh_gen}++;
795   }
796 }
797
798 =head2 with_deferred_fk_checks
799
800 =over 4
801
802 =item Arguments: C<$coderef>
803
804 =item Return Value: The return value of $coderef
805
806 =back
807
808 Storage specific method to run the code ref with FK checks deferred or
809 in MySQL's case disabled entirely.
810
811 =cut
812
813 # Storage subclasses should override this
814 sub with_deferred_fk_checks {
815   my ($self, $sub) = @_;
816
817   $sub->();
818 }
819
820 sub connected {
821   my ($self) = @_;
822
823   if(my $dbh = $self->_dbh) {
824       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
825           $self->_dbh(undef);
826           $self->{_dbh_gen}++;
827           return;
828       }
829       else {
830           $self->_verify_pid;
831           return 0 if !$self->_dbh;
832       }
833       return ($dbh->FETCH('Active') && $dbh->ping);
834   }
835
836   return 0;
837 }
838
839 # handle pid changes correctly
840 #  NOTE: assumes $self->_dbh is a valid $dbh
841 sub _verify_pid {
842   my ($self) = @_;
843
844   return if defined $self->_conn_pid && $self->_conn_pid == $$;
845
846   $self->_dbh->{InactiveDestroy} = 1;
847   $self->_dbh(undef);
848   $self->{_dbh_gen}++;
849
850   return;
851 }
852
853 sub ensure_connected {
854   my ($self) = @_;
855
856   unless ($self->connected) {
857     $self->_populate_dbh;
858   }
859 }
860
861 =head2 dbh
862
863 Returns the dbh - a data base handle of class L<DBI>.
864
865 =cut
866
867 sub dbh {
868   my ($self) = @_;
869
870   $self->ensure_connected;
871   return $self->_dbh;
872 }
873
874 sub _sql_maker_args {
875     my ($self) = @_;
876     
877     return ( bindtype=>'columns', limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
878 }
879
880 sub sql_maker {
881   my ($self) = @_;
882   unless ($self->_sql_maker) {
883     my $sql_maker_class = $self->sql_maker_class;
884     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
885   }
886   return $self->_sql_maker;
887 }
888
889 sub _rebless {}
890
891 sub _populate_dbh {
892   my ($self) = @_;
893   my @info = @{$self->_dbi_connect_info || []};
894   $self->_dbh($self->_connect(@info));
895
896   # Always set the transaction depth on connect, since
897   #  there is no transaction in progress by definition
898   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
899
900   if(ref $self eq 'DBIx::Class::Storage::DBI') {
901     my $driver = $self->_dbh->{Driver}->{Name};
902     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
903       bless $self, "DBIx::Class::Storage::DBI::${driver}";
904       $self->_rebless();
905     }
906   }
907
908   my $connection_do = $self->on_connect_do;
909   $self->_do_connection_actions($connection_do) if ref($connection_do);
910
911   $self->_conn_pid($$);
912   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
913 }
914
915 sub _do_connection_actions {
916   my $self = shift;
917   my $connection_do = shift;
918
919   if (ref $connection_do eq 'ARRAY') {
920     $self->_do_query($_) foreach @$connection_do;
921   }
922   elsif (ref $connection_do eq 'CODE') {
923     $connection_do->();
924   }
925
926   return $self;
927 }
928
929 sub _do_query {
930   my ($self, $action) = @_;
931
932   if (ref $action eq 'CODE') {
933     $action = $action->($self);
934     $self->_do_query($_) foreach @$action;
935   }
936   else {
937     my @to_run = (ref $action eq 'ARRAY') ? (@$action) : ($action);
938     $self->_query_start(@to_run);
939     $self->_dbh->do(@to_run);
940     $self->_query_end(@to_run);
941   }
942
943   return $self;
944 }
945
946 sub _connect {
947   my ($self, @info) = @_;
948
949   $self->throw_exception("You failed to provide any connection info")
950     if !@info;
951
952   my ($old_connect_via, $dbh);
953
954   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
955     $old_connect_via = $DBI::connect_via;
956     $DBI::connect_via = 'connect';
957   }
958
959   eval {
960     if(ref $info[0] eq 'CODE') {
961        $dbh = &{$info[0]}
962     }
963     else {
964        $dbh = DBI->connect(@info);
965     }
966
967     if($dbh && !$self->unsafe) {
968       my $weak_self = $self;
969       weaken($weak_self);
970       $dbh->{HandleError} = sub {
971           if ($weak_self) {
972             $weak_self->throw_exception("DBI Exception: $_[0]");
973           }
974           else {
975             croak ("DBI Exception: $_[0]");
976           }
977       };
978       $dbh->{ShowErrorStatement} = 1;
979       $dbh->{RaiseError} = 1;
980       $dbh->{PrintError} = 0;
981     }
982   };
983
984   $DBI::connect_via = $old_connect_via if $old_connect_via;
985
986   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
987     if !$dbh || $@;
988
989   $self->_dbh_autocommit($dbh->{AutoCommit});
990
991   $dbh;
992 }
993
994 sub svp_begin {
995   my ($self, $name) = @_;
996
997   $name = $self->_svp_generate_name
998     unless defined $name;
999
1000   $self->throw_exception ("You can't use savepoints outside a transaction")
1001     if $self->{transaction_depth} == 0;
1002
1003   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1004     unless $self->can('_svp_begin');
1005   
1006   push @{ $self->{savepoints} }, $name;
1007
1008   $self->debugobj->svp_begin($name) if $self->debug;
1009   
1010   return $self->_svp_begin($name);
1011 }
1012
1013 sub svp_release {
1014   my ($self, $name) = @_;
1015
1016   $self->throw_exception ("You can't use savepoints outside a transaction")
1017     if $self->{transaction_depth} == 0;
1018
1019   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1020     unless $self->can('_svp_release');
1021
1022   if (defined $name) {
1023     $self->throw_exception ("Savepoint '$name' does not exist")
1024       unless grep { $_ eq $name } @{ $self->{savepoints} };
1025
1026     # Dig through the stack until we find the one we are releasing.  This keeps
1027     # the stack up to date.
1028     my $svp;
1029
1030     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1031   } else {
1032     $name = pop @{ $self->{savepoints} };
1033   }
1034
1035   $self->debugobj->svp_release($name) if $self->debug;
1036
1037   return $self->_svp_release($name);
1038 }
1039
1040 sub svp_rollback {
1041   my ($self, $name) = @_;
1042
1043   $self->throw_exception ("You can't use savepoints outside a transaction")
1044     if $self->{transaction_depth} == 0;
1045
1046   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1047     unless $self->can('_svp_rollback');
1048
1049   if (defined $name) {
1050       # If they passed us a name, verify that it exists in the stack
1051       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1052           $self->throw_exception("Savepoint '$name' does not exist!");
1053       }
1054
1055       # Dig through the stack until we find the one we are releasing.  This keeps
1056       # the stack up to date.
1057       while(my $s = pop(@{ $self->{savepoints} })) {
1058           last if($s eq $name);
1059       }
1060       # Add the savepoint back to the stack, as a rollback doesn't remove the
1061       # named savepoint, only everything after it.
1062       push(@{ $self->{savepoints} }, $name);
1063   } else {
1064       # We'll assume they want to rollback to the last savepoint
1065       $name = $self->{savepoints}->[-1];
1066   }
1067
1068   $self->debugobj->svp_rollback($name) if $self->debug;
1069   
1070   return $self->_svp_rollback($name);
1071 }
1072
1073 sub _svp_generate_name {
1074     my ($self) = @_;
1075
1076     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1077 }
1078
1079 sub txn_begin {
1080   my $self = shift;
1081   $self->ensure_connected();
1082   if($self->{transaction_depth} == 0) {
1083     $self->debugobj->txn_begin()
1084       if $self->debug;
1085     # this isn't ->_dbh-> because
1086     #  we should reconnect on begin_work
1087     #  for AutoCommit users
1088     $self->dbh->begin_work;
1089   } elsif ($self->auto_savepoint) {
1090     $self->svp_begin;
1091   }
1092   $self->{transaction_depth}++;
1093 }
1094
1095 sub txn_commit {
1096   my $self = shift;
1097   if ($self->{transaction_depth} == 1) {
1098     my $dbh = $self->_dbh;
1099     $self->debugobj->txn_commit()
1100       if ($self->debug);
1101     $dbh->commit;
1102     $self->{transaction_depth} = 0
1103       if $self->_dbh_autocommit;
1104   }
1105   elsif($self->{transaction_depth} > 1) {
1106     $self->{transaction_depth}--;
1107     $self->svp_release
1108       if $self->auto_savepoint;
1109   }
1110 }
1111
1112 sub txn_rollback {
1113   my $self = shift;
1114   my $dbh = $self->_dbh;
1115   eval {
1116     if ($self->{transaction_depth} == 1) {
1117       $self->debugobj->txn_rollback()
1118         if ($self->debug);
1119       $self->{transaction_depth} = 0
1120         if $self->_dbh_autocommit;
1121       $dbh->rollback;
1122     }
1123     elsif($self->{transaction_depth} > 1) {
1124       $self->{transaction_depth}--;
1125       if ($self->auto_savepoint) {
1126         $self->svp_rollback;
1127         $self->svp_release;
1128       }
1129     }
1130     else {
1131       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1132     }
1133   };
1134   if ($@) {
1135     my $error = $@;
1136     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1137     $error =~ /$exception_class/ and $self->throw_exception($error);
1138     # ensure that a failed rollback resets the transaction depth
1139     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1140     $self->throw_exception($error);
1141   }
1142 }
1143
1144 # This used to be the top-half of _execute.  It was split out to make it
1145 #  easier to override in NoBindVars without duping the rest.  It takes up
1146 #  all of _execute's args, and emits $sql, @bind.
1147 sub _prep_for_execute {
1148   my ($self, $op, $extra_bind, $ident, $args) = @_;
1149
1150   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1151   unshift(@bind,
1152     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1153       if $extra_bind;
1154
1155   return ($sql, \@bind);
1156 }
1157
1158 sub _fix_bind_params {
1159     my ($self, @bind) = @_;
1160
1161     ### Turn @bind from something like this:
1162     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1163     ### to this:
1164     ###   ( "'1'", "'1'", "'3'" )
1165     return
1166         map {
1167             if ( defined( $_ && $_->[1] ) ) {
1168                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1169             }
1170             else { q{'NULL'}; }
1171         } @bind;
1172 }
1173
1174 sub _query_start {
1175     my ( $self, $sql, @bind ) = @_;
1176
1177     if ( $self->debug ) {
1178         @bind = $self->_fix_bind_params(@bind);
1179         
1180         $self->debugobj->query_start( $sql, @bind );
1181     }
1182 }
1183
1184 sub _query_end {
1185     my ( $self, $sql, @bind ) = @_;
1186
1187     if ( $self->debug ) {
1188         @bind = $self->_fix_bind_params(@bind);
1189         $self->debugobj->query_end( $sql, @bind );
1190     }
1191 }
1192
1193 sub _dbh_execute {
1194   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1195   
1196   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1197     $ident = $ident->from();
1198   }
1199
1200   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1201
1202   $self->_query_start( $sql, @$bind );
1203
1204   my $sth = $self->sth($sql,$op);
1205
1206   my $placeholder_index = 1; 
1207
1208   foreach my $bound (@$bind) {
1209     my $attributes = {};
1210     my($column_name, @data) = @$bound;
1211
1212     if ($bind_attributes) {
1213       $attributes = $bind_attributes->{$column_name}
1214       if defined $bind_attributes->{$column_name};
1215     }
1216
1217     foreach my $data (@data) {
1218       $data = ref $data ? ''.$data : $data; # stringify args
1219
1220       $sth->bind_param($placeholder_index, $data, $attributes);
1221       $placeholder_index++;
1222     }
1223   }
1224
1225   # Can this fail without throwing an exception anyways???
1226   my $rv = $sth->execute();
1227   $self->throw_exception($sth->errstr) if !$rv;
1228
1229   $self->_query_end( $sql, @$bind );
1230
1231   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1232 }
1233
1234 sub _execute {
1235     my $self = shift;
1236     $self->dbh_do('_dbh_execute', @_)
1237 }
1238
1239 sub insert {
1240   my ($self, $source, $to_insert) = @_;
1241   
1242   my $ident = $source->from; 
1243   my $bind_attributes = $self->source_bind_attributes($source);
1244
1245   $self->ensure_connected;
1246   foreach my $col ( $source->columns ) {
1247     if ( !defined $to_insert->{$col} ) {
1248       my $col_info = $source->column_info($col);
1249
1250       if ( $col_info->{auto_nextval} ) {
1251         $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1252       }
1253     }
1254   }
1255
1256   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1257
1258   return $to_insert;
1259 }
1260
1261 ## Still not quite perfect, and EXPERIMENTAL
1262 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1263 ## scalar refs, or at least, all the same type as the first set, the statement is
1264 ## only prepped once.
1265 sub insert_bulk {
1266   my ($self, $source, $cols, $data) = @_;
1267   my %colvalues;
1268   my $table = $source->from;
1269   @colvalues{@$cols} = (0..$#$cols);
1270   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1271   
1272   $self->_query_start( $sql, @bind );
1273   my $sth = $self->sth($sql);
1274
1275 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1276
1277   ## This must be an arrayref, else nothing works!
1278   
1279   my $tuple_status = [];
1280   
1281   ##use Data::Dumper;
1282   ##print STDERR Dumper( $data, $sql, [@bind] );
1283
1284   my $time = time();
1285
1286   ## Get the bind_attributes, if any exist
1287   my $bind_attributes = $self->source_bind_attributes($source);
1288
1289   ## Bind the values and execute
1290   my $placeholder_index = 1; 
1291
1292   foreach my $bound (@bind) {
1293
1294     my $attributes = {};
1295     my ($column_name, $data_index) = @$bound;
1296
1297     if( $bind_attributes ) {
1298       $attributes = $bind_attributes->{$column_name}
1299       if defined $bind_attributes->{$column_name};
1300     }
1301
1302     my @data = map { $_->[$data_index] } @$data;
1303
1304     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1305     $placeholder_index++;
1306   }
1307   my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
1308   $self->throw_exception($sth->errstr) if !$rv;
1309
1310   $self->_query_end( $sql, @bind );
1311   return (wantarray ? ($rv, $sth, @bind) : $rv);
1312 }
1313
1314 sub update {
1315   my $self = shift @_;
1316   my $source = shift @_;
1317   my $bind_attributes = $self->source_bind_attributes($source);
1318   
1319   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1320 }
1321
1322
1323 sub delete {
1324   my $self = shift @_;
1325   my $source = shift @_;
1326   
1327   my $bind_attrs = {}; ## If ever it's needed...
1328   
1329   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1330 }
1331
1332 sub _select {
1333   my ($self, $ident, $select, $condition, $attrs) = @_;
1334   my $order = $attrs->{order_by};
1335
1336   if (ref $condition eq 'SCALAR') {
1337     my $unwrap = ${$condition};
1338     if ($unwrap =~ s/ORDER BY (.*)$//i) {
1339       $order = $1;
1340       $condition = \$unwrap;
1341     }
1342   }
1343
1344   my $for = delete $attrs->{for};
1345   my $sql_maker = $self->sql_maker;
1346   local $sql_maker->{for} = $for;
1347
1348   if (exists $attrs->{group_by} || $attrs->{having}) {
1349     $order = {
1350       group_by => $attrs->{group_by},
1351       having => $attrs->{having},
1352       ($order ? (order_by => $order) : ())
1353     };
1354   }
1355   my $bind_attrs = {}; ## Future support
1356   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1357   if ($attrs->{software_limit} ||
1358       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1359         $attrs->{software_limit} = 1;
1360   } else {
1361     $self->throw_exception("rows attribute must be positive if present")
1362       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1363
1364     # MySQL actually recommends this approach.  I cringe.
1365     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1366     push @args, $attrs->{rows}, $attrs->{offset};
1367   }
1368
1369   return $self->_execute(@args);
1370 }
1371
1372 sub source_bind_attributes {
1373   my ($self, $source) = @_;
1374   
1375   my $bind_attributes;
1376   foreach my $column ($source->columns) {
1377   
1378     my $data_type = $source->column_info($column)->{data_type} || '';
1379     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1380      if $data_type;
1381   }
1382
1383   return $bind_attributes;
1384 }
1385
1386 =head2 select
1387
1388 =over 4
1389
1390 =item Arguments: $ident, $select, $condition, $attrs
1391
1392 =back
1393
1394 Handle a SQL select statement.
1395
1396 =cut
1397
1398 sub select {
1399   my $self = shift;
1400   my ($ident, $select, $condition, $attrs) = @_;
1401   return $self->cursor_class->new($self, \@_, $attrs);
1402 }
1403
1404 sub select_single {
1405   my $self = shift;
1406   my ($rv, $sth, @bind) = $self->_select(@_);
1407   my @row = $sth->fetchrow_array;
1408   if(@row && $sth->fetchrow_array) {
1409     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1410   }
1411   # Need to call finish() to work round broken DBDs
1412   $sth->finish();
1413   return @row;
1414 }
1415
1416 =head2 sth
1417
1418 =over 4
1419
1420 =item Arguments: $sql
1421
1422 =back
1423
1424 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1425
1426 =cut
1427
1428 sub _dbh_sth {
1429   my ($self, $dbh, $sql) = @_;
1430
1431   # 3 is the if_active parameter which avoids active sth re-use
1432   my $sth = $self->disable_sth_caching
1433     ? $dbh->prepare($sql)
1434     : $dbh->prepare_cached($sql, {}, 3);
1435
1436   # XXX You would think RaiseError would make this impossible,
1437   #  but apparently that's not true :(
1438   $self->throw_exception($dbh->errstr) if !$sth;
1439
1440   $sth;
1441 }
1442
1443 sub sth {
1444   my ($self, $sql) = @_;
1445   $self->dbh_do('_dbh_sth', $sql);
1446 }
1447
1448 sub _dbh_columns_info_for {
1449   my ($self, $dbh, $table) = @_;
1450
1451   if ($dbh->can('column_info')) {
1452     my %result;
1453     eval {
1454       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1455       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1456       $sth->execute();
1457       while ( my $info = $sth->fetchrow_hashref() ){
1458         my %column_info;
1459         $column_info{data_type}   = $info->{TYPE_NAME};
1460         $column_info{size}      = $info->{COLUMN_SIZE};
1461         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1462         $column_info{default_value} = $info->{COLUMN_DEF};
1463         my $col_name = $info->{COLUMN_NAME};
1464         $col_name =~ s/^\"(.*)\"$/$1/;
1465
1466         $result{$col_name} = \%column_info;
1467       }
1468     };
1469     return \%result if !$@ && scalar keys %result;
1470   }
1471
1472   my %result;
1473   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1474   $sth->execute;
1475   my @columns = @{$sth->{NAME_lc}};
1476   for my $i ( 0 .. $#columns ){
1477     my %column_info;
1478     $column_info{data_type} = $sth->{TYPE}->[$i];
1479     $column_info{size} = $sth->{PRECISION}->[$i];
1480     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1481
1482     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1483       $column_info{data_type} = $1;
1484       $column_info{size}    = $2;
1485     }
1486
1487     $result{$columns[$i]} = \%column_info;
1488   }
1489   $sth->finish;
1490
1491   foreach my $col (keys %result) {
1492     my $colinfo = $result{$col};
1493     my $type_num = $colinfo->{data_type};
1494     my $type_name;
1495     if(defined $type_num && $dbh->can('type_info')) {
1496       my $type_info = $dbh->type_info($type_num);
1497       $type_name = $type_info->{TYPE_NAME} if $type_info;
1498       $colinfo->{data_type} = $type_name if $type_name;
1499     }
1500   }
1501
1502   return \%result;
1503 }
1504
1505 sub columns_info_for {
1506   my ($self, $table) = @_;
1507   $self->dbh_do('_dbh_columns_info_for', $table);
1508 }
1509
1510 =head2 last_insert_id
1511
1512 Return the row id of the last insert.
1513
1514 =cut
1515
1516 sub _dbh_last_insert_id {
1517     my ($self, $dbh, $source, $col) = @_;
1518     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1519     $dbh->func('last_insert_rowid');
1520 }
1521
1522 sub last_insert_id {
1523   my $self = shift;
1524   $self->dbh_do('_dbh_last_insert_id', @_);
1525 }
1526
1527 =head2 sqlt_type
1528
1529 Returns the database driver name.
1530
1531 =cut
1532
1533 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1534
1535 =head2 bind_attribute_by_data_type
1536
1537 Given a datatype from column info, returns a database specific bind
1538 attribute for $dbh->bind_param($val,$attribute) or nothing if we will
1539 let the database planner just handle it.
1540
1541 Generally only needed for special case column types, like bytea in postgres.
1542
1543 =cut
1544
1545 sub bind_attribute_by_data_type {
1546     return;
1547 }
1548
1549 =head2 create_ddl_dir
1550
1551 =over 4
1552
1553 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1554
1555 =back
1556
1557 Creates a SQL file based on the Schema, for each of the specified
1558 database types, in the given directory.
1559
1560 By default, C<\%sqlt_args> will have
1561
1562  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1563
1564 merged with the hash passed in. To disable any of those features, pass in a 
1565 hashref like the following
1566
1567  { ignore_constraint_names => 0, # ... other options }
1568
1569 =cut
1570
1571 sub create_ddl_dir {
1572   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1573
1574   if(!$dir || !-d $dir) {
1575     warn "No directory given, using ./\n";
1576     $dir = "./";
1577   }
1578   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1579   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1580   $version ||= $schema->VERSION || '1.x';
1581   $sqltargs = {
1582     add_drop_table => 1, 
1583     ignore_constraint_names => 1,
1584     ignore_index_names => 1,
1585     %{$sqltargs || {}}
1586   };
1587
1588   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09: '}
1589       . $self->_check_sqlt_message . q{'})
1590           if !$self->_check_sqlt_version;
1591
1592   my $sqlt = SQL::Translator->new( $sqltargs );
1593
1594   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1595   my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
1596
1597   foreach my $db (@$databases) {
1598     $sqlt->reset();
1599     $sqlt = $self->configure_sqlt($sqlt, $db);
1600     $sqlt->{schema} = $sqlt_schema;
1601     $sqlt->producer($db);
1602
1603     my $file;
1604     my $filename = $schema->ddl_filename($db, $version, $dir);
1605     if (-e $filename && (!$version || ($version == $schema->schema_version()))) {
1606       # if we are dumping the current version, overwrite the DDL
1607       warn "Overwriting existing DDL file - $filename";
1608       unlink($filename);
1609     }
1610
1611     my $output = $sqlt->translate;
1612     if(!$output) {
1613       warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1614       next;
1615     }
1616     if(!open($file, ">$filename")) {
1617       $self->throw_exception("Can't open $filename for writing ($!)");
1618       next;
1619     }
1620     print $file $output;
1621     close($file);
1622   
1623     next unless ($preversion);
1624
1625     require SQL::Translator::Diff;
1626
1627     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1628     if(!-e $prefilename) {
1629       warn("No previous schema file found ($prefilename)");
1630       next;
1631     }
1632
1633     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1634     if(-e $difffile) {
1635       warn("Overwriting existing diff file - $difffile");
1636       unlink($difffile);
1637     }
1638     
1639     my $source_schema;
1640     {
1641       my $t = SQL::Translator->new($sqltargs);
1642       $t->debug( 0 );
1643       $t->trace( 0 );
1644       $t->parser( $db )                       or die $t->error;
1645       $t = $self->configure_sqlt($t, $db);
1646       my $out = $t->translate( $prefilename ) or die $t->error;
1647       $source_schema = $t->schema;
1648       unless ( $source_schema->name ) {
1649         $source_schema->name( $prefilename );
1650       }
1651     }
1652
1653     # The "new" style of producers have sane normalization and can support 
1654     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1655     # And we have to diff parsed SQL against parsed SQL.
1656     my $dest_schema = $sqlt_schema;
1657     
1658     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1659       my $t = SQL::Translator->new($sqltargs);
1660       $t->debug( 0 );
1661       $t->trace( 0 );
1662       $t->parser( $db )                    or die $t->error;
1663       $t = $self->configure_sqlt($t, $db);
1664       my $out = $t->translate( $filename ) or die $t->error;
1665       $dest_schema = $t->schema;
1666       $dest_schema->name( $filename )
1667         unless $dest_schema->name;
1668     }
1669     
1670     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1671                                                   $dest_schema,   $db,
1672                                                   $sqltargs
1673                                                  );
1674     if(!open $file, ">$difffile") { 
1675       $self->throw_exception("Can't write to $difffile ($!)");
1676       next;
1677     }
1678     print $file $diff;
1679     close($file);
1680   }
1681 }
1682
1683 sub configure_sqlt() {
1684   my $self = shift;
1685   my $tr = shift;
1686   my $db = shift || $self->sqlt_type;
1687   if ($db eq 'PostgreSQL') {
1688     $tr->quote_table_names(0);
1689     $tr->quote_field_names(0);
1690   }
1691   return $tr;
1692 }
1693
1694 =head2 deployment_statements
1695
1696 =over 4
1697
1698 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1699
1700 =back
1701
1702 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1703 The database driver name is given by C<$type>, though the value from
1704 L</sqlt_type> is used if it is not specified.
1705
1706 C<$directory> is used to return statements from files in a previously created
1707 L</create_ddl_dir> directory and is optional. The filenames are constructed
1708 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1709
1710 If no C<$directory> is specified then the statements are constructed on the
1711 fly using L<SQL::Translator> and C<$version> is ignored.
1712
1713 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1714
1715 =cut
1716
1717 sub deployment_statements {
1718   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1719   # Need to be connected to get the correct sqlt_type
1720   $self->ensure_connected() unless $type;
1721   $type ||= $self->sqlt_type;
1722   $version ||= $schema->VERSION || '1.x';
1723   $dir ||= './';
1724   my $filename = $schema->ddl_filename($type, $dir, $version);
1725   if(-f $filename)
1726   {
1727       my $file;
1728       open($file, "<$filename") 
1729         or $self->throw_exception("Can't open $filename ($!)");
1730       my @rows = <$file>;
1731       close($file);
1732       return join('', @rows);
1733   }
1734
1735   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09: '}
1736       . $self->_check_sqlt_message . q{'})
1737           if !$self->_check_sqlt_version;
1738
1739   require SQL::Translator::Parser::DBIx::Class;
1740   eval qq{use SQL::Translator::Producer::${type}};
1741   $self->throw_exception($@) if $@;
1742
1743   # sources needs to be a parser arg, but for simplicty allow at top level 
1744   # coming in
1745   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1746       if exists $sqltargs->{sources};
1747
1748   my $tr = SQL::Translator->new(%$sqltargs);
1749   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1750   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1751 }
1752
1753 sub deploy {
1754   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1755   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1756     foreach my $line ( split(";\n", $statement)) {
1757       next if($line =~ /^--/);
1758       next if(!$line);
1759 #      next if($line =~ /^DROP/m);
1760       next if($line =~ /^BEGIN TRANSACTION/m);
1761       next if($line =~ /^COMMIT/m);
1762       next if $line =~ /^\s+$/; # skip whitespace only
1763       $self->_query_start($line);
1764       eval {
1765         $self->dbh->do($line); # shouldn't be using ->dbh ?
1766       };
1767       if ($@) {
1768         warn qq{$@ (running "${line}")};
1769       }
1770       $self->_query_end($line);
1771     }
1772   }
1773 }
1774
1775 =head2 datetime_parser
1776
1777 Returns the datetime parser class
1778
1779 =cut
1780
1781 sub datetime_parser {
1782   my $self = shift;
1783   return $self->{datetime_parser} ||= do {
1784     $self->ensure_connected;
1785     $self->build_datetime_parser(@_);
1786   };
1787 }
1788
1789 =head2 datetime_parser_type
1790
1791 Defines (returns) the datetime parser class - currently hardwired to
1792 L<DateTime::Format::MySQL>
1793
1794 =cut
1795
1796 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1797
1798 =head2 build_datetime_parser
1799
1800 See L</datetime_parser>
1801
1802 =cut
1803
1804 sub build_datetime_parser {
1805   my $self = shift;
1806   my $type = $self->datetime_parser_type(@_);
1807   eval "use ${type}";
1808   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1809   return $type;
1810 }
1811
1812 {
1813     my $_check_sqlt_version; # private
1814     my $_check_sqlt_message; # private
1815     sub _check_sqlt_version {
1816         return $_check_sqlt_version if defined $_check_sqlt_version;
1817         eval 'use SQL::Translator "0.09"';
1818         $_check_sqlt_message = $@ || '';
1819         $_check_sqlt_version = !$@;
1820     }
1821
1822     sub _check_sqlt_message {
1823         _check_sqlt_version if !defined $_check_sqlt_message;
1824         $_check_sqlt_message;
1825     }
1826 }
1827
1828 =head2 is_replicating
1829
1830 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
1831 replicate from a master database.  Default is undef, which is the result
1832 returned by databases that don't support replication.
1833
1834 =cut
1835
1836 sub is_replicating {
1837     return;
1838     
1839 }
1840
1841 =head2 lag_behind_master
1842
1843 Returns a number that represents a certain amount of lag behind a master db
1844 when a given storage is replicating.  The number is database dependent, but
1845 starts at zero and increases with the amount of lag. Default in undef
1846
1847 =cut
1848
1849 sub lag_behind_master {
1850     return;
1851 }
1852
1853 sub DESTROY {
1854   my $self = shift;
1855   return if !$self->_dbh;
1856   $self->_verify_pid;
1857   $self->_dbh(undef);
1858 }
1859
1860 1;
1861
1862 =head1 USAGE NOTES
1863
1864 =head2 DBIx::Class and AutoCommit
1865
1866 DBIx::Class can do some wonderful magic with handling exceptions,
1867 disconnections, and transactions when you use C<< AutoCommit => 1 >>
1868 combined with C<txn_do> for transaction support.
1869
1870 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
1871 in an assumed transaction between commits, and you're telling us you'd
1872 like to manage that manually.  A lot of the magic protections offered by
1873 this module will go away.  We can't protect you from exceptions due to database
1874 disconnects because we don't know anything about how to restart your
1875 transactions.  You're on your own for handling all sorts of exceptional
1876 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
1877 be with raw DBI.
1878
1879
1880 =head1 SQL METHODS
1881
1882 The module defines a set of methods within the DBIC::SQL::Abstract
1883 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1884 SQL query functions.
1885
1886 The following methods are extended:-
1887
1888 =over 4
1889
1890 =item delete
1891
1892 =item insert
1893
1894 =item select
1895
1896 =item update
1897
1898 =item limit_dialect
1899
1900 See L</connect_info> for details.
1901 For setting, this method is deprecated in favor of L</connect_info>.
1902
1903 =item quote_char
1904
1905 See L</connect_info> for details.
1906 For setting, this method is deprecated in favor of L</connect_info>.
1907
1908 =item name_sep
1909
1910 See L</connect_info> for details.
1911 For setting, this method is deprecated in favor of L</connect_info>.
1912
1913 =back
1914
1915 =head1 AUTHORS
1916
1917 Matt S. Trout <mst@shadowcatsystems.co.uk>
1918
1919 Andy Grundman <andy@hybridized.org>
1920
1921 =head1 LICENSE
1922
1923 You may distribute this code under the same terms as Perl itself.
1924
1925 =cut