rename connect_do store
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use Scalar::Util();
13 use List::Util();
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit _on_connect_do
18        _on_disconnect_do _on_connect_do_store _on_disconnect_do_store
19        savepoints/
20 );
21
22 # the values for these accessors are picked out (and deleted) from
23 # the attribute hashref passed to connect_info
24 my @storage_options = qw/
25   on_connect_call on_disconnect_call disable_sth_caching unsafe auto_savepoint
26 /;
27 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
28
29
30 # default cursor class, overridable in connect_info attributes
31 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
32
33 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
34 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
35
36
37 =head1 NAME
38
39 DBIx::Class::Storage::DBI - DBI storage handler
40
41 =head1 SYNOPSIS
42
43   my $schema = MySchema->connect('dbi:SQLite:my.db');
44
45   $schema->storage->debug(1);
46   $schema->dbh_do("DROP TABLE authors");
47
48   $schema->resultset('Book')->search({
49      written_on => $schema->storage->datetime_parser(DateTime->now)
50   });
51
52 =head1 DESCRIPTION
53
54 This class represents the connection to an RDBMS via L<DBI>.  See
55 L<DBIx::Class::Storage> for general information.  This pod only
56 documents DBI-specific methods and behaviors.
57
58 =head1 METHODS
59
60 =cut
61
62 sub new {
63   my $new = shift->next::method(@_);
64
65   $new->transaction_depth(0);
66   $new->_sql_maker_opts({});
67   $new->{savepoints} = [];
68   $new->{_in_dbh_do} = 0;
69   $new->{_dbh_gen} = 0;
70
71   $new;
72 }
73
74 =head2 connect_info
75
76 This method is normally called by L<DBIx::Class::Schema/connection>, which
77 encapsulates its argument list in an arrayref before passing them here.
78
79 The argument list may contain:
80
81 =over
82
83 =item *
84
85 The same 4-element argument set one would normally pass to
86 L<DBI/connect>, optionally followed by
87 L<extra attributes|/DBIx::Class specific connection attributes>
88 recognized by DBIx::Class:
89
90   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
91
92 =item *
93
94 A single code reference which returns a connected 
95 L<DBI database handle|DBI/connect> optionally followed by 
96 L<extra attributes|/DBIx::Class specific connection attributes> recognized
97 by DBIx::Class:
98
99   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
100
101 =item *
102
103 A single hashref with all the attributes and the dsn/user/password
104 mixed together:
105
106   $connect_info_args = [{
107     dsn => $dsn,
108     user => $user,
109     password => $pass,
110     %dbi_attributes,
111     %extra_attributes,
112   }];
113
114 This is particularly useful for L<Catalyst> based applications, allowing the 
115 following config (L<Config::General> style):
116
117   <Model::DB>
118     schema_class   App::DB
119     <connect_info>
120       dsn          dbi:mysql:database=test
121       user         testuser
122       password     TestPass
123       AutoCommit   1
124     </connect_info>
125   </Model::DB>
126
127 =back
128
129 Please note that the L<DBI> docs recommend that you always explicitly
130 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
131 recommends that it be set to I<1>, and that you perform transactions
132 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
133 to I<1> if you do not do explicitly set it to zero.  This is the default 
134 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
135
136 =head3 DBIx::Class specific connection attributes
137
138 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
139 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
140 the following connection options. These options can be mixed in with your other
141 L<DBI> connection attributes, or placed in a seperate hashref
142 (C<\%extra_attributes>) as shown above.
143
144 Every time C<connect_info> is invoked, any previous settings for
145 these options will be cleared before setting the new ones, regardless of
146 whether any options are specified in the new C<connect_info>.
147
148
149 =over
150
151 =item on_connect_do
152
153 Specifies things to do immediately after connecting or re-connecting to
154 the database.  Its value may contain:
155
156 =over
157
158 =item a scalar
159
160 This contains one SQL statement to execute.
161
162 =item an array reference
163
164 This contains SQL statements to execute in order.  Each element contains
165 a string or a code reference that returns a string.
166
167 =item a code reference
168
169 This contains some code to execute.  Unlike code references within an
170 array reference, its return value is ignored.
171
172 =back
173
174 =item on_disconnect_do
175
176 Takes arguments in the same form as L</on_connect_do> and executes them
177 immediately before disconnecting from the database.
178
179 Note, this only runs if you explicitly call L</disconnect> on the
180 storage object.
181
182 =item on_connect_call
183
184 A more generalized form of L</on_connect_do> that calls the specified
185 C<connect_call_METHOD> methods in your storage driver.
186
187   on_connect_do => 'select 1'
188
189 is equivalent to:
190
191   on_connect_call => [ [ do_sql => 'select 1' ] ]
192
193 Its values may contain:
194
195 =over
196
197 =item a scalar
198
199 Will call the C<connect_call_METHOD> method.
200
201 =item a code reference
202
203 Will execute C<< $code->($storage) >>
204
205 =item an array reference
206
207 Each value can be a method name or code reference.
208
209 =item an array of arrays
210
211 For each array, the first item is taken to be the C<connect_call_> method name
212 or code reference, and the rest are parameters to it.
213
214 =back
215
216 Some predefined storage methods you may use:
217
218 =over
219
220 =item do_sql
221
222 Executes a SQL string or a code reference that returns a SQL string. This is
223 what L</on_connect_do> and L</on_disconnect_do> use.
224
225 It can take:
226
227 =over
228
229 =item a scalar
230
231 Will execute the scalar as SQL.
232
233 =item an arrayref
234
235 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
236 attributes hashref and bind values.
237
238 =item a code reference
239
240 Will execute C<< $code->($storage) >> and execute the return array refs as
241 above.
242
243 =back
244
245 =item datetime_setup
246
247 Execute any statements necessary to initialize the database session to return
248 and accept datetime/timestamp values used with
249 L<DBIx::Class::InflateColumn::DateTime>.
250
251 Only necessary for some databases, see your specific storage driver for
252 implementation details.
253
254 =back
255
256 =item on_disconnect_call
257
258 Takes arguments in the same form as L</on_connect_call> and executes them
259 immediately before disconnecting from the database.
260
261 Calls the C<disconnect_call_METHOD> methods as opposed to the
262 C<connect_call_METHOD> methods called by L</on_connect_call>.
263
264 Note, this only runs if you explicitly call L</disconnect> on the
265 storage object.
266
267 =item disable_sth_caching
268
269 If set to a true value, this option will disable the caching of
270 statement handles via L<DBI/prepare_cached>.
271
272 =item limit_dialect 
273
274 Sets the limit dialect. This is useful for JDBC-bridge among others
275 where the remote SQL-dialect cannot be determined by the name of the
276 driver alone. See also L<SQL::Abstract::Limit>.
277
278 =item quote_char
279
280 Specifies what characters to use to quote table and column names. If 
281 you use this you will want to specify L</name_sep> as well.
282
283 C<quote_char> expects either a single character, in which case is it
284 is placed on either side of the table/column name, or an arrayref of length
285 2 in which case the table/column name is placed between the elements.
286
287 For example under MySQL you should use C<< quote_char => '`' >>, and for
288 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
289
290 =item name_sep
291
292 This only needs to be used in conjunction with C<quote_char>, and is used to 
293 specify the charecter that seperates elements (schemas, tables, columns) from 
294 each other. In most cases this is simply a C<.>.
295
296 The consequences of not supplying this value is that L<SQL::Abstract>
297 will assume DBIx::Class' uses of aliases to be complete column
298 names. The output will look like I<"me.name"> when it should actually
299 be I<"me"."name">.
300
301 =item unsafe
302
303 This Storage driver normally installs its own C<HandleError>, sets
304 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
305 all database handles, including those supplied by a coderef.  It does this
306 so that it can have consistent and useful error behavior.
307
308 If you set this option to a true value, Storage will not do its usual
309 modifications to the database handle's attributes, and instead relies on
310 the settings in your connect_info DBI options (or the values you set in
311 your connection coderef, in the case that you are connecting via coderef).
312
313 Note that your custom settings can cause Storage to malfunction,
314 especially if you set a C<HandleError> handler that suppresses exceptions
315 and/or disable C<RaiseError>.
316
317 =item auto_savepoint
318
319 If this option is true, L<DBIx::Class> will use savepoints when nesting
320 transactions, making it possible to recover from failure in the inner
321 transaction without having to abort all outer transactions.
322
323 =item cursor_class
324
325 Use this argument to supply a cursor class other than the default
326 L<DBIx::Class::Storage::DBI::Cursor>.
327
328 =back
329
330 Some real-life examples of arguments to L</connect_info> and
331 L<DBIx::Class::Schema/connect>
332
333   # Simple SQLite connection
334   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
335
336   # Connect via subref
337   ->connect_info([ sub { DBI->connect(...) } ]);
338
339   # A bit more complicated
340   ->connect_info(
341     [
342       'dbi:Pg:dbname=foo',
343       'postgres',
344       'my_pg_password',
345       { AutoCommit => 1 },
346       { quote_char => q{"}, name_sep => q{.} },
347     ]
348   );
349
350   # Equivalent to the previous example
351   ->connect_info(
352     [
353       'dbi:Pg:dbname=foo',
354       'postgres',
355       'my_pg_password',
356       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
357     ]
358   );
359
360   # Same, but with hashref as argument
361   # See parse_connect_info for explanation
362   ->connect_info(
363     [{
364       dsn         => 'dbi:Pg:dbname=foo',
365       user        => 'postgres',
366       password    => 'my_pg_password',
367       AutoCommit  => 1,
368       quote_char  => q{"},
369       name_sep    => q{.},
370     }]
371   );
372
373   # Subref + DBIx::Class-specific connection options
374   ->connect_info(
375     [
376       sub { DBI->connect(...) },
377       {
378           quote_char => q{`},
379           name_sep => q{@},
380           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
381           disable_sth_caching => 1,
382       },
383     ]
384   );
385
386
387
388 =cut
389
390 sub connect_info {
391   my ($self, $info_arg) = @_;
392
393   return $self->_connect_info if !$info_arg;
394
395   my @args = @$info_arg;  # take a shallow copy for further mutilation
396   $self->_connect_info([@args]); # copy for _connect_info
397
398
399   # combine/pre-parse arguments depending on invocation style
400
401   my %attrs;
402   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
403     %attrs = %{ $args[1] || {} };
404     @args = $args[0];
405   }
406   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
407     %attrs = %{$args[0]};
408     @args = ();
409     for (qw/password user dsn/) {
410       unshift @args, delete $attrs{$_};
411     }
412   }
413   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
414     %attrs = (
415       % { $args[3] || {} },
416       % { $args[4] || {} },
417     );
418     @args = @args[0,1,2];
419   }
420
421   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
422   #  the new set of options
423   $self->_sql_maker(undef);
424   $self->_sql_maker_opts({});
425
426   if(keys %attrs) {
427     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
428       if(my $value = delete $attrs{$storage_opt}) {
429         $self->$storage_opt($value);
430       }
431     }
432     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
433       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
434         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
435       }
436     }
437     for my $connect_do_opt (qw/on_connect_do on_disconnect_do/) {
438       if(my $opt_val = delete $attrs{$connect_do_opt}) {
439         $self->$connect_do_opt($opt_val);
440       }
441     }
442   }
443
444   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
445
446   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
447   $self->_connect_info;
448 }
449
450 =head2 on_connect_do
451
452 This method is deprecated in favour of setting via L</connect_info>.
453
454 =cut
455
456 sub on_connect_do {
457   my $self = shift;
458   $self->_setup_connect_do(on_connect_do => @_);
459 }
460
461 =head2 on_disconnect_do
462
463 This method is deprecated in favour of setting via L</connect_info>.
464
465 =cut
466
467 sub on_disconnect_do {
468   my $self = shift;
469   $self->_setup_connect_do(on_disconnect_do => @_);
470 }
471
472 sub _setup_connect_do {
473   my ($self, $opt) = (shift, shift);
474
475   my $accessor = "_$opt";
476   my $store    = "_${opt}_store";
477
478   return $self->$accessor if not @_;
479
480   my $val = shift;
481
482   if (not defined $val) {
483     $self->$accessor(undef);
484     $self->$store(undef);
485     return;
486   }
487
488   my @store;
489
490   if (not ref($val)) {
491     push @store, [ 'do_sql', $val ];
492   } elsif (ref($val) eq 'CODE') {
493     push @store, $val;
494   } elsif (ref($val) eq 'ARRAY') {
495     push @store, map [ 'do_sql', $_ ], @$val;
496   } else {
497     $self->throw_exception("Invalid type for $opt ".ref($val));
498   }
499
500   $self->$store(\@store);
501   $self->$accessor($val);
502 }
503
504 =head2 dbh_do
505
506 Arguments: ($subref | $method_name), @extra_coderef_args?
507
508 Execute the given $subref or $method_name using the new exception-based
509 connection management.
510
511 The first two arguments will be the storage object that C<dbh_do> was called
512 on and a database handle to use.  Any additional arguments will be passed
513 verbatim to the called subref as arguments 2 and onwards.
514
515 Using this (instead of $self->_dbh or $self->dbh) ensures correct
516 exception handling and reconnection (or failover in future subclasses).
517
518 Your subref should have no side-effects outside of the database, as
519 there is the potential for your subref to be partially double-executed
520 if the database connection was stale/dysfunctional.
521
522 Example:
523
524   my @stuff = $schema->storage->dbh_do(
525     sub {
526       my ($storage, $dbh, @cols) = @_;
527       my $cols = join(q{, }, @cols);
528       $dbh->selectrow_array("SELECT $cols FROM foo");
529     },
530     @column_list
531   );
532
533 =cut
534
535 sub dbh_do {
536   my $self = shift;
537   my $code = shift;
538
539   my $dbh = $self->_dbh;
540
541   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
542       || $self->{transaction_depth};
543
544   local $self->{_in_dbh_do} = 1;
545
546   my @result;
547   my $want_array = wantarray;
548
549   eval {
550     $self->_verify_pid if $dbh;
551     if(!$self->_dbh) {
552         $self->_populate_dbh;
553         $dbh = $self->_dbh;
554     }
555
556     if($want_array) {
557         @result = $self->$code($dbh, @_);
558     }
559     elsif(defined $want_array) {
560         $result[0] = $self->$code($dbh, @_);
561     }
562     else {
563         $self->$code($dbh, @_);
564     }
565   };
566
567   my $exception = $@;
568   if(!$exception) { return $want_array ? @result : $result[0] }
569
570   $self->throw_exception($exception) if $self->connected;
571
572   # We were not connected - reconnect and retry, but let any
573   #  exception fall right through this time
574   $self->_populate_dbh;
575   $self->$code($self->_dbh, @_);
576 }
577
578 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
579 # It also informs dbh_do to bypass itself while under the direction of txn_do,
580 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
581 sub txn_do {
582   my $self = shift;
583   my $coderef = shift;
584
585   ref $coderef eq 'CODE' or $self->throw_exception
586     ('$coderef must be a CODE reference');
587
588   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
589
590   local $self->{_in_dbh_do} = 1;
591
592   my @result;
593   my $want_array = wantarray;
594
595   my $tried = 0;
596   while(1) {
597     eval {
598       $self->_verify_pid if $self->_dbh;
599       $self->_populate_dbh if !$self->_dbh;
600
601       $self->txn_begin;
602       if($want_array) {
603           @result = $coderef->(@_);
604       }
605       elsif(defined $want_array) {
606           $result[0] = $coderef->(@_);
607       }
608       else {
609           $coderef->(@_);
610       }
611       $self->txn_commit;
612     };
613
614     my $exception = $@;
615     if(!$exception) { return $want_array ? @result : $result[0] }
616
617     if($tried++ > 0 || $self->connected) {
618       eval { $self->txn_rollback };
619       my $rollback_exception = $@;
620       if($rollback_exception) {
621         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
622         $self->throw_exception($exception)  # propagate nested rollback
623           if $rollback_exception =~ /$exception_class/;
624
625         $self->throw_exception(
626           "Transaction aborted: ${exception}. "
627           . "Rollback failed: ${rollback_exception}"
628         );
629       }
630       $self->throw_exception($exception)
631     }
632
633     # We were not connected, and was first try - reconnect and retry
634     # via the while loop
635     $self->_populate_dbh;
636   }
637 }
638
639 =head2 disconnect
640
641 Our C<disconnect> method also performs a rollback first if the
642 database is not in C<AutoCommit> mode.
643
644 =cut
645
646 sub disconnect {
647   my ($self) = @_;
648
649   if( $self->connected ) {
650     if (my $connection_call = $self->on_disconnect_call) {
651       $self->_do_connection_actions(disconnect_call_ => $connection_call)
652     }
653     if (my $connection_do   = $self->_on_disconnect_do_store) {
654       $self->_do_connection_actions(disconnect_call_ => $connection_do)
655     }
656
657     $self->_dbh->rollback unless $self->_dbh_autocommit;
658     $self->_dbh->disconnect;
659     $self->_dbh(undef);
660     $self->{_dbh_gen}++;
661   }
662 }
663
664 =head2 with_deferred_fk_checks
665
666 =over 4
667
668 =item Arguments: C<$coderef>
669
670 =item Return Value: The return value of $coderef
671
672 =back
673
674 Storage specific method to run the code ref with FK checks deferred or
675 in MySQL's case disabled entirely.
676
677 =cut
678
679 # Storage subclasses should override this
680 sub with_deferred_fk_checks {
681   my ($self, $sub) = @_;
682
683   $sub->();
684 }
685
686 sub connected {
687   my ($self) = @_;
688
689   if(my $dbh = $self->_dbh) {
690       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
691           $self->_dbh(undef);
692           $self->{_dbh_gen}++;
693           return;
694       }
695       else {
696           $self->_verify_pid;
697           return 0 if !$self->_dbh;
698       }
699       return ($dbh->FETCH('Active') && $dbh->ping);
700   }
701
702   return 0;
703 }
704
705 # handle pid changes correctly
706 #  NOTE: assumes $self->_dbh is a valid $dbh
707 sub _verify_pid {
708   my ($self) = @_;
709
710   return if defined $self->_conn_pid && $self->_conn_pid == $$;
711
712   $self->_dbh->{InactiveDestroy} = 1;
713   $self->_dbh(undef);
714   $self->{_dbh_gen}++;
715
716   return;
717 }
718
719 sub ensure_connected {
720   my ($self) = @_;
721
722   unless ($self->connected) {
723     $self->_populate_dbh;
724   }
725 }
726
727 =head2 dbh
728
729 Returns the dbh - a data base handle of class L<DBI>.
730
731 =cut
732
733 sub dbh {
734   my ($self) = @_;
735
736   $self->ensure_connected;
737   return $self->_dbh;
738 }
739
740 sub _sql_maker_args {
741     my ($self) = @_;
742     
743     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
744 }
745
746 sub sql_maker {
747   my ($self) = @_;
748   unless ($self->_sql_maker) {
749     my $sql_maker_class = $self->sql_maker_class;
750     $self->ensure_class_loaded ($sql_maker_class);
751     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
752   }
753   return $self->_sql_maker;
754 }
755
756 sub _rebless {}
757
758 sub _populate_dbh {
759   my ($self) = @_;
760   my @info = @{$self->_dbi_connect_info || []};
761   $self->_dbh($self->_connect(@info));
762
763   $self->_conn_pid($$);
764   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
765
766   $self->_determine_driver;
767
768   # Always set the transaction depth on connect, since
769   #  there is no transaction in progress by definition
770   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
771
772   if (my $connection_call = $self->on_connect_call) {
773     $self->_do_connection_actions(connect_call_ => $connection_call)
774   }
775   if (my $connection_do = $self->_on_connect_do_store) {
776     $self->_do_connection_actions(connect_call_ => $connection_do)
777   }
778 }
779
780 sub _determine_driver {
781   my ($self) = @_;
782
783   if (ref $self eq 'DBIx::Class::Storage::DBI') {
784     my $driver;
785
786     if ($self->_dbh) { # we are connected
787       $driver = $self->_dbh->{Driver}{Name};
788     } else {
789       # try to use dsn to not require being connected, the driver may still
790       # force a connection in _rebless to determine version
791       ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
792     }
793
794     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
795       bless $self, "DBIx::Class::Storage::DBI::${driver}";
796       $self->_rebless();
797     }
798   }
799 }
800
801 sub _do_connection_actions {
802   my $self          = shift;
803   my $method_prefix = shift;
804   my $call          = shift;
805
806   if (not ref($call)) {
807     my $method = $method_prefix . $call;
808     $self->$method(@_);
809   } elsif (ref($call) eq 'CODE') {
810     $self->$call(@_);
811   } elsif (ref($call) eq 'ARRAY') {
812     if (ref($call->[0]) ne 'ARRAY') {
813       $self->_do_connection_actions($method_prefix, $_) for @$call;
814     } else {
815       $self->_do_connection_actions($method_prefix, @$_) for @$call;
816     }
817   } else {
818     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
819   }
820
821   return $self;
822 }
823
824 sub connect_call_do_sql {
825   my $self = shift;
826   $self->_do_query(@_);
827 }
828
829 sub disconnect_call_do_sql {
830   my $self = shift;
831   $self->_do_query(@_);
832 }
833
834 # override in db-specific backend when necessary
835 sub connect_call_datetime_setup { 1 }
836
837 sub _do_query {
838   my ($self, $action) = @_;
839
840   if (ref $action eq 'CODE') {
841     $action = $action->($self);
842     $self->_do_query($_) foreach @$action;
843   }
844   else {
845     # Most debuggers expect ($sql, @bind), so we need to exclude
846     # the attribute hash which is the second argument to $dbh->do
847     # furthermore the bind values are usually to be presented
848     # as named arrayref pairs, so wrap those here too
849     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
850     my $sql = shift @do_args;
851     my $attrs = shift @do_args;
852     my @bind = map { [ undef, $_ ] } @do_args;
853
854     $self->_query_start($sql, @bind);
855     $self->_dbh->do($sql, $attrs, @do_args);
856     $self->_query_end($sql, @bind);
857   }
858
859   return $self;
860 }
861
862 sub _connect {
863   my ($self, @info) = @_;
864
865   $self->throw_exception("You failed to provide any connection info")
866     if !@info;
867
868   my ($old_connect_via, $dbh);
869
870   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
871     $old_connect_via = $DBI::connect_via;
872     $DBI::connect_via = 'connect';
873   }
874
875   eval {
876     if(ref $info[0] eq 'CODE') {
877        $dbh = &{$info[0]}
878     }
879     else {
880        $dbh = DBI->connect(@info);
881     }
882
883     if($dbh && !$self->unsafe) {
884       my $weak_self = $self;
885       Scalar::Util::weaken($weak_self);
886       $dbh->{HandleError} = sub {
887           if ($weak_self) {
888             $weak_self->throw_exception("DBI Exception: $_[0]");
889           }
890           else {
891             croak ("DBI Exception: $_[0]");
892           }
893       };
894       $dbh->{ShowErrorStatement} = 1;
895       $dbh->{RaiseError} = 1;
896       $dbh->{PrintError} = 0;
897     }
898   };
899
900   $DBI::connect_via = $old_connect_via if $old_connect_via;
901
902   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
903     if !$dbh || $@;
904
905   $self->_dbh_autocommit($dbh->{AutoCommit});
906
907   $dbh;
908 }
909
910 sub svp_begin {
911   my ($self, $name) = @_;
912
913   $name = $self->_svp_generate_name
914     unless defined $name;
915
916   $self->throw_exception ("You can't use savepoints outside a transaction")
917     if $self->{transaction_depth} == 0;
918
919   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
920     unless $self->can('_svp_begin');
921   
922   push @{ $self->{savepoints} }, $name;
923
924   $self->debugobj->svp_begin($name) if $self->debug;
925   
926   return $self->_svp_begin($name);
927 }
928
929 sub svp_release {
930   my ($self, $name) = @_;
931
932   $self->throw_exception ("You can't use savepoints outside a transaction")
933     if $self->{transaction_depth} == 0;
934
935   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
936     unless $self->can('_svp_release');
937
938   if (defined $name) {
939     $self->throw_exception ("Savepoint '$name' does not exist")
940       unless grep { $_ eq $name } @{ $self->{savepoints} };
941
942     # Dig through the stack until we find the one we are releasing.  This keeps
943     # the stack up to date.
944     my $svp;
945
946     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
947   } else {
948     $name = pop @{ $self->{savepoints} };
949   }
950
951   $self->debugobj->svp_release($name) if $self->debug;
952
953   return $self->_svp_release($name);
954 }
955
956 sub svp_rollback {
957   my ($self, $name) = @_;
958
959   $self->throw_exception ("You can't use savepoints outside a transaction")
960     if $self->{transaction_depth} == 0;
961
962   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
963     unless $self->can('_svp_rollback');
964
965   if (defined $name) {
966       # If they passed us a name, verify that it exists in the stack
967       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
968           $self->throw_exception("Savepoint '$name' does not exist!");
969       }
970
971       # Dig through the stack until we find the one we are releasing.  This keeps
972       # the stack up to date.
973       while(my $s = pop(@{ $self->{savepoints} })) {
974           last if($s eq $name);
975       }
976       # Add the savepoint back to the stack, as a rollback doesn't remove the
977       # named savepoint, only everything after it.
978       push(@{ $self->{savepoints} }, $name);
979   } else {
980       # We'll assume they want to rollback to the last savepoint
981       $name = $self->{savepoints}->[-1];
982   }
983
984   $self->debugobj->svp_rollback($name) if $self->debug;
985   
986   return $self->_svp_rollback($name);
987 }
988
989 sub _svp_generate_name {
990     my ($self) = @_;
991
992     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
993 }
994
995 sub txn_begin {
996   my $self = shift;
997   $self->ensure_connected();
998   if($self->{transaction_depth} == 0) {
999     $self->debugobj->txn_begin()
1000       if $self->debug;
1001     # this isn't ->_dbh-> because
1002     #  we should reconnect on begin_work
1003     #  for AutoCommit users
1004     $self->dbh->begin_work;
1005   } elsif ($self->auto_savepoint) {
1006     $self->svp_begin;
1007   }
1008   $self->{transaction_depth}++;
1009 }
1010
1011 sub txn_commit {
1012   my $self = shift;
1013   if ($self->{transaction_depth} == 1) {
1014     my $dbh = $self->_dbh;
1015     $self->debugobj->txn_commit()
1016       if ($self->debug);
1017     $dbh->commit;
1018     $self->{transaction_depth} = 0
1019       if $self->_dbh_autocommit;
1020   }
1021   elsif($self->{transaction_depth} > 1) {
1022     $self->{transaction_depth}--;
1023     $self->svp_release
1024       if $self->auto_savepoint;
1025   }
1026 }
1027
1028 sub txn_rollback {
1029   my $self = shift;
1030   my $dbh = $self->_dbh;
1031   eval {
1032     if ($self->{transaction_depth} == 1) {
1033       $self->debugobj->txn_rollback()
1034         if ($self->debug);
1035       $self->{transaction_depth} = 0
1036         if $self->_dbh_autocommit;
1037       $dbh->rollback;
1038     }
1039     elsif($self->{transaction_depth} > 1) {
1040       $self->{transaction_depth}--;
1041       if ($self->auto_savepoint) {
1042         $self->svp_rollback;
1043         $self->svp_release;
1044       }
1045     }
1046     else {
1047       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1048     }
1049   };
1050   if ($@) {
1051     my $error = $@;
1052     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1053     $error =~ /$exception_class/ and $self->throw_exception($error);
1054     # ensure that a failed rollback resets the transaction depth
1055     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1056     $self->throw_exception($error);
1057   }
1058 }
1059
1060 # This used to be the top-half of _execute.  It was split out to make it
1061 #  easier to override in NoBindVars without duping the rest.  It takes up
1062 #  all of _execute's args, and emits $sql, @bind.
1063 sub _prep_for_execute {
1064   my ($self, $op, $extra_bind, $ident, $args) = @_;
1065
1066   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1067     $ident = $ident->from();
1068   }
1069
1070   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1071
1072   unshift(@bind,
1073     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1074       if $extra_bind;
1075   return ($sql, \@bind);
1076 }
1077
1078
1079 sub _fix_bind_params {
1080     my ($self, @bind) = @_;
1081
1082     ### Turn @bind from something like this:
1083     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1084     ### to this:
1085     ###   ( "'1'", "'1'", "'3'" )
1086     return
1087         map {
1088             if ( defined( $_ && $_->[1] ) ) {
1089                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1090             }
1091             else { q{'NULL'}; }
1092         } @bind;
1093 }
1094
1095 sub _query_start {
1096     my ( $self, $sql, @bind ) = @_;
1097
1098     if ( $self->debug ) {
1099         @bind = $self->_fix_bind_params(@bind);
1100
1101         $self->debugobj->query_start( $sql, @bind );
1102     }
1103 }
1104
1105 sub _query_end {
1106     my ( $self, $sql, @bind ) = @_;
1107
1108     if ( $self->debug ) {
1109         @bind = $self->_fix_bind_params(@bind);
1110         $self->debugobj->query_end( $sql, @bind );
1111     }
1112 }
1113
1114 sub _dbh_execute {
1115   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1116
1117   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1118
1119   $self->_query_start( $sql, @$bind );
1120
1121   my $sth = $self->sth($sql,$op);
1122
1123   my $placeholder_index = 1; 
1124
1125   foreach my $bound (@$bind) {
1126     my $attributes = {};
1127     my($column_name, @data) = @$bound;
1128
1129     if ($bind_attributes) {
1130       $attributes = $bind_attributes->{$column_name}
1131       if defined $bind_attributes->{$column_name};
1132     }
1133
1134     foreach my $data (@data) {
1135       my $ref = ref $data;
1136       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1137
1138       $sth->bind_param($placeholder_index, $data, $attributes);
1139       $placeholder_index++;
1140     }
1141   }
1142
1143   # Can this fail without throwing an exception anyways???
1144   my $rv = $sth->execute();
1145   $self->throw_exception($sth->errstr) if !$rv;
1146
1147   $self->_query_end( $sql, @$bind );
1148
1149   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1150 }
1151
1152 sub _execute {
1153     my $self = shift;
1154     $self->dbh_do('_dbh_execute', @_)
1155 }
1156
1157 sub insert {
1158   my ($self, $source, $to_insert) = @_;
1159
1160   my $ident = $source->from;
1161   my $bind_attributes = $self->source_bind_attributes($source);
1162
1163   my $updated_cols = {};
1164
1165   $self->ensure_connected;
1166   foreach my $col ( $source->columns ) {
1167     if ( !defined $to_insert->{$col} ) {
1168       my $col_info = $source->column_info($col);
1169
1170       if ( $col_info->{auto_nextval} ) {
1171         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1172       }
1173     }
1174   }
1175
1176   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1177
1178   return $updated_cols;
1179 }
1180
1181 ## Still not quite perfect, and EXPERIMENTAL
1182 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1183 ## scalar refs, or at least, all the same type as the first set, the statement is
1184 ## only prepped once.
1185 sub insert_bulk {
1186   my ($self, $source, $cols, $data) = @_;
1187   my %colvalues;
1188   my $table = $source->from;
1189   @colvalues{@$cols} = (0..$#$cols);
1190   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1191   
1192   $self->_query_start( $sql, @bind );
1193   my $sth = $self->sth($sql);
1194
1195 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1196
1197   ## This must be an arrayref, else nothing works!
1198   my $tuple_status = [];
1199
1200   ## Get the bind_attributes, if any exist
1201   my $bind_attributes = $self->source_bind_attributes($source);
1202
1203   ## Bind the values and execute
1204   my $placeholder_index = 1; 
1205
1206   foreach my $bound (@bind) {
1207
1208     my $attributes = {};
1209     my ($column_name, $data_index) = @$bound;
1210
1211     if( $bind_attributes ) {
1212       $attributes = $bind_attributes->{$column_name}
1213       if defined $bind_attributes->{$column_name};
1214     }
1215
1216     my @data = map { $_->[$data_index] } @$data;
1217
1218     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1219     $placeholder_index++;
1220   }
1221   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1222   if (my $err = $@) {
1223     my $i = 0;
1224     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1225
1226     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1227       if ($i > $#$tuple_status);
1228
1229     require Data::Dumper;
1230     local $Data::Dumper::Terse = 1;
1231     local $Data::Dumper::Indent = 1;
1232     local $Data::Dumper::Useqq = 1;
1233     local $Data::Dumper::Quotekeys = 0;
1234
1235     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1236       $tuple_status->[$i][1],
1237       Data::Dumper::Dumper(
1238         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1239       ),
1240     );
1241   }
1242   $self->throw_exception($sth->errstr) if !$rv;
1243
1244   $self->_query_end( $sql, @bind );
1245   return (wantarray ? ($rv, $sth, @bind) : $rv);
1246 }
1247
1248 sub update {
1249   my $self = shift @_;
1250   my $source = shift @_;
1251   my $bind_attributes = $self->source_bind_attributes($source);
1252   
1253   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1254 }
1255
1256
1257 sub delete {
1258   my $self = shift @_;
1259   my $source = shift @_;
1260   
1261   my $bind_attrs = $self->source_bind_attributes($source);
1262   
1263   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1264 }
1265
1266 # We were sent here because the $rs contains a complex search
1267 # which will require a subquery to select the correct rows
1268 # (i.e. joined or limited resultsets)
1269 #
1270 # Genarating a single PK column subquery is trivial and supported
1271 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1272 # Look at _multipk_update_delete()
1273 sub _subq_update_delete {
1274   my $self = shift;
1275   my ($rs, $op, $values) = @_;
1276
1277   my $rsrc = $rs->result_source;
1278
1279   # we already check this, but double check naively just in case. Should be removed soon
1280   my $sel = $rs->_resolved_attrs->{select};
1281   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1282   my @pcols = $rsrc->primary_columns;
1283   if (@$sel != @pcols) {
1284     $self->throw_exception (
1285       'Subquery update/delete can not be called on resultsets selecting a'
1286      .' number of columns different than the number of primary keys'
1287     );
1288   }
1289
1290   if (@pcols == 1) {
1291     return $self->$op (
1292       $rsrc,
1293       $op eq 'update' ? $values : (),
1294       { $pcols[0] => { -in => $rs->as_query } },
1295     );
1296   }
1297
1298   else {
1299     return $self->_multipk_update_delete (@_);
1300   }
1301 }
1302
1303 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1304 # resultset update/delete involving subqueries. So by default resort
1305 # to simple (and inefficient) delete_all style per-row opearations,
1306 # while allowing specific storages to override this with a faster
1307 # implementation.
1308 #
1309 sub _multipk_update_delete {
1310   return shift->_per_row_update_delete (@_);
1311 }
1312
1313 # This is the default loop used to delete/update rows for multi PK
1314 # resultsets, and used by mysql exclusively (because it can't do anything
1315 # else).
1316 #
1317 # We do not use $row->$op style queries, because resultset update/delete
1318 # is not expected to cascade (this is what delete_all/update_all is for).
1319 #
1320 # There should be no race conditions as the entire operation is rolled
1321 # in a transaction.
1322 #
1323 sub _per_row_update_delete {
1324   my $self = shift;
1325   my ($rs, $op, $values) = @_;
1326
1327   my $rsrc = $rs->result_source;
1328   my @pcols = $rsrc->primary_columns;
1329
1330   my $guard = $self->txn_scope_guard;
1331
1332   # emulate the return value of $sth->execute for non-selects
1333   my $row_cnt = '0E0';
1334
1335   my $subrs_cur = $rs->cursor;
1336   while (my @pks = $subrs_cur->next) {
1337
1338     my $cond;
1339     for my $i (0.. $#pcols) {
1340       $cond->{$pcols[$i]} = $pks[$i];
1341     }
1342
1343     $self->$op (
1344       $rsrc,
1345       $op eq 'update' ? $values : (),
1346       $cond,
1347     );
1348
1349     $row_cnt++;
1350   }
1351
1352   $guard->commit;
1353
1354   return $row_cnt;
1355 }
1356
1357 sub _select {
1358   my $self = shift;
1359   my $sql_maker = $self->sql_maker;
1360   local $sql_maker->{for};
1361   return $self->_execute($self->_select_args(@_));
1362 }
1363
1364 sub _select_args_to_query {
1365   my $self = shift;
1366
1367   my $sql_maker = $self->sql_maker;
1368   local $sql_maker->{for};
1369
1370   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset) 
1371   #  = $self->_select_args($ident, $select, $cond, $attrs);
1372   my ($op, $bind, $ident, $bind_attrs, @args) =
1373     $self->_select_args(@_);
1374
1375   # my ($sql, $bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1376   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1377
1378   return \[ "($sql)", @{ $prepared_bind || [] }];
1379 }
1380
1381 sub _select_args {
1382   my ($self, $ident, $select, $condition, $attrs) = @_;
1383
1384   my $for = delete $attrs->{for};
1385   my $sql_maker = $self->sql_maker;
1386   $sql_maker->{for} = $for;
1387
1388   my $order = { map
1389     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1390     (qw/order_by group_by having _virtual_order_by/ )
1391   };
1392
1393
1394   my $bind_attrs = {};
1395
1396   my $alias2source = $self->_resolve_ident_sources ($ident);
1397
1398   for my $alias (keys %$alias2source) {
1399     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1400     for my $col (keys %$bindtypes) {
1401
1402       my $fqcn = join ('.', $alias, $col);
1403       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1404
1405       # so that unqualified searches can be bound too
1406       $bind_attrs->{$col} = $bind_attrs->{$fqcn} if $alias eq 'me';
1407     }
1408   }
1409
1410   # This would be the point to deflate anything found in $condition
1411   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1412   # expect a row object. And all we have is a resultsource (it is trivial
1413   # to extract deflator coderefs via $alias2source above).
1414   #
1415   # I don't see a way forward other than changing the way deflators are
1416   # invoked, and that's just bad...
1417
1418   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1419   if ($attrs->{software_limit} ||
1420       $sql_maker->_default_limit_syntax eq "GenericSubQ") {
1421         $attrs->{software_limit} = 1;
1422   } else {
1423     $self->throw_exception("rows attribute must be positive if present")
1424       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1425
1426     # MySQL actually recommends this approach.  I cringe.
1427     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1428     push @args, $attrs->{rows}, $attrs->{offset};
1429   }
1430   return @args;
1431 }
1432
1433 sub _resolve_ident_sources {
1434   my ($self, $ident) = @_;
1435
1436   my $alias2source = {};
1437
1438   # the reason this is so contrived is that $ident may be a {from}
1439   # structure, specifying multiple tables to join
1440   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1441     # this is compat mode for insert/update/delete which do not deal with aliases
1442     $alias2source->{me} = $ident;
1443   }
1444   elsif (ref $ident eq 'ARRAY') {
1445
1446     for (@$ident) {
1447       my $tabinfo;
1448       if (ref $_ eq 'HASH') {
1449         $tabinfo = $_;
1450       }
1451       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1452         $tabinfo = $_->[0];
1453       }
1454
1455       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-result_source}
1456         if ($tabinfo->{-result_source});
1457     }
1458   }
1459
1460   return $alias2source;
1461 }
1462
1463 sub count {
1464   my ($self, $source, $attrs) = @_;
1465
1466   my $tmp_attrs = { %$attrs };
1467
1468   # take off any pagers, record_filter is cdbi, and no point of ordering a count
1469   delete $tmp_attrs->{$_} for (qw/select as rows offset page order_by record_filter/);
1470
1471   # overwrite the selector
1472   $tmp_attrs->{select} = { count => '*' };
1473
1474   my $tmp_rs = $source->resultset_class->new($source, $tmp_attrs);
1475   my ($count) = $tmp_rs->cursor->next;
1476
1477   # if the offset/rows attributes are still present, we did not use
1478   # a subquery, so we need to make the calculations in software
1479   $count -= $attrs->{offset} if $attrs->{offset};
1480   $count = $attrs->{rows} if $attrs->{rows} and $attrs->{rows} < $count;
1481   $count = 0 if ($count < 0);
1482
1483   return $count;
1484 }
1485
1486 sub count_grouped {
1487   my ($self, $source, $attrs) = @_;
1488
1489   # copy for the subquery, we need to do some adjustments to it too
1490   my $sub_attrs = { %$attrs };
1491
1492   # these can not go in the subquery, and there is no point of ordering it
1493   delete $sub_attrs->{$_} for qw/prefetch collapse select as order_by/;
1494
1495   # if we prefetch, we group_by primary keys only as this is what we would get out of the rs via ->next/->all
1496   # simply deleting group_by suffices, as the code below will re-fill it
1497   # Note: we check $attrs, as $sub_attrs has collapse deleted
1498   if (ref $attrs->{collapse} and keys %{$attrs->{collapse}} ) {
1499     delete $sub_attrs->{group_by};
1500   }
1501
1502   $sub_attrs->{group_by} ||= [ map { "$attrs->{alias}.$_" } ($source->primary_columns) ];
1503   $sub_attrs->{select} = $self->_grouped_count_select ($source, $sub_attrs);
1504
1505   $attrs->{from} = [{
1506     count_subq => $source->resultset_class->new ($source, $sub_attrs )->as_query
1507   }];
1508
1509   # the subquery replaces this
1510   delete $attrs->{$_} for qw/where bind prefetch collapse group_by having having_bind rows offset page pager/;
1511
1512   return $self->count ($source, $attrs);
1513 }
1514
1515 #
1516 # Returns a SELECT to go with a supplied GROUP BY
1517 # (caled by count_grouped so a group_by is present)
1518 # Most databases expect them to match, but some
1519 # choke in various ways.
1520 #
1521 sub _grouped_count_select {
1522   my ($self, $source, $rs_args) = @_;
1523   return $rs_args->{group_by};
1524 }
1525
1526 sub source_bind_attributes {
1527   my ($self, $source) = @_;
1528   
1529   my $bind_attributes;
1530   foreach my $column ($source->columns) {
1531   
1532     my $data_type = $source->column_info($column)->{data_type} || '';
1533     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1534      if $data_type;
1535   }
1536
1537   return $bind_attributes;
1538 }
1539
1540 =head2 select
1541
1542 =over 4
1543
1544 =item Arguments: $ident, $select, $condition, $attrs
1545
1546 =back
1547
1548 Handle a SQL select statement.
1549
1550 =cut
1551
1552 sub select {
1553   my $self = shift;
1554   my ($ident, $select, $condition, $attrs) = @_;
1555   return $self->cursor_class->new($self, \@_, $attrs);
1556 }
1557
1558 sub select_single {
1559   my $self = shift;
1560   my ($rv, $sth, @bind) = $self->_select(@_);
1561   my @row = $sth->fetchrow_array;
1562   my @nextrow = $sth->fetchrow_array if @row;
1563   if(@row && @nextrow) {
1564     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1565   }
1566   # Need to call finish() to work round broken DBDs
1567   $sth->finish();
1568   return @row;
1569 }
1570
1571 =head2 sth
1572
1573 =over 4
1574
1575 =item Arguments: $sql
1576
1577 =back
1578
1579 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1580
1581 =cut
1582
1583 sub _dbh_sth {
1584   my ($self, $dbh, $sql) = @_;
1585
1586   # 3 is the if_active parameter which avoids active sth re-use
1587   my $sth = $self->disable_sth_caching
1588     ? $dbh->prepare($sql)
1589     : $dbh->prepare_cached($sql, {}, 3);
1590
1591   # XXX You would think RaiseError would make this impossible,
1592   #  but apparently that's not true :(
1593   $self->throw_exception($dbh->errstr) if !$sth;
1594
1595   $sth;
1596 }
1597
1598 sub sth {
1599   my ($self, $sql) = @_;
1600   $self->dbh_do('_dbh_sth', $sql);
1601 }
1602
1603 sub _dbh_columns_info_for {
1604   my ($self, $dbh, $table) = @_;
1605
1606   if ($dbh->can('column_info')) {
1607     my %result;
1608     eval {
1609       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1610       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1611       $sth->execute();
1612       while ( my $info = $sth->fetchrow_hashref() ){
1613         my %column_info;
1614         $column_info{data_type}   = $info->{TYPE_NAME};
1615         $column_info{size}      = $info->{COLUMN_SIZE};
1616         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1617         $column_info{default_value} = $info->{COLUMN_DEF};
1618         my $col_name = $info->{COLUMN_NAME};
1619         $col_name =~ s/^\"(.*)\"$/$1/;
1620
1621         $result{$col_name} = \%column_info;
1622       }
1623     };
1624     return \%result if !$@ && scalar keys %result;
1625   }
1626
1627   my %result;
1628   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1629   $sth->execute;
1630   my @columns = @{$sth->{NAME_lc}};
1631   for my $i ( 0 .. $#columns ){
1632     my %column_info;
1633     $column_info{data_type} = $sth->{TYPE}->[$i];
1634     $column_info{size} = $sth->{PRECISION}->[$i];
1635     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1636
1637     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1638       $column_info{data_type} = $1;
1639       $column_info{size}    = $2;
1640     }
1641
1642     $result{$columns[$i]} = \%column_info;
1643   }
1644   $sth->finish;
1645
1646   foreach my $col (keys %result) {
1647     my $colinfo = $result{$col};
1648     my $type_num = $colinfo->{data_type};
1649     my $type_name;
1650     if(defined $type_num && $dbh->can('type_info')) {
1651       my $type_info = $dbh->type_info($type_num);
1652       $type_name = $type_info->{TYPE_NAME} if $type_info;
1653       $colinfo->{data_type} = $type_name if $type_name;
1654     }
1655   }
1656
1657   return \%result;
1658 }
1659
1660 sub columns_info_for {
1661   my ($self, $table) = @_;
1662   $self->dbh_do('_dbh_columns_info_for', $table);
1663 }
1664
1665 =head2 last_insert_id
1666
1667 Return the row id of the last insert.
1668
1669 =cut
1670
1671 sub _dbh_last_insert_id {
1672     # All Storage's need to register their own _dbh_last_insert_id
1673     # the old SQLite-based method was highly inappropriate
1674
1675     my $self = shift;
1676     my $class = ref $self;
1677     $self->throw_exception (<<EOE);
1678
1679 No _dbh_last_insert_id() method found in $class.
1680 Since the method of obtaining the autoincrement id of the last insert
1681 operation varies greatly between different databases, this method must be
1682 individually implemented for every storage class.
1683 EOE
1684 }
1685
1686 sub last_insert_id {
1687   my $self = shift;
1688   $self->dbh_do('_dbh_last_insert_id', @_);
1689 }
1690
1691 =head2 sqlt_type
1692
1693 Returns the database driver name.
1694
1695 =cut
1696
1697 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1698
1699 =head2 bind_attribute_by_data_type
1700
1701 Given a datatype from column info, returns a database specific bind
1702 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1703 let the database planner just handle it.
1704
1705 Generally only needed for special case column types, like bytea in postgres.
1706
1707 =cut
1708
1709 sub bind_attribute_by_data_type {
1710     return;
1711 }
1712
1713 =head2 is_datatype_numeric
1714
1715 Given a datatype from column_info, returns a boolean value indicating if
1716 the current RDBMS considers it a numeric value. This controls how
1717 L<DBIx::Class::Row/set_column> decides whether to mark the column as
1718 dirty - when the datatype is deemed numeric a C<< != >> comparison will
1719 be performed instead of the usual C<eq>.
1720
1721 =cut
1722
1723 sub is_datatype_numeric {
1724   my ($self, $dt) = @_;
1725
1726   return 0 unless $dt;
1727
1728   return $dt =~ /^ (?:
1729     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
1730   ) $/ix;
1731 }
1732
1733
1734 =head2 create_ddl_dir (EXPERIMENTAL)
1735
1736 =over 4
1737
1738 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1739
1740 =back
1741
1742 Creates a SQL file based on the Schema, for each of the specified
1743 database engines in C<\@databases> in the given directory.
1744 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
1745
1746 Given a previous version number, this will also create a file containing
1747 the ALTER TABLE statements to transform the previous schema into the
1748 current one. Note that these statements may contain C<DROP TABLE> or
1749 C<DROP COLUMN> statements that can potentially destroy data.
1750
1751 The file names are created using the C<ddl_filename> method below, please
1752 override this method in your schema if you would like a different file
1753 name format. For the ALTER file, the same format is used, replacing
1754 $version in the name with "$preversion-$version".
1755
1756 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
1757 The most common value for this would be C<< { add_drop_table => 1 } >>
1758 to have the SQL produced include a C<DROP TABLE> statement for each table
1759 created. For quoting purposes supply C<quote_table_names> and
1760 C<quote_field_names>.
1761
1762 If no arguments are passed, then the following default values are assumed:
1763
1764 =over 4
1765
1766 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
1767
1768 =item version    - $schema->schema_version
1769
1770 =item directory  - './'
1771
1772 =item preversion - <none>
1773
1774 =back
1775
1776 By default, C<\%sqlt_args> will have
1777
1778  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1779
1780 merged with the hash passed in. To disable any of those features, pass in a 
1781 hashref like the following
1782
1783  { ignore_constraint_names => 0, # ... other options }
1784
1785
1786 Note that this feature is currently EXPERIMENTAL and may not work correctly 
1787 across all databases, or fully handle complex relationships.
1788
1789 WARNING: Please check all SQL files created, before applying them.
1790
1791 =cut
1792
1793 sub create_ddl_dir {
1794   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1795
1796   if(!$dir || !-d $dir) {
1797     carp "No directory given, using ./\n";
1798     $dir = "./";
1799   }
1800   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1801   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1802
1803   my $schema_version = $schema->schema_version || '1.x';
1804   $version ||= $schema_version;
1805
1806   $sqltargs = {
1807     add_drop_table => 1, 
1808     ignore_constraint_names => 1,
1809     ignore_index_names => 1,
1810     %{$sqltargs || {}}
1811   };
1812
1813   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1814       . $self->_check_sqlt_message . q{'})
1815           if !$self->_check_sqlt_version;
1816
1817   my $sqlt = SQL::Translator->new( $sqltargs );
1818
1819   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1820   my $sqlt_schema = $sqlt->translate({ data => $schema })
1821     or $self->throw_exception ($sqlt->error);
1822
1823   foreach my $db (@$databases) {
1824     $sqlt->reset();
1825     $sqlt->{schema} = $sqlt_schema;
1826     $sqlt->producer($db);
1827
1828     my $file;
1829     my $filename = $schema->ddl_filename($db, $version, $dir);
1830     if (-e $filename && ($version eq $schema_version )) {
1831       # if we are dumping the current version, overwrite the DDL
1832       carp "Overwriting existing DDL file - $filename";
1833       unlink($filename);
1834     }
1835
1836     my $output = $sqlt->translate;
1837     if(!$output) {
1838       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1839       next;
1840     }
1841     if(!open($file, ">$filename")) {
1842       $self->throw_exception("Can't open $filename for writing ($!)");
1843       next;
1844     }
1845     print $file $output;
1846     close($file);
1847   
1848     next unless ($preversion);
1849
1850     require SQL::Translator::Diff;
1851
1852     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1853     if(!-e $prefilename) {
1854       carp("No previous schema file found ($prefilename)");
1855       next;
1856     }
1857
1858     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1859     if(-e $difffile) {
1860       carp("Overwriting existing diff file - $difffile");
1861       unlink($difffile);
1862     }
1863     
1864     my $source_schema;
1865     {
1866       my $t = SQL::Translator->new($sqltargs);
1867       $t->debug( 0 );
1868       $t->trace( 0 );
1869
1870       $t->parser( $db )
1871         or $self->throw_exception ($t->error);
1872
1873       my $out = $t->translate( $prefilename )
1874         or $self->throw_exception ($t->error);
1875
1876       $source_schema = $t->schema;
1877
1878       $source_schema->name( $prefilename )
1879         unless ( $source_schema->name );
1880     }
1881
1882     # The "new" style of producers have sane normalization and can support 
1883     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1884     # And we have to diff parsed SQL against parsed SQL.
1885     my $dest_schema = $sqlt_schema;
1886
1887     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1888       my $t = SQL::Translator->new($sqltargs);
1889       $t->debug( 0 );
1890       $t->trace( 0 );
1891
1892       $t->parser( $db )
1893         or $self->throw_exception ($t->error);
1894
1895       my $out = $t->translate( $filename )
1896         or $self->throw_exception ($t->error);
1897
1898       $dest_schema = $t->schema;
1899
1900       $dest_schema->name( $filename )
1901         unless $dest_schema->name;
1902     }
1903     
1904     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1905                                                   $dest_schema,   $db,
1906                                                   $sqltargs
1907                                                  );
1908     if(!open $file, ">$difffile") { 
1909       $self->throw_exception("Can't write to $difffile ($!)");
1910       next;
1911     }
1912     print $file $diff;
1913     close($file);
1914   }
1915 }
1916
1917 =head2 deployment_statements
1918
1919 =over 4
1920
1921 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1922
1923 =back
1924
1925 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1926
1927 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
1928 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
1929
1930 C<$directory> is used to return statements from files in a previously created
1931 L</create_ddl_dir> directory and is optional. The filenames are constructed
1932 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1933
1934 If no C<$directory> is specified then the statements are constructed on the
1935 fly using L<SQL::Translator> and C<$version> is ignored.
1936
1937 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1938
1939 =cut
1940
1941 sub deployment_statements {
1942   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1943   # Need to be connected to get the correct sqlt_type
1944   $self->ensure_connected() unless $type;
1945   $type ||= $self->sqlt_type;
1946   $version ||= $schema->schema_version || '1.x';
1947   $dir ||= './';
1948   my $filename = $schema->ddl_filename($type, $version, $dir);
1949   if(-f $filename)
1950   {
1951       my $file;
1952       open($file, "<$filename") 
1953         or $self->throw_exception("Can't open $filename ($!)");
1954       my @rows = <$file>;
1955       close($file);
1956       return join('', @rows);
1957   }
1958
1959   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
1960       . $self->_check_sqlt_message . q{'})
1961           if !$self->_check_sqlt_version;
1962
1963   require SQL::Translator::Parser::DBIx::Class;
1964   eval qq{use SQL::Translator::Producer::${type}};
1965   $self->throw_exception($@) if $@;
1966
1967   # sources needs to be a parser arg, but for simplicty allow at top level 
1968   # coming in
1969   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1970       if exists $sqltargs->{sources};
1971
1972   my $tr = SQL::Translator->new(%$sqltargs);
1973   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1974   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1975 }
1976
1977 sub deploy {
1978   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1979   my $deploy = sub {
1980     my $line = shift;
1981     return if($line =~ /^--/);
1982     return if(!$line);
1983     # next if($line =~ /^DROP/m);
1984     return if($line =~ /^BEGIN TRANSACTION/m);
1985     return if($line =~ /^COMMIT/m);
1986     return if $line =~ /^\s+$/; # skip whitespace only
1987     $self->_query_start($line);
1988     eval {
1989       $self->dbh->do($line); # shouldn't be using ->dbh ?
1990     };
1991     if ($@) {
1992       carp qq{$@ (running "${line}")};
1993     }
1994     $self->_query_end($line);
1995   };
1996   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
1997   if (@statements > 1) {
1998     foreach my $statement (@statements) {
1999       $deploy->( $statement );
2000     }
2001   }
2002   elsif (@statements == 1) {
2003     foreach my $line ( split(";\n", $statements[0])) {
2004       $deploy->( $line );
2005     }
2006   }
2007 }
2008
2009 =head2 datetime_parser
2010
2011 Returns the datetime parser class
2012
2013 =cut
2014
2015 sub datetime_parser {
2016   my $self = shift;
2017   return $self->{datetime_parser} ||= do {
2018     $self->ensure_connected;
2019     $self->build_datetime_parser(@_);
2020   };
2021 }
2022
2023 =head2 datetime_parser_type
2024
2025 Defines (returns) the datetime parser class - currently hardwired to
2026 L<DateTime::Format::MySQL>
2027
2028 =cut
2029
2030 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2031
2032 =head2 build_datetime_parser
2033
2034 See L</datetime_parser>
2035
2036 =cut
2037
2038 sub build_datetime_parser {
2039   my $self = shift;
2040   my $type = $self->datetime_parser_type(@_);
2041   eval "use ${type}";
2042   $self->throw_exception("Couldn't load ${type}: $@") if $@;
2043   return $type;
2044 }
2045
2046 {
2047     my $_check_sqlt_version; # private
2048     my $_check_sqlt_message; # private
2049     sub _check_sqlt_version {
2050         return $_check_sqlt_version if defined $_check_sqlt_version;
2051         eval 'use SQL::Translator "0.09003"';
2052         $_check_sqlt_message = $@ || '';
2053         $_check_sqlt_version = !$@;
2054     }
2055
2056     sub _check_sqlt_message {
2057         _check_sqlt_version if !defined $_check_sqlt_message;
2058         $_check_sqlt_message;
2059     }
2060 }
2061
2062 =head2 is_replicating
2063
2064 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2065 replicate from a master database.  Default is undef, which is the result
2066 returned by databases that don't support replication.
2067
2068 =cut
2069
2070 sub is_replicating {
2071     return;
2072     
2073 }
2074
2075 =head2 lag_behind_master
2076
2077 Returns a number that represents a certain amount of lag behind a master db
2078 when a given storage is replicating.  The number is database dependent, but
2079 starts at zero and increases with the amount of lag. Default in undef
2080
2081 =cut
2082
2083 sub lag_behind_master {
2084     return;
2085 }
2086
2087 sub DESTROY {
2088   my $self = shift;
2089   return if !$self->_dbh;
2090   $self->_verify_pid;
2091   $self->_dbh(undef);
2092 }
2093
2094 1;
2095
2096 =head1 USAGE NOTES
2097
2098 =head2 DBIx::Class and AutoCommit
2099
2100 DBIx::Class can do some wonderful magic with handling exceptions,
2101 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2102 combined with C<txn_do> for transaction support.
2103
2104 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2105 in an assumed transaction between commits, and you're telling us you'd
2106 like to manage that manually.  A lot of the magic protections offered by
2107 this module will go away.  We can't protect you from exceptions due to database
2108 disconnects because we don't know anything about how to restart your
2109 transactions.  You're on your own for handling all sorts of exceptional
2110 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2111 be with raw DBI.
2112
2113
2114
2115 =head1 AUTHORS
2116
2117 Matt S. Trout <mst@shadowcatsystems.co.uk>
2118
2119 Andy Grundman <andy@hybridized.org>
2120
2121 =head1 LICENSE
2122
2123 You may distribute this code under the same terms as Perl itself.
2124
2125 =cut