Put the ocmment back
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base 'DBIx::Class::Storage';
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16
17 __PACKAGE__->mk_group_accessors('simple' =>
18   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
19      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
20 );
21
22 # the values for these accessors are picked out (and deleted) from
23 # the attribute hashref passed to connect_info
24 my @storage_options = qw/
25   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
26   disable_sth_caching unsafe auto_savepoint
27 /;
28 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
29
30
31 # default cursor class, overridable in connect_info attributes
32 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
33
34 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
35 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
36
37
38 =head1 NAME
39
40 DBIx::Class::Storage::DBI - DBI storage handler
41
42 =head1 SYNOPSIS
43
44   my $schema = MySchema->connect('dbi:SQLite:my.db');
45
46   $schema->storage->debug(1);
47   $schema->dbh_do("DROP TABLE authors");
48
49   $schema->resultset('Book')->search({
50      written_on => $schema->storage->datetime_parser(DateTime->now)
51   });
52
53 =head1 DESCRIPTION
54
55 This class represents the connection to an RDBMS via L<DBI>.  See
56 L<DBIx::Class::Storage> for general information.  This pod only
57 documents DBI-specific methods and behaviors.
58
59 =head1 METHODS
60
61 =cut
62
63 sub new {
64   my $new = shift->next::method(@_);
65
66   $new->transaction_depth(0);
67   $new->_sql_maker_opts({});
68   $new->{savepoints} = [];
69   $new->{_in_dbh_do} = 0;
70   $new->{_dbh_gen} = 0;
71
72   $new;
73 }
74
75 =head2 connect_info
76
77 This method is normally called by L<DBIx::Class::Schema/connection>, which
78 encapsulates its argument list in an arrayref before passing them here.
79
80 The argument list may contain:
81
82 =over
83
84 =item *
85
86 The same 4-element argument set one would normally pass to
87 L<DBI/connect>, optionally followed by
88 L<extra attributes|/DBIx::Class specific connection attributes>
89 recognized by DBIx::Class:
90
91   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
92
93 =item *
94
95 A single code reference which returns a connected
96 L<DBI database handle|DBI/connect> optionally followed by
97 L<extra attributes|/DBIx::Class specific connection attributes> recognized
98 by DBIx::Class:
99
100   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
101
102 =item *
103
104 A single hashref with all the attributes and the dsn/user/password
105 mixed together:
106
107   $connect_info_args = [{
108     dsn => $dsn,
109     user => $user,
110     password => $pass,
111     %dbi_attributes,
112     %extra_attributes,
113   }];
114
115   $connect_info_args = [{
116     dbh_maker => sub { DBI->connect (...) },
117     %dbi_attributes,
118     %extra_attributes,
119   }];
120
121 This is particularly useful for L<Catalyst> based applications, allowing the
122 following config (L<Config::General> style):
123
124   <Model::DB>
125     schema_class   App::DB
126     <connect_info>
127       dsn          dbi:mysql:database=test
128       user         testuser
129       password     TestPass
130       AutoCommit   1
131     </connect_info>
132   </Model::DB>
133
134 The C<dsn>/C<user>/C<password> combination can be substituted by the
135 C<dbh_maker> key whose value is a coderef that returns a connected
136 L<DBI database handle|DBI/connect>
137
138 =back
139
140 Please note that the L<DBI> docs recommend that you always explicitly
141 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
142 recommends that it be set to I<1>, and that you perform transactions
143 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
144 to I<1> if you do not do explicitly set it to zero.  This is the default
145 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
146
147 =head3 DBIx::Class specific connection attributes
148
149 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
150 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
151 the following connection options. These options can be mixed in with your other
152 L<DBI> connection attributes, or placed in a seperate hashref
153 (C<\%extra_attributes>) as shown above.
154
155 Every time C<connect_info> is invoked, any previous settings for
156 these options will be cleared before setting the new ones, regardless of
157 whether any options are specified in the new C<connect_info>.
158
159
160 =over
161
162 =item on_connect_do
163
164 Specifies things to do immediately after connecting or re-connecting to
165 the database.  Its value may contain:
166
167 =over
168
169 =item a scalar
170
171 This contains one SQL statement to execute.
172
173 =item an array reference
174
175 This contains SQL statements to execute in order.  Each element contains
176 a string or a code reference that returns a string.
177
178 =item a code reference
179
180 This contains some code to execute.  Unlike code references within an
181 array reference, its return value is ignored.
182
183 =back
184
185 =item on_disconnect_do
186
187 Takes arguments in the same form as L</on_connect_do> and executes them
188 immediately before disconnecting from the database.
189
190 Note, this only runs if you explicitly call L</disconnect> on the
191 storage object.
192
193 =item on_connect_call
194
195 A more generalized form of L</on_connect_do> that calls the specified
196 C<connect_call_METHOD> methods in your storage driver.
197
198   on_connect_do => 'select 1'
199
200 is equivalent to:
201
202   on_connect_call => [ [ do_sql => 'select 1' ] ]
203
204 Its values may contain:
205
206 =over
207
208 =item a scalar
209
210 Will call the C<connect_call_METHOD> method.
211
212 =item a code reference
213
214 Will execute C<< $code->($storage) >>
215
216 =item an array reference
217
218 Each value can be a method name or code reference.
219
220 =item an array of arrays
221
222 For each array, the first item is taken to be the C<connect_call_> method name
223 or code reference, and the rest are parameters to it.
224
225 =back
226
227 Some predefined storage methods you may use:
228
229 =over
230
231 =item do_sql
232
233 Executes a SQL string or a code reference that returns a SQL string. This is
234 what L</on_connect_do> and L</on_disconnect_do> use.
235
236 It can take:
237
238 =over
239
240 =item a scalar
241
242 Will execute the scalar as SQL.
243
244 =item an arrayref
245
246 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
247 attributes hashref and bind values.
248
249 =item a code reference
250
251 Will execute C<< $code->($storage) >> and execute the return array refs as
252 above.
253
254 =back
255
256 =item datetime_setup
257
258 Execute any statements necessary to initialize the database session to return
259 and accept datetime/timestamp values used with
260 L<DBIx::Class::InflateColumn::DateTime>.
261
262 Only necessary for some databases, see your specific storage driver for
263 implementation details.
264
265 =back
266
267 =item on_disconnect_call
268
269 Takes arguments in the same form as L</on_connect_call> and executes them
270 immediately before disconnecting from the database.
271
272 Calls the C<disconnect_call_METHOD> methods as opposed to the
273 C<connect_call_METHOD> methods called by L</on_connect_call>.
274
275 Note, this only runs if you explicitly call L</disconnect> on the
276 storage object.
277
278 =item disable_sth_caching
279
280 If set to a true value, this option will disable the caching of
281 statement handles via L<DBI/prepare_cached>.
282
283 =item limit_dialect
284
285 Sets the limit dialect. This is useful for JDBC-bridge among others
286 where the remote SQL-dialect cannot be determined by the name of the
287 driver alone. See also L<SQL::Abstract::Limit>.
288
289 =item quote_char
290
291 Specifies what characters to use to quote table and column names. If
292 you use this you will want to specify L</name_sep> as well.
293
294 C<quote_char> expects either a single character, in which case is it
295 is placed on either side of the table/column name, or an arrayref of length
296 2 in which case the table/column name is placed between the elements.
297
298 For example under MySQL you should use C<< quote_char => '`' >>, and for
299 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
300
301 =item name_sep
302
303 This only needs to be used in conjunction with C<quote_char>, and is used to
304 specify the charecter that seperates elements (schemas, tables, columns) from
305 each other. In most cases this is simply a C<.>.
306
307 The consequences of not supplying this value is that L<SQL::Abstract>
308 will assume DBIx::Class' uses of aliases to be complete column
309 names. The output will look like I<"me.name"> when it should actually
310 be I<"me"."name">.
311
312 =item unsafe
313
314 This Storage driver normally installs its own C<HandleError>, sets
315 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
316 all database handles, including those supplied by a coderef.  It does this
317 so that it can have consistent and useful error behavior.
318
319 If you set this option to a true value, Storage will not do its usual
320 modifications to the database handle's attributes, and instead relies on
321 the settings in your connect_info DBI options (or the values you set in
322 your connection coderef, in the case that you are connecting via coderef).
323
324 Note that your custom settings can cause Storage to malfunction,
325 especially if you set a C<HandleError> handler that suppresses exceptions
326 and/or disable C<RaiseError>.
327
328 =item auto_savepoint
329
330 If this option is true, L<DBIx::Class> will use savepoints when nesting
331 transactions, making it possible to recover from failure in the inner
332 transaction without having to abort all outer transactions.
333
334 =item cursor_class
335
336 Use this argument to supply a cursor class other than the default
337 L<DBIx::Class::Storage::DBI::Cursor>.
338
339 =back
340
341 Some real-life examples of arguments to L</connect_info> and
342 L<DBIx::Class::Schema/connect>
343
344   # Simple SQLite connection
345   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
346
347   # Connect via subref
348   ->connect_info([ sub { DBI->connect(...) } ]);
349
350   # Connect via subref in hashref
351   ->connect_info([{
352     dbh_maker => sub { DBI->connect(...) },
353     on_connect_do => 'alter session ...',
354   }]);
355
356   # A bit more complicated
357   ->connect_info(
358     [
359       'dbi:Pg:dbname=foo',
360       'postgres',
361       'my_pg_password',
362       { AutoCommit => 1 },
363       { quote_char => q{"}, name_sep => q{.} },
364     ]
365   );
366
367   # Equivalent to the previous example
368   ->connect_info(
369     [
370       'dbi:Pg:dbname=foo',
371       'postgres',
372       'my_pg_password',
373       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
374     ]
375   );
376
377   # Same, but with hashref as argument
378   # See parse_connect_info for explanation
379   ->connect_info(
380     [{
381       dsn         => 'dbi:Pg:dbname=foo',
382       user        => 'postgres',
383       password    => 'my_pg_password',
384       AutoCommit  => 1,
385       quote_char  => q{"},
386       name_sep    => q{.},
387     }]
388   );
389
390   # Subref + DBIx::Class-specific connection options
391   ->connect_info(
392     [
393       sub { DBI->connect(...) },
394       {
395           quote_char => q{`},
396           name_sep => q{@},
397           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
398           disable_sth_caching => 1,
399       },
400     ]
401   );
402
403
404
405 =cut
406
407 sub connect_info {
408   my ($self, $info_arg) = @_;
409
410   return $self->_connect_info if !$info_arg;
411
412   my @args = @$info_arg;  # take a shallow copy for further mutilation
413   $self->_connect_info([@args]); # copy for _connect_info
414
415
416   # combine/pre-parse arguments depending on invocation style
417
418   my %attrs;
419   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
420     %attrs = %{ $args[1] || {} };
421     @args = $args[0];
422   }
423   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
424     %attrs = %{$args[0]};
425     @args = ();
426     if (my $code = delete $attrs{dbh_maker}) {
427       @args = $code;
428
429       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
430       if (@ignored) {
431         carp sprintf (
432             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
433           . "to the result of 'dbh_maker'",
434
435           join (', ', map { "'$_'" } (@ignored) ),
436         );
437       }
438     }
439     else {
440       @args = delete @attrs{qw/dsn user password/};
441     }
442   }
443   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
444     %attrs = (
445       % { $args[3] || {} },
446       % { $args[4] || {} },
447     );
448     @args = @args[0,1,2];
449   }
450
451   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
452   #  the new set of options
453   $self->_sql_maker(undef);
454   $self->_sql_maker_opts({});
455
456   if(keys %attrs) {
457     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
458       if(my $value = delete $attrs{$storage_opt}) {
459         $self->$storage_opt($value);
460       }
461     }
462     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
463       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
464         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
465       }
466     }
467   }
468
469   if (ref $args[0] eq 'CODE') {
470     # _connect() never looks past $args[0] in this case
471     %attrs = ()
472   } else {
473     %attrs = (
474       %{ $self->_default_dbi_connect_attributes || {} },
475       %attrs,
476     );
477   }
478
479   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
480   $self->_connect_info;
481 }
482
483 sub _default_dbi_connect_attributes {
484   return {
485     AutoCommit => 1,
486     RaiseError => 1,
487     PrintError => 0,
488   };
489 }
490
491 =head2 on_connect_do
492
493 This method is deprecated in favour of setting via L</connect_info>.
494
495 =cut
496
497 =head2 on_disconnect_do
498
499 This method is deprecated in favour of setting via L</connect_info>.
500
501 =cut
502
503 sub _parse_connect_do {
504   my ($self, $type) = @_;
505
506   my $val = $self->$type;
507   return () if not defined $val;
508
509   my @res;
510
511   if (not ref($val)) {
512     push @res, [ 'do_sql', $val ];
513   } elsif (ref($val) eq 'CODE') {
514     push @res, $val;
515   } elsif (ref($val) eq 'ARRAY') {
516     push @res, map { [ 'do_sql', $_ ] } @$val;
517   } else {
518     $self->throw_exception("Invalid type for $type: ".ref($val));
519   }
520
521   return \@res;
522 }
523
524 =head2 dbh_do
525
526 Arguments: ($subref | $method_name), @extra_coderef_args?
527
528 Execute the given $subref or $method_name using the new exception-based
529 connection management.
530
531 The first two arguments will be the storage object that C<dbh_do> was called
532 on and a database handle to use.  Any additional arguments will be passed
533 verbatim to the called subref as arguments 2 and onwards.
534
535 Using this (instead of $self->_dbh or $self->dbh) ensures correct
536 exception handling and reconnection (or failover in future subclasses).
537
538 Your subref should have no side-effects outside of the database, as
539 there is the potential for your subref to be partially double-executed
540 if the database connection was stale/dysfunctional.
541
542 Example:
543
544   my @stuff = $schema->storage->dbh_do(
545     sub {
546       my ($storage, $dbh, @cols) = @_;
547       my $cols = join(q{, }, @cols);
548       $dbh->selectrow_array("SELECT $cols FROM foo");
549     },
550     @column_list
551   );
552
553 =cut
554
555 sub dbh_do {
556   my $self = shift;
557   my $code = shift;
558
559   my $dbh = $self->_dbh;
560
561   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
562       || $self->{transaction_depth};
563
564   local $self->{_in_dbh_do} = 1;
565
566   my @result;
567   my $want_array = wantarray;
568
569   eval {
570     $self->_verify_pid if $dbh;
571     if(!$self->_dbh) {
572         $self->_populate_dbh;
573         $dbh = $self->_dbh;
574     }
575
576     if($want_array) {
577         @result = $self->$code($dbh, @_);
578     }
579     elsif(defined $want_array) {
580         $result[0] = $self->$code($dbh, @_);
581     }
582     else {
583         $self->$code($dbh, @_);
584     }
585   };
586
587   # ->connected might unset $@ - copy
588   my $exception = $@;
589   if(!$exception) { return $want_array ? @result : $result[0] }
590
591   $self->throw_exception($exception) if $self->connected;
592
593   # We were not connected - reconnect and retry, but let any
594   #  exception fall right through this time
595   carp "Retrying $code after catching disconnected exception: $exception"
596     if $ENV{DBIC_DBIRETRY_DEBUG};
597   $self->_populate_dbh;
598   $self->$code($self->_dbh, @_);
599 }
600
601 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
602 # It also informs dbh_do to bypass itself while under the direction of txn_do,
603 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
604 sub txn_do {
605   my $self = shift;
606   my $coderef = shift;
607
608   ref $coderef eq 'CODE' or $self->throw_exception
609     ('$coderef must be a CODE reference');
610
611   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
612
613   local $self->{_in_dbh_do} = 1;
614
615   my @result;
616   my $want_array = wantarray;
617
618   my $tried = 0;
619   while(1) {
620     eval {
621       $self->_verify_pid if $self->_dbh;
622       $self->_populate_dbh if !$self->_dbh;
623
624       $self->txn_begin;
625       if($want_array) {
626           @result = $coderef->(@_);
627       }
628       elsif(defined $want_array) {
629           $result[0] = $coderef->(@_);
630       }
631       else {
632           $coderef->(@_);
633       }
634       $self->txn_commit;
635     };
636
637     # ->connected might unset $@ - copy
638     my $exception = $@;
639     if(!$exception) { return $want_array ? @result : $result[0] }
640
641     if($tried++ || $self->connected) {
642       eval { $self->txn_rollback };
643       my $rollback_exception = $@;
644       if($rollback_exception) {
645         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
646         $self->throw_exception($exception)  # propagate nested rollback
647           if $rollback_exception =~ /$exception_class/;
648
649         $self->throw_exception(
650           "Transaction aborted: ${exception}. "
651           . "Rollback failed: ${rollback_exception}"
652         );
653       }
654       $self->throw_exception($exception)
655     }
656
657     # We were not connected, and was first try - reconnect and retry
658     # via the while loop
659     carp "Retrying $coderef after catching disconnected exception: $exception"
660       if $ENV{DBIC_DBIRETRY_DEBUG};
661     $self->_populate_dbh;
662   }
663 }
664
665 =head2 disconnect
666
667 Our C<disconnect> method also performs a rollback first if the
668 database is not in C<AutoCommit> mode.
669
670 =cut
671
672 sub disconnect {
673   my ($self) = @_;
674
675   if( $self->_dbh ) {
676     my @actions;
677
678     push @actions, ( $self->on_disconnect_call || () );
679     push @actions, $self->_parse_connect_do ('on_disconnect_do');
680
681     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
682
683     $self->_dbh_rollback unless $self->_dbh_autocommit;
684
685     $self->_dbh->disconnect;
686     $self->_dbh(undef);
687     $self->{_dbh_gen}++;
688   }
689 }
690
691 =head2 with_deferred_fk_checks
692
693 =over 4
694
695 =item Arguments: C<$coderef>
696
697 =item Return Value: The return value of $coderef
698
699 =back
700
701 Storage specific method to run the code ref with FK checks deferred or
702 in MySQL's case disabled entirely.
703
704 =cut
705
706 # Storage subclasses should override this
707 sub with_deferred_fk_checks {
708   my ($self, $sub) = @_;
709
710   $sub->();
711 }
712
713 =head2 connected
714
715 =over
716
717 =item Arguments: none
718
719 =item Return Value: 1|0
720
721 =back
722
723 Verifies that the the current database handle is active and ready to execute
724 an SQL statement (i.e. the connection did not get stale, server is still
725 answering, etc.) This method is used internally by L</dbh>.
726
727 =cut
728
729 sub connected {
730   my $self = shift;
731   return 0 unless $self->_seems_connected;
732
733   #be on the safe side
734   local $self->_dbh->{RaiseError} = 1;
735
736   return $self->_ping;
737 }
738
739 sub _seems_connected {
740   my $self = shift;
741
742   my $dbh = $self->_dbh
743     or return 0;
744
745   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
746     $self->_dbh(undef);
747     $self->{_dbh_gen}++;
748     return 0;
749   }
750   else {
751     $self->_verify_pid;
752     return 0 if !$self->_dbh;
753   }
754
755   return $dbh->FETCH('Active');
756 }
757
758 sub _ping {
759   my $self = shift;
760
761   my $dbh = $self->_dbh or return 0;
762
763   return $dbh->ping;
764 }
765
766 # handle pid changes correctly
767 #  NOTE: assumes $self->_dbh is a valid $dbh
768 sub _verify_pid {
769   my ($self) = @_;
770
771   return if defined $self->_conn_pid && $self->_conn_pid == $$;
772
773   $self->_dbh->{InactiveDestroy} = 1;
774   $self->_dbh(undef);
775   $self->{_dbh_gen}++;
776
777   return;
778 }
779
780 sub ensure_connected {
781   my ($self) = @_;
782
783   unless ($self->connected) {
784     $self->_populate_dbh;
785   }
786 }
787
788 =head2 dbh
789
790 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
791 is guaranteed to be healthy by implicitly calling L</connected>, and if
792 necessary performing a reconnection before returning. Keep in mind that this
793 is very B<expensive> on some database engines. Consider using L<dbh_do>
794 instead.
795
796 =cut
797
798 sub dbh {
799   my ($self) = @_;
800
801   if (not $self->_dbh) {
802     $self->_populate_dbh;
803   } else {
804     $self->ensure_connected;
805   }
806   return $self->_dbh;
807 }
808
809 # this is the internal "get dbh or connect (don't check)" method
810 sub _get_dbh {
811   my $self = shift;
812   $self->_populate_dbh unless $self->_dbh;
813   return $self->_dbh;
814 }
815
816 sub _sql_maker_args {
817     my ($self) = @_;
818
819     return (
820       bindtype=>'columns',
821       array_datatypes => 1,
822       limit_dialect => $self->_get_dbh,
823       %{$self->_sql_maker_opts}
824     );
825 }
826
827 sub sql_maker {
828   my ($self) = @_;
829   unless ($self->_sql_maker) {
830     my $sql_maker_class = $self->sql_maker_class;
831     $self->ensure_class_loaded ($sql_maker_class);
832     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
833   }
834   return $self->_sql_maker;
835 }
836
837 sub _rebless {}
838
839 sub _populate_dbh {
840   my ($self) = @_;
841
842   my @info = @{$self->_dbi_connect_info || []};
843   $self->_dbh(undef); # in case ->connected failed we might get sent here
844   $self->_dbh($self->_connect(@info));
845
846   $self->_conn_pid($$);
847   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
848
849   $self->_determine_driver;
850
851   # Always set the transaction depth on connect, since
852   #  there is no transaction in progress by definition
853   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
854
855   $self->_run_connection_actions unless $self->{_in_determine_driver};
856 }
857
858 sub _run_connection_actions {
859   my $self = shift;
860   my @actions;
861
862   push @actions, ( $self->on_connect_call || () );
863   push @actions, $self->_parse_connect_do ('on_connect_do');
864
865   $self->_do_connection_actions(connect_call_ => $_) for @actions;
866 }
867
868 sub _determine_driver {
869   my ($self) = @_;
870
871   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
872     my $started_unconnected = 0;
873     local $self->{_in_determine_driver} = 1;
874
875     if (ref($self) eq __PACKAGE__) {
876       my $driver;
877       if ($self->_dbh) { # we are connected
878         $driver = $self->_dbh->{Driver}{Name};
879       } else {
880         # try to use dsn to not require being connected, the driver may still
881         # force a connection in _rebless to determine version
882         ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
883         $started_unconnected = 1;
884       }
885
886       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
887       if ($self->load_optional_class($storage_class)) {
888         mro::set_mro($storage_class, 'c3');
889         bless $self, $storage_class;
890         $self->_rebless();
891       }
892     }
893
894     $self->_driver_determined(1);
895
896     $self->_run_connection_actions
897         if $started_unconnected && defined $self->_dbh;
898   }
899 }
900
901 sub _do_connection_actions {
902   my $self          = shift;
903   my $method_prefix = shift;
904   my $call          = shift;
905
906   if (not ref($call)) {
907     my $method = $method_prefix . $call;
908     $self->$method(@_);
909   } elsif (ref($call) eq 'CODE') {
910     $self->$call(@_);
911   } elsif (ref($call) eq 'ARRAY') {
912     if (ref($call->[0]) ne 'ARRAY') {
913       $self->_do_connection_actions($method_prefix, $_) for @$call;
914     } else {
915       $self->_do_connection_actions($method_prefix, @$_) for @$call;
916     }
917   } else {
918     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
919   }
920
921   return $self;
922 }
923
924 sub connect_call_do_sql {
925   my $self = shift;
926   $self->_do_query(@_);
927 }
928
929 sub disconnect_call_do_sql {
930   my $self = shift;
931   $self->_do_query(@_);
932 }
933
934 # override in db-specific backend when necessary
935 sub connect_call_datetime_setup { 1 }
936
937 sub _do_query {
938   my ($self, $action) = @_;
939
940   if (ref $action eq 'CODE') {
941     $action = $action->($self);
942     $self->_do_query($_) foreach @$action;
943   }
944   else {
945     # Most debuggers expect ($sql, @bind), so we need to exclude
946     # the attribute hash which is the second argument to $dbh->do
947     # furthermore the bind values are usually to be presented
948     # as named arrayref pairs, so wrap those here too
949     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
950     my $sql = shift @do_args;
951     my $attrs = shift @do_args;
952     my @bind = map { [ undef, $_ ] } @do_args;
953
954     $self->_query_start($sql, @bind);
955     $self->_dbh->do($sql, $attrs, @do_args);
956     $self->_query_end($sql, @bind);
957   }
958
959   return $self;
960 }
961
962 sub _connect {
963   my ($self, @info) = @_;
964
965   $self->throw_exception("You failed to provide any connection info")
966     if !@info;
967
968   my ($old_connect_via, $dbh);
969
970   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
971     $old_connect_via = $DBI::connect_via;
972     $DBI::connect_via = 'connect';
973   }
974
975   eval {
976     if(ref $info[0] eq 'CODE') {
977        $dbh = &{$info[0]}
978     }
979     else {
980        $dbh = DBI->connect(@info);
981     }
982
983     if($dbh && !$self->unsafe) {
984       my $weak_self = $self;
985       Scalar::Util::weaken($weak_self);
986       $dbh->{HandleError} = sub {
987           if ($weak_self) {
988             $weak_self->throw_exception("DBI Exception: $_[0]");
989           }
990           else {
991             croak ("DBI Exception: $_[0]");
992           }
993       };
994       $dbh->{ShowErrorStatement} = 1;
995       $dbh->{RaiseError} = 1;
996       $dbh->{PrintError} = 0;
997     }
998   };
999
1000   $DBI::connect_via = $old_connect_via if $old_connect_via;
1001
1002   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1003     if !$dbh || $@;
1004
1005   $self->_dbh_autocommit($dbh->{AutoCommit});
1006
1007   $dbh;
1008 }
1009
1010 sub svp_begin {
1011   my ($self, $name) = @_;
1012
1013   $name = $self->_svp_generate_name
1014     unless defined $name;
1015
1016   $self->throw_exception ("You can't use savepoints outside a transaction")
1017     if $self->{transaction_depth} == 0;
1018
1019   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1020     unless $self->can('_svp_begin');
1021
1022   push @{ $self->{savepoints} }, $name;
1023
1024   $self->debugobj->svp_begin($name) if $self->debug;
1025
1026   return $self->_svp_begin($name);
1027 }
1028
1029 sub svp_release {
1030   my ($self, $name) = @_;
1031
1032   $self->throw_exception ("You can't use savepoints outside a transaction")
1033     if $self->{transaction_depth} == 0;
1034
1035   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1036     unless $self->can('_svp_release');
1037
1038   if (defined $name) {
1039     $self->throw_exception ("Savepoint '$name' does not exist")
1040       unless grep { $_ eq $name } @{ $self->{savepoints} };
1041
1042     # Dig through the stack until we find the one we are releasing.  This keeps
1043     # the stack up to date.
1044     my $svp;
1045
1046     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1047   } else {
1048     $name = pop @{ $self->{savepoints} };
1049   }
1050
1051   $self->debugobj->svp_release($name) if $self->debug;
1052
1053   return $self->_svp_release($name);
1054 }
1055
1056 sub svp_rollback {
1057   my ($self, $name) = @_;
1058
1059   $self->throw_exception ("You can't use savepoints outside a transaction")
1060     if $self->{transaction_depth} == 0;
1061
1062   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1063     unless $self->can('_svp_rollback');
1064
1065   if (defined $name) {
1066       # If they passed us a name, verify that it exists in the stack
1067       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1068           $self->throw_exception("Savepoint '$name' does not exist!");
1069       }
1070
1071       # Dig through the stack until we find the one we are releasing.  This keeps
1072       # the stack up to date.
1073       while(my $s = pop(@{ $self->{savepoints} })) {
1074           last if($s eq $name);
1075       }
1076       # Add the savepoint back to the stack, as a rollback doesn't remove the
1077       # named savepoint, only everything after it.
1078       push(@{ $self->{savepoints} }, $name);
1079   } else {
1080       # We'll assume they want to rollback to the last savepoint
1081       $name = $self->{savepoints}->[-1];
1082   }
1083
1084   $self->debugobj->svp_rollback($name) if $self->debug;
1085
1086   return $self->_svp_rollback($name);
1087 }
1088
1089 sub _svp_generate_name {
1090     my ($self) = @_;
1091
1092     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1093 }
1094
1095 sub txn_begin {
1096   my $self = shift;
1097   if($self->{transaction_depth} == 0) {
1098     $self->debugobj->txn_begin()
1099       if $self->debug;
1100     $self->_dbh_begin_work;
1101   }
1102   elsif ($self->auto_savepoint) {
1103     $self->svp_begin;
1104   }
1105   $self->{transaction_depth}++;
1106 }
1107
1108 sub _dbh_begin_work {
1109   my $self = shift;
1110
1111   # if the user is utilizing txn_do - good for him, otherwise we need to
1112   # ensure that the $dbh is healthy on BEGIN.
1113   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1114   # will be replaced by a failure of begin_work itself (which will be
1115   # then retried on reconnect)
1116   if ($self->{_in_dbh_do}) {
1117     $self->_dbh->begin_work;
1118   } else {
1119     $self->dbh_do(sub { $_[1]->begin_work });
1120   }
1121 }
1122
1123 sub txn_commit {
1124   my $self = shift;
1125   if ($self->{transaction_depth} == 1) {
1126     my $dbh = $self->_dbh;
1127     $self->debugobj->txn_commit()
1128       if ($self->debug);
1129     $self->_dbh_commit;
1130     $self->{transaction_depth} = 0
1131       if $self->_dbh_autocommit;
1132   }
1133   elsif($self->{transaction_depth} > 1) {
1134     $self->{transaction_depth}--;
1135     $self->svp_release
1136       if $self->auto_savepoint;
1137   }
1138 }
1139
1140 sub _dbh_commit {
1141   my $self = shift;
1142   $self->_dbh->commit;
1143 }
1144
1145 sub txn_rollback {
1146   my $self = shift;
1147   my $dbh = $self->_dbh;
1148   eval {
1149     if ($self->{transaction_depth} == 1) {
1150       $self->debugobj->txn_rollback()
1151         if ($self->debug);
1152       $self->{transaction_depth} = 0
1153         if $self->_dbh_autocommit;
1154       $self->_dbh_rollback;
1155     }
1156     elsif($self->{transaction_depth} > 1) {
1157       $self->{transaction_depth}--;
1158       if ($self->auto_savepoint) {
1159         $self->svp_rollback;
1160         $self->svp_release;
1161       }
1162     }
1163     else {
1164       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1165     }
1166   };
1167   if ($@) {
1168     my $error = $@;
1169     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1170     $error =~ /$exception_class/ and $self->throw_exception($error);
1171     # ensure that a failed rollback resets the transaction depth
1172     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1173     $self->throw_exception($error);
1174   }
1175 }
1176
1177 sub _dbh_rollback {
1178   my $self = shift;
1179   $self->_dbh->rollback;
1180 }
1181
1182 # This used to be the top-half of _execute.  It was split out to make it
1183 #  easier to override in NoBindVars without duping the rest.  It takes up
1184 #  all of _execute's args, and emits $sql, @bind.
1185 sub _prep_for_execute {
1186   my ($self, $op, $extra_bind, $ident, $args) = @_;
1187
1188   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1189     $ident = $ident->from();
1190   }
1191
1192   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1193
1194   unshift(@bind,
1195     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1196       if $extra_bind;
1197   return ($sql, \@bind);
1198 }
1199
1200
1201 sub _fix_bind_params {
1202     my ($self, @bind) = @_;
1203
1204     ### Turn @bind from something like this:
1205     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1206     ### to this:
1207     ###   ( "'1'", "'1'", "'3'" )
1208     return
1209         map {
1210             if ( defined( $_ && $_->[1] ) ) {
1211                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1212             }
1213             else { q{'NULL'}; }
1214         } @bind;
1215 }
1216
1217 sub _query_start {
1218     my ( $self, $sql, @bind ) = @_;
1219
1220     if ( $self->debug ) {
1221         @bind = $self->_fix_bind_params(@bind);
1222
1223         $self->debugobj->query_start( $sql, @bind );
1224     }
1225 }
1226
1227 sub _query_end {
1228     my ( $self, $sql, @bind ) = @_;
1229
1230     if ( $self->debug ) {
1231         @bind = $self->_fix_bind_params(@bind);
1232         $self->debugobj->query_end( $sql, @bind );
1233     }
1234 }
1235
1236 sub _dbh_execute {
1237   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1238
1239   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1240
1241   $self->_query_start( $sql, @$bind );
1242
1243   my $sth = $self->sth($sql,$op);
1244
1245   my $placeholder_index = 1;
1246
1247   foreach my $bound (@$bind) {
1248     my $attributes = {};
1249     my($column_name, @data) = @$bound;
1250
1251     if ($bind_attributes) {
1252       $attributes = $bind_attributes->{$column_name}
1253       if defined $bind_attributes->{$column_name};
1254     }
1255
1256     foreach my $data (@data) {
1257       my $ref = ref $data;
1258       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1259
1260       $sth->bind_param($placeholder_index, $data, $attributes);
1261       $placeholder_index++;
1262     }
1263   }
1264
1265   # Can this fail without throwing an exception anyways???
1266   my $rv = $sth->execute();
1267   $self->throw_exception($sth->errstr) if !$rv;
1268
1269   $self->_query_end( $sql, @$bind );
1270
1271   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1272 }
1273
1274 sub _execute {
1275     my $self = shift;
1276     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1277 }
1278
1279 sub insert {
1280   my ($self, $source, $to_insert) = @_;
1281
1282 # redispatch to insert method of storage we reblessed into, if necessary
1283   if (not $self->_driver_determined) {
1284     $self->_determine_driver;
1285     goto $self->can('insert');
1286   }
1287
1288   my $ident = $source->from;
1289   my $bind_attributes = $self->source_bind_attributes($source);
1290
1291   my $updated_cols = {};
1292
1293   foreach my $col ( $source->columns ) {
1294     if ( !defined $to_insert->{$col} ) {
1295       my $col_info = $source->column_info($col);
1296
1297       if ( $col_info->{auto_nextval} ) {
1298         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1299           'nextval',
1300           $col_info->{sequence} ||
1301             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1302         );
1303       }
1304     }
1305   }
1306
1307   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1308
1309   return $updated_cols;
1310 }
1311
1312 ## Still not quite perfect, and EXPERIMENTAL
1313 ## Currently it is assumed that all values passed will be "normal", i.e. not
1314 ## scalar refs, or at least, all the same type as the first set, the statement is
1315 ## only prepped once.
1316 sub insert_bulk {
1317   my ($self, $source, $cols, $data) = @_;
1318   my %colvalues;
1319   my $table = $source->from;
1320   @colvalues{@$cols} = (0..$#$cols);
1321   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1322
1323   $self->_determine_driver;
1324
1325   $self->_query_start( $sql, @bind );
1326   my $sth = $self->sth($sql);
1327
1328 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1329
1330   ## This must be an arrayref, else nothing works!
1331   my $tuple_status = [];
1332
1333   ## Get the bind_attributes, if any exist
1334   my $bind_attributes = $self->source_bind_attributes($source);
1335
1336   ## Bind the values and execute
1337   my $placeholder_index = 1;
1338
1339   foreach my $bound (@bind) {
1340
1341     my $attributes = {};
1342     my ($column_name, $data_index) = @$bound;
1343
1344     if( $bind_attributes ) {
1345       $attributes = $bind_attributes->{$column_name}
1346       if defined $bind_attributes->{$column_name};
1347     }
1348
1349     my @data = map { $_->[$data_index] } @$data;
1350
1351     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1352     $placeholder_index++;
1353   }
1354   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1355   if (my $err = $@) {
1356     my $i = 0;
1357     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1358
1359     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1360       if ($i > $#$tuple_status);
1361
1362     require Data::Dumper;
1363     local $Data::Dumper::Terse = 1;
1364     local $Data::Dumper::Indent = 1;
1365     local $Data::Dumper::Useqq = 1;
1366     local $Data::Dumper::Quotekeys = 0;
1367
1368     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1369       $tuple_status->[$i][1],
1370       Data::Dumper::Dumper(
1371         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1372       ),
1373     );
1374   }
1375   $self->throw_exception($sth->errstr) if !$rv;
1376
1377   $self->_query_end( $sql, @bind );
1378   return (wantarray ? ($rv, $sth, @bind) : $rv);
1379 }
1380
1381 sub update {
1382   my ($self, $source, @args) = @_; 
1383
1384 # redispatch to update method of storage we reblessed into, if necessary
1385   if (not $self->_driver_determined) {
1386     $self->_determine_driver;
1387     goto $self->can('update');
1388   }
1389
1390   my $bind_attributes = $self->source_bind_attributes($source);
1391
1392   return $self->_execute('update' => [], $source, $bind_attributes, @args);
1393 }
1394
1395
1396 sub delete {
1397   my $self = shift @_;
1398   my $source = shift @_;
1399   $self->_determine_driver;
1400   my $bind_attrs = $self->source_bind_attributes($source);
1401
1402   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1403 }
1404
1405 # We were sent here because the $rs contains a complex search
1406 # which will require a subquery to select the correct rows
1407 # (i.e. joined or limited resultsets)
1408 #
1409 # Genarating a single PK column subquery is trivial and supported
1410 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1411 # Look at _multipk_update_delete()
1412 sub _subq_update_delete {
1413   my $self = shift;
1414   my ($rs, $op, $values) = @_;
1415
1416   my $rsrc = $rs->result_source;
1417
1418   # we already check this, but double check naively just in case. Should be removed soon
1419   my $sel = $rs->_resolved_attrs->{select};
1420   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1421   my @pcols = $rsrc->primary_columns;
1422   if (@$sel != @pcols) {
1423     $self->throw_exception (
1424       'Subquery update/delete can not be called on resultsets selecting a'
1425      .' number of columns different than the number of primary keys'
1426     );
1427   }
1428
1429   if (@pcols == 1) {
1430     return $self->$op (
1431       $rsrc,
1432       $op eq 'update' ? $values : (),
1433       { $pcols[0] => { -in => $rs->as_query } },
1434     );
1435   }
1436
1437   else {
1438     return $self->_multipk_update_delete (@_);
1439   }
1440 }
1441
1442 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1443 # resultset update/delete involving subqueries. So by default resort
1444 # to simple (and inefficient) delete_all style per-row opearations,
1445 # while allowing specific storages to override this with a faster
1446 # implementation.
1447 #
1448 sub _multipk_update_delete {
1449   return shift->_per_row_update_delete (@_);
1450 }
1451
1452 # This is the default loop used to delete/update rows for multi PK
1453 # resultsets, and used by mysql exclusively (because it can't do anything
1454 # else).
1455 #
1456 # We do not use $row->$op style queries, because resultset update/delete
1457 # is not expected to cascade (this is what delete_all/update_all is for).
1458 #
1459 # There should be no race conditions as the entire operation is rolled
1460 # in a transaction.
1461 #
1462 sub _per_row_update_delete {
1463   my $self = shift;
1464   my ($rs, $op, $values) = @_;
1465
1466   my $rsrc = $rs->result_source;
1467   my @pcols = $rsrc->primary_columns;
1468
1469   my $guard = $self->txn_scope_guard;
1470
1471   # emulate the return value of $sth->execute for non-selects
1472   my $row_cnt = '0E0';
1473
1474   my $subrs_cur = $rs->cursor;
1475   while (my @pks = $subrs_cur->next) {
1476
1477     my $cond;
1478     for my $i (0.. $#pcols) {
1479       $cond->{$pcols[$i]} = $pks[$i];
1480     }
1481
1482     $self->$op (
1483       $rsrc,
1484       $op eq 'update' ? $values : (),
1485       $cond,
1486     );
1487
1488     $row_cnt++;
1489   }
1490
1491   $guard->commit;
1492
1493   return $row_cnt;
1494 }
1495
1496 sub _select {
1497   my $self = shift;
1498
1499   # localization is neccessary as
1500   # 1) there is no infrastructure to pass this around before SQLA2
1501   # 2) _select_args sets it and _prep_for_execute consumes it
1502   my $sql_maker = $self->sql_maker;
1503   local $sql_maker->{_dbic_rs_attrs};
1504
1505   return $self->_execute($self->_select_args(@_));
1506 }
1507
1508 sub _select_args_to_query {
1509   my $self = shift;
1510
1511   # localization is neccessary as
1512   # 1) there is no infrastructure to pass this around before SQLA2
1513   # 2) _select_args sets it and _prep_for_execute consumes it
1514   my $sql_maker = $self->sql_maker;
1515   local $sql_maker->{_dbic_rs_attrs};
1516
1517   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1518   #  = $self->_select_args($ident, $select, $cond, $attrs);
1519   my ($op, $bind, $ident, $bind_attrs, @args) =
1520     $self->_select_args(@_);
1521
1522   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1523   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1524   $prepared_bind ||= [];
1525
1526   return wantarray
1527     ? ($sql, $prepared_bind, $bind_attrs)
1528     : \[ "($sql)", @$prepared_bind ]
1529   ;
1530 }
1531
1532 sub _select_args {
1533   my ($self, $ident, $select, $where, $attrs) = @_;
1534
1535   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1536
1537   my $sql_maker = $self->sql_maker;
1538   $sql_maker->{_dbic_rs_attrs} = {
1539     %$attrs,
1540     select => $select,
1541     from => $ident,
1542     where => $where,
1543     $rs_alias
1544       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1545       : ()
1546     ,
1547   };
1548
1549   # calculate bind_attrs before possible $ident mangling
1550   my $bind_attrs = {};
1551   for my $alias (keys %$alias2source) {
1552     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1553     for my $col (keys %$bindtypes) {
1554
1555       my $fqcn = join ('.', $alias, $col);
1556       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1557
1558       # Unqialified column names are nice, but at the same time can be
1559       # rather ambiguous. What we do here is basically go along with
1560       # the loop, adding an unqualified column slot to $bind_attrs,
1561       # alongside the fully qualified name. As soon as we encounter
1562       # another column by that name (which would imply another table)
1563       # we unset the unqualified slot and never add any info to it
1564       # to avoid erroneous type binding. If this happens the users
1565       # only choice will be to fully qualify his column name
1566
1567       if (exists $bind_attrs->{$col}) {
1568         $bind_attrs->{$col} = {};
1569       }
1570       else {
1571         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1572       }
1573     }
1574   }
1575
1576   # adjust limits
1577   if (
1578     $attrs->{software_limit}
1579       ||
1580     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1581   ) {
1582     $attrs->{software_limit} = 1;
1583   }
1584   else {
1585     $self->throw_exception("rows attribute must be positive if present")
1586       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1587
1588     # MySQL actually recommends this approach.  I cringe.
1589     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1590   }
1591
1592   my @limit;
1593
1594   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1595   # otherwise delegate the limiting to the storage, unless software limit was requested
1596   if (
1597     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1598        ||
1599     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1600       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1601   ) {
1602     ($ident, $select, $where, $attrs)
1603       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1604   }
1605   elsif (! $attrs->{software_limit} ) {
1606     push @limit, $attrs->{rows}, $attrs->{offset};
1607   }
1608
1609 ###
1610   # This would be the point to deflate anything found in $where
1611   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1612   # expect a row object. And all we have is a resultsource (it is trivial
1613   # to extract deflator coderefs via $alias2source above).
1614   #
1615   # I don't see a way forward other than changing the way deflators are
1616   # invoked, and that's just bad...
1617 ###
1618
1619   my $order = { map
1620     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1621     (qw/order_by group_by having/ )
1622   };
1623
1624   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1625 }
1626
1627 #
1628 # This is the code producing joined subqueries like:
1629 # SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
1630 #
1631 sub _adjust_select_args_for_complex_prefetch {
1632   my ($self, $from, $select, $where, $attrs) = @_;
1633
1634   $self->throw_exception ('Nothing to prefetch... how did we get here?!')
1635     if not @{$attrs->{_prefetch_select}};
1636
1637   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
1638     if (ref $from ne 'ARRAY' || ref $from->[0] ne 'HASH' || ref $from->[1] ne 'ARRAY');
1639
1640
1641   # generate inner/outer attribute lists, remove stuff that doesn't apply
1642   my $outer_attrs = { %$attrs };
1643   delete $outer_attrs->{$_} for qw/where bind rows offset group_by having/;
1644
1645   my $inner_attrs = { %$attrs };
1646   delete $inner_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
1647
1648
1649   # bring over all non-collapse-induced order_by into the inner query (if any)
1650   # the outer one will have to keep them all
1651   delete $inner_attrs->{order_by};
1652   if (my $ord_cnt = @{$outer_attrs->{order_by}} - @{$outer_attrs->{_collapse_order_by}} ) {
1653     $inner_attrs->{order_by} = [
1654       @{$outer_attrs->{order_by}}[ 0 .. $ord_cnt - 1]
1655     ];
1656   }
1657
1658
1659   # generate the inner/outer select lists
1660   # for inside we consider only stuff *not* brought in by the prefetch
1661   # on the outside we substitute any function for its alias
1662   my $outer_select = [ @$select ];
1663   my $inner_select = [];
1664   for my $i (0 .. ( @$outer_select - @{$outer_attrs->{_prefetch_select}} - 1) ) {
1665     my $sel = $outer_select->[$i];
1666
1667     if (ref $sel eq 'HASH' ) {
1668       $sel->{-as} ||= $attrs->{as}[$i];
1669       $outer_select->[$i] = join ('.', $attrs->{alias}, ($sel->{-as} || "inner_column_$i") );
1670     }
1671
1672     push @$inner_select, $sel;
1673   }
1674
1675   # normalize a copy of $from, so it will be easier to work with further
1676   # down (i.e. promote the initial hashref to an AoH)
1677   $from = [ @$from ];
1678   $from->[0] = [ $from->[0] ];
1679   my %original_join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1680
1681
1682   # decide which parts of the join will remain in either part of
1683   # the outer/inner query
1684
1685   # First we compose a list of which aliases are used in restrictions
1686   # (i.e. conditions/order/grouping/etc). Since we do not have
1687   # introspectable SQLA, we fall back to ugly scanning of raw SQL for
1688   # WHERE, and for pieces of ORDER BY in order to determine which aliases
1689   # need to appear in the resulting sql.
1690   # It may not be very efficient, but it's a reasonable stop-gap
1691   # Also unqualified column names will not be considered, but more often
1692   # than not this is actually ok
1693   #
1694   # In the same loop we enumerate part of the selection aliases, as
1695   # it requires the same sqla hack for the time being
1696   my ($restrict_aliases, $select_aliases, $prefetch_aliases);
1697   {
1698     # produce stuff unquoted, so it can be scanned
1699     my $sql_maker = $self->sql_maker;
1700     local $sql_maker->{quote_char};
1701     my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1702     $sep = "\Q$sep\E";
1703
1704     my $non_prefetch_select_sql = $sql_maker->_recurse_fields ($inner_select);
1705     my $prefetch_select_sql = $sql_maker->_recurse_fields ($outer_attrs->{_prefetch_select});
1706     my $where_sql = $sql_maker->where ($where);
1707     my $group_by_sql = $sql_maker->_order_by({
1708       map { $_ => $inner_attrs->{$_} } qw/group_by having/
1709     });
1710     my @non_prefetch_order_by_chunks = (map
1711       { ref $_ ? $_->[0] : $_ }
1712       $sql_maker->_order_by_chunks ($inner_attrs->{order_by})
1713     );
1714
1715
1716     for my $alias (keys %original_join_info) {
1717       my $seen_re = qr/\b $alias $sep/x;
1718
1719       for my $piece ($where_sql, $group_by_sql, @non_prefetch_order_by_chunks ) {
1720         if ($piece =~ $seen_re) {
1721           $restrict_aliases->{$alias} = 1;
1722         }
1723       }
1724
1725       if ($non_prefetch_select_sql =~ $seen_re) {
1726           $select_aliases->{$alias} = 1;
1727       }
1728
1729       if ($prefetch_select_sql =~ $seen_re) {
1730           $prefetch_aliases->{$alias} = 1;
1731       }
1732
1733     }
1734   }
1735
1736   # Add any non-left joins to the restriction list (such joins are indeed restrictions)
1737   for my $j (values %original_join_info) {
1738     my $alias = $j->{-alias} or next;
1739     $restrict_aliases->{$alias} = 1 if (
1740       (not $j->{-join_type})
1741         or
1742       ($j->{-join_type} !~ /^left (?: \s+ outer)? $/xi)
1743     );
1744   }
1745
1746   # mark all join parents as mentioned
1747   # (e.g.  join => { cds => 'tracks' } - tracks will need to bring cds too )
1748   for my $collection ($restrict_aliases, $select_aliases) {
1749     for my $alias (keys %$collection) {
1750       $collection->{$_} = 1
1751         for (@{ $original_join_info{$alias}{-join_path} || [] });
1752     }
1753   }
1754
1755   # construct the inner $from for the subquery
1756   my %inner_joins = (map { %{$_ || {}} } ($restrict_aliases, $select_aliases) );
1757   my @inner_from;
1758   for my $j (@$from) {
1759     push @inner_from, $j if $inner_joins{$j->[0]{-alias}};
1760   }
1761
1762   # if a multi-type join was needed in the subquery ("multi" is indicated by
1763   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1764   unless ($inner_attrs->{group_by}) {
1765     for my $alias (keys %inner_joins) {
1766
1767       # the dot comes from some weirdness in collapse
1768       # remove after the rewrite
1769       if ($attrs->{collapse}{".$alias"}) {
1770         $inner_attrs->{group_by} ||= $inner_select;
1771         last;
1772       }
1773     }
1774   }
1775
1776   # demote the inner_from head
1777   $inner_from[0] = $inner_from[0][0];
1778
1779   # generate the subquery
1780   my $subq = $self->_select_args_to_query (
1781     \@inner_from,
1782     $inner_select,
1783     $where,
1784     $inner_attrs,
1785   );
1786
1787   my $subq_joinspec = {
1788     -alias => $attrs->{alias},
1789     -source_handle => $inner_from[0]{-source_handle},
1790     $attrs->{alias} => $subq,
1791   };
1792
1793   # Generate the outer from - this is relatively easy (really just replace
1794   # the join slot with the subquery), with a major caveat - we can not
1795   # join anything that is non-selecting (not part of the prefetch), but at
1796   # the same time is a multi-type relationship, as it will explode the result.
1797   #
1798   # There are two possibilities here
1799   # - either the join is non-restricting, in which case we simply throw it away
1800   # - it is part of the restrictions, in which case we need to collapse the outer
1801   #   result by tackling yet another group_by to the outside of the query
1802
1803   # so first generate the outer_from, up to the substitution point
1804   my @outer_from;
1805   while (my $j = shift @$from) {
1806     if ($j->[0]{-alias} eq $attrs->{alias}) { # time to swap
1807       push @outer_from, [
1808         $subq_joinspec,
1809         @{$j}[1 .. $#$j],
1810       ];
1811       last; # we'll take care of what's left in $from below
1812     }
1813     else {
1814       push @outer_from, $j;
1815     }
1816   }
1817
1818   # see what's left - throw away if not selecting/restricting
1819   # also throw in a group_by if restricting to guard against
1820   # cross-join explosions
1821   #
1822   while (my $j = shift @$from) {
1823     my $alias = $j->[0]{-alias};
1824
1825     if ($select_aliases->{$alias} || $prefetch_aliases->{$alias}) {
1826       push @outer_from, $j;
1827     }
1828     elsif ($restrict_aliases->{$alias}) {
1829       push @outer_from, $j;
1830
1831       # FIXME - this should be obviated by SQLA2, as I'll be able to 
1832       # have restrict_inner and restrict_outer... or something to that
1833       # effect... I think...
1834
1835       # FIXME2 - I can't find a clean way to determine if a particular join
1836       # is a multi - instead I am just treating everything as a potential
1837       # explosive join (ribasushi)
1838       #
1839       # if (my $handle = $j->[0]{-source_handle}) {
1840       #   my $rsrc = $handle->resolve;
1841       #   ... need to bail out of the following if this is not a multi,
1842       #       as it will be much easier on the db ...
1843
1844           $outer_attrs->{group_by} ||= $outer_select;
1845       # }
1846     }
1847   }
1848
1849   # demote the outer_from head
1850   $outer_from[0] = $outer_from[0][0];
1851
1852   # This is totally horrific - the $where ends up in both the inner and outer query
1853   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
1854   # then if where conditions apply to the *right* side of the prefetch, you may have
1855   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
1856   # the outer select to exclude joins you didin't want in the first place
1857   #
1858   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
1859   return (\@outer_from, $outer_select, $where, $outer_attrs);
1860 }
1861
1862 sub _resolve_ident_sources {
1863   my ($self, $ident) = @_;
1864
1865   my $alias2source = {};
1866   my $rs_alias;
1867
1868   # the reason this is so contrived is that $ident may be a {from}
1869   # structure, specifying multiple tables to join
1870   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1871     # this is compat mode for insert/update/delete which do not deal with aliases
1872     $alias2source->{me} = $ident;
1873     $rs_alias = 'me';
1874   }
1875   elsif (ref $ident eq 'ARRAY') {
1876
1877     for (@$ident) {
1878       my $tabinfo;
1879       if (ref $_ eq 'HASH') {
1880         $tabinfo = $_;
1881         $rs_alias = $tabinfo->{-alias};
1882       }
1883       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1884         $tabinfo = $_->[0];
1885       }
1886
1887       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
1888         if ($tabinfo->{-source_handle});
1889     }
1890   }
1891
1892   return ($alias2source, $rs_alias);
1893 }
1894
1895 # Takes $ident, \@column_names
1896 #
1897 # returns { $column_name => \%column_info, ... }
1898 # also note: this adds -result_source => $rsrc to the column info
1899 #
1900 # usage:
1901 #   my $col_sources = $self->_resolve_column_info($ident, @column_names);
1902 sub _resolve_column_info {
1903   my ($self, $ident, $colnames) = @_;
1904   my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
1905
1906   my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1907   $sep = "\Q$sep\E";
1908
1909   my (%return, %seen_cols);
1910
1911   # compile a global list of column names, to be able to properly
1912   # disambiguate unqualified column names (if at all possible)
1913   for my $alias (keys %$alias2src) {
1914     my $rsrc = $alias2src->{$alias};
1915     for my $colname ($rsrc->columns) {
1916       push @{$seen_cols{$colname}}, $alias;
1917     }
1918   }
1919
1920   COLUMN:
1921   foreach my $col (@$colnames) {
1922     my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
1923
1924     unless ($alias) {
1925       # see if the column was seen exactly once (so we know which rsrc it came from)
1926       if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
1927         $alias = $seen_cols{$colname}[0];
1928       }
1929       else {
1930         next COLUMN;
1931       }
1932     }
1933
1934     my $rsrc = $alias2src->{$alias};
1935     $return{$col} = $rsrc && {
1936       %{$rsrc->column_info($colname)},
1937       -result_source => $rsrc,
1938       -source_alias => $alias,
1939     };
1940   }
1941
1942   return \%return;
1943 }
1944
1945 # Returns a counting SELECT for a simple count
1946 # query. Abstracted so that a storage could override
1947 # this to { count => 'firstcol' } or whatever makes
1948 # sense as a performance optimization
1949 sub _count_select {
1950   #my ($self, $source, $rs_attrs) = @_;
1951   return { count => '*' };
1952 }
1953
1954 # Returns a SELECT which will end up in the subselect
1955 # There may or may not be a group_by, as the subquery
1956 # might have been called to accomodate a limit
1957 #
1958 # Most databases would be happy with whatever ends up
1959 # here, but some choke in various ways.
1960 #
1961 sub _subq_count_select {
1962   my ($self, $source, $rs_attrs) = @_;
1963   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1964
1965   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1966   return @pcols ? \@pcols : [ 1 ];
1967 }
1968
1969 #
1970 # Returns an ordered list of column names before they are used
1971 # in a SELECT statement. By default simply returns the list
1972 # passed in.
1973 #
1974 # This may be overridden in a specific storage when there are
1975 # requirements such as moving BLOB columns to the end of the 
1976 # SELECT list.
1977 sub _order_select_columns {
1978   #my ($self, $source, $columns) = @_;
1979   return @{$_[2]};
1980 }
1981
1982 sub source_bind_attributes {
1983   my ($self, $source) = @_;
1984
1985   my $bind_attributes;
1986   foreach my $column ($source->columns) {
1987
1988     my $data_type = $source->column_info($column)->{data_type} || '';
1989     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1990      if $data_type;
1991   }
1992
1993   return $bind_attributes;
1994 }
1995
1996 =head2 select
1997
1998 =over 4
1999
2000 =item Arguments: $ident, $select, $condition, $attrs
2001
2002 =back
2003
2004 Handle a SQL select statement.
2005
2006 =cut
2007
2008 sub select {
2009   my $self = shift;
2010   my ($ident, $select, $condition, $attrs) = @_;
2011   return $self->cursor_class->new($self, \@_, $attrs);
2012 }
2013
2014 sub select_single {
2015   my $self = shift;
2016   my ($rv, $sth, @bind) = $self->_select(@_);
2017   my @row = $sth->fetchrow_array;
2018   my @nextrow = $sth->fetchrow_array if @row;
2019   if(@row && @nextrow) {
2020     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2021   }
2022   # Need to call finish() to work round broken DBDs
2023   $sth->finish();
2024   return @row;
2025 }
2026
2027 =head2 sth
2028
2029 =over 4
2030
2031 =item Arguments: $sql
2032
2033 =back
2034
2035 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2036
2037 =cut
2038
2039 sub _dbh_sth {
2040   my ($self, $dbh, $sql) = @_;
2041
2042   # 3 is the if_active parameter which avoids active sth re-use
2043   my $sth = $self->disable_sth_caching
2044     ? $dbh->prepare($sql)
2045     : $dbh->prepare_cached($sql, {}, 3);
2046
2047   # XXX You would think RaiseError would make this impossible,
2048   #  but apparently that's not true :(
2049   $self->throw_exception($dbh->errstr) if !$sth;
2050
2051   $sth;
2052 }
2053
2054 sub sth {
2055   my ($self, $sql) = @_;
2056   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2057 }
2058
2059 sub _dbh_columns_info_for {
2060   my ($self, $dbh, $table) = @_;
2061
2062   if ($dbh->can('column_info')) {
2063     my %result;
2064     eval {
2065       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2066       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2067       $sth->execute();
2068       while ( my $info = $sth->fetchrow_hashref() ){
2069         my %column_info;
2070         $column_info{data_type}   = $info->{TYPE_NAME};
2071         $column_info{size}      = $info->{COLUMN_SIZE};
2072         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2073         $column_info{default_value} = $info->{COLUMN_DEF};
2074         my $col_name = $info->{COLUMN_NAME};
2075         $col_name =~ s/^\"(.*)\"$/$1/;
2076
2077         $result{$col_name} = \%column_info;
2078       }
2079     };
2080     return \%result if !$@ && scalar keys %result;
2081   }
2082
2083   my %result;
2084   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2085   $sth->execute;
2086   my @columns = @{$sth->{NAME_lc}};
2087   for my $i ( 0 .. $#columns ){
2088     my %column_info;
2089     $column_info{data_type} = $sth->{TYPE}->[$i];
2090     $column_info{size} = $sth->{PRECISION}->[$i];
2091     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2092
2093     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2094       $column_info{data_type} = $1;
2095       $column_info{size}    = $2;
2096     }
2097
2098     $result{$columns[$i]} = \%column_info;
2099   }
2100   $sth->finish;
2101
2102   foreach my $col (keys %result) {
2103     my $colinfo = $result{$col};
2104     my $type_num = $colinfo->{data_type};
2105     my $type_name;
2106     if(defined $type_num && $dbh->can('type_info')) {
2107       my $type_info = $dbh->type_info($type_num);
2108       $type_name = $type_info->{TYPE_NAME} if $type_info;
2109       $colinfo->{data_type} = $type_name if $type_name;
2110     }
2111   }
2112
2113   return \%result;
2114 }
2115
2116 sub columns_info_for {
2117   my ($self, $table) = @_;
2118   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2119 }
2120
2121 =head2 last_insert_id
2122
2123 Return the row id of the last insert.
2124
2125 =cut
2126
2127 sub _dbh_last_insert_id {
2128     # All Storage's need to register their own _dbh_last_insert_id
2129     # the old SQLite-based method was highly inappropriate
2130
2131     my $self = shift;
2132     my $class = ref $self;
2133     $self->throw_exception (<<EOE);
2134
2135 No _dbh_last_insert_id() method found in $class.
2136 Since the method of obtaining the autoincrement id of the last insert
2137 operation varies greatly between different databases, this method must be
2138 individually implemented for every storage class.
2139 EOE
2140 }
2141
2142 sub last_insert_id {
2143   my $self = shift;
2144   $self->_dbh_last_insert_id ($self->_dbh, @_);
2145 }
2146
2147 =head2 _native_data_type
2148
2149 =over 4
2150
2151 =item Arguments: $type_name
2152
2153 =back
2154
2155 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2156 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2157 L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
2158
2159 The default implementation returns C<undef>, implement in your Storage driver if
2160 you need this functionality.
2161
2162 Should map types from other databases to the native RDBMS type, for example
2163 C<VARCHAR2> to C<VARCHAR>.
2164
2165 Types with modifiers should map to the underlying data type. For example,
2166 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2167
2168 Composite types should map to the container type, for example
2169 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2170
2171 =cut
2172
2173 sub _native_data_type {
2174   #my ($self, $data_type) = @_;
2175   return undef
2176 }
2177
2178 # Check if placeholders are supported at all
2179 sub _placeholders_supported {
2180   my $self = shift;
2181   my $dbh  = $self->_get_dbh;
2182
2183   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2184   # but it is inaccurate more often than not
2185   eval {
2186     local $dbh->{PrintError} = 0;
2187     local $dbh->{RaiseError} = 1;
2188     $dbh->do('select ?', {}, 1);
2189   };
2190   return $@ ? 0 : 1;
2191 }
2192
2193 # Check if placeholders bound to non-string types throw exceptions
2194 #
2195 sub _typeless_placeholders_supported {
2196   my $self = shift;
2197   my $dbh  = $self->_get_dbh;
2198
2199   eval {
2200     local $dbh->{PrintError} = 0;
2201     local $dbh->{RaiseError} = 1;
2202     # this specifically tests a bind that is NOT a string
2203     $dbh->do('select 1 where 1 = ?', {}, 1);
2204   };
2205   return $@ ? 0 : 1;
2206 }
2207
2208 =head2 sqlt_type
2209
2210 Returns the database driver name.
2211
2212 =cut
2213
2214 sub sqlt_type {
2215   my ($self) = @_;
2216
2217   if (not $self->_driver_determined) {
2218     $self->_determine_driver;
2219     goto $self->can ('sqlt_type');
2220   }
2221
2222   $self->_get_dbh->{Driver}->{Name};
2223 }
2224
2225 =head2 bind_attribute_by_data_type
2226
2227 Given a datatype from column info, returns a database specific bind
2228 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2229 let the database planner just handle it.
2230
2231 Generally only needed for special case column types, like bytea in postgres.
2232
2233 =cut
2234
2235 sub bind_attribute_by_data_type {
2236     return;
2237 }
2238
2239 =head2 is_datatype_numeric
2240
2241 Given a datatype from column_info, returns a boolean value indicating if
2242 the current RDBMS considers it a numeric value. This controls how
2243 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2244 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2245 be performed instead of the usual C<eq>.
2246
2247 =cut
2248
2249 sub is_datatype_numeric {
2250   my ($self, $dt) = @_;
2251
2252   return 0 unless $dt;
2253
2254   return $dt =~ /^ (?:
2255     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2256   ) $/ix;
2257 }
2258
2259
2260 =head2 create_ddl_dir (EXPERIMENTAL)
2261
2262 =over 4
2263
2264 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2265
2266 =back
2267
2268 Creates a SQL file based on the Schema, for each of the specified
2269 database engines in C<\@databases> in the given directory.
2270 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2271
2272 Given a previous version number, this will also create a file containing
2273 the ALTER TABLE statements to transform the previous schema into the
2274 current one. Note that these statements may contain C<DROP TABLE> or
2275 C<DROP COLUMN> statements that can potentially destroy data.
2276
2277 The file names are created using the C<ddl_filename> method below, please
2278 override this method in your schema if you would like a different file
2279 name format. For the ALTER file, the same format is used, replacing
2280 $version in the name with "$preversion-$version".
2281
2282 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2283 The most common value for this would be C<< { add_drop_table => 1 } >>
2284 to have the SQL produced include a C<DROP TABLE> statement for each table
2285 created. For quoting purposes supply C<quote_table_names> and
2286 C<quote_field_names>.
2287
2288 If no arguments are passed, then the following default values are assumed:
2289
2290 =over 4
2291
2292 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2293
2294 =item version    - $schema->schema_version
2295
2296 =item directory  - './'
2297
2298 =item preversion - <none>
2299
2300 =back
2301
2302 By default, C<\%sqlt_args> will have
2303
2304  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2305
2306 merged with the hash passed in. To disable any of those features, pass in a
2307 hashref like the following
2308
2309  { ignore_constraint_names => 0, # ... other options }
2310
2311
2312 Note that this feature is currently EXPERIMENTAL and may not work correctly
2313 across all databases, or fully handle complex relationships.
2314
2315 WARNING: Please check all SQL files created, before applying them.
2316
2317 =cut
2318
2319 sub create_ddl_dir {
2320   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2321
2322   if(!$dir || !-d $dir) {
2323     carp "No directory given, using ./\n";
2324     $dir = "./";
2325   }
2326   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2327   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2328
2329   my $schema_version = $schema->schema_version || '1.x';
2330   $version ||= $schema_version;
2331
2332   $sqltargs = {
2333     add_drop_table => 1,
2334     ignore_constraint_names => 1,
2335     ignore_index_names => 1,
2336     %{$sqltargs || {}}
2337   };
2338
2339   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
2340       . $self->_check_sqlt_message . q{'})
2341           if !$self->_check_sqlt_version;
2342
2343   my $sqlt = SQL::Translator->new( $sqltargs );
2344
2345   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2346   my $sqlt_schema = $sqlt->translate({ data => $schema })
2347     or $self->throw_exception ($sqlt->error);
2348
2349   foreach my $db (@$databases) {
2350     $sqlt->reset();
2351     $sqlt->{schema} = $sqlt_schema;
2352     $sqlt->producer($db);
2353
2354     my $file;
2355     my $filename = $schema->ddl_filename($db, $version, $dir);
2356     if (-e $filename && ($version eq $schema_version )) {
2357       # if we are dumping the current version, overwrite the DDL
2358       carp "Overwriting existing DDL file - $filename";
2359       unlink($filename);
2360     }
2361
2362     my $output = $sqlt->translate;
2363     if(!$output) {
2364       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2365       next;
2366     }
2367     if(!open($file, ">$filename")) {
2368       $self->throw_exception("Can't open $filename for writing ($!)");
2369       next;
2370     }
2371     print $file $output;
2372     close($file);
2373
2374     next unless ($preversion);
2375
2376     require SQL::Translator::Diff;
2377
2378     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2379     if(!-e $prefilename) {
2380       carp("No previous schema file found ($prefilename)");
2381       next;
2382     }
2383
2384     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2385     if(-e $difffile) {
2386       carp("Overwriting existing diff file - $difffile");
2387       unlink($difffile);
2388     }
2389
2390     my $source_schema;
2391     {
2392       my $t = SQL::Translator->new($sqltargs);
2393       $t->debug( 0 );
2394       $t->trace( 0 );
2395
2396       $t->parser( $db )
2397         or $self->throw_exception ($t->error);
2398
2399       my $out = $t->translate( $prefilename )
2400         or $self->throw_exception ($t->error);
2401
2402       $source_schema = $t->schema;
2403
2404       $source_schema->name( $prefilename )
2405         unless ( $source_schema->name );
2406     }
2407
2408     # The "new" style of producers have sane normalization and can support
2409     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2410     # And we have to diff parsed SQL against parsed SQL.
2411     my $dest_schema = $sqlt_schema;
2412
2413     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2414       my $t = SQL::Translator->new($sqltargs);
2415       $t->debug( 0 );
2416       $t->trace( 0 );
2417
2418       $t->parser( $db )
2419         or $self->throw_exception ($t->error);
2420
2421       my $out = $t->translate( $filename )
2422         or $self->throw_exception ($t->error);
2423
2424       $dest_schema = $t->schema;
2425
2426       $dest_schema->name( $filename )
2427         unless $dest_schema->name;
2428     }
2429
2430     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2431                                                   $dest_schema,   $db,
2432                                                   $sqltargs
2433                                                  );
2434     if(!open $file, ">$difffile") {
2435       $self->throw_exception("Can't write to $difffile ($!)");
2436       next;
2437     }
2438     print $file $diff;
2439     close($file);
2440   }
2441 }
2442
2443 =head2 deployment_statements
2444
2445 =over 4
2446
2447 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2448
2449 =back
2450
2451 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2452
2453 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2454 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2455
2456 C<$directory> is used to return statements from files in a previously created
2457 L</create_ddl_dir> directory and is optional. The filenames are constructed
2458 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2459
2460 If no C<$directory> is specified then the statements are constructed on the
2461 fly using L<SQL::Translator> and C<$version> is ignored.
2462
2463 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2464
2465 =cut
2466
2467 sub deployment_statements {
2468   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2469   $type ||= $self->sqlt_type;
2470   $version ||= $schema->schema_version || '1.x';
2471   $dir ||= './';
2472   my $filename = $schema->ddl_filename($type, $version, $dir);
2473   if(-f $filename)
2474   {
2475       my $file;
2476       open($file, "<$filename")
2477         or $self->throw_exception("Can't open $filename ($!)");
2478       my @rows = <$file>;
2479       close($file);
2480       return join('', @rows);
2481   }
2482
2483   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
2484       . $self->_check_sqlt_message . q{'})
2485           if !$self->_check_sqlt_version;
2486
2487   # sources needs to be a parser arg, but for simplicty allow at top level
2488   # coming in
2489   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2490       if exists $sqltargs->{sources};
2491
2492   my $tr = SQL::Translator->new(
2493     producer => "SQL::Translator::Producer::${type}",
2494     %$sqltargs,
2495     parser => 'SQL::Translator::Parser::DBIx::Class',
2496     data => $schema,
2497   );
2498   return $tr->translate;
2499 }
2500
2501 sub deploy {
2502   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2503   my $deploy = sub {
2504     my $line = shift;
2505     return if($line =~ /^--/);
2506     return if(!$line);
2507     # next if($line =~ /^DROP/m);
2508     return if($line =~ /^BEGIN TRANSACTION/m);
2509     return if($line =~ /^COMMIT/m);
2510     return if $line =~ /^\s+$/; # skip whitespace only
2511     $self->_query_start($line);
2512     eval {
2513       # do a dbh_do cycle here, as we need some error checking in
2514       # place (even though we will ignore errors)
2515       $self->dbh_do (sub { $_[1]->do($line) });
2516     };
2517     if ($@) {
2518       carp qq{$@ (running "${line}")};
2519     }
2520     $self->_query_end($line);
2521   };
2522   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2523   if (@statements > 1) {
2524     foreach my $statement (@statements) {
2525       $deploy->( $statement );
2526     }
2527   }
2528   elsif (@statements == 1) {
2529     foreach my $line ( split(";\n", $statements[0])) {
2530       $deploy->( $line );
2531     }
2532   }
2533 }
2534
2535 =head2 datetime_parser
2536
2537 Returns the datetime parser class
2538
2539 =cut
2540
2541 sub datetime_parser {
2542   my $self = shift;
2543   return $self->{datetime_parser} ||= do {
2544     $self->_populate_dbh unless $self->_dbh;
2545     $self->build_datetime_parser(@_);
2546   };
2547 }
2548
2549 =head2 datetime_parser_type
2550
2551 Defines (returns) the datetime parser class - currently hardwired to
2552 L<DateTime::Format::MySQL>
2553
2554 =cut
2555
2556 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2557
2558 =head2 build_datetime_parser
2559
2560 See L</datetime_parser>
2561
2562 =cut
2563
2564 sub build_datetime_parser {
2565   my $self = shift;
2566   my $type = $self->datetime_parser_type(@_);
2567   eval "use ${type}";
2568   $self->throw_exception("Couldn't load ${type}: $@") if $@;
2569   return $type;
2570 }
2571
2572 {
2573     my $_check_sqlt_version; # private
2574     my $_check_sqlt_message; # private
2575     sub _check_sqlt_version {
2576         return $_check_sqlt_version if defined $_check_sqlt_version;
2577         eval 'use SQL::Translator "0.09003"';
2578         $_check_sqlt_message = $@ || '';
2579         $_check_sqlt_version = !$@;
2580     }
2581
2582     sub _check_sqlt_message {
2583         _check_sqlt_version if !defined $_check_sqlt_message;
2584         $_check_sqlt_message;
2585     }
2586 }
2587
2588 =head2 is_replicating
2589
2590 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2591 replicate from a master database.  Default is undef, which is the result
2592 returned by databases that don't support replication.
2593
2594 =cut
2595
2596 sub is_replicating {
2597     return;
2598
2599 }
2600
2601 =head2 lag_behind_master
2602
2603 Returns a number that represents a certain amount of lag behind a master db
2604 when a given storage is replicating.  The number is database dependent, but
2605 starts at zero and increases with the amount of lag. Default in undef
2606
2607 =cut
2608
2609 sub lag_behind_master {
2610     return;
2611 }
2612
2613 sub DESTROY {
2614   my $self = shift;
2615   $self->_verify_pid if $self->_dbh;
2616
2617   # some databases need this to stop spewing warnings
2618   if (my $dbh = $self->_dbh) {
2619     eval { $dbh->disconnect };
2620   }
2621
2622   $self->_dbh(undef);
2623 }
2624
2625 1;
2626
2627 =head1 USAGE NOTES
2628
2629 =head2 DBIx::Class and AutoCommit
2630
2631 DBIx::Class can do some wonderful magic with handling exceptions,
2632 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2633 (the default) combined with C<txn_do> for transaction support.
2634
2635 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2636 in an assumed transaction between commits, and you're telling us you'd
2637 like to manage that manually.  A lot of the magic protections offered by
2638 this module will go away.  We can't protect you from exceptions due to database
2639 disconnects because we don't know anything about how to restart your
2640 transactions.  You're on your own for handling all sorts of exceptional
2641 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2642 be with raw DBI.
2643
2644
2645 =head1 AUTHORS
2646
2647 Matt S. Trout <mst@shadowcatsystems.co.uk>
2648
2649 Andy Grundman <andy@hybridized.org>
2650
2651 =head1 LICENSE
2652
2653 You may distribute this code under the same terms as Perl itself.
2654
2655 =cut