26fcbbb9314cf51049d416f0d37facc41b5eb7f5
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use Scalar::Util();
13 use List::Util();
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit _on_connect_do
18        _on_disconnect_do __on_connect_do __on_disconnect_do savepoints/
19 );
20
21 # the values for these accessors are picked out (and deleted) from
22 # the attribute hashref passed to connect_info
23 my @storage_options = qw/
24   on_connect_call on_disconnect_call disable_sth_caching unsafe auto_savepoint
25 /;
26 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
27
28
29 # default cursor class, overridable in connect_info attributes
30 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
31
32 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
33 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
34
35
36 =head1 NAME
37
38 DBIx::Class::Storage::DBI - DBI storage handler
39
40 =head1 SYNOPSIS
41
42   my $schema = MySchema->connect('dbi:SQLite:my.db');
43
44   $schema->storage->debug(1);
45   $schema->dbh_do("DROP TABLE authors");
46
47   $schema->resultset('Book')->search({
48      written_on => $schema->storage->datetime_parser(DateTime->now)
49   });
50
51 =head1 DESCRIPTION
52
53 This class represents the connection to an RDBMS via L<DBI>.  See
54 L<DBIx::Class::Storage> for general information.  This pod only
55 documents DBI-specific methods and behaviors.
56
57 =head1 METHODS
58
59 =cut
60
61 sub new {
62   my $new = shift->next::method(@_);
63
64   $new->transaction_depth(0);
65   $new->_sql_maker_opts({});
66   $new->{savepoints} = [];
67   $new->{_in_dbh_do} = 0;
68   $new->{_dbh_gen} = 0;
69
70   $new;
71 }
72
73 =head2 connect_info
74
75 This method is normally called by L<DBIx::Class::Schema/connection>, which
76 encapsulates its argument list in an arrayref before passing them here.
77
78 The argument list may contain:
79
80 =over
81
82 =item *
83
84 The same 4-element argument set one would normally pass to
85 L<DBI/connect>, optionally followed by
86 L<extra attributes|/DBIx::Class specific connection attributes>
87 recognized by DBIx::Class:
88
89   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
90
91 =item *
92
93 A single code reference which returns a connected 
94 L<DBI database handle|DBI/connect> optionally followed by 
95 L<extra attributes|/DBIx::Class specific connection attributes> recognized
96 by DBIx::Class:
97
98   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
99
100 =item *
101
102 A single hashref with all the attributes and the dsn/user/password
103 mixed together:
104
105   $connect_info_args = [{
106     dsn => $dsn,
107     user => $user,
108     password => $pass,
109     %dbi_attributes,
110     %extra_attributes,
111   }];
112
113 This is particularly useful for L<Catalyst> based applications, allowing the 
114 following config (L<Config::General> style):
115
116   <Model::DB>
117     schema_class   App::DB
118     <connect_info>
119       dsn          dbi:mysql:database=test
120       user         testuser
121       password     TestPass
122       AutoCommit   1
123     </connect_info>
124   </Model::DB>
125
126 =back
127
128 Please note that the L<DBI> docs recommend that you always explicitly
129 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
130 recommends that it be set to I<1>, and that you perform transactions
131 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
132 to I<1> if you do not do explicitly set it to zero.  This is the default 
133 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
134
135 =head3 DBIx::Class specific connection attributes
136
137 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
138 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
139 the following connection options. These options can be mixed in with your other
140 L<DBI> connection attributes, or placed in a seperate hashref
141 (C<\%extra_attributes>) as shown above.
142
143 Every time C<connect_info> is invoked, any previous settings for
144 these options will be cleared before setting the new ones, regardless of
145 whether any options are specified in the new C<connect_info>.
146
147
148 =over
149
150 =item on_connect_do
151
152 Specifies things to do immediately after connecting or re-connecting to
153 the database.  Its value may contain:
154
155 =over
156
157 =item a scalar
158
159 This contains one SQL statement to execute.
160
161 =item an array reference
162
163 This contains SQL statements to execute in order.  Each element contains
164 a string or a code reference that returns a string.
165
166 =item a code reference
167
168 This contains some code to execute.  Unlike code references within an
169 array reference, its return value is ignored.
170
171 =back
172
173 =item on_disconnect_do
174
175 Takes arguments in the same form as L</on_connect_do> and executes them
176 immediately before disconnecting from the database.
177
178 Note, this only runs if you explicitly call L</disconnect> on the
179 storage object.
180
181 =item on_connect_call
182
183 A more generalized form of L</on_connect_do> that calls the specified
184 C<connect_call_METHOD> methods in your storage driver.
185
186   on_connect_do => 'select 1'
187
188 is equivalent to:
189
190   on_connect_call => [ [ do_sql => 'select 1' ] ]
191
192 Its values may contain:
193
194 =over
195
196 =item a scalar
197
198 Will call the C<connect_call_METHOD> method.
199
200 =item a code reference
201
202 Will execute C<< $code->($storage) >>
203
204 =item an array reference
205
206 Each value can be a method name or code reference.
207
208 =item an array of arrays
209
210 For each array, the first item is taken to be the C<connect_call_> method name
211 or code reference, and the rest are parameters to it.
212
213 =back
214
215 Some predefined storage methods you may use:
216
217 =over
218
219 =item do_sql
220
221 Executes a SQL string or a code reference that returns a SQL string. This is
222 what L</on_connect_do> and L</on_disconnect_do> use.
223
224 It can take:
225
226 =over
227
228 =item a scalar
229
230 Will execute the scalar as SQL.
231
232 =item an arrayref
233
234 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
235 attributes hashref and bind values.
236
237 =item a code reference
238
239 Will execute C<< $code->($storage) >> and execute the return array refs as
240 above.
241
242 =back
243
244 =item datetime_setup
245
246 Execute any statements necessary to initialize the database session to return
247 and accept datetime/timestamp values used with
248 L<DBIx::Class::InflateColumn::DateTime>.
249
250 Only necessary for some databases, see your specific storage driver for
251 implementation details.
252
253 =back
254
255 =item on_disconnect_call
256
257 Takes arguments in the same form as L</on_connect_call> and executes them
258 immediately before disconnecting from the database.
259
260 Calls the C<disconnect_call_METHOD> methods as opposed to the
261 C<connect_call_METHOD> methods called by L</on_connect_call>.
262
263 Note, this only runs if you explicitly call L</disconnect> on the
264 storage object.
265
266 =item disable_sth_caching
267
268 If set to a true value, this option will disable the caching of
269 statement handles via L<DBI/prepare_cached>.
270
271 =item limit_dialect 
272
273 Sets the limit dialect. This is useful for JDBC-bridge among others
274 where the remote SQL-dialect cannot be determined by the name of the
275 driver alone. See also L<SQL::Abstract::Limit>.
276
277 =item quote_char
278
279 Specifies what characters to use to quote table and column names. If 
280 you use this you will want to specify L</name_sep> as well.
281
282 C<quote_char> expects either a single character, in which case is it
283 is placed on either side of the table/column name, or an arrayref of length
284 2 in which case the table/column name is placed between the elements.
285
286 For example under MySQL you should use C<< quote_char => '`' >>, and for
287 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
288
289 =item name_sep
290
291 This only needs to be used in conjunction with C<quote_char>, and is used to 
292 specify the charecter that seperates elements (schemas, tables, columns) from 
293 each other. In most cases this is simply a C<.>.
294
295 The consequences of not supplying this value is that L<SQL::Abstract>
296 will assume DBIx::Class' uses of aliases to be complete column
297 names. The output will look like I<"me.name"> when it should actually
298 be I<"me"."name">.
299
300 =item unsafe
301
302 This Storage driver normally installs its own C<HandleError>, sets
303 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
304 all database handles, including those supplied by a coderef.  It does this
305 so that it can have consistent and useful error behavior.
306
307 If you set this option to a true value, Storage will not do its usual
308 modifications to the database handle's attributes, and instead relies on
309 the settings in your connect_info DBI options (or the values you set in
310 your connection coderef, in the case that you are connecting via coderef).
311
312 Note that your custom settings can cause Storage to malfunction,
313 especially if you set a C<HandleError> handler that suppresses exceptions
314 and/or disable C<RaiseError>.
315
316 =item auto_savepoint
317
318 If this option is true, L<DBIx::Class> will use savepoints when nesting
319 transactions, making it possible to recover from failure in the inner
320 transaction without having to abort all outer transactions.
321
322 =item cursor_class
323
324 Use this argument to supply a cursor class other than the default
325 L<DBIx::Class::Storage::DBI::Cursor>.
326
327 =back
328
329 Some real-life examples of arguments to L</connect_info> and
330 L<DBIx::Class::Schema/connect>
331
332   # Simple SQLite connection
333   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
334
335   # Connect via subref
336   ->connect_info([ sub { DBI->connect(...) } ]);
337
338   # A bit more complicated
339   ->connect_info(
340     [
341       'dbi:Pg:dbname=foo',
342       'postgres',
343       'my_pg_password',
344       { AutoCommit => 1 },
345       { quote_char => q{"}, name_sep => q{.} },
346     ]
347   );
348
349   # Equivalent to the previous example
350   ->connect_info(
351     [
352       'dbi:Pg:dbname=foo',
353       'postgres',
354       'my_pg_password',
355       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
356     ]
357   );
358
359   # Same, but with hashref as argument
360   # See parse_connect_info for explanation
361   ->connect_info(
362     [{
363       dsn         => 'dbi:Pg:dbname=foo',
364       user        => 'postgres',
365       password    => 'my_pg_password',
366       AutoCommit  => 1,
367       quote_char  => q{"},
368       name_sep    => q{.},
369     }]
370   );
371
372   # Subref + DBIx::Class-specific connection options
373   ->connect_info(
374     [
375       sub { DBI->connect(...) },
376       {
377           quote_char => q{`},
378           name_sep => q{@},
379           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
380           disable_sth_caching => 1,
381       },
382     ]
383   );
384
385
386
387 =cut
388
389 sub connect_info {
390   my ($self, $info_arg) = @_;
391
392   return $self->_connect_info if !$info_arg;
393
394   my @args = @$info_arg;  # take a shallow copy for further mutilation
395   $self->_connect_info([@args]); # copy for _connect_info
396
397
398   # combine/pre-parse arguments depending on invocation style
399
400   my %attrs;
401   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
402     %attrs = %{ $args[1] || {} };
403     @args = $args[0];
404   }
405   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
406     %attrs = %{$args[0]};
407     @args = ();
408     for (qw/password user dsn/) {
409       unshift @args, delete $attrs{$_};
410     }
411   }
412   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
413     %attrs = (
414       % { $args[3] || {} },
415       % { $args[4] || {} },
416     );
417     @args = @args[0,1,2];
418   }
419
420   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
421   #  the new set of options
422   $self->_sql_maker(undef);
423   $self->_sql_maker_opts({});
424
425   if(keys %attrs) {
426     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
427       if(my $value = delete $attrs{$storage_opt}) {
428         $self->$storage_opt($value);
429       }
430     }
431     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
432       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
433         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
434       }
435     }
436     for my $connect_do_opt (qw/on_connect_do on_disconnect_do/) {
437       if(my $opt_val = delete $attrs{$connect_do_opt}) {
438         $self->$connect_do_opt($opt_val);
439       }
440     }
441   }
442
443   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
444
445   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
446   $self->_connect_info;
447 }
448
449 =head2 on_connect_do
450
451 This method is deprecated in favour of setting via L</connect_info>.
452
453 =cut
454
455 sub on_connect_do {
456   my $self = shift;
457   $self->_setup_connect_do(on_connect_do => @_);
458 }
459
460 =head2 on_disconnect_do
461
462 This method is deprecated in favour of setting via L</connect_info>.
463
464 =cut
465
466 sub on_disconnect_do {
467   my $self = shift;
468   $self->_setup_connect_do(on_disconnect_do => @_);
469 }
470
471 sub _setup_connect_do {
472   my ($self, $opt) = (shift, shift);
473
474   my $accessor = "_$opt";
475   my $store    = "__$opt";
476
477   return $self->$accessor if not @_;
478
479   my $val = shift;
480
481   if (not defined $val) {
482     $self->$accessor(undef);
483     $self->$store(undef);
484     return;
485   }
486
487   my @store;
488
489   if (not ref($val)) {
490     push @store, [ 'do_sql', $val ];
491   } elsif (ref($val) eq 'CODE') {
492     push @store, $val;
493   } elsif (ref($val) eq 'ARRAY') {
494     push @store, map [ 'do_sql', $_ ], @$val;
495   } else {
496     $self->throw_exception("Invalid type for $opt ".ref($val));
497   }
498
499   $self->$store(\@store);
500   $self->$accessor($val);
501 }
502
503 =head2 dbh_do
504
505 Arguments: ($subref | $method_name), @extra_coderef_args?
506
507 Execute the given $subref or $method_name using the new exception-based
508 connection management.
509
510 The first two arguments will be the storage object that C<dbh_do> was called
511 on and a database handle to use.  Any additional arguments will be passed
512 verbatim to the called subref as arguments 2 and onwards.
513
514 Using this (instead of $self->_dbh or $self->dbh) ensures correct
515 exception handling and reconnection (or failover in future subclasses).
516
517 Your subref should have no side-effects outside of the database, as
518 there is the potential for your subref to be partially double-executed
519 if the database connection was stale/dysfunctional.
520
521 Example:
522
523   my @stuff = $schema->storage->dbh_do(
524     sub {
525       my ($storage, $dbh, @cols) = @_;
526       my $cols = join(q{, }, @cols);
527       $dbh->selectrow_array("SELECT $cols FROM foo");
528     },
529     @column_list
530   );
531
532 =cut
533
534 sub dbh_do {
535   my $self = shift;
536   my $code = shift;
537
538   my $dbh = $self->_dbh;
539
540   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
541       || $self->{transaction_depth};
542
543   local $self->{_in_dbh_do} = 1;
544
545   my @result;
546   my $want_array = wantarray;
547
548   eval {
549     $self->_verify_pid if $dbh;
550     if(!$self->_dbh) {
551         $self->_populate_dbh;
552         $dbh = $self->_dbh;
553     }
554
555     if($want_array) {
556         @result = $self->$code($dbh, @_);
557     }
558     elsif(defined $want_array) {
559         $result[0] = $self->$code($dbh, @_);
560     }
561     else {
562         $self->$code($dbh, @_);
563     }
564   };
565
566   my $exception = $@;
567   if(!$exception) { return $want_array ? @result : $result[0] }
568
569   $self->throw_exception($exception) if $self->connected;
570
571   # We were not connected - reconnect and retry, but let any
572   #  exception fall right through this time
573   $self->_populate_dbh;
574   $self->$code($self->_dbh, @_);
575 }
576
577 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
578 # It also informs dbh_do to bypass itself while under the direction of txn_do,
579 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
580 sub txn_do {
581   my $self = shift;
582   my $coderef = shift;
583
584   ref $coderef eq 'CODE' or $self->throw_exception
585     ('$coderef must be a CODE reference');
586
587   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
588
589   local $self->{_in_dbh_do} = 1;
590
591   my @result;
592   my $want_array = wantarray;
593
594   my $tried = 0;
595   while(1) {
596     eval {
597       $self->_verify_pid if $self->_dbh;
598       $self->_populate_dbh if !$self->_dbh;
599
600       $self->txn_begin;
601       if($want_array) {
602           @result = $coderef->(@_);
603       }
604       elsif(defined $want_array) {
605           $result[0] = $coderef->(@_);
606       }
607       else {
608           $coderef->(@_);
609       }
610       $self->txn_commit;
611     };
612
613     my $exception = $@;
614     if(!$exception) { return $want_array ? @result : $result[0] }
615
616     if($tried++ > 0 || $self->connected) {
617       eval { $self->txn_rollback };
618       my $rollback_exception = $@;
619       if($rollback_exception) {
620         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
621         $self->throw_exception($exception)  # propagate nested rollback
622           if $rollback_exception =~ /$exception_class/;
623
624         $self->throw_exception(
625           "Transaction aborted: ${exception}. "
626           . "Rollback failed: ${rollback_exception}"
627         );
628       }
629       $self->throw_exception($exception)
630     }
631
632     # We were not connected, and was first try - reconnect and retry
633     # via the while loop
634     $self->_populate_dbh;
635   }
636 }
637
638 =head2 disconnect
639
640 Our C<disconnect> method also performs a rollback first if the
641 database is not in C<AutoCommit> mode.
642
643 =cut
644
645 sub disconnect {
646   my ($self) = @_;
647
648   if( $self->connected ) {
649     if (my $connection_call = $self->on_disconnect_call) {
650       $self->_do_connection_actions(disconnect_call_ => $connection_call)
651     }
652     if (my $connection_do   = $self->__on_disconnect_do) {
653       $self->_do_connection_actions(disconnect_call_ => $connection_do)
654     }
655
656     $self->_dbh->rollback unless $self->_dbh_autocommit;
657     $self->_dbh->disconnect;
658     $self->_dbh(undef);
659     $self->{_dbh_gen}++;
660   }
661 }
662
663 =head2 with_deferred_fk_checks
664
665 =over 4
666
667 =item Arguments: C<$coderef>
668
669 =item Return Value: The return value of $coderef
670
671 =back
672
673 Storage specific method to run the code ref with FK checks deferred or
674 in MySQL's case disabled entirely.
675
676 =cut
677
678 # Storage subclasses should override this
679 sub with_deferred_fk_checks {
680   my ($self, $sub) = @_;
681
682   $sub->();
683 }
684
685 sub connected {
686   my ($self) = @_;
687
688   if(my $dbh = $self->_dbh) {
689       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
690           $self->_dbh(undef);
691           $self->{_dbh_gen}++;
692           return;
693       }
694       else {
695           $self->_verify_pid;
696           return 0 if !$self->_dbh;
697       }
698       return ($dbh->FETCH('Active') && $dbh->ping);
699   }
700
701   return 0;
702 }
703
704 # handle pid changes correctly
705 #  NOTE: assumes $self->_dbh is a valid $dbh
706 sub _verify_pid {
707   my ($self) = @_;
708
709   return if defined $self->_conn_pid && $self->_conn_pid == $$;
710
711   $self->_dbh->{InactiveDestroy} = 1;
712   $self->_dbh(undef);
713   $self->{_dbh_gen}++;
714
715   return;
716 }
717
718 sub ensure_connected {
719   my ($self) = @_;
720
721   unless ($self->connected) {
722     $self->_populate_dbh;
723   }
724 }
725
726 =head2 dbh
727
728 Returns the dbh - a data base handle of class L<DBI>.
729
730 =cut
731
732 sub dbh {
733   my ($self) = @_;
734
735   $self->ensure_connected;
736   return $self->_dbh;
737 }
738
739 sub _sql_maker_args {
740     my ($self) = @_;
741     
742     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
743 }
744
745 sub sql_maker {
746   my ($self) = @_;
747   unless ($self->_sql_maker) {
748     my $sql_maker_class = $self->sql_maker_class;
749     $self->ensure_class_loaded ($sql_maker_class);
750     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
751   }
752   return $self->_sql_maker;
753 }
754
755 sub _rebless {}
756
757 sub _populate_dbh {
758   my ($self) = @_;
759   my @info = @{$self->_dbi_connect_info || []};
760   $self->_dbh($self->_connect(@info));
761
762   $self->_conn_pid($$);
763   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
764
765   $self->_determine_driver;
766
767   # Always set the transaction depth on connect, since
768   #  there is no transaction in progress by definition
769   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
770
771   if (my $connection_call = $self->on_connect_call) {
772     $self->_do_connection_actions(connect_call_ => $connection_call)
773   }
774   if (my $connection_do = $self->__on_connect_do) {
775     $self->_do_connection_actions(connect_call_ => $connection_do)
776   }
777 }
778
779 sub _determine_driver {
780   my ($self) = @_;
781
782   if (ref $self eq 'DBIx::Class::Storage::DBI') {
783     my $driver;
784
785     if ($self->_dbh) { # we are connected
786       $driver = $self->_dbh->{Driver}{Name};
787     } else {
788       # try to use dsn to not require being connected, the driver may still
789       # force a connection in _rebless to determine version
790       ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
791     }
792
793     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
794       bless $self, "DBIx::Class::Storage::DBI::${driver}";
795       $self->_rebless();
796     }
797   }
798 }
799
800 sub _do_connection_actions {
801   my $self          = shift;
802   my $method_prefix = shift;
803   my $call          = shift;
804
805   if (not ref($call)) {
806     my $method = $method_prefix . $call;
807     $self->$method(@_);
808   } elsif (ref($call) eq 'CODE') {
809     $self->$call(@_);
810   } elsif (ref($call) eq 'ARRAY') {
811     if (ref($call->[0]) ne 'ARRAY') {
812       $self->_do_connection_actions($method_prefix, $_) for @$call;
813     } else {
814       $self->_do_connection_actions($method_prefix, @$_) for @$call;
815     }
816   } else {
817     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
818   }
819
820   return $self;
821 }
822
823 sub connect_call_do_sql {
824   my $self = shift;
825   $self->_do_query(@_);
826 }
827
828 sub disconnect_call_do_sql {
829   my $self = shift;
830   $self->_do_query(@_);
831 }
832
833 # override in db-specific backend when necessary
834 sub connect_call_datetime_setup { 1 }
835
836 sub _do_query {
837   my ($self, $action) = @_;
838
839   if (ref $action eq 'CODE') {
840     $action = $action->($self);
841     $self->_do_query($_) foreach @$action;
842   }
843   else {
844     # Most debuggers expect ($sql, @bind), so we need to exclude
845     # the attribute hash which is the second argument to $dbh->do
846     # furthermore the bind values are usually to be presented
847     # as named arrayref pairs, so wrap those here too
848     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
849     my $sql = shift @do_args;
850     my $attrs = shift @do_args;
851     my @bind = map { [ undef, $_ ] } @do_args;
852
853     $self->_query_start($sql, @bind);
854     $self->_dbh->do($sql, $attrs, @do_args);
855     $self->_query_end($sql, @bind);
856   }
857
858   return $self;
859 }
860
861 sub _connect {
862   my ($self, @info) = @_;
863
864   $self->throw_exception("You failed to provide any connection info")
865     if !@info;
866
867   my ($old_connect_via, $dbh);
868
869   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
870     $old_connect_via = $DBI::connect_via;
871     $DBI::connect_via = 'connect';
872   }
873
874   eval {
875     if(ref $info[0] eq 'CODE') {
876        $dbh = &{$info[0]}
877     }
878     else {
879        $dbh = DBI->connect(@info);
880     }
881
882     if($dbh && !$self->unsafe) {
883       my $weak_self = $self;
884       Scalar::Util::weaken($weak_self);
885       $dbh->{HandleError} = sub {
886           if ($weak_self) {
887             $weak_self->throw_exception("DBI Exception: $_[0]");
888           }
889           else {
890             croak ("DBI Exception: $_[0]");
891           }
892       };
893       $dbh->{ShowErrorStatement} = 1;
894       $dbh->{RaiseError} = 1;
895       $dbh->{PrintError} = 0;
896     }
897   };
898
899   $DBI::connect_via = $old_connect_via if $old_connect_via;
900
901   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
902     if !$dbh || $@;
903
904   $self->_dbh_autocommit($dbh->{AutoCommit});
905
906   $dbh;
907 }
908
909 sub svp_begin {
910   my ($self, $name) = @_;
911
912   $name = $self->_svp_generate_name
913     unless defined $name;
914
915   $self->throw_exception ("You can't use savepoints outside a transaction")
916     if $self->{transaction_depth} == 0;
917
918   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
919     unless $self->can('_svp_begin');
920   
921   push @{ $self->{savepoints} }, $name;
922
923   $self->debugobj->svp_begin($name) if $self->debug;
924   
925   return $self->_svp_begin($name);
926 }
927
928 sub svp_release {
929   my ($self, $name) = @_;
930
931   $self->throw_exception ("You can't use savepoints outside a transaction")
932     if $self->{transaction_depth} == 0;
933
934   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
935     unless $self->can('_svp_release');
936
937   if (defined $name) {
938     $self->throw_exception ("Savepoint '$name' does not exist")
939       unless grep { $_ eq $name } @{ $self->{savepoints} };
940
941     # Dig through the stack until we find the one we are releasing.  This keeps
942     # the stack up to date.
943     my $svp;
944
945     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
946   } else {
947     $name = pop @{ $self->{savepoints} };
948   }
949
950   $self->debugobj->svp_release($name) if $self->debug;
951
952   return $self->_svp_release($name);
953 }
954
955 sub svp_rollback {
956   my ($self, $name) = @_;
957
958   $self->throw_exception ("You can't use savepoints outside a transaction")
959     if $self->{transaction_depth} == 0;
960
961   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
962     unless $self->can('_svp_rollback');
963
964   if (defined $name) {
965       # If they passed us a name, verify that it exists in the stack
966       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
967           $self->throw_exception("Savepoint '$name' does not exist!");
968       }
969
970       # Dig through the stack until we find the one we are releasing.  This keeps
971       # the stack up to date.
972       while(my $s = pop(@{ $self->{savepoints} })) {
973           last if($s eq $name);
974       }
975       # Add the savepoint back to the stack, as a rollback doesn't remove the
976       # named savepoint, only everything after it.
977       push(@{ $self->{savepoints} }, $name);
978   } else {
979       # We'll assume they want to rollback to the last savepoint
980       $name = $self->{savepoints}->[-1];
981   }
982
983   $self->debugobj->svp_rollback($name) if $self->debug;
984   
985   return $self->_svp_rollback($name);
986 }
987
988 sub _svp_generate_name {
989     my ($self) = @_;
990
991     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
992 }
993
994 sub txn_begin {
995   my $self = shift;
996   $self->ensure_connected();
997   if($self->{transaction_depth} == 0) {
998     $self->debugobj->txn_begin()
999       if $self->debug;
1000     # this isn't ->_dbh-> because
1001     #  we should reconnect on begin_work
1002     #  for AutoCommit users
1003     $self->dbh->begin_work;
1004   } elsif ($self->auto_savepoint) {
1005     $self->svp_begin;
1006   }
1007   $self->{transaction_depth}++;
1008 }
1009
1010 sub txn_commit {
1011   my $self = shift;
1012   if ($self->{transaction_depth} == 1) {
1013     my $dbh = $self->_dbh;
1014     $self->debugobj->txn_commit()
1015       if ($self->debug);
1016     $dbh->commit;
1017     $self->{transaction_depth} = 0
1018       if $self->_dbh_autocommit;
1019   }
1020   elsif($self->{transaction_depth} > 1) {
1021     $self->{transaction_depth}--;
1022     $self->svp_release
1023       if $self->auto_savepoint;
1024   }
1025 }
1026
1027 sub txn_rollback {
1028   my $self = shift;
1029   my $dbh = $self->_dbh;
1030   eval {
1031     if ($self->{transaction_depth} == 1) {
1032       $self->debugobj->txn_rollback()
1033         if ($self->debug);
1034       $self->{transaction_depth} = 0
1035         if $self->_dbh_autocommit;
1036       $dbh->rollback;
1037     }
1038     elsif($self->{transaction_depth} > 1) {
1039       $self->{transaction_depth}--;
1040       if ($self->auto_savepoint) {
1041         $self->svp_rollback;
1042         $self->svp_release;
1043       }
1044     }
1045     else {
1046       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1047     }
1048   };
1049   if ($@) {
1050     my $error = $@;
1051     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1052     $error =~ /$exception_class/ and $self->throw_exception($error);
1053     # ensure that a failed rollback resets the transaction depth
1054     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1055     $self->throw_exception($error);
1056   }
1057 }
1058
1059 # This used to be the top-half of _execute.  It was split out to make it
1060 #  easier to override in NoBindVars without duping the rest.  It takes up
1061 #  all of _execute's args, and emits $sql, @bind.
1062 sub _prep_for_execute {
1063   my ($self, $op, $extra_bind, $ident, $args) = @_;
1064
1065   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1066     $ident = $ident->from();
1067   }
1068
1069   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1070
1071   unshift(@bind,
1072     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1073       if $extra_bind;
1074   return ($sql, \@bind);
1075 }
1076
1077
1078 sub _fix_bind_params {
1079     my ($self, @bind) = @_;
1080
1081     ### Turn @bind from something like this:
1082     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1083     ### to this:
1084     ###   ( "'1'", "'1'", "'3'" )
1085     return
1086         map {
1087             if ( defined( $_ && $_->[1] ) ) {
1088                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1089             }
1090             else { q{'NULL'}; }
1091         } @bind;
1092 }
1093
1094 sub _query_start {
1095     my ( $self, $sql, @bind ) = @_;
1096
1097     if ( $self->debug ) {
1098         @bind = $self->_fix_bind_params(@bind);
1099
1100         $self->debugobj->query_start( $sql, @bind );
1101     }
1102 }
1103
1104 sub _query_end {
1105     my ( $self, $sql, @bind ) = @_;
1106
1107     if ( $self->debug ) {
1108         @bind = $self->_fix_bind_params(@bind);
1109         $self->debugobj->query_end( $sql, @bind );
1110     }
1111 }
1112
1113 sub _dbh_execute {
1114   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1115
1116   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1117
1118   $self->_query_start( $sql, @$bind );
1119
1120   my $sth = $self->sth($sql,$op);
1121
1122   my $placeholder_index = 1; 
1123
1124   foreach my $bound (@$bind) {
1125     my $attributes = {};
1126     my($column_name, @data) = @$bound;
1127
1128     if ($bind_attributes) {
1129       $attributes = $bind_attributes->{$column_name}
1130       if defined $bind_attributes->{$column_name};
1131     }
1132
1133     foreach my $data (@data) {
1134       my $ref = ref $data;
1135       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1136
1137       $sth->bind_param($placeholder_index, $data, $attributes);
1138       $placeholder_index++;
1139     }
1140   }
1141
1142   # Can this fail without throwing an exception anyways???
1143   my $rv = $sth->execute();
1144   $self->throw_exception($sth->errstr) if !$rv;
1145
1146   $self->_query_end( $sql, @$bind );
1147
1148   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1149 }
1150
1151 sub _execute {
1152     my $self = shift;
1153     $self->dbh_do('_dbh_execute', @_)
1154 }
1155
1156 sub insert {
1157   my ($self, $source, $to_insert) = @_;
1158
1159   my $ident = $source->from;
1160   my $bind_attributes = $self->source_bind_attributes($source);
1161
1162   my $updated_cols = {};
1163
1164   $self->ensure_connected;
1165   foreach my $col ( $source->columns ) {
1166     if ( !defined $to_insert->{$col} ) {
1167       my $col_info = $source->column_info($col);
1168
1169       if ( $col_info->{auto_nextval} ) {
1170         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1171       }
1172     }
1173   }
1174
1175   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1176
1177   return $updated_cols;
1178 }
1179
1180 ## Still not quite perfect, and EXPERIMENTAL
1181 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1182 ## scalar refs, or at least, all the same type as the first set, the statement is
1183 ## only prepped once.
1184 sub insert_bulk {
1185   my ($self, $source, $cols, $data) = @_;
1186   my %colvalues;
1187   my $table = $source->from;
1188   @colvalues{@$cols} = (0..$#$cols);
1189   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1190   
1191   $self->_query_start( $sql, @bind );
1192   my $sth = $self->sth($sql);
1193
1194 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1195
1196   ## This must be an arrayref, else nothing works!
1197   my $tuple_status = [];
1198
1199   ## Get the bind_attributes, if any exist
1200   my $bind_attributes = $self->source_bind_attributes($source);
1201
1202   ## Bind the values and execute
1203   my $placeholder_index = 1; 
1204
1205   foreach my $bound (@bind) {
1206
1207     my $attributes = {};
1208     my ($column_name, $data_index) = @$bound;
1209
1210     if( $bind_attributes ) {
1211       $attributes = $bind_attributes->{$column_name}
1212       if defined $bind_attributes->{$column_name};
1213     }
1214
1215     my @data = map { $_->[$data_index] } @$data;
1216
1217     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1218     $placeholder_index++;
1219   }
1220   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1221   if (my $err = $@) {
1222     my $i = 0;
1223     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1224
1225     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1226       if ($i > $#$tuple_status);
1227
1228     require Data::Dumper;
1229     local $Data::Dumper::Terse = 1;
1230     local $Data::Dumper::Indent = 1;
1231     local $Data::Dumper::Useqq = 1;
1232     local $Data::Dumper::Quotekeys = 0;
1233
1234     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1235       $tuple_status->[$i][1],
1236       Data::Dumper::Dumper(
1237         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1238       ),
1239     );
1240   }
1241   $self->throw_exception($sth->errstr) if !$rv;
1242
1243   $self->_query_end( $sql, @bind );
1244   return (wantarray ? ($rv, $sth, @bind) : $rv);
1245 }
1246
1247 sub update {
1248   my $self = shift @_;
1249   my $source = shift @_;
1250   my $bind_attributes = $self->source_bind_attributes($source);
1251   
1252   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1253 }
1254
1255
1256 sub delete {
1257   my $self = shift @_;
1258   my $source = shift @_;
1259   
1260   my $bind_attrs = $self->source_bind_attributes($source);
1261   
1262   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1263 }
1264
1265 # We were sent here because the $rs contains a complex search
1266 # which will require a subquery to select the correct rows
1267 # (i.e. joined or limited resultsets)
1268 #
1269 # Genarating a single PK column subquery is trivial and supported
1270 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1271 # Look at _multipk_update_delete()
1272 sub _subq_update_delete {
1273   my $self = shift;
1274   my ($rs, $op, $values) = @_;
1275
1276   my $rsrc = $rs->result_source;
1277
1278   # we already check this, but double check naively just in case. Should be removed soon
1279   my $sel = $rs->_resolved_attrs->{select};
1280   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1281   my @pcols = $rsrc->primary_columns;
1282   if (@$sel != @pcols) {
1283     $self->throw_exception (
1284       'Subquery update/delete can not be called on resultsets selecting a'
1285      .' number of columns different than the number of primary keys'
1286     );
1287   }
1288
1289   if (@pcols == 1) {
1290     return $self->$op (
1291       $rsrc,
1292       $op eq 'update' ? $values : (),
1293       { $pcols[0] => { -in => $rs->as_query } },
1294     );
1295   }
1296
1297   else {
1298     return $self->_multipk_update_delete (@_);
1299   }
1300 }
1301
1302 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1303 # resultset update/delete involving subqueries. So by default resort
1304 # to simple (and inefficient) delete_all style per-row opearations,
1305 # while allowing specific storages to override this with a faster
1306 # implementation.
1307 #
1308 sub _multipk_update_delete {
1309   return shift->_per_row_update_delete (@_);
1310 }
1311
1312 # This is the default loop used to delete/update rows for multi PK
1313 # resultsets, and used by mysql exclusively (because it can't do anything
1314 # else).
1315 #
1316 # We do not use $row->$op style queries, because resultset update/delete
1317 # is not expected to cascade (this is what delete_all/update_all is for).
1318 #
1319 # There should be no race conditions as the entire operation is rolled
1320 # in a transaction.
1321 #
1322 sub _per_row_update_delete {
1323   my $self = shift;
1324   my ($rs, $op, $values) = @_;
1325
1326   my $rsrc = $rs->result_source;
1327   my @pcols = $rsrc->primary_columns;
1328
1329   my $guard = $self->txn_scope_guard;
1330
1331   # emulate the return value of $sth->execute for non-selects
1332   my $row_cnt = '0E0';
1333
1334   my $subrs_cur = $rs->cursor;
1335   while (my @pks = $subrs_cur->next) {
1336
1337     my $cond;
1338     for my $i (0.. $#pcols) {
1339       $cond->{$pcols[$i]} = $pks[$i];
1340     }
1341
1342     $self->$op (
1343       $rsrc,
1344       $op eq 'update' ? $values : (),
1345       $cond,
1346     );
1347
1348     $row_cnt++;
1349   }
1350
1351   $guard->commit;
1352
1353   return $row_cnt;
1354 }
1355
1356 sub _select {
1357   my $self = shift;
1358   my $sql_maker = $self->sql_maker;
1359   local $sql_maker->{for};
1360   return $self->_execute($self->_select_args(@_));
1361 }
1362
1363 sub _select_args_to_query {
1364   my $self = shift;
1365
1366   my $sql_maker = $self->sql_maker;
1367   local $sql_maker->{for};
1368
1369   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset) 
1370   #  = $self->_select_args($ident, $select, $cond, $attrs);
1371   my ($op, $bind, $ident, $bind_attrs, @args) =
1372     $self->_select_args(@_);
1373
1374   # my ($sql, $bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1375   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1376
1377   return \[ "($sql)", @{ $prepared_bind || [] }];
1378 }
1379
1380 sub _select_args {
1381   my ($self, $ident, $select, $condition, $attrs) = @_;
1382
1383   my $for = delete $attrs->{for};
1384   my $sql_maker = $self->sql_maker;
1385   $sql_maker->{for} = $for;
1386
1387   my $order = { map
1388     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1389     (qw/order_by group_by having _virtual_order_by/ )
1390   };
1391
1392
1393   my $bind_attrs = {};
1394
1395   my $alias2source = $self->_resolve_ident_sources ($ident);
1396
1397   for my $alias (keys %$alias2source) {
1398     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1399     for my $col (keys %$bindtypes) {
1400
1401       my $fqcn = join ('.', $alias, $col);
1402       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1403
1404       # so that unqualified searches can be bound too
1405       $bind_attrs->{$col} = $bind_attrs->{$fqcn} if $alias eq 'me';
1406     }
1407   }
1408
1409   # This would be the point to deflate anything found in $condition
1410   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1411   # expect a row object. And all we have is a resultsource (it is trivial
1412   # to extract deflator coderefs via $alias2source above).
1413   #
1414   # I don't see a way forward other than changing the way deflators are
1415   # invoked, and that's just bad...
1416
1417   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1418   if ($attrs->{software_limit} ||
1419       $sql_maker->_default_limit_syntax eq "GenericSubQ") {
1420         $attrs->{software_limit} = 1;
1421   } else {
1422     $self->throw_exception("rows attribute must be positive if present")
1423       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1424
1425     # MySQL actually recommends this approach.  I cringe.
1426     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1427     push @args, $attrs->{rows}, $attrs->{offset};
1428   }
1429   return @args;
1430 }
1431
1432 sub _resolve_ident_sources {
1433   my ($self, $ident) = @_;
1434
1435   my $alias2source = {};
1436
1437   # the reason this is so contrived is that $ident may be a {from}
1438   # structure, specifying multiple tables to join
1439   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1440     # this is compat mode for insert/update/delete which do not deal with aliases
1441     $alias2source->{me} = $ident;
1442   }
1443   elsif (ref $ident eq 'ARRAY') {
1444
1445     for (@$ident) {
1446       my $tabinfo;
1447       if (ref $_ eq 'HASH') {
1448         $tabinfo = $_;
1449       }
1450       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1451         $tabinfo = $_->[0];
1452       }
1453
1454       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-result_source}
1455         if ($tabinfo->{-result_source});
1456     }
1457   }
1458
1459   return $alias2source;
1460 }
1461
1462 sub count {
1463   my ($self, $source, $attrs) = @_;
1464
1465   my $tmp_attrs = { %$attrs };
1466
1467   # take off any pagers, record_filter is cdbi, and no point of ordering a count
1468   delete $tmp_attrs->{$_} for (qw/select as rows offset page order_by record_filter/);
1469
1470   # overwrite the selector
1471   $tmp_attrs->{select} = { count => '*' };
1472
1473   my $tmp_rs = $source->resultset_class->new($source, $tmp_attrs);
1474   my ($count) = $tmp_rs->cursor->next;
1475
1476   # if the offset/rows attributes are still present, we did not use
1477   # a subquery, so we need to make the calculations in software
1478   $count -= $attrs->{offset} if $attrs->{offset};
1479   $count = $attrs->{rows} if $attrs->{rows} and $attrs->{rows} < $count;
1480   $count = 0 if ($count < 0);
1481
1482   return $count;
1483 }
1484
1485 sub count_grouped {
1486   my ($self, $source, $attrs) = @_;
1487
1488   # copy for the subquery, we need to do some adjustments to it too
1489   my $sub_attrs = { %$attrs };
1490
1491   # these can not go in the subquery, and there is no point of ordering it
1492   delete $sub_attrs->{$_} for qw/prefetch collapse select as order_by/;
1493
1494   # if we prefetch, we group_by primary keys only as this is what we would get out of the rs via ->next/->all
1495   # simply deleting group_by suffices, as the code below will re-fill it
1496   # Note: we check $attrs, as $sub_attrs has collapse deleted
1497   if (ref $attrs->{collapse} and keys %{$attrs->{collapse}} ) {
1498     delete $sub_attrs->{group_by};
1499   }
1500
1501   $sub_attrs->{group_by} ||= [ map { "$attrs->{alias}.$_" } ($source->primary_columns) ];
1502   $sub_attrs->{select} = $self->_grouped_count_select ($source, $sub_attrs);
1503
1504   $attrs->{from} = [{
1505     count_subq => $source->resultset_class->new ($source, $sub_attrs )->as_query
1506   }];
1507
1508   # the subquery replaces this
1509   delete $attrs->{$_} for qw/where bind prefetch collapse group_by having having_bind rows offset page pager/;
1510
1511   return $self->count ($source, $attrs);
1512 }
1513
1514 #
1515 # Returns a SELECT to go with a supplied GROUP BY
1516 # (caled by count_grouped so a group_by is present)
1517 # Most databases expect them to match, but some
1518 # choke in various ways.
1519 #
1520 sub _grouped_count_select {
1521   my ($self, $source, $rs_args) = @_;
1522   return $rs_args->{group_by};
1523 }
1524
1525 sub source_bind_attributes {
1526   my ($self, $source) = @_;
1527   
1528   my $bind_attributes;
1529   foreach my $column ($source->columns) {
1530   
1531     my $data_type = $source->column_info($column)->{data_type} || '';
1532     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1533      if $data_type;
1534   }
1535
1536   return $bind_attributes;
1537 }
1538
1539 =head2 select
1540
1541 =over 4
1542
1543 =item Arguments: $ident, $select, $condition, $attrs
1544
1545 =back
1546
1547 Handle a SQL select statement.
1548
1549 =cut
1550
1551 sub select {
1552   my $self = shift;
1553   my ($ident, $select, $condition, $attrs) = @_;
1554   return $self->cursor_class->new($self, \@_, $attrs);
1555 }
1556
1557 sub select_single {
1558   my $self = shift;
1559   my ($rv, $sth, @bind) = $self->_select(@_);
1560   my @row = $sth->fetchrow_array;
1561   my @nextrow = $sth->fetchrow_array if @row;
1562   if(@row && @nextrow) {
1563     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1564   }
1565   # Need to call finish() to work round broken DBDs
1566   $sth->finish();
1567   return @row;
1568 }
1569
1570 =head2 sth
1571
1572 =over 4
1573
1574 =item Arguments: $sql
1575
1576 =back
1577
1578 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1579
1580 =cut
1581
1582 sub _dbh_sth {
1583   my ($self, $dbh, $sql) = @_;
1584
1585   # 3 is the if_active parameter which avoids active sth re-use
1586   my $sth = $self->disable_sth_caching
1587     ? $dbh->prepare($sql)
1588     : $dbh->prepare_cached($sql, {}, 3);
1589
1590   # XXX You would think RaiseError would make this impossible,
1591   #  but apparently that's not true :(
1592   $self->throw_exception($dbh->errstr) if !$sth;
1593
1594   $sth;
1595 }
1596
1597 sub sth {
1598   my ($self, $sql) = @_;
1599   $self->dbh_do('_dbh_sth', $sql);
1600 }
1601
1602 sub _dbh_columns_info_for {
1603   my ($self, $dbh, $table) = @_;
1604
1605   if ($dbh->can('column_info')) {
1606     my %result;
1607     eval {
1608       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1609       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1610       $sth->execute();
1611       while ( my $info = $sth->fetchrow_hashref() ){
1612         my %column_info;
1613         $column_info{data_type}   = $info->{TYPE_NAME};
1614         $column_info{size}      = $info->{COLUMN_SIZE};
1615         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1616         $column_info{default_value} = $info->{COLUMN_DEF};
1617         my $col_name = $info->{COLUMN_NAME};
1618         $col_name =~ s/^\"(.*)\"$/$1/;
1619
1620         $result{$col_name} = \%column_info;
1621       }
1622     };
1623     return \%result if !$@ && scalar keys %result;
1624   }
1625
1626   my %result;
1627   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1628   $sth->execute;
1629   my @columns = @{$sth->{NAME_lc}};
1630   for my $i ( 0 .. $#columns ){
1631     my %column_info;
1632     $column_info{data_type} = $sth->{TYPE}->[$i];
1633     $column_info{size} = $sth->{PRECISION}->[$i];
1634     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1635
1636     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1637       $column_info{data_type} = $1;
1638       $column_info{size}    = $2;
1639     }
1640
1641     $result{$columns[$i]} = \%column_info;
1642   }
1643   $sth->finish;
1644
1645   foreach my $col (keys %result) {
1646     my $colinfo = $result{$col};
1647     my $type_num = $colinfo->{data_type};
1648     my $type_name;
1649     if(defined $type_num && $dbh->can('type_info')) {
1650       my $type_info = $dbh->type_info($type_num);
1651       $type_name = $type_info->{TYPE_NAME} if $type_info;
1652       $colinfo->{data_type} = $type_name if $type_name;
1653     }
1654   }
1655
1656   return \%result;
1657 }
1658
1659 sub columns_info_for {
1660   my ($self, $table) = @_;
1661   $self->dbh_do('_dbh_columns_info_for', $table);
1662 }
1663
1664 =head2 last_insert_id
1665
1666 Return the row id of the last insert.
1667
1668 =cut
1669
1670 sub _dbh_last_insert_id {
1671     # All Storage's need to register their own _dbh_last_insert_id
1672     # the old SQLite-based method was highly inappropriate
1673
1674     my $self = shift;
1675     my $class = ref $self;
1676     $self->throw_exception (<<EOE);
1677
1678 No _dbh_last_insert_id() method found in $class.
1679 Since the method of obtaining the autoincrement id of the last insert
1680 operation varies greatly between different databases, this method must be
1681 individually implemented for every storage class.
1682 EOE
1683 }
1684
1685 sub last_insert_id {
1686   my $self = shift;
1687   $self->dbh_do('_dbh_last_insert_id', @_);
1688 }
1689
1690 =head2 sqlt_type
1691
1692 Returns the database driver name.
1693
1694 =cut
1695
1696 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1697
1698 =head2 bind_attribute_by_data_type
1699
1700 Given a datatype from column info, returns a database specific bind
1701 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1702 let the database planner just handle it.
1703
1704 Generally only needed for special case column types, like bytea in postgres.
1705
1706 =cut
1707
1708 sub bind_attribute_by_data_type {
1709     return;
1710 }
1711
1712 =head2 is_datatype_numeric
1713
1714 Given a datatype from column_info, returns a boolean value indicating if
1715 the current RDBMS considers it a numeric value. This controls how
1716 L<DBIx::Class::Row/set_column> decides whether to mark the column as
1717 dirty - when the datatype is deemed numeric a C<< != >> comparison will
1718 be performed instead of the usual C<eq>.
1719
1720 =cut
1721
1722 sub is_datatype_numeric {
1723   my ($self, $dt) = @_;
1724
1725   return 0 unless $dt;
1726
1727   return $dt =~ /^ (?:
1728     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
1729   ) $/ix;
1730 }
1731
1732
1733 =head2 create_ddl_dir (EXPERIMENTAL)
1734
1735 =over 4
1736
1737 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1738
1739 =back
1740
1741 Creates a SQL file based on the Schema, for each of the specified
1742 database engines in C<\@databases> in the given directory.
1743 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
1744
1745 Given a previous version number, this will also create a file containing
1746 the ALTER TABLE statements to transform the previous schema into the
1747 current one. Note that these statements may contain C<DROP TABLE> or
1748 C<DROP COLUMN> statements that can potentially destroy data.
1749
1750 The file names are created using the C<ddl_filename> method below, please
1751 override this method in your schema if you would like a different file
1752 name format. For the ALTER file, the same format is used, replacing
1753 $version in the name with "$preversion-$version".
1754
1755 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
1756 The most common value for this would be C<< { add_drop_table => 1 } >>
1757 to have the SQL produced include a C<DROP TABLE> statement for each table
1758 created. For quoting purposes supply C<quote_table_names> and
1759 C<quote_field_names>.
1760
1761 If no arguments are passed, then the following default values are assumed:
1762
1763 =over 4
1764
1765 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
1766
1767 =item version    - $schema->schema_version
1768
1769 =item directory  - './'
1770
1771 =item preversion - <none>
1772
1773 =back
1774
1775 By default, C<\%sqlt_args> will have
1776
1777  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1778
1779 merged with the hash passed in. To disable any of those features, pass in a 
1780 hashref like the following
1781
1782  { ignore_constraint_names => 0, # ... other options }
1783
1784
1785 Note that this feature is currently EXPERIMENTAL and may not work correctly 
1786 across all databases, or fully handle complex relationships.
1787
1788 WARNING: Please check all SQL files created, before applying them.
1789
1790 =cut
1791
1792 sub create_ddl_dir {
1793   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1794
1795   if(!$dir || !-d $dir) {
1796     carp "No directory given, using ./\n";
1797     $dir = "./";
1798   }
1799   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1800   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1801
1802   my $schema_version = $schema->schema_version || '1.x';
1803   $version ||= $schema_version;
1804
1805   $sqltargs = {
1806     add_drop_table => 1, 
1807     ignore_constraint_names => 1,
1808     ignore_index_names => 1,
1809     %{$sqltargs || {}}
1810   };
1811
1812   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1813       . $self->_check_sqlt_message . q{'})
1814           if !$self->_check_sqlt_version;
1815
1816   my $sqlt = SQL::Translator->new( $sqltargs );
1817
1818   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1819   my $sqlt_schema = $sqlt->translate({ data => $schema })
1820     or $self->throw_exception ($sqlt->error);
1821
1822   foreach my $db (@$databases) {
1823     $sqlt->reset();
1824     $sqlt->{schema} = $sqlt_schema;
1825     $sqlt->producer($db);
1826
1827     my $file;
1828     my $filename = $schema->ddl_filename($db, $version, $dir);
1829     if (-e $filename && ($version eq $schema_version )) {
1830       # if we are dumping the current version, overwrite the DDL
1831       carp "Overwriting existing DDL file - $filename";
1832       unlink($filename);
1833     }
1834
1835     my $output = $sqlt->translate;
1836     if(!$output) {
1837       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1838       next;
1839     }
1840     if(!open($file, ">$filename")) {
1841       $self->throw_exception("Can't open $filename for writing ($!)");
1842       next;
1843     }
1844     print $file $output;
1845     close($file);
1846   
1847     next unless ($preversion);
1848
1849     require SQL::Translator::Diff;
1850
1851     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1852     if(!-e $prefilename) {
1853       carp("No previous schema file found ($prefilename)");
1854       next;
1855     }
1856
1857     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1858     if(-e $difffile) {
1859       carp("Overwriting existing diff file - $difffile");
1860       unlink($difffile);
1861     }
1862     
1863     my $source_schema;
1864     {
1865       my $t = SQL::Translator->new($sqltargs);
1866       $t->debug( 0 );
1867       $t->trace( 0 );
1868
1869       $t->parser( $db )
1870         or $self->throw_exception ($t->error);
1871
1872       my $out = $t->translate( $prefilename )
1873         or $self->throw_exception ($t->error);
1874
1875       $source_schema = $t->schema;
1876
1877       $source_schema->name( $prefilename )
1878         unless ( $source_schema->name );
1879     }
1880
1881     # The "new" style of producers have sane normalization and can support 
1882     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1883     # And we have to diff parsed SQL against parsed SQL.
1884     my $dest_schema = $sqlt_schema;
1885
1886     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1887       my $t = SQL::Translator->new($sqltargs);
1888       $t->debug( 0 );
1889       $t->trace( 0 );
1890
1891       $t->parser( $db )
1892         or $self->throw_exception ($t->error);
1893
1894       my $out = $t->translate( $filename )
1895         or $self->throw_exception ($t->error);
1896
1897       $dest_schema = $t->schema;
1898
1899       $dest_schema->name( $filename )
1900         unless $dest_schema->name;
1901     }
1902     
1903     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1904                                                   $dest_schema,   $db,
1905                                                   $sqltargs
1906                                                  );
1907     if(!open $file, ">$difffile") { 
1908       $self->throw_exception("Can't write to $difffile ($!)");
1909       next;
1910     }
1911     print $file $diff;
1912     close($file);
1913   }
1914 }
1915
1916 =head2 deployment_statements
1917
1918 =over 4
1919
1920 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1921
1922 =back
1923
1924 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1925
1926 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
1927 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
1928
1929 C<$directory> is used to return statements from files in a previously created
1930 L</create_ddl_dir> directory and is optional. The filenames are constructed
1931 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1932
1933 If no C<$directory> is specified then the statements are constructed on the
1934 fly using L<SQL::Translator> and C<$version> is ignored.
1935
1936 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1937
1938 =cut
1939
1940 sub deployment_statements {
1941   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1942   # Need to be connected to get the correct sqlt_type
1943   $self->ensure_connected() unless $type;
1944   $type ||= $self->sqlt_type;
1945   $version ||= $schema->schema_version || '1.x';
1946   $dir ||= './';
1947   my $filename = $schema->ddl_filename($type, $version, $dir);
1948   if(-f $filename)
1949   {
1950       my $file;
1951       open($file, "<$filename") 
1952         or $self->throw_exception("Can't open $filename ($!)");
1953       my @rows = <$file>;
1954       close($file);
1955       return join('', @rows);
1956   }
1957
1958   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
1959       . $self->_check_sqlt_message . q{'})
1960           if !$self->_check_sqlt_version;
1961
1962   require SQL::Translator::Parser::DBIx::Class;
1963   eval qq{use SQL::Translator::Producer::${type}};
1964   $self->throw_exception($@) if $@;
1965
1966   # sources needs to be a parser arg, but for simplicty allow at top level 
1967   # coming in
1968   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1969       if exists $sqltargs->{sources};
1970
1971   my $tr = SQL::Translator->new(%$sqltargs);
1972   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1973   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1974 }
1975
1976 sub deploy {
1977   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1978   my $deploy = sub {
1979     my $line = shift;
1980     return if($line =~ /^--/);
1981     return if(!$line);
1982     # next if($line =~ /^DROP/m);
1983     return if($line =~ /^BEGIN TRANSACTION/m);
1984     return if($line =~ /^COMMIT/m);
1985     return if $line =~ /^\s+$/; # skip whitespace only
1986     $self->_query_start($line);
1987     eval {
1988       $self->dbh->do($line); # shouldn't be using ->dbh ?
1989     };
1990     if ($@) {
1991       carp qq{$@ (running "${line}")};
1992     }
1993     $self->_query_end($line);
1994   };
1995   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
1996   if (@statements > 1) {
1997     foreach my $statement (@statements) {
1998       $deploy->( $statement );
1999     }
2000   }
2001   elsif (@statements == 1) {
2002     foreach my $line ( split(";\n", $statements[0])) {
2003       $deploy->( $line );
2004     }
2005   }
2006 }
2007
2008 =head2 datetime_parser
2009
2010 Returns the datetime parser class
2011
2012 =cut
2013
2014 sub datetime_parser {
2015   my $self = shift;
2016   return $self->{datetime_parser} ||= do {
2017     $self->ensure_connected;
2018     $self->build_datetime_parser(@_);
2019   };
2020 }
2021
2022 =head2 datetime_parser_type
2023
2024 Defines (returns) the datetime parser class - currently hardwired to
2025 L<DateTime::Format::MySQL>
2026
2027 =cut
2028
2029 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2030
2031 =head2 build_datetime_parser
2032
2033 See L</datetime_parser>
2034
2035 =cut
2036
2037 sub build_datetime_parser {
2038   my $self = shift;
2039   my $type = $self->datetime_parser_type(@_);
2040   eval "use ${type}";
2041   $self->throw_exception("Couldn't load ${type}: $@") if $@;
2042   return $type;
2043 }
2044
2045 {
2046     my $_check_sqlt_version; # private
2047     my $_check_sqlt_message; # private
2048     sub _check_sqlt_version {
2049         return $_check_sqlt_version if defined $_check_sqlt_version;
2050         eval 'use SQL::Translator "0.09003"';
2051         $_check_sqlt_message = $@ || '';
2052         $_check_sqlt_version = !$@;
2053     }
2054
2055     sub _check_sqlt_message {
2056         _check_sqlt_version if !defined $_check_sqlt_message;
2057         $_check_sqlt_message;
2058     }
2059 }
2060
2061 =head2 is_replicating
2062
2063 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2064 replicate from a master database.  Default is undef, which is the result
2065 returned by databases that don't support replication.
2066
2067 =cut
2068
2069 sub is_replicating {
2070     return;
2071     
2072 }
2073
2074 =head2 lag_behind_master
2075
2076 Returns a number that represents a certain amount of lag behind a master db
2077 when a given storage is replicating.  The number is database dependent, but
2078 starts at zero and increases with the amount of lag. Default in undef
2079
2080 =cut
2081
2082 sub lag_behind_master {
2083     return;
2084 }
2085
2086 sub DESTROY {
2087   my $self = shift;
2088   return if !$self->_dbh;
2089   $self->_verify_pid;
2090   $self->_dbh(undef);
2091 }
2092
2093 1;
2094
2095 =head1 USAGE NOTES
2096
2097 =head2 DBIx::Class and AutoCommit
2098
2099 DBIx::Class can do some wonderful magic with handling exceptions,
2100 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2101 combined with C<txn_do> for transaction support.
2102
2103 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2104 in an assumed transaction between commits, and you're telling us you'd
2105 like to manage that manually.  A lot of the magic protections offered by
2106 this module will go away.  We can't protect you from exceptions due to database
2107 disconnects because we don't know anything about how to restart your
2108 transactions.  You're on your own for handling all sorts of exceptional
2109 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2110 be with raw DBI.
2111
2112
2113
2114 =head1 AUTHORS
2115
2116 Matt S. Trout <mst@shadowcatsystems.co.uk>
2117
2118 Andy Grundman <andy@hybridized.org>
2119
2120 =head1 LICENSE
2121
2122 You may distribute this code under the same terms as Perl itself.
2123
2124 =cut