Merge 'trunk' into 'on_connect_call'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use Carp::Clan qw/^DBIx::Class/;
9 use DBI;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use Scalar::Util();
13 use List::Util();
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
17        _conn_pid _conn_tid transaction_depth _dbh_autocommit _on_connect_do
18        _on_disconnect_do _on_connect_do_store _on_disconnect_do_store
19        savepoints/
20 );
21
22 # the values for these accessors are picked out (and deleted) from
23 # the attribute hashref passed to connect_info
24 my @storage_options = qw/
25   on_connect_call on_disconnect_call disable_sth_caching unsafe auto_savepoint
26 /;
27 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
28
29
30 # default cursor class, overridable in connect_info attributes
31 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
32
33 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
34 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
35
36
37 =head1 NAME
38
39 DBIx::Class::Storage::DBI - DBI storage handler
40
41 =head1 SYNOPSIS
42
43   my $schema = MySchema->connect('dbi:SQLite:my.db');
44
45   $schema->storage->debug(1);
46   $schema->dbh_do("DROP TABLE authors");
47
48   $schema->resultset('Book')->search({
49      written_on => $schema->storage->datetime_parser(DateTime->now)
50   });
51
52 =head1 DESCRIPTION
53
54 This class represents the connection to an RDBMS via L<DBI>.  See
55 L<DBIx::Class::Storage> for general information.  This pod only
56 documents DBI-specific methods and behaviors.
57
58 =head1 METHODS
59
60 =cut
61
62 sub new {
63   my $new = shift->next::method(@_);
64
65   $new->transaction_depth(0);
66   $new->_sql_maker_opts({});
67   $new->{savepoints} = [];
68   $new->{_in_dbh_do} = 0;
69   $new->{_dbh_gen} = 0;
70
71   $new;
72 }
73
74 =head2 connect_info
75
76 This method is normally called by L<DBIx::Class::Schema/connection>, which
77 encapsulates its argument list in an arrayref before passing them here.
78
79 The argument list may contain:
80
81 =over
82
83 =item *
84
85 The same 4-element argument set one would normally pass to
86 L<DBI/connect>, optionally followed by
87 L<extra attributes|/DBIx::Class specific connection attributes>
88 recognized by DBIx::Class:
89
90   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
91
92 =item *
93
94 A single code reference which returns a connected 
95 L<DBI database handle|DBI/connect> optionally followed by 
96 L<extra attributes|/DBIx::Class specific connection attributes> recognized
97 by DBIx::Class:
98
99   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
100
101 =item *
102
103 A single hashref with all the attributes and the dsn/user/password
104 mixed together:
105
106   $connect_info_args = [{
107     dsn => $dsn,
108     user => $user,
109     password => $pass,
110     %dbi_attributes,
111     %extra_attributes,
112   }];
113
114 This is particularly useful for L<Catalyst> based applications, allowing the 
115 following config (L<Config::General> style):
116
117   <Model::DB>
118     schema_class   App::DB
119     <connect_info>
120       dsn          dbi:mysql:database=test
121       user         testuser
122       password     TestPass
123       AutoCommit   1
124     </connect_info>
125   </Model::DB>
126
127 =back
128
129 Please note that the L<DBI> docs recommend that you always explicitly
130 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
131 recommends that it be set to I<1>, and that you perform transactions
132 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
133 to I<1> if you do not do explicitly set it to zero.  This is the default 
134 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
135
136 =head3 DBIx::Class specific connection attributes
137
138 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
139 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
140 the following connection options. These options can be mixed in with your other
141 L<DBI> connection attributes, or placed in a seperate hashref
142 (C<\%extra_attributes>) as shown above.
143
144 Every time C<connect_info> is invoked, any previous settings for
145 these options will be cleared before setting the new ones, regardless of
146 whether any options are specified in the new C<connect_info>.
147
148
149 =over
150
151 =item on_connect_do
152
153 Specifies things to do immediately after connecting or re-connecting to
154 the database.  Its value may contain:
155
156 =over
157
158 =item a scalar
159
160 This contains one SQL statement to execute.
161
162 =item an array reference
163
164 This contains SQL statements to execute in order.  Each element contains
165 a string or a code reference that returns a string.
166
167 =item a code reference
168
169 This contains some code to execute.  Unlike code references within an
170 array reference, its return value is ignored.
171
172 =back
173
174 =item on_disconnect_do
175
176 Takes arguments in the same form as L</on_connect_do> and executes them
177 immediately before disconnecting from the database.
178
179 Note, this only runs if you explicitly call L</disconnect> on the
180 storage object.
181
182 =item on_connect_call
183
184 A more generalized form of L</on_connect_do> that calls the specified
185 C<connect_call_METHOD> methods in your storage driver.
186
187   on_connect_do => 'select 1'
188
189 is equivalent to:
190
191   on_connect_call => [ [ do_sql => 'select 1' ] ]
192
193 Its values may contain:
194
195 =over
196
197 =item a scalar
198
199 Will call the C<connect_call_METHOD> method.
200
201 =item a code reference
202
203 Will execute C<< $code->($storage) >>
204
205 =item an array reference
206
207 Each value can be a method name or code reference.
208
209 =item an array of arrays
210
211 For each array, the first item is taken to be the C<connect_call_> method name
212 or code reference, and the rest are parameters to it.
213
214 =back
215
216 Some predefined storage methods you may use:
217
218 =over
219
220 =item do_sql
221
222 Executes a SQL string or a code reference that returns a SQL string. This is
223 what L</on_connect_do> and L</on_disconnect_do> use.
224
225 It can take:
226
227 =over
228
229 =item a scalar
230
231 Will execute the scalar as SQL.
232
233 =item an arrayref
234
235 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
236 attributes hashref and bind values.
237
238 =item a code reference
239
240 Will execute C<< $code->($storage) >> and execute the return array refs as
241 above.
242
243 =back
244
245 =item datetime_setup
246
247 Execute any statements necessary to initialize the database session to return
248 and accept datetime/timestamp values used with
249 L<DBIx::Class::InflateColumn::DateTime>.
250
251 Only necessary for some databases, see your specific storage driver for
252 implementation details.
253
254 =back
255
256 =item on_disconnect_call
257
258 Takes arguments in the same form as L</on_connect_call> and executes them
259 immediately before disconnecting from the database.
260
261 Calls the C<disconnect_call_METHOD> methods as opposed to the
262 C<connect_call_METHOD> methods called by L</on_connect_call>.
263
264 Note, this only runs if you explicitly call L</disconnect> on the
265 storage object.
266
267 =item disable_sth_caching
268
269 If set to a true value, this option will disable the caching of
270 statement handles via L<DBI/prepare_cached>.
271
272 =item limit_dialect 
273
274 Sets the limit dialect. This is useful for JDBC-bridge among others
275 where the remote SQL-dialect cannot be determined by the name of the
276 driver alone. See also L<SQL::Abstract::Limit>.
277
278 =item quote_char
279
280 Specifies what characters to use to quote table and column names. If 
281 you use this you will want to specify L</name_sep> as well.
282
283 C<quote_char> expects either a single character, in which case is it
284 is placed on either side of the table/column name, or an arrayref of length
285 2 in which case the table/column name is placed between the elements.
286
287 For example under MySQL you should use C<< quote_char => '`' >>, and for
288 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
289
290 =item name_sep
291
292 This only needs to be used in conjunction with C<quote_char>, and is used to 
293 specify the charecter that seperates elements (schemas, tables, columns) from 
294 each other. In most cases this is simply a C<.>.
295
296 The consequences of not supplying this value is that L<SQL::Abstract>
297 will assume DBIx::Class' uses of aliases to be complete column
298 names. The output will look like I<"me.name"> when it should actually
299 be I<"me"."name">.
300
301 =item unsafe
302
303 This Storage driver normally installs its own C<HandleError>, sets
304 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
305 all database handles, including those supplied by a coderef.  It does this
306 so that it can have consistent and useful error behavior.
307
308 If you set this option to a true value, Storage will not do its usual
309 modifications to the database handle's attributes, and instead relies on
310 the settings in your connect_info DBI options (or the values you set in
311 your connection coderef, in the case that you are connecting via coderef).
312
313 Note that your custom settings can cause Storage to malfunction,
314 especially if you set a C<HandleError> handler that suppresses exceptions
315 and/or disable C<RaiseError>.
316
317 =item auto_savepoint
318
319 If this option is true, L<DBIx::Class> will use savepoints when nesting
320 transactions, making it possible to recover from failure in the inner
321 transaction without having to abort all outer transactions.
322
323 =item cursor_class
324
325 Use this argument to supply a cursor class other than the default
326 L<DBIx::Class::Storage::DBI::Cursor>.
327
328 =back
329
330 Some real-life examples of arguments to L</connect_info> and
331 L<DBIx::Class::Schema/connect>
332
333   # Simple SQLite connection
334   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
335
336   # Connect via subref
337   ->connect_info([ sub { DBI->connect(...) } ]);
338
339   # A bit more complicated
340   ->connect_info(
341     [
342       'dbi:Pg:dbname=foo',
343       'postgres',
344       'my_pg_password',
345       { AutoCommit => 1 },
346       { quote_char => q{"}, name_sep => q{.} },
347     ]
348   );
349
350   # Equivalent to the previous example
351   ->connect_info(
352     [
353       'dbi:Pg:dbname=foo',
354       'postgres',
355       'my_pg_password',
356       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
357     ]
358   );
359
360   # Same, but with hashref as argument
361   # See parse_connect_info for explanation
362   ->connect_info(
363     [{
364       dsn         => 'dbi:Pg:dbname=foo',
365       user        => 'postgres',
366       password    => 'my_pg_password',
367       AutoCommit  => 1,
368       quote_char  => q{"},
369       name_sep    => q{.},
370     }]
371   );
372
373   # Subref + DBIx::Class-specific connection options
374   ->connect_info(
375     [
376       sub { DBI->connect(...) },
377       {
378           quote_char => q{`},
379           name_sep => q{@},
380           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
381           disable_sth_caching => 1,
382       },
383     ]
384   );
385
386
387
388 =cut
389
390 sub connect_info {
391   my ($self, $info_arg) = @_;
392
393   return $self->_connect_info if !$info_arg;
394
395   my @args = @$info_arg;  # take a shallow copy for further mutilation
396   $self->_connect_info([@args]); # copy for _connect_info
397
398
399   # combine/pre-parse arguments depending on invocation style
400
401   my %attrs;
402   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
403     %attrs = %{ $args[1] || {} };
404     @args = $args[0];
405   }
406   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
407     %attrs = %{$args[0]};
408     @args = ();
409     for (qw/password user dsn/) {
410       unshift @args, delete $attrs{$_};
411     }
412   }
413   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
414     %attrs = (
415       % { $args[3] || {} },
416       % { $args[4] || {} },
417     );
418     @args = @args[0,1,2];
419   }
420
421   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
422   #  the new set of options
423   $self->_sql_maker(undef);
424   $self->_sql_maker_opts({});
425
426   if(keys %attrs) {
427     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
428       if(my $value = delete $attrs{$storage_opt}) {
429         $self->$storage_opt($value);
430       }
431     }
432     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
433       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
434         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
435       }
436     }
437     for my $connect_do_opt (qw/on_connect_do on_disconnect_do/) {
438       if(my $opt_val = delete $attrs{$connect_do_opt}) {
439         $self->$connect_do_opt($opt_val);
440       }
441     }
442   }
443
444   %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
445
446   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
447   $self->_connect_info;
448 }
449
450 =head2 on_connect_do
451
452 This method is deprecated in favour of setting via L</connect_info>.
453
454 =cut
455
456 sub on_connect_do {
457   my $self = shift;
458   $self->_setup_connect_do(on_connect_do => @_);
459 }
460
461 =head2 on_disconnect_do
462
463 This method is deprecated in favour of setting via L</connect_info>.
464
465 =cut
466
467 sub on_disconnect_do {
468   my $self = shift;
469   $self->_setup_connect_do(on_disconnect_do => @_);
470 }
471
472 sub _setup_connect_do {
473   my ($self, $opt) = (shift, shift);
474
475   my $accessor = "_$opt";
476   my $store    = "_${opt}_store";
477
478   return $self->$accessor if not @_;
479
480   my $val = shift;
481
482   if (not defined $val) {
483     $self->$accessor(undef);
484     $self->$store(undef);
485     return;
486   }
487
488   my @store;
489
490   if (not ref($val)) {
491     push @store, [ 'do_sql', $val ];
492   } elsif (ref($val) eq 'CODE') {
493     push @store, $val;
494   } elsif (ref($val) eq 'ARRAY') {
495     push @store, map [ 'do_sql', $_ ], @$val;
496   } else {
497     $self->throw_exception("Invalid type for $opt ".ref($val));
498   }
499
500   $self->$store(\@store);
501   $self->$accessor($val);
502 }
503
504 =head2 dbh_do
505
506 Arguments: ($subref | $method_name), @extra_coderef_args?
507
508 Execute the given $subref or $method_name using the new exception-based
509 connection management.
510
511 The first two arguments will be the storage object that C<dbh_do> was called
512 on and a database handle to use.  Any additional arguments will be passed
513 verbatim to the called subref as arguments 2 and onwards.
514
515 Using this (instead of $self->_dbh or $self->dbh) ensures correct
516 exception handling and reconnection (or failover in future subclasses).
517
518 Your subref should have no side-effects outside of the database, as
519 there is the potential for your subref to be partially double-executed
520 if the database connection was stale/dysfunctional.
521
522 Example:
523
524   my @stuff = $schema->storage->dbh_do(
525     sub {
526       my ($storage, $dbh, @cols) = @_;
527       my $cols = join(q{, }, @cols);
528       $dbh->selectrow_array("SELECT $cols FROM foo");
529     },
530     @column_list
531   );
532
533 =cut
534
535 sub dbh_do {
536   my $self = shift;
537   my $code = shift;
538
539   my $dbh = $self->_dbh;
540
541   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
542       || $self->{transaction_depth};
543
544   local $self->{_in_dbh_do} = 1;
545
546   my @result;
547   my $want_array = wantarray;
548
549   eval {
550     $self->_verify_pid if $dbh;
551     if(!$self->_dbh) {
552         $self->_populate_dbh;
553         $dbh = $self->_dbh;
554     }
555
556     if($want_array) {
557         @result = $self->$code($dbh, @_);
558     }
559     elsif(defined $want_array) {
560         $result[0] = $self->$code($dbh, @_);
561     }
562     else {
563         $self->$code($dbh, @_);
564     }
565   };
566
567   my $exception = $@;
568   if(!$exception) { return $want_array ? @result : $result[0] }
569
570   $self->throw_exception($exception) if $self->connected;
571
572   # We were not connected - reconnect and retry, but let any
573   #  exception fall right through this time
574   $self->_populate_dbh;
575   $self->$code($self->_dbh, @_);
576 }
577
578 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
579 # It also informs dbh_do to bypass itself while under the direction of txn_do,
580 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
581 sub txn_do {
582   my $self = shift;
583   my $coderef = shift;
584
585   ref $coderef eq 'CODE' or $self->throw_exception
586     ('$coderef must be a CODE reference');
587
588   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
589
590   local $self->{_in_dbh_do} = 1;
591
592   my @result;
593   my $want_array = wantarray;
594
595   my $tried = 0;
596   while(1) {
597     eval {
598       $self->_verify_pid if $self->_dbh;
599       $self->_populate_dbh if !$self->_dbh;
600
601       $self->txn_begin;
602       if($want_array) {
603           @result = $coderef->(@_);
604       }
605       elsif(defined $want_array) {
606           $result[0] = $coderef->(@_);
607       }
608       else {
609           $coderef->(@_);
610       }
611       $self->txn_commit;
612     };
613
614     my $exception = $@;
615     if(!$exception) { return $want_array ? @result : $result[0] }
616
617     if($tried++ > 0 || $self->connected) {
618       eval { $self->txn_rollback };
619       my $rollback_exception = $@;
620       if($rollback_exception) {
621         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
622         $self->throw_exception($exception)  # propagate nested rollback
623           if $rollback_exception =~ /$exception_class/;
624
625         $self->throw_exception(
626           "Transaction aborted: ${exception}. "
627           . "Rollback failed: ${rollback_exception}"
628         );
629       }
630       $self->throw_exception($exception)
631     }
632
633     # We were not connected, and was first try - reconnect and retry
634     # via the while loop
635     $self->_populate_dbh;
636   }
637 }
638
639 =head2 disconnect
640
641 Our C<disconnect> method also performs a rollback first if the
642 database is not in C<AutoCommit> mode.
643
644 =cut
645
646 sub disconnect {
647   my ($self) = @_;
648
649   if( $self->connected ) {
650     if (my $connection_call = $self->on_disconnect_call) {
651       $self->_do_connection_actions(disconnect_call_ => $connection_call)
652     }
653     if (my $connection_do   = $self->_on_disconnect_do_store) {
654       $self->_do_connection_actions(disconnect_call_ => $connection_do)
655     }
656
657     $self->_dbh->rollback unless $self->_dbh_autocommit;
658     $self->_dbh->disconnect;
659     $self->_dbh(undef);
660     $self->{_dbh_gen}++;
661   }
662 }
663
664 =head2 with_deferred_fk_checks
665
666 =over 4
667
668 =item Arguments: C<$coderef>
669
670 =item Return Value: The return value of $coderef
671
672 =back
673
674 Storage specific method to run the code ref with FK checks deferred or
675 in MySQL's case disabled entirely.
676
677 =cut
678
679 # Storage subclasses should override this
680 sub with_deferred_fk_checks {
681   my ($self, $sub) = @_;
682
683   $sub->();
684 }
685
686 sub connected {
687   my ($self) = @_;
688
689   if(my $dbh = $self->_dbh) {
690       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
691           $self->_dbh(undef);
692           $self->{_dbh_gen}++;
693           return;
694       }
695       else {
696           $self->_verify_pid;
697           return 0 if !$self->_dbh;
698       }
699       return ($dbh->FETCH('Active') && $dbh->ping);
700   }
701
702   return 0;
703 }
704
705 # handle pid changes correctly
706 #  NOTE: assumes $self->_dbh is a valid $dbh
707 sub _verify_pid {
708   my ($self) = @_;
709
710   return if defined $self->_conn_pid && $self->_conn_pid == $$;
711
712   $self->_dbh->{InactiveDestroy} = 1;
713   $self->_dbh(undef);
714   $self->{_dbh_gen}++;
715
716   return;
717 }
718
719 sub ensure_connected {
720   my ($self) = @_;
721
722   unless ($self->connected) {
723     $self->_populate_dbh;
724   }
725 }
726
727 =head2 dbh
728
729 Returns the dbh - a data base handle of class L<DBI>.
730
731 =cut
732
733 sub dbh {
734   my ($self) = @_;
735
736   $self->ensure_connected;
737   return $self->_dbh;
738 }
739
740 sub _sql_maker_args {
741     my ($self) = @_;
742     
743     return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
744 }
745
746 sub sql_maker {
747   my ($self) = @_;
748   unless ($self->_sql_maker) {
749     my $sql_maker_class = $self->sql_maker_class;
750     $self->ensure_class_loaded ($sql_maker_class);
751     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
752   }
753   return $self->_sql_maker;
754 }
755
756 sub _rebless {}
757
758 sub _populate_dbh {
759   my ($self) = @_;
760   my @info = @{$self->_dbi_connect_info || []};
761   $self->_dbh($self->_connect(@info));
762
763   $self->_conn_pid($$);
764   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
765
766   $self->_determine_driver;
767
768   # Always set the transaction depth on connect, since
769   #  there is no transaction in progress by definition
770   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
771
772   if (my $connection_call = $self->on_connect_call) {
773     $self->_do_connection_actions(connect_call_ => $connection_call)
774   }
775   if (my $connection_do = $self->_on_connect_do_store) {
776     $self->_do_connection_actions(connect_call_ => $connection_do)
777   }
778 }
779
780 sub _determine_driver {
781   my ($self) = @_;
782
783   if (ref $self eq 'DBIx::Class::Storage::DBI') {
784     my $driver;
785
786     if ($self->_dbh) { # we are connected
787       $driver = $self->_dbh->{Driver}{Name};
788     } else {
789       # try to use dsn to not require being connected, the driver may still
790       # force a connection in _rebless to determine version
791       ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
792     }
793
794     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
795       bless $self, "DBIx::Class::Storage::DBI::${driver}";
796       $self->_rebless();
797     }
798   }
799 }
800
801 sub _do_connection_actions {
802   my $self          = shift;
803   my $method_prefix = shift;
804   my $call          = shift;
805
806   if (not ref($call)) {
807     my $method = $method_prefix . $call;
808     $self->$method(@_);
809   } elsif (ref($call) eq 'CODE') {
810     $self->$call(@_);
811   } elsif (ref($call) eq 'ARRAY') {
812     if (ref($call->[0]) ne 'ARRAY') {
813       $self->_do_connection_actions($method_prefix, $_) for @$call;
814     } else {
815       $self->_do_connection_actions($method_prefix, @$_) for @$call;
816     }
817   } else {
818     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
819   }
820
821   return $self;
822 }
823
824 sub connect_call_do_sql {
825   my $self = shift;
826   $self->_do_query(@_);
827 }
828
829 sub disconnect_call_do_sql {
830   my $self = shift;
831   $self->_do_query(@_);
832 }
833
834 # override in db-specific backend when necessary
835 sub connect_call_datetime_setup { 1 }
836
837 sub _do_query {
838   my ($self, $action) = @_;
839
840   if (ref $action eq 'CODE') {
841     $action = $action->($self);
842     $self->_do_query($_) foreach @$action;
843   }
844   else {
845     # Most debuggers expect ($sql, @bind), so we need to exclude
846     # the attribute hash which is the second argument to $dbh->do
847     # furthermore the bind values are usually to be presented
848     # as named arrayref pairs, so wrap those here too
849     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
850     my $sql = shift @do_args;
851     my $attrs = shift @do_args;
852     my @bind = map { [ undef, $_ ] } @do_args;
853
854     $self->_query_start($sql, @bind);
855     $self->_dbh->do($sql, $attrs, @do_args);
856     $self->_query_end($sql, @bind);
857   }
858
859   return $self;
860 }
861
862 sub _connect {
863   my ($self, @info) = @_;
864
865   $self->throw_exception("You failed to provide any connection info")
866     if !@info;
867
868   my ($old_connect_via, $dbh);
869
870   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
871     $old_connect_via = $DBI::connect_via;
872     $DBI::connect_via = 'connect';
873   }
874
875   eval {
876     if(ref $info[0] eq 'CODE') {
877        $dbh = &{$info[0]}
878     }
879     else {
880        $dbh = DBI->connect(@info);
881     }
882
883     if($dbh && !$self->unsafe) {
884       my $weak_self = $self;
885       Scalar::Util::weaken($weak_self);
886       $dbh->{HandleError} = sub {
887           if ($weak_self) {
888             $weak_self->throw_exception("DBI Exception: $_[0]");
889           }
890           else {
891             croak ("DBI Exception: $_[0]");
892           }
893       };
894       $dbh->{ShowErrorStatement} = 1;
895       $dbh->{RaiseError} = 1;
896       $dbh->{PrintError} = 0;
897     }
898   };
899
900   $DBI::connect_via = $old_connect_via if $old_connect_via;
901
902   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
903     if !$dbh || $@;
904
905   $self->_dbh_autocommit($dbh->{AutoCommit});
906
907   $dbh;
908 }
909
910 sub svp_begin {
911   my ($self, $name) = @_;
912
913   $name = $self->_svp_generate_name
914     unless defined $name;
915
916   $self->throw_exception ("You can't use savepoints outside a transaction")
917     if $self->{transaction_depth} == 0;
918
919   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
920     unless $self->can('_svp_begin');
921   
922   push @{ $self->{savepoints} }, $name;
923
924   $self->debugobj->svp_begin($name) if $self->debug;
925   
926   return $self->_svp_begin($name);
927 }
928
929 sub svp_release {
930   my ($self, $name) = @_;
931
932   $self->throw_exception ("You can't use savepoints outside a transaction")
933     if $self->{transaction_depth} == 0;
934
935   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
936     unless $self->can('_svp_release');
937
938   if (defined $name) {
939     $self->throw_exception ("Savepoint '$name' does not exist")
940       unless grep { $_ eq $name } @{ $self->{savepoints} };
941
942     # Dig through the stack until we find the one we are releasing.  This keeps
943     # the stack up to date.
944     my $svp;
945
946     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
947   } else {
948     $name = pop @{ $self->{savepoints} };
949   }
950
951   $self->debugobj->svp_release($name) if $self->debug;
952
953   return $self->_svp_release($name);
954 }
955
956 sub svp_rollback {
957   my ($self, $name) = @_;
958
959   $self->throw_exception ("You can't use savepoints outside a transaction")
960     if $self->{transaction_depth} == 0;
961
962   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
963     unless $self->can('_svp_rollback');
964
965   if (defined $name) {
966       # If they passed us a name, verify that it exists in the stack
967       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
968           $self->throw_exception("Savepoint '$name' does not exist!");
969       }
970
971       # Dig through the stack until we find the one we are releasing.  This keeps
972       # the stack up to date.
973       while(my $s = pop(@{ $self->{savepoints} })) {
974           last if($s eq $name);
975       }
976       # Add the savepoint back to the stack, as a rollback doesn't remove the
977       # named savepoint, only everything after it.
978       push(@{ $self->{savepoints} }, $name);
979   } else {
980       # We'll assume they want to rollback to the last savepoint
981       $name = $self->{savepoints}->[-1];
982   }
983
984   $self->debugobj->svp_rollback($name) if $self->debug;
985   
986   return $self->_svp_rollback($name);
987 }
988
989 sub _svp_generate_name {
990     my ($self) = @_;
991
992     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
993 }
994
995 sub txn_begin {
996   my $self = shift;
997   $self->ensure_connected();
998   if($self->{transaction_depth} == 0) {
999     $self->debugobj->txn_begin()
1000       if $self->debug;
1001     # this isn't ->_dbh-> because
1002     #  we should reconnect on begin_work
1003     #  for AutoCommit users
1004     $self->dbh->begin_work;
1005   } elsif ($self->auto_savepoint) {
1006     $self->svp_begin;
1007   }
1008   $self->{transaction_depth}++;
1009 }
1010
1011 sub txn_commit {
1012   my $self = shift;
1013   if ($self->{transaction_depth} == 1) {
1014     my $dbh = $self->_dbh;
1015     $self->debugobj->txn_commit()
1016       if ($self->debug);
1017     $dbh->commit;
1018     $self->{transaction_depth} = 0
1019       if $self->_dbh_autocommit;
1020   }
1021   elsif($self->{transaction_depth} > 1) {
1022     $self->{transaction_depth}--;
1023     $self->svp_release
1024       if $self->auto_savepoint;
1025   }
1026 }
1027
1028 sub txn_rollback {
1029   my $self = shift;
1030   my $dbh = $self->_dbh;
1031   eval {
1032     if ($self->{transaction_depth} == 1) {
1033       $self->debugobj->txn_rollback()
1034         if ($self->debug);
1035       $self->{transaction_depth} = 0
1036         if $self->_dbh_autocommit;
1037       $dbh->rollback;
1038     }
1039     elsif($self->{transaction_depth} > 1) {
1040       $self->{transaction_depth}--;
1041       if ($self->auto_savepoint) {
1042         $self->svp_rollback;
1043         $self->svp_release;
1044       }
1045     }
1046     else {
1047       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1048     }
1049   };
1050   if ($@) {
1051     my $error = $@;
1052     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1053     $error =~ /$exception_class/ and $self->throw_exception($error);
1054     # ensure that a failed rollback resets the transaction depth
1055     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1056     $self->throw_exception($error);
1057   }
1058 }
1059
1060 # This used to be the top-half of _execute.  It was split out to make it
1061 #  easier to override in NoBindVars without duping the rest.  It takes up
1062 #  all of _execute's args, and emits $sql, @bind.
1063 sub _prep_for_execute {
1064   my ($self, $op, $extra_bind, $ident, $args) = @_;
1065
1066   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1067     $ident = $ident->from();
1068   }
1069
1070   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1071
1072   unshift(@bind,
1073     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1074       if $extra_bind;
1075   return ($sql, \@bind);
1076 }
1077
1078
1079 sub _fix_bind_params {
1080     my ($self, @bind) = @_;
1081
1082     ### Turn @bind from something like this:
1083     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1084     ### to this:
1085     ###   ( "'1'", "'1'", "'3'" )
1086     return
1087         map {
1088             if ( defined( $_ && $_->[1] ) ) {
1089                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1090             }
1091             else { q{'NULL'}; }
1092         } @bind;
1093 }
1094
1095 sub _query_start {
1096     my ( $self, $sql, @bind ) = @_;
1097
1098     if ( $self->debug ) {
1099         @bind = $self->_fix_bind_params(@bind);
1100
1101         $self->debugobj->query_start( $sql, @bind );
1102     }
1103 }
1104
1105 sub _query_end {
1106     my ( $self, $sql, @bind ) = @_;
1107
1108     if ( $self->debug ) {
1109         @bind = $self->_fix_bind_params(@bind);
1110         $self->debugobj->query_end( $sql, @bind );
1111     }
1112 }
1113
1114 sub _dbh_execute {
1115   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1116
1117   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1118
1119   $self->_query_start( $sql, @$bind );
1120
1121   my $sth = $self->sth($sql,$op);
1122
1123   my $placeholder_index = 1; 
1124
1125   foreach my $bound (@$bind) {
1126     my $attributes = {};
1127     my($column_name, @data) = @$bound;
1128
1129     if ($bind_attributes) {
1130       $attributes = $bind_attributes->{$column_name}
1131       if defined $bind_attributes->{$column_name};
1132     }
1133
1134     foreach my $data (@data) {
1135       my $ref = ref $data;
1136       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1137
1138       $sth->bind_param($placeholder_index, $data, $attributes);
1139       $placeholder_index++;
1140     }
1141   }
1142
1143   # Can this fail without throwing an exception anyways???
1144   my $rv = $sth->execute();
1145   $self->throw_exception($sth->errstr) if !$rv;
1146
1147   $self->_query_end( $sql, @$bind );
1148
1149   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1150 }
1151
1152 sub _execute {
1153     my $self = shift;
1154     $self->dbh_do('_dbh_execute', @_)
1155 }
1156
1157 sub insert {
1158   my ($self, $source, $to_insert) = @_;
1159
1160   my $ident = $source->from;
1161   my $bind_attributes = $self->source_bind_attributes($source);
1162
1163   my $updated_cols = {};
1164
1165   $self->ensure_connected;
1166   foreach my $col ( $source->columns ) {
1167     if ( !defined $to_insert->{$col} ) {
1168       my $col_info = $source->column_info($col);
1169
1170       if ( $col_info->{auto_nextval} ) {
1171         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1172       }
1173     }
1174   }
1175
1176   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1177
1178   return $updated_cols;
1179 }
1180
1181 ## Still not quite perfect, and EXPERIMENTAL
1182 ## Currently it is assumed that all values passed will be "normal", i.e. not 
1183 ## scalar refs, or at least, all the same type as the first set, the statement is
1184 ## only prepped once.
1185 sub insert_bulk {
1186   my ($self, $source, $cols, $data) = @_;
1187   my %colvalues;
1188   my $table = $source->from;
1189   @colvalues{@$cols} = (0..$#$cols);
1190   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1191   
1192   $self->_query_start( $sql, @bind );
1193   my $sth = $self->sth($sql);
1194
1195 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1196
1197   ## This must be an arrayref, else nothing works!
1198   my $tuple_status = [];
1199
1200   ## Get the bind_attributes, if any exist
1201   my $bind_attributes = $self->source_bind_attributes($source);
1202
1203   ## Bind the values and execute
1204   my $placeholder_index = 1; 
1205
1206   foreach my $bound (@bind) {
1207
1208     my $attributes = {};
1209     my ($column_name, $data_index) = @$bound;
1210
1211     if( $bind_attributes ) {
1212       $attributes = $bind_attributes->{$column_name}
1213       if defined $bind_attributes->{$column_name};
1214     }
1215
1216     my @data = map { $_->[$data_index] } @$data;
1217
1218     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1219     $placeholder_index++;
1220   }
1221   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1222   if (my $err = $@) {
1223     my $i = 0;
1224     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1225
1226     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1227       if ($i > $#$tuple_status);
1228
1229     require Data::Dumper;
1230     local $Data::Dumper::Terse = 1;
1231     local $Data::Dumper::Indent = 1;
1232     local $Data::Dumper::Useqq = 1;
1233     local $Data::Dumper::Quotekeys = 0;
1234
1235     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1236       $tuple_status->[$i][1],
1237       Data::Dumper::Dumper(
1238         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1239       ),
1240     );
1241   }
1242   $self->throw_exception($sth->errstr) if !$rv;
1243
1244   $self->_query_end( $sql, @bind );
1245   return (wantarray ? ($rv, $sth, @bind) : $rv);
1246 }
1247
1248 sub update {
1249   my $self = shift @_;
1250   my $source = shift @_;
1251   my $bind_attributes = $self->source_bind_attributes($source);
1252   
1253   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1254 }
1255
1256
1257 sub delete {
1258   my $self = shift @_;
1259   my $source = shift @_;
1260   
1261   my $bind_attrs = $self->source_bind_attributes($source);
1262   
1263   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1264 }
1265
1266 # We were sent here because the $rs contains a complex search
1267 # which will require a subquery to select the correct rows
1268 # (i.e. joined or limited resultsets)
1269 #
1270 # Genarating a single PK column subquery is trivial and supported
1271 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1272 # Look at _multipk_update_delete()
1273 sub _subq_update_delete {
1274   my $self = shift;
1275   my ($rs, $op, $values) = @_;
1276
1277   my $rsrc = $rs->result_source;
1278
1279   # we already check this, but double check naively just in case. Should be removed soon
1280   my $sel = $rs->_resolved_attrs->{select};
1281   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1282   my @pcols = $rsrc->primary_columns;
1283   if (@$sel != @pcols) {
1284     $self->throw_exception (
1285       'Subquery update/delete can not be called on resultsets selecting a'
1286      .' number of columns different than the number of primary keys'
1287     );
1288   }
1289
1290   if (@pcols == 1) {
1291     return $self->$op (
1292       $rsrc,
1293       $op eq 'update' ? $values : (),
1294       { $pcols[0] => { -in => $rs->as_query } },
1295     );
1296   }
1297
1298   else {
1299     return $self->_multipk_update_delete (@_);
1300   }
1301 }
1302
1303 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1304 # resultset update/delete involving subqueries. So by default resort
1305 # to simple (and inefficient) delete_all style per-row opearations,
1306 # while allowing specific storages to override this with a faster
1307 # implementation.
1308 #
1309 sub _multipk_update_delete {
1310   return shift->_per_row_update_delete (@_);
1311 }
1312
1313 # This is the default loop used to delete/update rows for multi PK
1314 # resultsets, and used by mysql exclusively (because it can't do anything
1315 # else).
1316 #
1317 # We do not use $row->$op style queries, because resultset update/delete
1318 # is not expected to cascade (this is what delete_all/update_all is for).
1319 #
1320 # There should be no race conditions as the entire operation is rolled
1321 # in a transaction.
1322 #
1323 sub _per_row_update_delete {
1324   my $self = shift;
1325   my ($rs, $op, $values) = @_;
1326
1327   my $rsrc = $rs->result_source;
1328   my @pcols = $rsrc->primary_columns;
1329
1330   my $guard = $self->txn_scope_guard;
1331
1332   # emulate the return value of $sth->execute for non-selects
1333   my $row_cnt = '0E0';
1334
1335   my $subrs_cur = $rs->cursor;
1336   while (my @pks = $subrs_cur->next) {
1337
1338     my $cond;
1339     for my $i (0.. $#pcols) {
1340       $cond->{$pcols[$i]} = $pks[$i];
1341     }
1342
1343     $self->$op (
1344       $rsrc,
1345       $op eq 'update' ? $values : (),
1346       $cond,
1347     );
1348
1349     $row_cnt++;
1350   }
1351
1352   $guard->commit;
1353
1354   return $row_cnt;
1355 }
1356
1357 sub _select {
1358   my $self = shift;
1359
1360   # localization is neccessary as
1361   # 1) there is no infrastructure to pass this around (easy to do, but will wait)
1362   # 2) _select_args sets it and _prep_for_execute consumes it
1363   my $sql_maker = $self->sql_maker;
1364   local $sql_maker->{for};
1365
1366   return $self->_execute($self->_select_args(@_));
1367 }
1368
1369 sub _select_args_to_query {
1370   my $self = shift;
1371
1372   # localization is neccessary as
1373   # 1) there is no infrastructure to pass this around (easy to do, but will wait)
1374   # 2) _select_args sets it and _prep_for_execute consumes it
1375   my $sql_maker = $self->sql_maker;
1376   local $sql_maker->{for};
1377
1378   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1379   #  = $self->_select_args($ident, $select, $cond, $attrs);
1380   my ($op, $bind, $ident, $bind_attrs, @args) =
1381     $self->_select_args(@_);
1382
1383   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1384   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1385   $prepared_bind ||= [];
1386
1387   return wantarray
1388     ? ($sql, $prepared_bind, $bind_attrs)
1389     : \[ "($sql)", @$prepared_bind ]
1390   ;
1391 }
1392
1393 sub _select_args {
1394   my ($self, $ident, $select, $condition, $attrs) = @_;
1395
1396   my $sql_maker = $self->sql_maker;
1397   $sql_maker->{for} = delete $attrs->{for};
1398
1399   my $order = { map
1400     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1401     (qw/order_by group_by having _virtual_order_by/ )
1402   };
1403
1404
1405   my $bind_attrs = {};
1406
1407   my $alias2source = $self->_resolve_ident_sources ($ident);
1408
1409   for my $alias (keys %$alias2source) {
1410     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1411     for my $col (keys %$bindtypes) {
1412
1413       my $fqcn = join ('.', $alias, $col);
1414       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1415
1416       # so that unqualified searches can be bound too
1417       $bind_attrs->{$col} = $bind_attrs->{$fqcn} if $alias eq 'me';
1418     }
1419   }
1420
1421   # This would be the point to deflate anything found in $condition
1422   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1423   # expect a row object. And all we have is a resultsource (it is trivial
1424   # to extract deflator coderefs via $alias2source above).
1425   #
1426   # I don't see a way forward other than changing the way deflators are
1427   # invoked, and that's just bad...
1428
1429   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1430   if ($attrs->{software_limit} ||
1431       $sql_maker->_default_limit_syntax eq "GenericSubQ") {
1432         $attrs->{software_limit} = 1;
1433   } else {
1434     $self->throw_exception("rows attribute must be positive if present")
1435       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1436
1437     # MySQL actually recommends this approach.  I cringe.
1438     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1439     push @args, $attrs->{rows}, $attrs->{offset};
1440   }
1441   return @args;
1442 }
1443
1444 sub _resolve_ident_sources {
1445   my ($self, $ident) = @_;
1446
1447   my $alias2source = {};
1448
1449   # the reason this is so contrived is that $ident may be a {from}
1450   # structure, specifying multiple tables to join
1451   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1452     # this is compat mode for insert/update/delete which do not deal with aliases
1453     $alias2source->{me} = $ident;
1454   }
1455   elsif (ref $ident eq 'ARRAY') {
1456
1457     for (@$ident) {
1458       my $tabinfo;
1459       if (ref $_ eq 'HASH') {
1460         $tabinfo = $_;
1461       }
1462       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1463         $tabinfo = $_->[0];
1464       }
1465
1466       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
1467         if ($tabinfo->{-source_handle});
1468     }
1469   }
1470
1471   return $alias2source;
1472 }
1473
1474 sub count {
1475   my ($self, $source, $attrs) = @_;
1476
1477   my $tmp_attrs = { %$attrs };
1478
1479   # take off any pagers, record_filter is cdbi, and no point of ordering a count
1480   delete $tmp_attrs->{$_} for (qw/select as rows offset page order_by record_filter/);
1481
1482   # overwrite the selector
1483   $tmp_attrs->{select} = { count => '*' };
1484
1485   my $tmp_rs = $source->resultset_class->new($source, $tmp_attrs);
1486   my ($count) = $tmp_rs->cursor->next;
1487
1488   # if the offset/rows attributes are still present, we did not use
1489   # a subquery, so we need to make the calculations in software
1490   $count -= $attrs->{offset} if $attrs->{offset};
1491   $count = $attrs->{rows} if $attrs->{rows} and $attrs->{rows} < $count;
1492   $count = 0 if ($count < 0);
1493
1494   return $count;
1495 }
1496
1497 sub count_grouped {
1498   my ($self, $source, $attrs) = @_;
1499
1500   # copy for the subquery, we need to do some adjustments to it too
1501   my $sub_attrs = { %$attrs };
1502
1503   # these can not go in the subquery, and there is no point of ordering it
1504   delete $sub_attrs->{$_} for qw/prefetch collapse select as order_by/;
1505
1506   # if we prefetch, we group_by primary keys only as this is what we would get out of the rs via ->next/->all
1507   # simply deleting group_by suffices, as the code below will re-fill it
1508   # Note: we check $attrs, as $sub_attrs has collapse deleted
1509   if (ref $attrs->{collapse} and keys %{$attrs->{collapse}} ) {
1510     delete $sub_attrs->{group_by};
1511   }
1512
1513   $sub_attrs->{group_by} ||= [ map { "$attrs->{alias}.$_" } ($source->primary_columns) ];
1514   $sub_attrs->{select} = $self->_grouped_count_select ($source, $sub_attrs);
1515
1516   $attrs->{from} = [{
1517     count_subq => $source->resultset_class->new ($source, $sub_attrs )->as_query
1518   }];
1519
1520   # the subquery replaces this
1521   delete $attrs->{$_} for qw/where bind prefetch collapse group_by having having_bind rows offset page pager/;
1522
1523   return $self->count ($source, $attrs);
1524 }
1525
1526 #
1527 # Returns a SELECT to go with a supplied GROUP BY
1528 # (caled by count_grouped so a group_by is present)
1529 # Most databases expect them to match, but some
1530 # choke in various ways.
1531 #
1532 sub _grouped_count_select {
1533   my ($self, $source, $rs_args) = @_;
1534   return $rs_args->{group_by};
1535 }
1536
1537 sub source_bind_attributes {
1538   my ($self, $source) = @_;
1539   
1540   my $bind_attributes;
1541   foreach my $column ($source->columns) {
1542   
1543     my $data_type = $source->column_info($column)->{data_type} || '';
1544     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1545      if $data_type;
1546   }
1547
1548   return $bind_attributes;
1549 }
1550
1551 =head2 select
1552
1553 =over 4
1554
1555 =item Arguments: $ident, $select, $condition, $attrs
1556
1557 =back
1558
1559 Handle a SQL select statement.
1560
1561 =cut
1562
1563 sub select {
1564   my $self = shift;
1565   my ($ident, $select, $condition, $attrs) = @_;
1566   return $self->cursor_class->new($self, \@_, $attrs);
1567 }
1568
1569 sub select_single {
1570   my $self = shift;
1571   my ($rv, $sth, @bind) = $self->_select(@_);
1572   my @row = $sth->fetchrow_array;
1573   my @nextrow = $sth->fetchrow_array if @row;
1574   if(@row && @nextrow) {
1575     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1576   }
1577   # Need to call finish() to work round broken DBDs
1578   $sth->finish();
1579   return @row;
1580 }
1581
1582 =head2 sth
1583
1584 =over 4
1585
1586 =item Arguments: $sql
1587
1588 =back
1589
1590 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1591
1592 =cut
1593
1594 sub _dbh_sth {
1595   my ($self, $dbh, $sql) = @_;
1596
1597   # 3 is the if_active parameter which avoids active sth re-use
1598   my $sth = $self->disable_sth_caching
1599     ? $dbh->prepare($sql)
1600     : $dbh->prepare_cached($sql, {}, 3);
1601
1602   # XXX You would think RaiseError would make this impossible,
1603   #  but apparently that's not true :(
1604   $self->throw_exception($dbh->errstr) if !$sth;
1605
1606   $sth;
1607 }
1608
1609 sub sth {
1610   my ($self, $sql) = @_;
1611   $self->dbh_do('_dbh_sth', $sql);
1612 }
1613
1614 sub _dbh_columns_info_for {
1615   my ($self, $dbh, $table) = @_;
1616
1617   if ($dbh->can('column_info')) {
1618     my %result;
1619     eval {
1620       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1621       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1622       $sth->execute();
1623       while ( my $info = $sth->fetchrow_hashref() ){
1624         my %column_info;
1625         $column_info{data_type}   = $info->{TYPE_NAME};
1626         $column_info{size}      = $info->{COLUMN_SIZE};
1627         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1628         $column_info{default_value} = $info->{COLUMN_DEF};
1629         my $col_name = $info->{COLUMN_NAME};
1630         $col_name =~ s/^\"(.*)\"$/$1/;
1631
1632         $result{$col_name} = \%column_info;
1633       }
1634     };
1635     return \%result if !$@ && scalar keys %result;
1636   }
1637
1638   my %result;
1639   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1640   $sth->execute;
1641   my @columns = @{$sth->{NAME_lc}};
1642   for my $i ( 0 .. $#columns ){
1643     my %column_info;
1644     $column_info{data_type} = $sth->{TYPE}->[$i];
1645     $column_info{size} = $sth->{PRECISION}->[$i];
1646     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1647
1648     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1649       $column_info{data_type} = $1;
1650       $column_info{size}    = $2;
1651     }
1652
1653     $result{$columns[$i]} = \%column_info;
1654   }
1655   $sth->finish;
1656
1657   foreach my $col (keys %result) {
1658     my $colinfo = $result{$col};
1659     my $type_num = $colinfo->{data_type};
1660     my $type_name;
1661     if(defined $type_num && $dbh->can('type_info')) {
1662       my $type_info = $dbh->type_info($type_num);
1663       $type_name = $type_info->{TYPE_NAME} if $type_info;
1664       $colinfo->{data_type} = $type_name if $type_name;
1665     }
1666   }
1667
1668   return \%result;
1669 }
1670
1671 sub columns_info_for {
1672   my ($self, $table) = @_;
1673   $self->dbh_do('_dbh_columns_info_for', $table);
1674 }
1675
1676 =head2 last_insert_id
1677
1678 Return the row id of the last insert.
1679
1680 =cut
1681
1682 sub _dbh_last_insert_id {
1683     # All Storage's need to register their own _dbh_last_insert_id
1684     # the old SQLite-based method was highly inappropriate
1685
1686     my $self = shift;
1687     my $class = ref $self;
1688     $self->throw_exception (<<EOE);
1689
1690 No _dbh_last_insert_id() method found in $class.
1691 Since the method of obtaining the autoincrement id of the last insert
1692 operation varies greatly between different databases, this method must be
1693 individually implemented for every storage class.
1694 EOE
1695 }
1696
1697 sub last_insert_id {
1698   my $self = shift;
1699   $self->dbh_do('_dbh_last_insert_id', @_);
1700 }
1701
1702 =head2 sqlt_type
1703
1704 Returns the database driver name.
1705
1706 =cut
1707
1708 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1709
1710 =head2 bind_attribute_by_data_type
1711
1712 Given a datatype from column info, returns a database specific bind
1713 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
1714 let the database planner just handle it.
1715
1716 Generally only needed for special case column types, like bytea in postgres.
1717
1718 =cut
1719
1720 sub bind_attribute_by_data_type {
1721     return;
1722 }
1723
1724 =head2 is_datatype_numeric
1725
1726 Given a datatype from column_info, returns a boolean value indicating if
1727 the current RDBMS considers it a numeric value. This controls how
1728 L<DBIx::Class::Row/set_column> decides whether to mark the column as
1729 dirty - when the datatype is deemed numeric a C<< != >> comparison will
1730 be performed instead of the usual C<eq>.
1731
1732 =cut
1733
1734 sub is_datatype_numeric {
1735   my ($self, $dt) = @_;
1736
1737   return 0 unless $dt;
1738
1739   return $dt =~ /^ (?:
1740     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
1741   ) $/ix;
1742 }
1743
1744
1745 =head2 create_ddl_dir (EXPERIMENTAL)
1746
1747 =over 4
1748
1749 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
1750
1751 =back
1752
1753 Creates a SQL file based on the Schema, for each of the specified
1754 database engines in C<\@databases> in the given directory.
1755 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
1756
1757 Given a previous version number, this will also create a file containing
1758 the ALTER TABLE statements to transform the previous schema into the
1759 current one. Note that these statements may contain C<DROP TABLE> or
1760 C<DROP COLUMN> statements that can potentially destroy data.
1761
1762 The file names are created using the C<ddl_filename> method below, please
1763 override this method in your schema if you would like a different file
1764 name format. For the ALTER file, the same format is used, replacing
1765 $version in the name with "$preversion-$version".
1766
1767 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
1768 The most common value for this would be C<< { add_drop_table => 1 } >>
1769 to have the SQL produced include a C<DROP TABLE> statement for each table
1770 created. For quoting purposes supply C<quote_table_names> and
1771 C<quote_field_names>.
1772
1773 If no arguments are passed, then the following default values are assumed:
1774
1775 =over 4
1776
1777 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
1778
1779 =item version    - $schema->schema_version
1780
1781 =item directory  - './'
1782
1783 =item preversion - <none>
1784
1785 =back
1786
1787 By default, C<\%sqlt_args> will have
1788
1789  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1790
1791 merged with the hash passed in. To disable any of those features, pass in a 
1792 hashref like the following
1793
1794  { ignore_constraint_names => 0, # ... other options }
1795
1796
1797 Note that this feature is currently EXPERIMENTAL and may not work correctly 
1798 across all databases, or fully handle complex relationships.
1799
1800 WARNING: Please check all SQL files created, before applying them.
1801
1802 =cut
1803
1804 sub create_ddl_dir {
1805   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1806
1807   if(!$dir || !-d $dir) {
1808     carp "No directory given, using ./\n";
1809     $dir = "./";
1810   }
1811   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1812   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1813
1814   my $schema_version = $schema->schema_version || '1.x';
1815   $version ||= $schema_version;
1816
1817   $sqltargs = {
1818     add_drop_table => 1, 
1819     ignore_constraint_names => 1,
1820     ignore_index_names => 1,
1821     %{$sqltargs || {}}
1822   };
1823
1824   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
1825       . $self->_check_sqlt_message . q{'})
1826           if !$self->_check_sqlt_version;
1827
1828   my $sqlt = SQL::Translator->new( $sqltargs );
1829
1830   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1831   my $sqlt_schema = $sqlt->translate({ data => $schema })
1832     or $self->throw_exception ($sqlt->error);
1833
1834   foreach my $db (@$databases) {
1835     $sqlt->reset();
1836     $sqlt->{schema} = $sqlt_schema;
1837     $sqlt->producer($db);
1838
1839     my $file;
1840     my $filename = $schema->ddl_filename($db, $version, $dir);
1841     if (-e $filename && ($version eq $schema_version )) {
1842       # if we are dumping the current version, overwrite the DDL
1843       carp "Overwriting existing DDL file - $filename";
1844       unlink($filename);
1845     }
1846
1847     my $output = $sqlt->translate;
1848     if(!$output) {
1849       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1850       next;
1851     }
1852     if(!open($file, ">$filename")) {
1853       $self->throw_exception("Can't open $filename for writing ($!)");
1854       next;
1855     }
1856     print $file $output;
1857     close($file);
1858   
1859     next unless ($preversion);
1860
1861     require SQL::Translator::Diff;
1862
1863     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1864     if(!-e $prefilename) {
1865       carp("No previous schema file found ($prefilename)");
1866       next;
1867     }
1868
1869     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1870     if(-e $difffile) {
1871       carp("Overwriting existing diff file - $difffile");
1872       unlink($difffile);
1873     }
1874     
1875     my $source_schema;
1876     {
1877       my $t = SQL::Translator->new($sqltargs);
1878       $t->debug( 0 );
1879       $t->trace( 0 );
1880
1881       $t->parser( $db )
1882         or $self->throw_exception ($t->error);
1883
1884       my $out = $t->translate( $prefilename )
1885         or $self->throw_exception ($t->error);
1886
1887       $source_schema = $t->schema;
1888
1889       $source_schema->name( $prefilename )
1890         unless ( $source_schema->name );
1891     }
1892
1893     # The "new" style of producers have sane normalization and can support 
1894     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1895     # And we have to diff parsed SQL against parsed SQL.
1896     my $dest_schema = $sqlt_schema;
1897
1898     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1899       my $t = SQL::Translator->new($sqltargs);
1900       $t->debug( 0 );
1901       $t->trace( 0 );
1902
1903       $t->parser( $db )
1904         or $self->throw_exception ($t->error);
1905
1906       my $out = $t->translate( $filename )
1907         or $self->throw_exception ($t->error);
1908
1909       $dest_schema = $t->schema;
1910
1911       $dest_schema->name( $filename )
1912         unless $dest_schema->name;
1913     }
1914     
1915     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1916                                                   $dest_schema,   $db,
1917                                                   $sqltargs
1918                                                  );
1919     if(!open $file, ">$difffile") { 
1920       $self->throw_exception("Can't write to $difffile ($!)");
1921       next;
1922     }
1923     print $file $diff;
1924     close($file);
1925   }
1926 }
1927
1928 =head2 deployment_statements
1929
1930 =over 4
1931
1932 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1933
1934 =back
1935
1936 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1937
1938 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
1939 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
1940
1941 C<$directory> is used to return statements from files in a previously created
1942 L</create_ddl_dir> directory and is optional. The filenames are constructed
1943 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1944
1945 If no C<$directory> is specified then the statements are constructed on the
1946 fly using L<SQL::Translator> and C<$version> is ignored.
1947
1948 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1949
1950 =cut
1951
1952 sub deployment_statements {
1953   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1954   # Need to be connected to get the correct sqlt_type
1955   $self->ensure_connected() unless $type;
1956   $type ||= $self->sqlt_type;
1957   $version ||= $schema->schema_version || '1.x';
1958   $dir ||= './';
1959   my $filename = $schema->ddl_filename($type, $version, $dir);
1960   if(-f $filename)
1961   {
1962       my $file;
1963       open($file, "<$filename") 
1964         or $self->throw_exception("Can't open $filename ($!)");
1965       my @rows = <$file>;
1966       close($file);
1967       return join('', @rows);
1968   }
1969
1970   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
1971       . $self->_check_sqlt_message . q{'})
1972           if !$self->_check_sqlt_version;
1973
1974   require SQL::Translator::Parser::DBIx::Class;
1975   eval qq{use SQL::Translator::Producer::${type}};
1976   $self->throw_exception($@) if $@;
1977
1978   # sources needs to be a parser arg, but for simplicty allow at top level 
1979   # coming in
1980   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1981       if exists $sqltargs->{sources};
1982
1983   my $tr = SQL::Translator->new(%$sqltargs);
1984   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1985   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1986 }
1987
1988 sub deploy {
1989   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1990   my $deploy = sub {
1991     my $line = shift;
1992     return if($line =~ /^--/);
1993     return if(!$line);
1994     # next if($line =~ /^DROP/m);
1995     return if($line =~ /^BEGIN TRANSACTION/m);
1996     return if($line =~ /^COMMIT/m);
1997     return if $line =~ /^\s+$/; # skip whitespace only
1998     $self->_query_start($line);
1999     eval {
2000       $self->dbh->do($line); # shouldn't be using ->dbh ?
2001     };
2002     if ($@) {
2003       carp qq{$@ (running "${line}")};
2004     }
2005     $self->_query_end($line);
2006   };
2007   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2008   if (@statements > 1) {
2009     foreach my $statement (@statements) {
2010       $deploy->( $statement );
2011     }
2012   }
2013   elsif (@statements == 1) {
2014     foreach my $line ( split(";\n", $statements[0])) {
2015       $deploy->( $line );
2016     }
2017   }
2018 }
2019
2020 =head2 datetime_parser
2021
2022 Returns the datetime parser class
2023
2024 =cut
2025
2026 sub datetime_parser {
2027   my $self = shift;
2028   return $self->{datetime_parser} ||= do {
2029     $self->ensure_connected;
2030     $self->build_datetime_parser(@_);
2031   };
2032 }
2033
2034 =head2 datetime_parser_type
2035
2036 Defines (returns) the datetime parser class - currently hardwired to
2037 L<DateTime::Format::MySQL>
2038
2039 =cut
2040
2041 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2042
2043 =head2 build_datetime_parser
2044
2045 See L</datetime_parser>
2046
2047 =cut
2048
2049 sub build_datetime_parser {
2050   my $self = shift;
2051   my $type = $self->datetime_parser_type(@_);
2052   eval "use ${type}";
2053   $self->throw_exception("Couldn't load ${type}: $@") if $@;
2054   return $type;
2055 }
2056
2057 {
2058     my $_check_sqlt_version; # private
2059     my $_check_sqlt_message; # private
2060     sub _check_sqlt_version {
2061         return $_check_sqlt_version if defined $_check_sqlt_version;
2062         eval 'use SQL::Translator "0.09003"';
2063         $_check_sqlt_message = $@ || '';
2064         $_check_sqlt_version = !$@;
2065     }
2066
2067     sub _check_sqlt_message {
2068         _check_sqlt_version if !defined $_check_sqlt_message;
2069         $_check_sqlt_message;
2070     }
2071 }
2072
2073 =head2 is_replicating
2074
2075 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2076 replicate from a master database.  Default is undef, which is the result
2077 returned by databases that don't support replication.
2078
2079 =cut
2080
2081 sub is_replicating {
2082     return;
2083     
2084 }
2085
2086 =head2 lag_behind_master
2087
2088 Returns a number that represents a certain amount of lag behind a master db
2089 when a given storage is replicating.  The number is database dependent, but
2090 starts at zero and increases with the amount of lag. Default in undef
2091
2092 =cut
2093
2094 sub lag_behind_master {
2095     return;
2096 }
2097
2098 sub DESTROY {
2099   my $self = shift;
2100   return if !$self->_dbh;
2101   $self->_verify_pid;
2102   $self->_dbh(undef);
2103 }
2104
2105 1;
2106
2107 =head1 USAGE NOTES
2108
2109 =head2 DBIx::Class and AutoCommit
2110
2111 DBIx::Class can do some wonderful magic with handling exceptions,
2112 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2113 combined with C<txn_do> for transaction support.
2114
2115 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2116 in an assumed transaction between commits, and you're telling us you'd
2117 like to manage that manually.  A lot of the magic protections offered by
2118 this module will go away.  We can't protect you from exceptions due to database
2119 disconnects because we don't know anything about how to restart your
2120 transactions.  You're on your own for handling all sorts of exceptional
2121 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2122 be with raw DBI.
2123
2124
2125
2126 =head1 AUTHORS
2127
2128 Matt S. Trout <mst@shadowcatsystems.co.uk>
2129
2130 Andy Grundman <andy@hybridized.org>
2131
2132 =head1 LICENSE
2133
2134 You may distribute this code under the same terms as Perl itself.
2135
2136 =cut