Fix insert_bulk with rebless
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base 'DBIx::Class::Storage';
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16
17 __PACKAGE__->mk_group_accessors('simple' =>
18   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
19      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
20 );
21
22 # the values for these accessors are picked out (and deleted) from
23 # the attribute hashref passed to connect_info
24 my @storage_options = qw/
25   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
26   disable_sth_caching unsafe auto_savepoint
27 /;
28 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
29
30
31 # default cursor class, overridable in connect_info attributes
32 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
33
34 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
35 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
36
37
38 =head1 NAME
39
40 DBIx::Class::Storage::DBI - DBI storage handler
41
42 =head1 SYNOPSIS
43
44   my $schema = MySchema->connect('dbi:SQLite:my.db');
45
46   $schema->storage->debug(1);
47   $schema->dbh_do("DROP TABLE authors");
48
49   $schema->resultset('Book')->search({
50      written_on => $schema->storage->datetime_parser(DateTime->now)
51   });
52
53 =head1 DESCRIPTION
54
55 This class represents the connection to an RDBMS via L<DBI>.  See
56 L<DBIx::Class::Storage> for general information.  This pod only
57 documents DBI-specific methods and behaviors.
58
59 =head1 METHODS
60
61 =cut
62
63 sub new {
64   my $new = shift->next::method(@_);
65
66   $new->transaction_depth(0);
67   $new->_sql_maker_opts({});
68   $new->{savepoints} = [];
69   $new->{_in_dbh_do} = 0;
70   $new->{_dbh_gen} = 0;
71
72   $new;
73 }
74
75 =head2 connect_info
76
77 This method is normally called by L<DBIx::Class::Schema/connection>, which
78 encapsulates its argument list in an arrayref before passing them here.
79
80 The argument list may contain:
81
82 =over
83
84 =item *
85
86 The same 4-element argument set one would normally pass to
87 L<DBI/connect>, optionally followed by
88 L<extra attributes|/DBIx::Class specific connection attributes>
89 recognized by DBIx::Class:
90
91   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
92
93 =item *
94
95 A single code reference which returns a connected
96 L<DBI database handle|DBI/connect> optionally followed by
97 L<extra attributes|/DBIx::Class specific connection attributes> recognized
98 by DBIx::Class:
99
100   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
101
102 =item *
103
104 A single hashref with all the attributes and the dsn/user/password
105 mixed together:
106
107   $connect_info_args = [{
108     dsn => $dsn,
109     user => $user,
110     password => $pass,
111     %dbi_attributes,
112     %extra_attributes,
113   }];
114
115   $connect_info_args = [{
116     dbh_maker => sub { DBI->connect (...) },
117     %dbi_attributes,
118     %extra_attributes,
119   }];
120
121 This is particularly useful for L<Catalyst> based applications, allowing the
122 following config (L<Config::General> style):
123
124   <Model::DB>
125     schema_class   App::DB
126     <connect_info>
127       dsn          dbi:mysql:database=test
128       user         testuser
129       password     TestPass
130       AutoCommit   1
131     </connect_info>
132   </Model::DB>
133
134 The C<dsn>/C<user>/C<password> combination can be substituted by the
135 C<dbh_maker> key whose value is a coderef that returns a connected
136 L<DBI database handle|DBI/connect>
137
138 =back
139
140 Please note that the L<DBI> docs recommend that you always explicitly
141 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
142 recommends that it be set to I<1>, and that you perform transactions
143 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
144 to I<1> if you do not do explicitly set it to zero.  This is the default
145 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
146
147 =head3 DBIx::Class specific connection attributes
148
149 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
150 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
151 the following connection options. These options can be mixed in with your other
152 L<DBI> connection attributes, or placed in a seperate hashref
153 (C<\%extra_attributes>) as shown above.
154
155 Every time C<connect_info> is invoked, any previous settings for
156 these options will be cleared before setting the new ones, regardless of
157 whether any options are specified in the new C<connect_info>.
158
159
160 =over
161
162 =item on_connect_do
163
164 Specifies things to do immediately after connecting or re-connecting to
165 the database.  Its value may contain:
166
167 =over
168
169 =item a scalar
170
171 This contains one SQL statement to execute.
172
173 =item an array reference
174
175 This contains SQL statements to execute in order.  Each element contains
176 a string or a code reference that returns a string.
177
178 =item a code reference
179
180 This contains some code to execute.  Unlike code references within an
181 array reference, its return value is ignored.
182
183 =back
184
185 =item on_disconnect_do
186
187 Takes arguments in the same form as L</on_connect_do> and executes them
188 immediately before disconnecting from the database.
189
190 Note, this only runs if you explicitly call L</disconnect> on the
191 storage object.
192
193 =item on_connect_call
194
195 A more generalized form of L</on_connect_do> that calls the specified
196 C<connect_call_METHOD> methods in your storage driver.
197
198   on_connect_do => 'select 1'
199
200 is equivalent to:
201
202   on_connect_call => [ [ do_sql => 'select 1' ] ]
203
204 Its values may contain:
205
206 =over
207
208 =item a scalar
209
210 Will call the C<connect_call_METHOD> method.
211
212 =item a code reference
213
214 Will execute C<< $code->($storage) >>
215
216 =item an array reference
217
218 Each value can be a method name or code reference.
219
220 =item an array of arrays
221
222 For each array, the first item is taken to be the C<connect_call_> method name
223 or code reference, and the rest are parameters to it.
224
225 =back
226
227 Some predefined storage methods you may use:
228
229 =over
230
231 =item do_sql
232
233 Executes a SQL string or a code reference that returns a SQL string. This is
234 what L</on_connect_do> and L</on_disconnect_do> use.
235
236 It can take:
237
238 =over
239
240 =item a scalar
241
242 Will execute the scalar as SQL.
243
244 =item an arrayref
245
246 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
247 attributes hashref and bind values.
248
249 =item a code reference
250
251 Will execute C<< $code->($storage) >> and execute the return array refs as
252 above.
253
254 =back
255
256 =item datetime_setup
257
258 Execute any statements necessary to initialize the database session to return
259 and accept datetime/timestamp values used with
260 L<DBIx::Class::InflateColumn::DateTime>.
261
262 Only necessary for some databases, see your specific storage driver for
263 implementation details.
264
265 =back
266
267 =item on_disconnect_call
268
269 Takes arguments in the same form as L</on_connect_call> and executes them
270 immediately before disconnecting from the database.
271
272 Calls the C<disconnect_call_METHOD> methods as opposed to the
273 C<connect_call_METHOD> methods called by L</on_connect_call>.
274
275 Note, this only runs if you explicitly call L</disconnect> on the
276 storage object.
277
278 =item disable_sth_caching
279
280 If set to a true value, this option will disable the caching of
281 statement handles via L<DBI/prepare_cached>.
282
283 =item limit_dialect
284
285 Sets the limit dialect. This is useful for JDBC-bridge among others
286 where the remote SQL-dialect cannot be determined by the name of the
287 driver alone. See also L<SQL::Abstract::Limit>.
288
289 =item quote_char
290
291 Specifies what characters to use to quote table and column names. If
292 you use this you will want to specify L</name_sep> as well.
293
294 C<quote_char> expects either a single character, in which case is it
295 is placed on either side of the table/column name, or an arrayref of length
296 2 in which case the table/column name is placed between the elements.
297
298 For example under MySQL you should use C<< quote_char => '`' >>, and for
299 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
300
301 =item name_sep
302
303 This only needs to be used in conjunction with C<quote_char>, and is used to
304 specify the charecter that seperates elements (schemas, tables, columns) from
305 each other. In most cases this is simply a C<.>.
306
307 The consequences of not supplying this value is that L<SQL::Abstract>
308 will assume DBIx::Class' uses of aliases to be complete column
309 names. The output will look like I<"me.name"> when it should actually
310 be I<"me"."name">.
311
312 =item unsafe
313
314 This Storage driver normally installs its own C<HandleError>, sets
315 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
316 all database handles, including those supplied by a coderef.  It does this
317 so that it can have consistent and useful error behavior.
318
319 If you set this option to a true value, Storage will not do its usual
320 modifications to the database handle's attributes, and instead relies on
321 the settings in your connect_info DBI options (or the values you set in
322 your connection coderef, in the case that you are connecting via coderef).
323
324 Note that your custom settings can cause Storage to malfunction,
325 especially if you set a C<HandleError> handler that suppresses exceptions
326 and/or disable C<RaiseError>.
327
328 =item auto_savepoint
329
330 If this option is true, L<DBIx::Class> will use savepoints when nesting
331 transactions, making it possible to recover from failure in the inner
332 transaction without having to abort all outer transactions.
333
334 =item cursor_class
335
336 Use this argument to supply a cursor class other than the default
337 L<DBIx::Class::Storage::DBI::Cursor>.
338
339 =back
340
341 Some real-life examples of arguments to L</connect_info> and
342 L<DBIx::Class::Schema/connect>
343
344   # Simple SQLite connection
345   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
346
347   # Connect via subref
348   ->connect_info([ sub { DBI->connect(...) } ]);
349
350   # Connect via subref in hashref
351   ->connect_info([{
352     dbh_maker => sub { DBI->connect(...) },
353     on_connect_do => 'alter session ...',
354   }]);
355
356   # A bit more complicated
357   ->connect_info(
358     [
359       'dbi:Pg:dbname=foo',
360       'postgres',
361       'my_pg_password',
362       { AutoCommit => 1 },
363       { quote_char => q{"}, name_sep => q{.} },
364     ]
365   );
366
367   # Equivalent to the previous example
368   ->connect_info(
369     [
370       'dbi:Pg:dbname=foo',
371       'postgres',
372       'my_pg_password',
373       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
374     ]
375   );
376
377   # Same, but with hashref as argument
378   # See parse_connect_info for explanation
379   ->connect_info(
380     [{
381       dsn         => 'dbi:Pg:dbname=foo',
382       user        => 'postgres',
383       password    => 'my_pg_password',
384       AutoCommit  => 1,
385       quote_char  => q{"},
386       name_sep    => q{.},
387     }]
388   );
389
390   # Subref + DBIx::Class-specific connection options
391   ->connect_info(
392     [
393       sub { DBI->connect(...) },
394       {
395           quote_char => q{`},
396           name_sep => q{@},
397           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
398           disable_sth_caching => 1,
399       },
400     ]
401   );
402
403
404
405 =cut
406
407 sub connect_info {
408   my ($self, $info_arg) = @_;
409
410   return $self->_connect_info if !$info_arg;
411
412   my @args = @$info_arg;  # take a shallow copy for further mutilation
413   $self->_connect_info([@args]); # copy for _connect_info
414
415
416   # combine/pre-parse arguments depending on invocation style
417
418   my %attrs;
419   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
420     %attrs = %{ $args[1] || {} };
421     @args = $args[0];
422   }
423   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
424     %attrs = %{$args[0]};
425     @args = ();
426     if (my $code = delete $attrs{dbh_maker}) {
427       @args = $code;
428
429       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
430       if (@ignored) {
431         carp sprintf (
432             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
433           . "to the result of 'dbh_maker'",
434
435           join (', ', map { "'$_'" } (@ignored) ),
436         );
437       }
438     }
439     else {
440       @args = delete @attrs{qw/dsn user password/};
441     }
442   }
443   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
444     %attrs = (
445       % { $args[3] || {} },
446       % { $args[4] || {} },
447     );
448     @args = @args[0,1,2];
449   }
450
451   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
452   #  the new set of options
453   $self->_sql_maker(undef);
454   $self->_sql_maker_opts({});
455
456   if(keys %attrs) {
457     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
458       if(my $value = delete $attrs{$storage_opt}) {
459         $self->$storage_opt($value);
460       }
461     }
462     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
463       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
464         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
465       }
466     }
467   }
468
469   if (ref $args[0] eq 'CODE') {
470     # _connect() never looks past $args[0] in this case
471     %attrs = ()
472   } else {
473     %attrs = (
474       %{ $self->_default_dbi_connect_attributes || {} },
475       %attrs,
476     );
477   }
478
479   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
480   $self->_connect_info;
481 }
482
483 sub _default_dbi_connect_attributes {
484   return {
485     AutoCommit => 1,
486     RaiseError => 1,
487     PrintError => 0,
488   };
489 }
490
491 =head2 on_connect_do
492
493 This method is deprecated in favour of setting via L</connect_info>.
494
495 =cut
496
497 =head2 on_disconnect_do
498
499 This method is deprecated in favour of setting via L</connect_info>.
500
501 =cut
502
503 sub _parse_connect_do {
504   my ($self, $type) = @_;
505
506   my $val = $self->$type;
507   return () if not defined $val;
508
509   my @res;
510
511   if (not ref($val)) {
512     push @res, [ 'do_sql', $val ];
513   } elsif (ref($val) eq 'CODE') {
514     push @res, $val;
515   } elsif (ref($val) eq 'ARRAY') {
516     push @res, map { [ 'do_sql', $_ ] } @$val;
517   } else {
518     $self->throw_exception("Invalid type for $type: ".ref($val));
519   }
520
521   return \@res;
522 }
523
524 =head2 dbh_do
525
526 Arguments: ($subref | $method_name), @extra_coderef_args?
527
528 Execute the given $subref or $method_name using the new exception-based
529 connection management.
530
531 The first two arguments will be the storage object that C<dbh_do> was called
532 on and a database handle to use.  Any additional arguments will be passed
533 verbatim to the called subref as arguments 2 and onwards.
534
535 Using this (instead of $self->_dbh or $self->dbh) ensures correct
536 exception handling and reconnection (or failover in future subclasses).
537
538 Your subref should have no side-effects outside of the database, as
539 there is the potential for your subref to be partially double-executed
540 if the database connection was stale/dysfunctional.
541
542 Example:
543
544   my @stuff = $schema->storage->dbh_do(
545     sub {
546       my ($storage, $dbh, @cols) = @_;
547       my $cols = join(q{, }, @cols);
548       $dbh->selectrow_array("SELECT $cols FROM foo");
549     },
550     @column_list
551   );
552
553 =cut
554
555 sub dbh_do {
556   my $self = shift;
557   my $code = shift;
558
559   my $dbh = $self->_dbh;
560
561   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
562       || $self->{transaction_depth};
563
564   local $self->{_in_dbh_do} = 1;
565
566   my @result;
567   my $want_array = wantarray;
568
569   eval {
570     $self->_verify_pid if $dbh;
571     if(!$self->_dbh) {
572         $self->_populate_dbh;
573         $dbh = $self->_dbh;
574     }
575
576     if($want_array) {
577         @result = $self->$code($dbh, @_);
578     }
579     elsif(defined $want_array) {
580         $result[0] = $self->$code($dbh, @_);
581     }
582     else {
583         $self->$code($dbh, @_);
584     }
585   };
586
587   # ->connected might unset $@ - copy
588   my $exception = $@;
589   if(!$exception) { return $want_array ? @result : $result[0] }
590
591   $self->throw_exception($exception) if $self->connected;
592
593   # We were not connected - reconnect and retry, but let any
594   #  exception fall right through this time
595   carp "Retrying $code after catching disconnected exception: $exception"
596     if $ENV{DBIC_DBIRETRY_DEBUG};
597   $self->_populate_dbh;
598   $self->$code($self->_dbh, @_);
599 }
600
601 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
602 # It also informs dbh_do to bypass itself while under the direction of txn_do,
603 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
604 sub txn_do {
605   my $self = shift;
606   my $coderef = shift;
607
608   ref $coderef eq 'CODE' or $self->throw_exception
609     ('$coderef must be a CODE reference');
610
611   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
612
613   local $self->{_in_dbh_do} = 1;
614
615   my @result;
616   my $want_array = wantarray;
617
618   my $tried = 0;
619   while(1) {
620     eval {
621       $self->_verify_pid if $self->_dbh;
622       $self->_populate_dbh if !$self->_dbh;
623
624       $self->txn_begin;
625       if($want_array) {
626           @result = $coderef->(@_);
627       }
628       elsif(defined $want_array) {
629           $result[0] = $coderef->(@_);
630       }
631       else {
632           $coderef->(@_);
633       }
634       $self->txn_commit;
635     };
636
637     # ->connected might unset $@ - copy
638     my $exception = $@;
639     if(!$exception) { return $want_array ? @result : $result[0] }
640
641     if($tried++ || $self->connected) {
642       eval { $self->txn_rollback };
643       my $rollback_exception = $@;
644       if($rollback_exception) {
645         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
646         $self->throw_exception($exception)  # propagate nested rollback
647           if $rollback_exception =~ /$exception_class/;
648
649         $self->throw_exception(
650           "Transaction aborted: ${exception}. "
651           . "Rollback failed: ${rollback_exception}"
652         );
653       }
654       $self->throw_exception($exception)
655     }
656
657     # We were not connected, and was first try - reconnect and retry
658     # via the while loop
659     carp "Retrying $coderef after catching disconnected exception: $exception"
660       if $ENV{DBIC_DBIRETRY_DEBUG};
661     $self->_populate_dbh;
662   }
663 }
664
665 =head2 disconnect
666
667 Our C<disconnect> method also performs a rollback first if the
668 database is not in C<AutoCommit> mode.
669
670 =cut
671
672 sub disconnect {
673   my ($self) = @_;
674
675   if( $self->_dbh ) {
676     my @actions;
677
678     push @actions, ( $self->on_disconnect_call || () );
679     push @actions, $self->_parse_connect_do ('on_disconnect_do');
680
681     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
682
683     $self->_dbh->rollback unless $self->_dbh_autocommit;
684     $self->_dbh->disconnect;
685     $self->_dbh(undef);
686     $self->{_dbh_gen}++;
687   }
688 }
689
690 =head2 with_deferred_fk_checks
691
692 =over 4
693
694 =item Arguments: C<$coderef>
695
696 =item Return Value: The return value of $coderef
697
698 =back
699
700 Storage specific method to run the code ref with FK checks deferred or
701 in MySQL's case disabled entirely.
702
703 =cut
704
705 # Storage subclasses should override this
706 sub with_deferred_fk_checks {
707   my ($self, $sub) = @_;
708
709   $sub->();
710 }
711
712 =head2 connected
713
714 =over
715
716 =item Arguments: none
717
718 =item Return Value: 1|0
719
720 =back
721
722 Verifies that the the current database handle is active and ready to execute
723 an SQL statement (i.e. the connection did not get stale, server is still
724 answering, etc.) This method is used internally by L</dbh>.
725
726 =cut
727
728 sub connected {
729   my $self = shift;
730   return 0 unless $self->_seems_connected;
731
732   #be on the safe side
733   local $self->_dbh->{RaiseError} = 1;
734
735   return $self->_ping;
736 }
737
738 sub _seems_connected {
739   my $self = shift;
740
741   my $dbh = $self->_dbh
742     or return 0;
743
744   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
745     $self->_dbh(undef);
746     $self->{_dbh_gen}++;
747     return 0;
748   }
749   else {
750     $self->_verify_pid;
751     return 0 if !$self->_dbh;
752   }
753
754   return $dbh->FETCH('Active');
755 }
756
757 sub _ping {
758   my $self = shift;
759
760   my $dbh = $self->_dbh or return 0;
761
762   return $dbh->ping;
763 }
764
765 # handle pid changes correctly
766 #  NOTE: assumes $self->_dbh is a valid $dbh
767 sub _verify_pid {
768   my ($self) = @_;
769
770   return if defined $self->_conn_pid && $self->_conn_pid == $$;
771
772   $self->_dbh->{InactiveDestroy} = 1;
773   $self->_dbh(undef);
774   $self->{_dbh_gen}++;
775
776   return;
777 }
778
779 sub ensure_connected {
780   my ($self) = @_;
781
782   unless ($self->connected) {
783     $self->_populate_dbh;
784   }
785 }
786
787 =head2 dbh
788
789 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
790 is guaranteed to be healthy by implicitly calling L</connected>, and if
791 necessary performing a reconnection before returning. Keep in mind that this
792 is very B<expensive> on some database engines. Consider using L<dbh_do>
793 instead.
794
795 =cut
796
797 sub dbh {
798   my ($self) = @_;
799
800   if (not $self->_dbh) {
801     $self->_populate_dbh;
802   } else {
803     $self->ensure_connected;
804   }
805   return $self->_dbh;
806 }
807
808 # this is the internal "get dbh or connect (don't check)" method
809 sub _get_dbh {
810   my $self = shift;
811   $self->_populate_dbh unless $self->_dbh;
812   return $self->_dbh;
813 }
814
815 sub _sql_maker_args {
816     my ($self) = @_;
817
818     return (
819       bindtype=>'columns',
820       array_datatypes => 1,
821       limit_dialect => $self->_get_dbh,
822       %{$self->_sql_maker_opts}
823     );
824 }
825
826 sub sql_maker {
827   my ($self) = @_;
828   unless ($self->_sql_maker) {
829     my $sql_maker_class = $self->sql_maker_class;
830     $self->ensure_class_loaded ($sql_maker_class);
831     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
832   }
833   return $self->_sql_maker;
834 }
835
836 sub _rebless {}
837
838 sub _populate_dbh {
839   my ($self) = @_;
840
841   my @info = @{$self->_dbi_connect_info || []};
842   $self->_dbh(undef); # in case ->connected failed we might get sent here
843   $self->_dbh($self->_connect(@info));
844
845   $self->_conn_pid($$);
846   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
847
848   $self->_determine_driver;
849
850   # Always set the transaction depth on connect, since
851   #  there is no transaction in progress by definition
852   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
853
854   $self->_run_connection_actions unless $self->{_in_determine_driver};
855 }
856
857 sub _run_connection_actions {
858   my $self = shift;
859   my @actions;
860
861   push @actions, ( $self->on_connect_call || () );
862   push @actions, $self->_parse_connect_do ('on_connect_do');
863
864   $self->_do_connection_actions(connect_call_ => $_) for @actions;
865 }
866
867 sub _determine_driver {
868   my ($self) = @_;
869
870   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
871     my $started_unconnected = 0;
872     local $self->{_in_determine_driver} = 1;
873
874     if (ref($self) eq __PACKAGE__) {
875       my $driver;
876       if ($self->_dbh) { # we are connected
877         $driver = $self->_dbh->{Driver}{Name};
878       } else {
879         # if connect_info is a CODEREF, we have no choice but to connect
880         if (ref $self->_dbi_connect_info->[0] &&
881             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
882           $self->_populate_dbh;
883           $driver = $self->_dbh->{Driver}{Name};
884         }
885         else {
886           # try to use dsn to not require being connected, the driver may still
887           # force a connection in _rebless to determine version
888           ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
889           $started_unconnected = 1;
890         }
891       }
892
893       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
894       if ($self->load_optional_class($storage_class)) {
895         mro::set_mro($storage_class, 'c3');
896         bless $self, $storage_class;
897         $self->_rebless();
898       }
899     }
900
901     $self->_driver_determined(1);
902
903     $self->_run_connection_actions
904         if $started_unconnected && defined $self->_dbh;
905   }
906 }
907
908 sub _do_connection_actions {
909   my $self          = shift;
910   my $method_prefix = shift;
911   my $call          = shift;
912
913   if (not ref($call)) {
914     my $method = $method_prefix . $call;
915     $self->$method(@_);
916   } elsif (ref($call) eq 'CODE') {
917     $self->$call(@_);
918   } elsif (ref($call) eq 'ARRAY') {
919     if (ref($call->[0]) ne 'ARRAY') {
920       $self->_do_connection_actions($method_prefix, $_) for @$call;
921     } else {
922       $self->_do_connection_actions($method_prefix, @$_) for @$call;
923     }
924   } else {
925     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
926   }
927
928   return $self;
929 }
930
931 sub connect_call_do_sql {
932   my $self = shift;
933   $self->_do_query(@_);
934 }
935
936 sub disconnect_call_do_sql {
937   my $self = shift;
938   $self->_do_query(@_);
939 }
940
941 # override in db-specific backend when necessary
942 sub connect_call_datetime_setup { 1 }
943
944 sub _do_query {
945   my ($self, $action) = @_;
946
947   if (ref $action eq 'CODE') {
948     $action = $action->($self);
949     $self->_do_query($_) foreach @$action;
950   }
951   else {
952     # Most debuggers expect ($sql, @bind), so we need to exclude
953     # the attribute hash which is the second argument to $dbh->do
954     # furthermore the bind values are usually to be presented
955     # as named arrayref pairs, so wrap those here too
956     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
957     my $sql = shift @do_args;
958     my $attrs = shift @do_args;
959     my @bind = map { [ undef, $_ ] } @do_args;
960
961     $self->_query_start($sql, @bind);
962     $self->_dbh->do($sql, $attrs, @do_args);
963     $self->_query_end($sql, @bind);
964   }
965
966   return $self;
967 }
968
969 sub _connect {
970   my ($self, @info) = @_;
971
972   $self->throw_exception("You failed to provide any connection info")
973     if !@info;
974
975   my ($old_connect_via, $dbh);
976
977   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
978     $old_connect_via = $DBI::connect_via;
979     $DBI::connect_via = 'connect';
980   }
981
982   eval {
983     if(ref $info[0] eq 'CODE') {
984        $dbh = &{$info[0]}
985     }
986     else {
987        $dbh = DBI->connect(@info);
988     }
989
990     if($dbh && !$self->unsafe) {
991       my $weak_self = $self;
992       Scalar::Util::weaken($weak_self);
993       $dbh->{HandleError} = sub {
994           if ($weak_self) {
995             $weak_self->throw_exception("DBI Exception: $_[0]");
996           }
997           else {
998             croak ("DBI Exception: $_[0]");
999           }
1000       };
1001       $dbh->{ShowErrorStatement} = 1;
1002       $dbh->{RaiseError} = 1;
1003       $dbh->{PrintError} = 0;
1004     }
1005   };
1006
1007   $DBI::connect_via = $old_connect_via if $old_connect_via;
1008
1009   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1010     if !$dbh || $@;
1011
1012   $self->_dbh_autocommit($dbh->{AutoCommit});
1013
1014   $dbh;
1015 }
1016
1017 sub svp_begin {
1018   my ($self, $name) = @_;
1019
1020   $name = $self->_svp_generate_name
1021     unless defined $name;
1022
1023   $self->throw_exception ("You can't use savepoints outside a transaction")
1024     if $self->{transaction_depth} == 0;
1025
1026   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1027     unless $self->can('_svp_begin');
1028
1029   push @{ $self->{savepoints} }, $name;
1030
1031   $self->debugobj->svp_begin($name) if $self->debug;
1032
1033   return $self->_svp_begin($name);
1034 }
1035
1036 sub svp_release {
1037   my ($self, $name) = @_;
1038
1039   $self->throw_exception ("You can't use savepoints outside a transaction")
1040     if $self->{transaction_depth} == 0;
1041
1042   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1043     unless $self->can('_svp_release');
1044
1045   if (defined $name) {
1046     $self->throw_exception ("Savepoint '$name' does not exist")
1047       unless grep { $_ eq $name } @{ $self->{savepoints} };
1048
1049     # Dig through the stack until we find the one we are releasing.  This keeps
1050     # the stack up to date.
1051     my $svp;
1052
1053     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1054   } else {
1055     $name = pop @{ $self->{savepoints} };
1056   }
1057
1058   $self->debugobj->svp_release($name) if $self->debug;
1059
1060   return $self->_svp_release($name);
1061 }
1062
1063 sub svp_rollback {
1064   my ($self, $name) = @_;
1065
1066   $self->throw_exception ("You can't use savepoints outside a transaction")
1067     if $self->{transaction_depth} == 0;
1068
1069   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1070     unless $self->can('_svp_rollback');
1071
1072   if (defined $name) {
1073       # If they passed us a name, verify that it exists in the stack
1074       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1075           $self->throw_exception("Savepoint '$name' does not exist!");
1076       }
1077
1078       # Dig through the stack until we find the one we are releasing.  This keeps
1079       # the stack up to date.
1080       while(my $s = pop(@{ $self->{savepoints} })) {
1081           last if($s eq $name);
1082       }
1083       # Add the savepoint back to the stack, as a rollback doesn't remove the
1084       # named savepoint, only everything after it.
1085       push(@{ $self->{savepoints} }, $name);
1086   } else {
1087       # We'll assume they want to rollback to the last savepoint
1088       $name = $self->{savepoints}->[-1];
1089   }
1090
1091   $self->debugobj->svp_rollback($name) if $self->debug;
1092
1093   return $self->_svp_rollback($name);
1094 }
1095
1096 sub _svp_generate_name {
1097     my ($self) = @_;
1098
1099     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1100 }
1101
1102 sub txn_begin {
1103   my $self = shift;
1104   if($self->{transaction_depth} == 0) {
1105     $self->debugobj->txn_begin()
1106       if $self->debug;
1107
1108     # being here implies we have AutoCommit => 1
1109     # if the user is utilizing txn_do - good for
1110     # him, otherwise we need to ensure that the
1111     # $dbh is healthy on BEGIN
1112     my $dbh_method = $self->{_in_dbh_do} ? '_dbh' : 'dbh';
1113     $self->$dbh_method->begin_work;
1114
1115   } elsif ($self->auto_savepoint) {
1116     $self->svp_begin;
1117   }
1118   $self->{transaction_depth}++;
1119 }
1120
1121 sub txn_commit {
1122   my $self = shift;
1123   if ($self->{transaction_depth} == 1) {
1124     my $dbh = $self->_dbh;
1125     $self->debugobj->txn_commit()
1126       if ($self->debug);
1127     $dbh->commit;
1128     $self->{transaction_depth} = 0
1129       if $self->_dbh_autocommit;
1130   }
1131   elsif($self->{transaction_depth} > 1) {
1132     $self->{transaction_depth}--;
1133     $self->svp_release
1134       if $self->auto_savepoint;
1135   }
1136 }
1137
1138 sub txn_rollback {
1139   my $self = shift;
1140   my $dbh = $self->_dbh;
1141   eval {
1142     if ($self->{transaction_depth} == 1) {
1143       $self->debugobj->txn_rollback()
1144         if ($self->debug);
1145       $self->{transaction_depth} = 0
1146         if $self->_dbh_autocommit;
1147       $dbh->rollback;
1148     }
1149     elsif($self->{transaction_depth} > 1) {
1150       $self->{transaction_depth}--;
1151       if ($self->auto_savepoint) {
1152         $self->svp_rollback;
1153         $self->svp_release;
1154       }
1155     }
1156     else {
1157       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1158     }
1159   };
1160   if ($@) {
1161     my $error = $@;
1162     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1163     $error =~ /$exception_class/ and $self->throw_exception($error);
1164     # ensure that a failed rollback resets the transaction depth
1165     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1166     $self->throw_exception($error);
1167   }
1168 }
1169
1170 # This used to be the top-half of _execute.  It was split out to make it
1171 #  easier to override in NoBindVars without duping the rest.  It takes up
1172 #  all of _execute's args, and emits $sql, @bind.
1173 sub _prep_for_execute {
1174   my ($self, $op, $extra_bind, $ident, $args) = @_;
1175
1176   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1177     $ident = $ident->from();
1178   }
1179
1180   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1181
1182   unshift(@bind,
1183     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1184       if $extra_bind;
1185   return ($sql, \@bind);
1186 }
1187
1188
1189 sub _fix_bind_params {
1190     my ($self, @bind) = @_;
1191
1192     ### Turn @bind from something like this:
1193     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1194     ### to this:
1195     ###   ( "'1'", "'1'", "'3'" )
1196     return
1197         map {
1198             if ( defined( $_ && $_->[1] ) ) {
1199                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1200             }
1201             else { q{'NULL'}; }
1202         } @bind;
1203 }
1204
1205 sub _query_start {
1206     my ( $self, $sql, @bind ) = @_;
1207
1208     if ( $self->debug ) {
1209         @bind = $self->_fix_bind_params(@bind);
1210
1211         $self->debugobj->query_start( $sql, @bind );
1212     }
1213 }
1214
1215 sub _query_end {
1216     my ( $self, $sql, @bind ) = @_;
1217
1218     if ( $self->debug ) {
1219         @bind = $self->_fix_bind_params(@bind);
1220         $self->debugobj->query_end( $sql, @bind );
1221     }
1222 }
1223
1224 sub _dbh_execute {
1225   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1226
1227   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1228
1229   $self->_query_start( $sql, @$bind );
1230
1231   my $sth = $self->sth($sql,$op);
1232
1233   my $placeholder_index = 1;
1234
1235   foreach my $bound (@$bind) {
1236     my $attributes = {};
1237     my($column_name, @data) = @$bound;
1238
1239     if ($bind_attributes) {
1240       $attributes = $bind_attributes->{$column_name}
1241       if defined $bind_attributes->{$column_name};
1242     }
1243
1244     foreach my $data (@data) {
1245       my $ref = ref $data;
1246       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1247
1248       $sth->bind_param($placeholder_index, $data, $attributes);
1249       $placeholder_index++;
1250     }
1251   }
1252
1253   # Can this fail without throwing an exception anyways???
1254   my $rv = $sth->execute();
1255   $self->throw_exception($sth->errstr) if !$rv;
1256
1257   $self->_query_end( $sql, @$bind );
1258
1259   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1260 }
1261
1262 sub _execute {
1263     my $self = shift;
1264     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1265 }
1266
1267 sub insert {
1268   my ($self, $source, $to_insert) = @_;
1269
1270 # redispatch to insert method of storage we reblessed into, if necessary
1271   if (not $self->_driver_determined) {
1272     $self->_determine_driver;
1273     goto $self->can('insert');
1274   }
1275
1276   my $ident = $source->from;
1277   my $bind_attributes = $self->source_bind_attributes($source);
1278
1279   my $updated_cols = {};
1280
1281   foreach my $col ( $source->columns ) {
1282     if ( !defined $to_insert->{$col} ) {
1283       my $col_info = $source->column_info($col);
1284
1285       if ( $col_info->{auto_nextval} ) {
1286         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1287           'nextval',
1288           $col_info->{sequence} ||
1289             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1290         );
1291       }
1292     }
1293   }
1294
1295   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1296
1297   return $updated_cols;
1298 }
1299
1300 ## Still not quite perfect, and EXPERIMENTAL
1301 ## Currently it is assumed that all values passed will be "normal", i.e. not
1302 ## scalar refs, or at least, all the same type as the first set, the statement is
1303 ## only prepped once.
1304 sub insert_bulk {
1305   my ($self, $source, $cols, $data) = @_;
1306
1307 # redispatch to insert_bulk method of storage we reblessed into, if necessary
1308   if (not $self->_driver_determined) {
1309     $self->_determine_driver;
1310     goto $self->can('insert_bulk');
1311   }
1312
1313   my %colvalues;
1314   my $table = $source->from;
1315   @colvalues{@$cols} = (0..$#$cols);
1316   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1317
1318   $self->_query_start( $sql, @bind );
1319   my $sth = $self->sth($sql);
1320
1321 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1322
1323   ## This must be an arrayref, else nothing works!
1324   my $tuple_status = [];
1325
1326   ## Get the bind_attributes, if any exist
1327   my $bind_attributes = $self->source_bind_attributes($source);
1328
1329   ## Bind the values and execute
1330   my $placeholder_index = 1;
1331
1332   foreach my $bound (@bind) {
1333
1334     my $attributes = {};
1335     my ($column_name, $data_index) = @$bound;
1336
1337     if( $bind_attributes ) {
1338       $attributes = $bind_attributes->{$column_name}
1339       if defined $bind_attributes->{$column_name};
1340     }
1341
1342     my @data = map { $_->[$data_index] } @$data;
1343
1344     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1345     $placeholder_index++;
1346   }
1347   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1348   if (my $err = $@) {
1349     my $i = 0;
1350     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1351
1352     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1353       if ($i > $#$tuple_status);
1354
1355     require Data::Dumper;
1356     local $Data::Dumper::Terse = 1;
1357     local $Data::Dumper::Indent = 1;
1358     local $Data::Dumper::Useqq = 1;
1359     local $Data::Dumper::Quotekeys = 0;
1360
1361     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1362       $tuple_status->[$i][1],
1363       Data::Dumper::Dumper(
1364         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1365       ),
1366     );
1367   }
1368   $self->throw_exception($sth->errstr) if !$rv;
1369
1370   $self->_query_end( $sql, @bind );
1371   return (wantarray ? ($rv, $sth, @bind) : $rv);
1372 }
1373
1374 sub update {
1375   my $self = shift @_;
1376   my $source = shift @_;
1377   $self->_determine_driver;
1378   my $bind_attributes = $self->source_bind_attributes($source);
1379
1380   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1381 }
1382
1383
1384 sub delete {
1385   my $self = shift @_;
1386   my $source = shift @_;
1387   $self->_determine_driver;
1388   my $bind_attrs = $self->source_bind_attributes($source);
1389
1390   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1391 }
1392
1393 # We were sent here because the $rs contains a complex search
1394 # which will require a subquery to select the correct rows
1395 # (i.e. joined or limited resultsets)
1396 #
1397 # Genarating a single PK column subquery is trivial and supported
1398 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1399 # Look at _multipk_update_delete()
1400 sub _subq_update_delete {
1401   my $self = shift;
1402   my ($rs, $op, $values) = @_;
1403
1404   my $rsrc = $rs->result_source;
1405
1406   # we already check this, but double check naively just in case. Should be removed soon
1407   my $sel = $rs->_resolved_attrs->{select};
1408   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1409   my @pcols = $rsrc->primary_columns;
1410   if (@$sel != @pcols) {
1411     $self->throw_exception (
1412       'Subquery update/delete can not be called on resultsets selecting a'
1413      .' number of columns different than the number of primary keys'
1414     );
1415   }
1416
1417   if (@pcols == 1) {
1418     return $self->$op (
1419       $rsrc,
1420       $op eq 'update' ? $values : (),
1421       { $pcols[0] => { -in => $rs->as_query } },
1422     );
1423   }
1424
1425   else {
1426     return $self->_multipk_update_delete (@_);
1427   }
1428 }
1429
1430 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1431 # resultset update/delete involving subqueries. So by default resort
1432 # to simple (and inefficient) delete_all style per-row opearations,
1433 # while allowing specific storages to override this with a faster
1434 # implementation.
1435 #
1436 sub _multipk_update_delete {
1437   return shift->_per_row_update_delete (@_);
1438 }
1439
1440 # This is the default loop used to delete/update rows for multi PK
1441 # resultsets, and used by mysql exclusively (because it can't do anything
1442 # else).
1443 #
1444 # We do not use $row->$op style queries, because resultset update/delete
1445 # is not expected to cascade (this is what delete_all/update_all is for).
1446 #
1447 # There should be no race conditions as the entire operation is rolled
1448 # in a transaction.
1449 #
1450 sub _per_row_update_delete {
1451   my $self = shift;
1452   my ($rs, $op, $values) = @_;
1453
1454   my $rsrc = $rs->result_source;
1455   my @pcols = $rsrc->primary_columns;
1456
1457   my $guard = $self->txn_scope_guard;
1458
1459   # emulate the return value of $sth->execute for non-selects
1460   my $row_cnt = '0E0';
1461
1462   my $subrs_cur = $rs->cursor;
1463   while (my @pks = $subrs_cur->next) {
1464
1465     my $cond;
1466     for my $i (0.. $#pcols) {
1467       $cond->{$pcols[$i]} = $pks[$i];
1468     }
1469
1470     $self->$op (
1471       $rsrc,
1472       $op eq 'update' ? $values : (),
1473       $cond,
1474     );
1475
1476     $row_cnt++;
1477   }
1478
1479   $guard->commit;
1480
1481   return $row_cnt;
1482 }
1483
1484 sub _select {
1485   my $self = shift;
1486
1487   # localization is neccessary as
1488   # 1) there is no infrastructure to pass this around before SQLA2
1489   # 2) _select_args sets it and _prep_for_execute consumes it
1490   my $sql_maker = $self->sql_maker;
1491   local $sql_maker->{_dbic_rs_attrs};
1492
1493   return $self->_execute($self->_select_args(@_));
1494 }
1495
1496 sub _select_args_to_query {
1497   my $self = shift;
1498
1499   # localization is neccessary as
1500   # 1) there is no infrastructure to pass this around before SQLA2
1501   # 2) _select_args sets it and _prep_for_execute consumes it
1502   my $sql_maker = $self->sql_maker;
1503   local $sql_maker->{_dbic_rs_attrs};
1504
1505   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1506   #  = $self->_select_args($ident, $select, $cond, $attrs);
1507   my ($op, $bind, $ident, $bind_attrs, @args) =
1508     $self->_select_args(@_);
1509
1510   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1511   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1512   $prepared_bind ||= [];
1513
1514   return wantarray
1515     ? ($sql, $prepared_bind, $bind_attrs)
1516     : \[ "($sql)", @$prepared_bind ]
1517   ;
1518 }
1519
1520 sub _select_args {
1521   my ($self, $ident, $select, $where, $attrs) = @_;
1522
1523   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1524
1525   my $sql_maker = $self->sql_maker;
1526   $sql_maker->{_dbic_rs_attrs} = {
1527     %$attrs,
1528     select => $select,
1529     from => $ident,
1530     where => $where,
1531     $rs_alias
1532       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1533       : ()
1534     ,
1535   };
1536
1537   # calculate bind_attrs before possible $ident mangling
1538   my $bind_attrs = {};
1539   for my $alias (keys %$alias2source) {
1540     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1541     for my $col (keys %$bindtypes) {
1542
1543       my $fqcn = join ('.', $alias, $col);
1544       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1545
1546       # Unqialified column names are nice, but at the same time can be
1547       # rather ambiguous. What we do here is basically go along with
1548       # the loop, adding an unqualified column slot to $bind_attrs,
1549       # alongside the fully qualified name. As soon as we encounter
1550       # another column by that name (which would imply another table)
1551       # we unset the unqualified slot and never add any info to it
1552       # to avoid erroneous type binding. If this happens the users
1553       # only choice will be to fully qualify his column name
1554
1555       if (exists $bind_attrs->{$col}) {
1556         $bind_attrs->{$col} = {};
1557       }
1558       else {
1559         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1560       }
1561     }
1562   }
1563
1564   # adjust limits
1565   if (
1566     $attrs->{software_limit}
1567       ||
1568     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1569   ) {
1570     $attrs->{software_limit} = 1;
1571   }
1572   else {
1573     $self->throw_exception("rows attribute must be positive if present")
1574       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1575
1576     # MySQL actually recommends this approach.  I cringe.
1577     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1578   }
1579
1580   my @limit;
1581
1582   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1583   # otherwise delegate the limiting to the storage, unless software limit was requested
1584   if (
1585     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1586        ||
1587     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1588       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1589   ) {
1590     ($ident, $select, $where, $attrs)
1591       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1592   }
1593   elsif (! $attrs->{software_limit} ) {
1594     push @limit, $attrs->{rows}, $attrs->{offset};
1595   }
1596
1597 ###
1598   # This would be the point to deflate anything found in $where
1599   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1600   # expect a row object. And all we have is a resultsource (it is trivial
1601   # to extract deflator coderefs via $alias2source above).
1602   #
1603   # I don't see a way forward other than changing the way deflators are
1604   # invoked, and that's just bad...
1605 ###
1606
1607   my $order = { map
1608     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1609     (qw/order_by group_by having/ )
1610   };
1611
1612   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1613 }
1614
1615 #
1616 # This is the code producing joined subqueries like:
1617 # SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
1618 #
1619 sub _adjust_select_args_for_complex_prefetch {
1620   my ($self, $from, $select, $where, $attrs) = @_;
1621
1622   $self->throw_exception ('Nothing to prefetch... how did we get here?!')
1623     if not @{$attrs->{_prefetch_select}};
1624
1625   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
1626     if (ref $from ne 'ARRAY' || ref $from->[0] ne 'HASH' || ref $from->[1] ne 'ARRAY');
1627
1628
1629   # generate inner/outer attribute lists, remove stuff that doesn't apply
1630   my $outer_attrs = { %$attrs };
1631   delete $outer_attrs->{$_} for qw/where bind rows offset group_by having/;
1632
1633   my $inner_attrs = { %$attrs };
1634   delete $inner_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
1635
1636
1637   # bring over all non-collapse-induced order_by into the inner query (if any)
1638   # the outer one will have to keep them all
1639   delete $inner_attrs->{order_by};
1640   if (my $ord_cnt = @{$outer_attrs->{order_by}} - @{$outer_attrs->{_collapse_order_by}} ) {
1641     $inner_attrs->{order_by} = [
1642       @{$outer_attrs->{order_by}}[ 0 .. $ord_cnt - 1]
1643     ];
1644   }
1645
1646
1647   # generate the inner/outer select lists
1648   # for inside we consider only stuff *not* brought in by the prefetch
1649   # on the outside we substitute any function for its alias
1650   my $outer_select = [ @$select ];
1651   my $inner_select = [];
1652   for my $i (0 .. ( @$outer_select - @{$outer_attrs->{_prefetch_select}} - 1) ) {
1653     my $sel = $outer_select->[$i];
1654
1655     if (ref $sel eq 'HASH' ) {
1656       $sel->{-as} ||= $attrs->{as}[$i];
1657       $outer_select->[$i] = join ('.', $attrs->{alias}, ($sel->{-as} || "inner_column_$i") );
1658     }
1659
1660     push @$inner_select, $sel;
1661   }
1662
1663   # normalize a copy of $from, so it will be easier to work with further
1664   # down (i.e. promote the initial hashref to an AoH)
1665   $from = [ @$from ];
1666   $from->[0] = [ $from->[0] ];
1667   my %original_join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1668
1669
1670   # decide which parts of the join will remain in either part of
1671   # the outer/inner query
1672
1673   # First we compose a list of which aliases are used in restrictions
1674   # (i.e. conditions/order/grouping/etc). Since we do not have
1675   # introspectable SQLA, we fall back to ugly scanning of raw SQL for
1676   # WHERE, and for pieces of ORDER BY in order to determine which aliases
1677   # need to appear in the resulting sql.
1678   # It may not be very efficient, but it's a reasonable stop-gap
1679   # Also unqualified column names will not be considered, but more often
1680   # than not this is actually ok
1681   #
1682   # In the same loop we enumerate part of the selection aliases, as
1683   # it requires the same sqla hack for the time being
1684   my ($restrict_aliases, $select_aliases, $prefetch_aliases);
1685   {
1686     # produce stuff unquoted, so it can be scanned
1687     my $sql_maker = $self->sql_maker;
1688     local $sql_maker->{quote_char};
1689     my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1690     $sep = "\Q$sep\E";
1691
1692     my $non_prefetch_select_sql = $sql_maker->_recurse_fields ($inner_select);
1693     my $prefetch_select_sql = $sql_maker->_recurse_fields ($outer_attrs->{_prefetch_select});
1694     my $where_sql = $sql_maker->where ($where);
1695     my $group_by_sql = $sql_maker->_order_by({
1696       map { $_ => $inner_attrs->{$_} } qw/group_by having/
1697     });
1698     my @non_prefetch_order_by_chunks = (map
1699       { ref $_ ? $_->[0] : $_ }
1700       $sql_maker->_order_by_chunks ($inner_attrs->{order_by})
1701     );
1702
1703
1704     for my $alias (keys %original_join_info) {
1705       my $seen_re = qr/\b $alias $sep/x;
1706
1707       for my $piece ($where_sql, $group_by_sql, @non_prefetch_order_by_chunks ) {
1708         if ($piece =~ $seen_re) {
1709           $restrict_aliases->{$alias} = 1;
1710         }
1711       }
1712
1713       if ($non_prefetch_select_sql =~ $seen_re) {
1714           $select_aliases->{$alias} = 1;
1715       }
1716
1717       if ($prefetch_select_sql =~ $seen_re) {
1718           $prefetch_aliases->{$alias} = 1;
1719       }
1720
1721     }
1722   }
1723
1724   # Add any non-left joins to the restriction list (such joins are indeed restrictions)
1725   for my $j (values %original_join_info) {
1726     my $alias = $j->{-alias} or next;
1727     $restrict_aliases->{$alias} = 1 if (
1728       (not $j->{-join_type})
1729         or
1730       ($j->{-join_type} !~ /^left (?: \s+ outer)? $/xi)
1731     );
1732   }
1733
1734   # mark all join parents as mentioned
1735   # (e.g.  join => { cds => 'tracks' } - tracks will need to bring cds too )
1736   for my $collection ($restrict_aliases, $select_aliases) {
1737     for my $alias (keys %$collection) {
1738       $collection->{$_} = 1
1739         for (@{ $original_join_info{$alias}{-join_path} || [] });
1740     }
1741   }
1742
1743   # construct the inner $from for the subquery
1744   my %inner_joins = (map { %{$_ || {}} } ($restrict_aliases, $select_aliases) );
1745   my @inner_from;
1746   for my $j (@$from) {
1747     push @inner_from, $j if $inner_joins{$j->[0]{-alias}};
1748   }
1749
1750   # if a multi-type join was needed in the subquery ("multi" is indicated by
1751   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1752   unless ($inner_attrs->{group_by}) {
1753     for my $alias (keys %inner_joins) {
1754
1755       # the dot comes from some weirdness in collapse
1756       # remove after the rewrite
1757       if ($attrs->{collapse}{".$alias"}) {
1758         $inner_attrs->{group_by} ||= $inner_select;
1759         last;
1760       }
1761     }
1762   }
1763
1764   # demote the inner_from head
1765   $inner_from[0] = $inner_from[0][0];
1766
1767   # generate the subquery
1768   my $subq = $self->_select_args_to_query (
1769     \@inner_from,
1770     $inner_select,
1771     $where,
1772     $inner_attrs,
1773   );
1774
1775   my $subq_joinspec = {
1776     -alias => $attrs->{alias},
1777     -source_handle => $inner_from[0]{-source_handle},
1778     $attrs->{alias} => $subq,
1779   };
1780
1781   # Generate the outer from - this is relatively easy (really just replace
1782   # the join slot with the subquery), with a major caveat - we can not
1783   # join anything that is non-selecting (not part of the prefetch), but at
1784   # the same time is a multi-type relationship, as it will explode the result.
1785   #
1786   # There are two possibilities here
1787   # - either the join is non-restricting, in which case we simply throw it away
1788   # - it is part of the restrictions, in which case we need to collapse the outer
1789   #   result by tackling yet another group_by to the outside of the query
1790
1791   # so first generate the outer_from, up to the substitution point
1792   my @outer_from;
1793   while (my $j = shift @$from) {
1794     if ($j->[0]{-alias} eq $attrs->{alias}) { # time to swap
1795       push @outer_from, [
1796         $subq_joinspec,
1797         @{$j}[1 .. $#$j],
1798       ];
1799       last; # we'll take care of what's left in $from below
1800     }
1801     else {
1802       push @outer_from, $j;
1803     }
1804   }
1805
1806   # see what's left - throw away if not selecting/restricting
1807   # also throw in a group_by if restricting to guard against
1808   # cross-join explosions
1809   #
1810   while (my $j = shift @$from) {
1811     my $alias = $j->[0]{-alias};
1812
1813     if ($select_aliases->{$alias} || $prefetch_aliases->{$alias}) {
1814       push @outer_from, $j;
1815     }
1816     elsif ($restrict_aliases->{$alias}) {
1817       push @outer_from, $j;
1818
1819       # FIXME - this should be obviated by SQLA2, as I'll be able to 
1820       # have restrict_inner and restrict_outer... or something to that
1821       # effect... I think...
1822
1823       # FIXME2 - I can't find a clean way to determine if a particular join
1824       # is a multi - instead I am just treating everything as a potential
1825       # explosive join (ribasushi)
1826       #
1827       # if (my $handle = $j->[0]{-source_handle}) {
1828       #   my $rsrc = $handle->resolve;
1829       #   ... need to bail out of the following if this is not a multi,
1830       #       as it will be much easier on the db ...
1831
1832           $outer_attrs->{group_by} ||= $outer_select;
1833       # }
1834     }
1835   }
1836
1837   # demote the outer_from head
1838   $outer_from[0] = $outer_from[0][0];
1839
1840   # This is totally horrific - the $where ends up in both the inner and outer query
1841   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
1842   # then if where conditions apply to the *right* side of the prefetch, you may have
1843   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
1844   # the outer select to exclude joins you didin't want in the first place
1845   #
1846   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
1847   return (\@outer_from, $outer_select, $where, $outer_attrs);
1848 }
1849
1850 sub _resolve_ident_sources {
1851   my ($self, $ident) = @_;
1852
1853   my $alias2source = {};
1854   my $rs_alias;
1855
1856   # the reason this is so contrived is that $ident may be a {from}
1857   # structure, specifying multiple tables to join
1858   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1859     # this is compat mode for insert/update/delete which do not deal with aliases
1860     $alias2source->{me} = $ident;
1861     $rs_alias = 'me';
1862   }
1863   elsif (ref $ident eq 'ARRAY') {
1864
1865     for (@$ident) {
1866       my $tabinfo;
1867       if (ref $_ eq 'HASH') {
1868         $tabinfo = $_;
1869         $rs_alias = $tabinfo->{-alias};
1870       }
1871       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1872         $tabinfo = $_->[0];
1873       }
1874
1875       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
1876         if ($tabinfo->{-source_handle});
1877     }
1878   }
1879
1880   return ($alias2source, $rs_alias);
1881 }
1882
1883 # Takes $ident, \@column_names
1884 #
1885 # returns { $column_name => \%column_info, ... }
1886 # also note: this adds -result_source => $rsrc to the column info
1887 #
1888 # usage:
1889 #   my $col_sources = $self->_resolve_column_info($ident, @column_names);
1890 sub _resolve_column_info {
1891   my ($self, $ident, $colnames) = @_;
1892   my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
1893
1894   my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1895   $sep = "\Q$sep\E";
1896
1897   my (%return, %seen_cols);
1898
1899   # compile a global list of column names, to be able to properly
1900   # disambiguate unqualified column names (if at all possible)
1901   for my $alias (keys %$alias2src) {
1902     my $rsrc = $alias2src->{$alias};
1903     for my $colname ($rsrc->columns) {
1904       push @{$seen_cols{$colname}}, $alias;
1905     }
1906   }
1907
1908   COLUMN:
1909   foreach my $col (@$colnames) {
1910     my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
1911
1912     unless ($alias) {
1913       # see if the column was seen exactly once (so we know which rsrc it came from)
1914       if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
1915         $alias = $seen_cols{$colname}[0];
1916       }
1917       else {
1918         next COLUMN;
1919       }
1920     }
1921
1922     my $rsrc = $alias2src->{$alias};
1923     $return{$col} = $rsrc && {
1924       %{$rsrc->column_info($colname)},
1925       -result_source => $rsrc,
1926       -source_alias => $alias,
1927     };
1928   }
1929
1930   return \%return;
1931 }
1932
1933 # Returns a counting SELECT for a simple count
1934 # query. Abstracted so that a storage could override
1935 # this to { count => 'firstcol' } or whatever makes
1936 # sense as a performance optimization
1937 sub _count_select {
1938   #my ($self, $source, $rs_attrs) = @_;
1939   return { count => '*' };
1940 }
1941
1942 # Returns a SELECT which will end up in the subselect
1943 # There may or may not be a group_by, as the subquery
1944 # might have been called to accomodate a limit
1945 #
1946 # Most databases would be happy with whatever ends up
1947 # here, but some choke in various ways.
1948 #
1949 sub _subq_count_select {
1950   my ($self, $source, $rs_attrs) = @_;
1951   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1952
1953   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1954   return @pcols ? \@pcols : [ 1 ];
1955 }
1956
1957
1958 sub source_bind_attributes {
1959   my ($self, $source) = @_;
1960
1961   my $bind_attributes;
1962   foreach my $column ($source->columns) {
1963
1964     my $data_type = $source->column_info($column)->{data_type} || '';
1965     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1966      if $data_type;
1967   }
1968
1969   return $bind_attributes;
1970 }
1971
1972 =head2 select
1973
1974 =over 4
1975
1976 =item Arguments: $ident, $select, $condition, $attrs
1977
1978 =back
1979
1980 Handle a SQL select statement.
1981
1982 =cut
1983
1984 sub select {
1985   my $self = shift;
1986   my ($ident, $select, $condition, $attrs) = @_;
1987   return $self->cursor_class->new($self, \@_, $attrs);
1988 }
1989
1990 sub select_single {
1991   my $self = shift;
1992   my ($rv, $sth, @bind) = $self->_select(@_);
1993   my @row = $sth->fetchrow_array;
1994   my @nextrow = $sth->fetchrow_array if @row;
1995   if(@row && @nextrow) {
1996     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1997   }
1998   # Need to call finish() to work round broken DBDs
1999   $sth->finish();
2000   return @row;
2001 }
2002
2003 =head2 sth
2004
2005 =over 4
2006
2007 =item Arguments: $sql
2008
2009 =back
2010
2011 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2012
2013 =cut
2014
2015 sub _dbh_sth {
2016   my ($self, $dbh, $sql) = @_;
2017
2018   # 3 is the if_active parameter which avoids active sth re-use
2019   my $sth = $self->disable_sth_caching
2020     ? $dbh->prepare($sql)
2021     : $dbh->prepare_cached($sql, {}, 3);
2022
2023   # XXX You would think RaiseError would make this impossible,
2024   #  but apparently that's not true :(
2025   $self->throw_exception($dbh->errstr) if !$sth;
2026
2027   $sth;
2028 }
2029
2030 sub sth {
2031   my ($self, $sql) = @_;
2032   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2033 }
2034
2035 sub _dbh_columns_info_for {
2036   my ($self, $dbh, $table) = @_;
2037
2038   if ($dbh->can('column_info')) {
2039     my %result;
2040     eval {
2041       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2042       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2043       $sth->execute();
2044       while ( my $info = $sth->fetchrow_hashref() ){
2045         my %column_info;
2046         $column_info{data_type}   = $info->{TYPE_NAME};
2047         $column_info{size}      = $info->{COLUMN_SIZE};
2048         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2049         $column_info{default_value} = $info->{COLUMN_DEF};
2050         my $col_name = $info->{COLUMN_NAME};
2051         $col_name =~ s/^\"(.*)\"$/$1/;
2052
2053         $result{$col_name} = \%column_info;
2054       }
2055     };
2056     return \%result if !$@ && scalar keys %result;
2057   }
2058
2059   my %result;
2060   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2061   $sth->execute;
2062   my @columns = @{$sth->{NAME_lc}};
2063   for my $i ( 0 .. $#columns ){
2064     my %column_info;
2065     $column_info{data_type} = $sth->{TYPE}->[$i];
2066     $column_info{size} = $sth->{PRECISION}->[$i];
2067     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2068
2069     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2070       $column_info{data_type} = $1;
2071       $column_info{size}    = $2;
2072     }
2073
2074     $result{$columns[$i]} = \%column_info;
2075   }
2076   $sth->finish;
2077
2078   foreach my $col (keys %result) {
2079     my $colinfo = $result{$col};
2080     my $type_num = $colinfo->{data_type};
2081     my $type_name;
2082     if(defined $type_num && $dbh->can('type_info')) {
2083       my $type_info = $dbh->type_info($type_num);
2084       $type_name = $type_info->{TYPE_NAME} if $type_info;
2085       $colinfo->{data_type} = $type_name if $type_name;
2086     }
2087   }
2088
2089   return \%result;
2090 }
2091
2092 sub columns_info_for {
2093   my ($self, $table) = @_;
2094   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2095 }
2096
2097 =head2 last_insert_id
2098
2099 Return the row id of the last insert.
2100
2101 =cut
2102
2103 sub _dbh_last_insert_id {
2104     # All Storage's need to register their own _dbh_last_insert_id
2105     # the old SQLite-based method was highly inappropriate
2106
2107     my $self = shift;
2108     my $class = ref $self;
2109     $self->throw_exception (<<EOE);
2110
2111 No _dbh_last_insert_id() method found in $class.
2112 Since the method of obtaining the autoincrement id of the last insert
2113 operation varies greatly between different databases, this method must be
2114 individually implemented for every storage class.
2115 EOE
2116 }
2117
2118 sub last_insert_id {
2119   my $self = shift;
2120   $self->_dbh_last_insert_id ($self->_dbh, @_);
2121 }
2122
2123 =head2 _native_data_type
2124
2125 =over 4
2126
2127 =item Arguments: $type_name
2128
2129 =back
2130
2131 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2132 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2133 L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
2134
2135 The default implementation returns C<undef>, implement in your Storage driver if
2136 you need this functionality.
2137
2138 Should map types from other databases to the native RDBMS type, for example
2139 C<VARCHAR2> to C<VARCHAR>.
2140
2141 Types with modifiers should map to the underlying data type. For example,
2142 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2143
2144 Composite types should map to the container type, for example
2145 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2146
2147 =cut
2148
2149 sub _native_data_type {
2150   #my ($self, $data_type) = @_;
2151   return undef
2152 }
2153
2154 =head2 sqlt_type
2155
2156 Returns the database driver name.
2157
2158 =cut
2159
2160 sub sqlt_type {
2161   my ($self) = @_;
2162
2163   if (not $self->_driver_determined) {
2164     $self->_determine_driver;
2165     goto $self->can ('sqlt_type');
2166   }
2167
2168   $self->_get_dbh->{Driver}->{Name};
2169 }
2170
2171 =head2 bind_attribute_by_data_type
2172
2173 Given a datatype from column info, returns a database specific bind
2174 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2175 let the database planner just handle it.
2176
2177 Generally only needed for special case column types, like bytea in postgres.
2178
2179 =cut
2180
2181 sub bind_attribute_by_data_type {
2182     return;
2183 }
2184
2185 =head2 is_datatype_numeric
2186
2187 Given a datatype from column_info, returns a boolean value indicating if
2188 the current RDBMS considers it a numeric value. This controls how
2189 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2190 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2191 be performed instead of the usual C<eq>.
2192
2193 =cut
2194
2195 sub is_datatype_numeric {
2196   my ($self, $dt) = @_;
2197
2198   return 0 unless $dt;
2199
2200   return $dt =~ /^ (?:
2201     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2202   ) $/ix;
2203 }
2204
2205
2206 =head2 create_ddl_dir (EXPERIMENTAL)
2207
2208 =over 4
2209
2210 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2211
2212 =back
2213
2214 Creates a SQL file based on the Schema, for each of the specified
2215 database engines in C<\@databases> in the given directory.
2216 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2217
2218 Given a previous version number, this will also create a file containing
2219 the ALTER TABLE statements to transform the previous schema into the
2220 current one. Note that these statements may contain C<DROP TABLE> or
2221 C<DROP COLUMN> statements that can potentially destroy data.
2222
2223 The file names are created using the C<ddl_filename> method below, please
2224 override this method in your schema if you would like a different file
2225 name format. For the ALTER file, the same format is used, replacing
2226 $version in the name with "$preversion-$version".
2227
2228 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2229 The most common value for this would be C<< { add_drop_table => 1 } >>
2230 to have the SQL produced include a C<DROP TABLE> statement for each table
2231 created. For quoting purposes supply C<quote_table_names> and
2232 C<quote_field_names>.
2233
2234 If no arguments are passed, then the following default values are assumed:
2235
2236 =over 4
2237
2238 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2239
2240 =item version    - $schema->schema_version
2241
2242 =item directory  - './'
2243
2244 =item preversion - <none>
2245
2246 =back
2247
2248 By default, C<\%sqlt_args> will have
2249
2250  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2251
2252 merged with the hash passed in. To disable any of those features, pass in a
2253 hashref like the following
2254
2255  { ignore_constraint_names => 0, # ... other options }
2256
2257
2258 Note that this feature is currently EXPERIMENTAL and may not work correctly
2259 across all databases, or fully handle complex relationships.
2260
2261 WARNING: Please check all SQL files created, before applying them.
2262
2263 =cut
2264
2265 sub create_ddl_dir {
2266   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2267
2268   if(!$dir || !-d $dir) {
2269     carp "No directory given, using ./\n";
2270     $dir = "./";
2271   }
2272   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2273   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2274
2275   my $schema_version = $schema->schema_version || '1.x';
2276   $version ||= $schema_version;
2277
2278   $sqltargs = {
2279     add_drop_table => 1,
2280     ignore_constraint_names => 1,
2281     ignore_index_names => 1,
2282     %{$sqltargs || {}}
2283   };
2284
2285   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
2286       . $self->_check_sqlt_message . q{'})
2287           if !$self->_check_sqlt_version;
2288
2289   my $sqlt = SQL::Translator->new( $sqltargs );
2290
2291   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2292   my $sqlt_schema = $sqlt->translate({ data => $schema })
2293     or $self->throw_exception ($sqlt->error);
2294
2295   foreach my $db (@$databases) {
2296     $sqlt->reset();
2297     $sqlt->{schema} = $sqlt_schema;
2298     $sqlt->producer($db);
2299
2300     my $file;
2301     my $filename = $schema->ddl_filename($db, $version, $dir);
2302     if (-e $filename && ($version eq $schema_version )) {
2303       # if we are dumping the current version, overwrite the DDL
2304       carp "Overwriting existing DDL file - $filename";
2305       unlink($filename);
2306     }
2307
2308     my $output = $sqlt->translate;
2309     if(!$output) {
2310       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2311       next;
2312     }
2313     if(!open($file, ">$filename")) {
2314       $self->throw_exception("Can't open $filename for writing ($!)");
2315       next;
2316     }
2317     print $file $output;
2318     close($file);
2319
2320     next unless ($preversion);
2321
2322     require SQL::Translator::Diff;
2323
2324     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2325     if(!-e $prefilename) {
2326       carp("No previous schema file found ($prefilename)");
2327       next;
2328     }
2329
2330     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2331     if(-e $difffile) {
2332       carp("Overwriting existing diff file - $difffile");
2333       unlink($difffile);
2334     }
2335
2336     my $source_schema;
2337     {
2338       my $t = SQL::Translator->new($sqltargs);
2339       $t->debug( 0 );
2340       $t->trace( 0 );
2341
2342       $t->parser( $db )
2343         or $self->throw_exception ($t->error);
2344
2345       my $out = $t->translate( $prefilename )
2346         or $self->throw_exception ($t->error);
2347
2348       $source_schema = $t->schema;
2349
2350       $source_schema->name( $prefilename )
2351         unless ( $source_schema->name );
2352     }
2353
2354     # The "new" style of producers have sane normalization and can support
2355     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2356     # And we have to diff parsed SQL against parsed SQL.
2357     my $dest_schema = $sqlt_schema;
2358
2359     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2360       my $t = SQL::Translator->new($sqltargs);
2361       $t->debug( 0 );
2362       $t->trace( 0 );
2363
2364       $t->parser( $db )
2365         or $self->throw_exception ($t->error);
2366
2367       my $out = $t->translate( $filename )
2368         or $self->throw_exception ($t->error);
2369
2370       $dest_schema = $t->schema;
2371
2372       $dest_schema->name( $filename )
2373         unless $dest_schema->name;
2374     }
2375
2376     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2377                                                   $dest_schema,   $db,
2378                                                   $sqltargs
2379                                                  );
2380     if(!open $file, ">$difffile") {
2381       $self->throw_exception("Can't write to $difffile ($!)");
2382       next;
2383     }
2384     print $file $diff;
2385     close($file);
2386   }
2387 }
2388
2389 =head2 deployment_statements
2390
2391 =over 4
2392
2393 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2394
2395 =back
2396
2397 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2398
2399 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2400 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2401
2402 C<$directory> is used to return statements from files in a previously created
2403 L</create_ddl_dir> directory and is optional. The filenames are constructed
2404 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2405
2406 If no C<$directory> is specified then the statements are constructed on the
2407 fly using L<SQL::Translator> and C<$version> is ignored.
2408
2409 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2410
2411 =cut
2412
2413 sub deployment_statements {
2414   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2415   $type ||= $self->sqlt_type;
2416   $version ||= $schema->schema_version || '1.x';
2417   $dir ||= './';
2418   my $filename = $schema->ddl_filename($type, $version, $dir);
2419   if(-f $filename)
2420   {
2421       my $file;
2422       open($file, "<$filename")
2423         or $self->throw_exception("Can't open $filename ($!)");
2424       my @rows = <$file>;
2425       close($file);
2426       return join('', @rows);
2427   }
2428
2429   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
2430       . $self->_check_sqlt_message . q{'})
2431           if !$self->_check_sqlt_version;
2432
2433   # sources needs to be a parser arg, but for simplicty allow at top level
2434   # coming in
2435   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2436       if exists $sqltargs->{sources};
2437
2438   my $tr = SQL::Translator->new(
2439     producer => "SQL::Translator::Producer::${type}",
2440     %$sqltargs,
2441     parser => 'SQL::Translator::Parser::DBIx::Class',
2442     data => $schema,
2443   );
2444   return $tr->translate;
2445 }
2446
2447 sub deploy {
2448   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2449   my $deploy = sub {
2450     my $line = shift;
2451     return if($line =~ /^--/);
2452     return if(!$line);
2453     # next if($line =~ /^DROP/m);
2454     return if($line =~ /^BEGIN TRANSACTION/m);
2455     return if($line =~ /^COMMIT/m);
2456     return if $line =~ /^\s+$/; # skip whitespace only
2457     $self->_query_start($line);
2458     eval {
2459       # do a dbh_do cycle here, as we need some error checking in
2460       # place (even though we will ignore errors)
2461       $self->dbh_do (sub { $_[1]->do($line) });
2462     };
2463     if ($@) {
2464       carp qq{$@ (running "${line}")};
2465     }
2466     $self->_query_end($line);
2467   };
2468   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2469   if (@statements > 1) {
2470     foreach my $statement (@statements) {
2471       $deploy->( $statement );
2472     }
2473   }
2474   elsif (@statements == 1) {
2475     foreach my $line ( split(";\n", $statements[0])) {
2476       $deploy->( $line );
2477     }
2478   }
2479 }
2480
2481 =head2 datetime_parser
2482
2483 Returns the datetime parser class
2484
2485 =cut
2486
2487 sub datetime_parser {
2488   my $self = shift;
2489   return $self->{datetime_parser} ||= do {
2490     $self->_populate_dbh unless $self->_dbh;
2491     $self->build_datetime_parser(@_);
2492   };
2493 }
2494
2495 =head2 datetime_parser_type
2496
2497 Defines (returns) the datetime parser class - currently hardwired to
2498 L<DateTime::Format::MySQL>
2499
2500 =cut
2501
2502 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2503
2504 =head2 build_datetime_parser
2505
2506 See L</datetime_parser>
2507
2508 =cut
2509
2510 sub build_datetime_parser {
2511   my $self = shift;
2512   my $type = $self->datetime_parser_type(@_);
2513   eval "use ${type}";
2514   $self->throw_exception("Couldn't load ${type}: $@") if $@;
2515   return $type;
2516 }
2517
2518 {
2519     my $_check_sqlt_version; # private
2520     my $_check_sqlt_message; # private
2521     sub _check_sqlt_version {
2522         return $_check_sqlt_version if defined $_check_sqlt_version;
2523         eval 'use SQL::Translator "0.09003"';
2524         $_check_sqlt_message = $@ || '';
2525         $_check_sqlt_version = !$@;
2526     }
2527
2528     sub _check_sqlt_message {
2529         _check_sqlt_version if !defined $_check_sqlt_message;
2530         $_check_sqlt_message;
2531     }
2532 }
2533
2534 =head2 is_replicating
2535
2536 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2537 replicate from a master database.  Default is undef, which is the result
2538 returned by databases that don't support replication.
2539
2540 =cut
2541
2542 sub is_replicating {
2543     return;
2544
2545 }
2546
2547 =head2 lag_behind_master
2548
2549 Returns a number that represents a certain amount of lag behind a master db
2550 when a given storage is replicating.  The number is database dependent, but
2551 starts at zero and increases with the amount of lag. Default in undef
2552
2553 =cut
2554
2555 sub lag_behind_master {
2556     return;
2557 }
2558
2559 sub DESTROY {
2560   my $self = shift;
2561   $self->_verify_pid if $self->_dbh;
2562
2563   # some databases need this to stop spewing warnings
2564   if (my $dbh = $self->_dbh) {
2565     eval { $dbh->disconnect };
2566   }
2567
2568   $self->_dbh(undef);
2569 }
2570
2571 1;
2572
2573 =head1 USAGE NOTES
2574
2575 =head2 DBIx::Class and AutoCommit
2576
2577 DBIx::Class can do some wonderful magic with handling exceptions,
2578 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2579 (the default) combined with C<txn_do> for transaction support.
2580
2581 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2582 in an assumed transaction between commits, and you're telling us you'd
2583 like to manage that manually.  A lot of the magic protections offered by
2584 this module will go away.  We can't protect you from exceptions due to database
2585 disconnects because we don't know anything about how to restart your
2586 transactions.  You're on your own for handling all sorts of exceptional
2587 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2588 be with raw DBI.
2589
2590
2591 =head1 AUTHORS
2592
2593 Matt S. Trout <mst@shadowcatsystems.co.uk>
2594
2595 Andy Grundman <andy@hybridized.org>
2596
2597 =head1 LICENSE
2598
2599 You may distribute this code under the same terms as Perl itself.
2600
2601 =cut