Consolidate _verify_pid calls
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base 'DBIx::Class::Storage';
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16
17 __PACKAGE__->mk_group_accessors('simple' =>
18   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
19      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
20 );
21
22 # the values for these accessors are picked out (and deleted) from
23 # the attribute hashref passed to connect_info
24 my @storage_options = qw/
25   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
26   disable_sth_caching unsafe auto_savepoint
27 /;
28 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
29
30
31 # default cursor class, overridable in connect_info attributes
32 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
33
34 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
35 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
36
37
38 =head1 NAME
39
40 DBIx::Class::Storage::DBI - DBI storage handler
41
42 =head1 SYNOPSIS
43
44   my $schema = MySchema->connect('dbi:SQLite:my.db');
45
46   $schema->storage->debug(1);
47
48   my @stuff = $schema->storage->dbh_do(
49     sub {
50       my ($storage, $dbh, @args) = @_;
51       $dbh->do("DROP TABLE authors");
52     },
53     @column_list
54   );
55
56   $schema->resultset('Book')->search({
57      written_on => $schema->storage->datetime_parser(DateTime->now)
58   });
59
60 =head1 DESCRIPTION
61
62 This class represents the connection to an RDBMS via L<DBI>.  See
63 L<DBIx::Class::Storage> for general information.  This pod only
64 documents DBI-specific methods and behaviors.
65
66 =head1 METHODS
67
68 =cut
69
70 sub new {
71   my $new = shift->next::method(@_);
72
73   $new->transaction_depth(0);
74   $new->_sql_maker_opts({});
75   $new->{savepoints} = [];
76   $new->{_in_dbh_do} = 0;
77   $new->{_dbh_gen} = 0;
78
79   $new;
80 }
81
82 =head2 connect_info
83
84 This method is normally called by L<DBIx::Class::Schema/connection>, which
85 encapsulates its argument list in an arrayref before passing them here.
86
87 The argument list may contain:
88
89 =over
90
91 =item *
92
93 The same 4-element argument set one would normally pass to
94 L<DBI/connect>, optionally followed by
95 L<extra attributes|/DBIx::Class specific connection attributes>
96 recognized by DBIx::Class:
97
98   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
99
100 =item *
101
102 A single code reference which returns a connected
103 L<DBI database handle|DBI/connect> optionally followed by
104 L<extra attributes|/DBIx::Class specific connection attributes> recognized
105 by DBIx::Class:
106
107   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
108
109 =item *
110
111 A single hashref with all the attributes and the dsn/user/password
112 mixed together:
113
114   $connect_info_args = [{
115     dsn => $dsn,
116     user => $user,
117     password => $pass,
118     %dbi_attributes,
119     %extra_attributes,
120   }];
121
122   $connect_info_args = [{
123     dbh_maker => sub { DBI->connect (...) },
124     %dbi_attributes,
125     %extra_attributes,
126   }];
127
128 This is particularly useful for L<Catalyst> based applications, allowing the
129 following config (L<Config::General> style):
130
131   <Model::DB>
132     schema_class   App::DB
133     <connect_info>
134       dsn          dbi:mysql:database=test
135       user         testuser
136       password     TestPass
137       AutoCommit   1
138     </connect_info>
139   </Model::DB>
140
141 The C<dsn>/C<user>/C<password> combination can be substituted by the
142 C<dbh_maker> key whose value is a coderef that returns a connected
143 L<DBI database handle|DBI/connect>
144
145 =back
146
147 Please note that the L<DBI> docs recommend that you always explicitly
148 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
149 recommends that it be set to I<1>, and that you perform transactions
150 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
151 to I<1> if you do not do explicitly set it to zero.  This is the default
152 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
153
154 =head3 DBIx::Class specific connection attributes
155
156 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
157 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
158 the following connection options. These options can be mixed in with your other
159 L<DBI> connection attributes, or placed in a seperate hashref
160 (C<\%extra_attributes>) as shown above.
161
162 Every time C<connect_info> is invoked, any previous settings for
163 these options will be cleared before setting the new ones, regardless of
164 whether any options are specified in the new C<connect_info>.
165
166
167 =over
168
169 =item on_connect_do
170
171 Specifies things to do immediately after connecting or re-connecting to
172 the database.  Its value may contain:
173
174 =over
175
176 =item a scalar
177
178 This contains one SQL statement to execute.
179
180 =item an array reference
181
182 This contains SQL statements to execute in order.  Each element contains
183 a string or a code reference that returns a string.
184
185 =item a code reference
186
187 This contains some code to execute.  Unlike code references within an
188 array reference, its return value is ignored.
189
190 =back
191
192 =item on_disconnect_do
193
194 Takes arguments in the same form as L</on_connect_do> and executes them
195 immediately before disconnecting from the database.
196
197 Note, this only runs if you explicitly call L</disconnect> on the
198 storage object.
199
200 =item on_connect_call
201
202 A more generalized form of L</on_connect_do> that calls the specified
203 C<connect_call_METHOD> methods in your storage driver.
204
205   on_connect_do => 'select 1'
206
207 is equivalent to:
208
209   on_connect_call => [ [ do_sql => 'select 1' ] ]
210
211 Its values may contain:
212
213 =over
214
215 =item a scalar
216
217 Will call the C<connect_call_METHOD> method.
218
219 =item a code reference
220
221 Will execute C<< $code->($storage) >>
222
223 =item an array reference
224
225 Each value can be a method name or code reference.
226
227 =item an array of arrays
228
229 For each array, the first item is taken to be the C<connect_call_> method name
230 or code reference, and the rest are parameters to it.
231
232 =back
233
234 Some predefined storage methods you may use:
235
236 =over
237
238 =item do_sql
239
240 Executes a SQL string or a code reference that returns a SQL string. This is
241 what L</on_connect_do> and L</on_disconnect_do> use.
242
243 It can take:
244
245 =over
246
247 =item a scalar
248
249 Will execute the scalar as SQL.
250
251 =item an arrayref
252
253 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
254 attributes hashref and bind values.
255
256 =item a code reference
257
258 Will execute C<< $code->($storage) >> and execute the return array refs as
259 above.
260
261 =back
262
263 =item datetime_setup
264
265 Execute any statements necessary to initialize the database session to return
266 and accept datetime/timestamp values used with
267 L<DBIx::Class::InflateColumn::DateTime>.
268
269 Only necessary for some databases, see your specific storage driver for
270 implementation details.
271
272 =back
273
274 =item on_disconnect_call
275
276 Takes arguments in the same form as L</on_connect_call> and executes them
277 immediately before disconnecting from the database.
278
279 Calls the C<disconnect_call_METHOD> methods as opposed to the
280 C<connect_call_METHOD> methods called by L</on_connect_call>.
281
282 Note, this only runs if you explicitly call L</disconnect> on the
283 storage object.
284
285 =item disable_sth_caching
286
287 If set to a true value, this option will disable the caching of
288 statement handles via L<DBI/prepare_cached>.
289
290 =item limit_dialect
291
292 Sets the limit dialect. This is useful for JDBC-bridge among others
293 where the remote SQL-dialect cannot be determined by the name of the
294 driver alone. See also L<SQL::Abstract::Limit>.
295
296 =item quote_char
297
298 Specifies what characters to use to quote table and column names. If
299 you use this you will want to specify L</name_sep> as well.
300
301 C<quote_char> expects either a single character, in which case is it
302 is placed on either side of the table/column name, or an arrayref of length
303 2 in which case the table/column name is placed between the elements.
304
305 For example under MySQL you should use C<< quote_char => '`' >>, and for
306 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
307
308 =item name_sep
309
310 This only needs to be used in conjunction with C<quote_char>, and is used to
311 specify the charecter that seperates elements (schemas, tables, columns) from
312 each other. In most cases this is simply a C<.>.
313
314 The consequences of not supplying this value is that L<SQL::Abstract>
315 will assume DBIx::Class' uses of aliases to be complete column
316 names. The output will look like I<"me.name"> when it should actually
317 be I<"me"."name">.
318
319 =item unsafe
320
321 This Storage driver normally installs its own C<HandleError>, sets
322 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
323 all database handles, including those supplied by a coderef.  It does this
324 so that it can have consistent and useful error behavior.
325
326 If you set this option to a true value, Storage will not do its usual
327 modifications to the database handle's attributes, and instead relies on
328 the settings in your connect_info DBI options (or the values you set in
329 your connection coderef, in the case that you are connecting via coderef).
330
331 Note that your custom settings can cause Storage to malfunction,
332 especially if you set a C<HandleError> handler that suppresses exceptions
333 and/or disable C<RaiseError>.
334
335 =item auto_savepoint
336
337 If this option is true, L<DBIx::Class> will use savepoints when nesting
338 transactions, making it possible to recover from failure in the inner
339 transaction without having to abort all outer transactions.
340
341 =item cursor_class
342
343 Use this argument to supply a cursor class other than the default
344 L<DBIx::Class::Storage::DBI::Cursor>.
345
346 =back
347
348 Some real-life examples of arguments to L</connect_info> and
349 L<DBIx::Class::Schema/connect>
350
351   # Simple SQLite connection
352   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
353
354   # Connect via subref
355   ->connect_info([ sub { DBI->connect(...) } ]);
356
357   # Connect via subref in hashref
358   ->connect_info([{
359     dbh_maker => sub { DBI->connect(...) },
360     on_connect_do => 'alter session ...',
361   }]);
362
363   # A bit more complicated
364   ->connect_info(
365     [
366       'dbi:Pg:dbname=foo',
367       'postgres',
368       'my_pg_password',
369       { AutoCommit => 1 },
370       { quote_char => q{"}, name_sep => q{.} },
371     ]
372   );
373
374   # Equivalent to the previous example
375   ->connect_info(
376     [
377       'dbi:Pg:dbname=foo',
378       'postgres',
379       'my_pg_password',
380       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
381     ]
382   );
383
384   # Same, but with hashref as argument
385   # See parse_connect_info for explanation
386   ->connect_info(
387     [{
388       dsn         => 'dbi:Pg:dbname=foo',
389       user        => 'postgres',
390       password    => 'my_pg_password',
391       AutoCommit  => 1,
392       quote_char  => q{"},
393       name_sep    => q{.},
394     }]
395   );
396
397   # Subref + DBIx::Class-specific connection options
398   ->connect_info(
399     [
400       sub { DBI->connect(...) },
401       {
402           quote_char => q{`},
403           name_sep => q{@},
404           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
405           disable_sth_caching => 1,
406       },
407     ]
408   );
409
410
411
412 =cut
413
414 sub connect_info {
415   my ($self, $info_arg) = @_;
416
417   return $self->_connect_info if !$info_arg;
418
419   my @args = @$info_arg;  # take a shallow copy for further mutilation
420   $self->_connect_info([@args]); # copy for _connect_info
421
422
423   # combine/pre-parse arguments depending on invocation style
424
425   my %attrs;
426   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
427     %attrs = %{ $args[1] || {} };
428     @args = $args[0];
429   }
430   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
431     %attrs = %{$args[0]};
432     @args = ();
433     if (my $code = delete $attrs{dbh_maker}) {
434       @args = $code;
435
436       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
437       if (@ignored) {
438         carp sprintf (
439             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
440           . "to the result of 'dbh_maker'",
441
442           join (', ', map { "'$_'" } (@ignored) ),
443         );
444       }
445     }
446     else {
447       @args = delete @attrs{qw/dsn user password/};
448     }
449   }
450   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
451     %attrs = (
452       % { $args[3] || {} },
453       % { $args[4] || {} },
454     );
455     @args = @args[0,1,2];
456   }
457
458   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
459   #  the new set of options
460   $self->_sql_maker(undef);
461   $self->_sql_maker_opts({});
462
463   if(keys %attrs) {
464     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
465       if(my $value = delete $attrs{$storage_opt}) {
466         $self->$storage_opt($value);
467       }
468     }
469     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
470       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
471         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
472       }
473     }
474   }
475
476   if (ref $args[0] eq 'CODE') {
477     # _connect() never looks past $args[0] in this case
478     %attrs = ()
479   } else {
480     %attrs = (
481       %{ $self->_default_dbi_connect_attributes || {} },
482       %attrs,
483     );
484   }
485
486   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
487   $self->_connect_info;
488 }
489
490 sub _default_dbi_connect_attributes {
491   return {
492     AutoCommit => 1,
493     RaiseError => 1,
494     PrintError => 0,
495   };
496 }
497
498 =head2 on_connect_do
499
500 This method is deprecated in favour of setting via L</connect_info>.
501
502 =cut
503
504 =head2 on_disconnect_do
505
506 This method is deprecated in favour of setting via L</connect_info>.
507
508 =cut
509
510 sub _parse_connect_do {
511   my ($self, $type) = @_;
512
513   my $val = $self->$type;
514   return () if not defined $val;
515
516   my @res;
517
518   if (not ref($val)) {
519     push @res, [ 'do_sql', $val ];
520   } elsif (ref($val) eq 'CODE') {
521     push @res, $val;
522   } elsif (ref($val) eq 'ARRAY') {
523     push @res, map { [ 'do_sql', $_ ] } @$val;
524   } else {
525     $self->throw_exception("Invalid type for $type: ".ref($val));
526   }
527
528   return \@res;
529 }
530
531 =head2 dbh_do
532
533 Arguments: ($subref | $method_name), @extra_coderef_args?
534
535 Execute the given $subref or $method_name using the new exception-based
536 connection management.
537
538 The first two arguments will be the storage object that C<dbh_do> was called
539 on and a database handle to use.  Any additional arguments will be passed
540 verbatim to the called subref as arguments 2 and onwards.
541
542 Using this (instead of $self->_dbh or $self->dbh) ensures correct
543 exception handling and reconnection (or failover in future subclasses).
544
545 Your subref should have no side-effects outside of the database, as
546 there is the potential for your subref to be partially double-executed
547 if the database connection was stale/dysfunctional.
548
549 Example:
550
551   my @stuff = $schema->storage->dbh_do(
552     sub {
553       my ($storage, $dbh, @cols) = @_;
554       my $cols = join(q{, }, @cols);
555       $dbh->selectrow_array("SELECT $cols FROM foo");
556     },
557     @column_list
558   );
559
560 =cut
561
562 sub dbh_do {
563   my $self = shift;
564   my $code = shift;
565
566   my $dbh = $self->_get_dbh;
567
568   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
569       || $self->{transaction_depth};
570
571   local $self->{_in_dbh_do} = 1;
572
573   my @result;
574   my $want_array = wantarray;
575
576   eval {
577
578     if($want_array) {
579         @result = $self->$code($dbh, @_);
580     }
581     elsif(defined $want_array) {
582         $result[0] = $self->$code($dbh, @_);
583     }
584     else {
585         $self->$code($dbh, @_);
586     }
587   };
588
589   # ->connected might unset $@ - copy
590   my $exception = $@;
591   if(!$exception) { return $want_array ? @result : $result[0] }
592
593   $self->throw_exception($exception) if $self->connected;
594
595   # We were not connected - reconnect and retry, but let any
596   #  exception fall right through this time
597   carp "Retrying $code after catching disconnected exception: $exception"
598     if $ENV{DBIC_DBIRETRY_DEBUG};
599   $self->_populate_dbh;
600   $self->$code($self->_dbh, @_);
601 }
602
603 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
604 # It also informs dbh_do to bypass itself while under the direction of txn_do,
605 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
606 sub txn_do {
607   my $self = shift;
608   my $coderef = shift;
609
610   ref $coderef eq 'CODE' or $self->throw_exception
611     ('$coderef must be a CODE reference');
612
613   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
614
615   local $self->{_in_dbh_do} = 1;
616
617   my @result;
618   my $want_array = wantarray;
619
620   my $tried = 0;
621   while(1) {
622     eval {
623       $self->_get_dbh;
624
625       $self->txn_begin;
626       if($want_array) {
627           @result = $coderef->(@_);
628       }
629       elsif(defined $want_array) {
630           $result[0] = $coderef->(@_);
631       }
632       else {
633           $coderef->(@_);
634       }
635       $self->txn_commit;
636     };
637
638     # ->connected might unset $@ - copy
639     my $exception = $@;
640     if(!$exception) { return $want_array ? @result : $result[0] }
641
642     if($tried++ || $self->connected) {
643       eval { $self->txn_rollback };
644       my $rollback_exception = $@;
645       if($rollback_exception) {
646         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
647         $self->throw_exception($exception)  # propagate nested rollback
648           if $rollback_exception =~ /$exception_class/;
649
650         $self->throw_exception(
651           "Transaction aborted: ${exception}. "
652           . "Rollback failed: ${rollback_exception}"
653         );
654       }
655       $self->throw_exception($exception)
656     }
657
658     # We were not connected, and was first try - reconnect and retry
659     # via the while loop
660     carp "Retrying $coderef after catching disconnected exception: $exception"
661       if $ENV{DBIC_DBIRETRY_DEBUG};
662     $self->_populate_dbh;
663   }
664 }
665
666 =head2 disconnect
667
668 Our C<disconnect> method also performs a rollback first if the
669 database is not in C<AutoCommit> mode.
670
671 =cut
672
673 sub disconnect {
674   my ($self) = @_;
675
676   if( $self->_dbh ) {
677     my @actions;
678
679     push @actions, ( $self->on_disconnect_call || () );
680     push @actions, $self->_parse_connect_do ('on_disconnect_do');
681
682     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
683
684     $self->_dbh->rollback unless $self->_dbh_autocommit;
685     $self->_dbh->disconnect;
686     $self->_dbh(undef);
687     $self->{_dbh_gen}++;
688   }
689 }
690
691 =head2 with_deferred_fk_checks
692
693 =over 4
694
695 =item Arguments: C<$coderef>
696
697 =item Return Value: The return value of $coderef
698
699 =back
700
701 Storage specific method to run the code ref with FK checks deferred or
702 in MySQL's case disabled entirely.
703
704 =cut
705
706 # Storage subclasses should override this
707 sub with_deferred_fk_checks {
708   my ($self, $sub) = @_;
709
710   $sub->();
711 }
712
713 =head2 connected
714
715 =over
716
717 =item Arguments: none
718
719 =item Return Value: 1|0
720
721 =back
722
723 Verifies that the the current database handle is active and ready to execute
724 an SQL statement (i.e. the connection did not get stale, server is still
725 answering, etc.) This method is used internally by L</dbh>.
726
727 =cut
728
729 sub connected {
730   my $self = shift;
731   return 0 unless $self->_seems_connected;
732
733   #be on the safe side
734   local $self->_dbh->{RaiseError} = 1;
735
736   return $self->_ping;
737 }
738
739 sub _seems_connected {
740   my $self = shift;
741
742   my $dbh = $self->_dbh
743     or return 0;
744
745   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
746     $self->_dbh(undef);
747     $self->{_dbh_gen}++;
748     return 0;
749   }
750   else {
751     $self->_verify_pid;
752     return 0 if !$self->_dbh;
753   }
754
755   return $dbh->FETCH('Active');
756 }
757
758 sub _ping {
759   my $self = shift;
760
761   my $dbh = $self->_dbh or return 0;
762
763   return $dbh->ping;
764 }
765
766 # handle pid changes correctly
767 #  NOTE: assumes $self->_dbh is a valid $dbh
768 sub _verify_pid {
769   my ($self) = @_;
770
771   return if defined $self->_conn_pid && $self->_conn_pid == $$;
772
773   $self->_dbh->{InactiveDestroy} = 1;
774   $self->_dbh(undef);
775   $self->{_dbh_gen}++;
776
777   return;
778 }
779
780 sub ensure_connected {
781   my ($self) = @_;
782
783   unless ($self->connected) {
784     $self->_populate_dbh;
785   }
786 }
787
788 =head2 dbh
789
790 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
791 is guaranteed to be healthy by implicitly calling L</connected>, and if
792 necessary performing a reconnection before returning. Keep in mind that this
793 is very B<expensive> on some database engines. Consider using L<dbh_do>
794 instead.
795
796 =cut
797
798 sub dbh {
799   my ($self) = @_;
800
801   if (not $self->_dbh) {
802     $self->_populate_dbh;
803   } else {
804     $self->ensure_connected;
805   }
806   return $self->_dbh;
807 }
808
809 # this is the internal "get dbh or connect (don't check)" method
810 sub _get_dbh {
811   my $self = shift;
812   $self->_verify_pid if $self->_dbh;
813   $self->_populate_dbh unless $self->_dbh;
814   return $self->_dbh;
815 }
816
817 sub _sql_maker_args {
818     my ($self) = @_;
819
820     return (
821       bindtype=>'columns',
822       array_datatypes => 1,
823       limit_dialect => $self->_get_dbh,
824       %{$self->_sql_maker_opts}
825     );
826 }
827
828 sub sql_maker {
829   my ($self) = @_;
830   unless ($self->_sql_maker) {
831     my $sql_maker_class = $self->sql_maker_class;
832     $self->ensure_class_loaded ($sql_maker_class);
833     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
834   }
835   return $self->_sql_maker;
836 }
837
838 sub _rebless {}
839
840 sub _populate_dbh {
841   my ($self) = @_;
842
843   my @info = @{$self->_dbi_connect_info || []};
844   $self->_dbh(undef); # in case ->connected failed we might get sent here
845   $self->_dbh($self->_connect(@info));
846
847   $self->_conn_pid($$);
848   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
849
850   $self->_determine_driver;
851
852   # Always set the transaction depth on connect, since
853   #  there is no transaction in progress by definition
854   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
855
856   $self->_run_connection_actions unless $self->{_in_determine_driver};
857 }
858
859 sub _run_connection_actions {
860   my $self = shift;
861   my @actions;
862
863   push @actions, ( $self->on_connect_call || () );
864   push @actions, $self->_parse_connect_do ('on_connect_do');
865
866   $self->_do_connection_actions(connect_call_ => $_) for @actions;
867 }
868
869 sub _determine_driver {
870   my ($self) = @_;
871
872   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
873     my $started_unconnected = 0;
874     local $self->{_in_determine_driver} = 1;
875
876     if (ref($self) eq __PACKAGE__) {
877       my $driver;
878       if ($self->_dbh) { # we are connected
879         $driver = $self->_dbh->{Driver}{Name};
880       } else {
881         # if connect_info is a CODEREF, we have no choice but to connect
882         if (ref $self->_dbi_connect_info->[0] &&
883             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
884           $self->_populate_dbh;
885           $driver = $self->_dbh->{Driver}{Name};
886         }
887         else {
888           # try to use dsn to not require being connected, the driver may still
889           # force a connection in _rebless to determine version
890           ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
891           $started_unconnected = 1;
892         }
893       }
894
895       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
896       if ($self->load_optional_class($storage_class)) {
897         mro::set_mro($storage_class, 'c3');
898         bless $self, $storage_class;
899         $self->_rebless();
900       }
901     }
902
903     $self->_driver_determined(1);
904
905     $self->_run_connection_actions
906         if $started_unconnected && defined $self->_dbh;
907   }
908 }
909
910 sub _do_connection_actions {
911   my $self          = shift;
912   my $method_prefix = shift;
913   my $call          = shift;
914
915   if (not ref($call)) {
916     my $method = $method_prefix . $call;
917     $self->$method(@_);
918   } elsif (ref($call) eq 'CODE') {
919     $self->$call(@_);
920   } elsif (ref($call) eq 'ARRAY') {
921     if (ref($call->[0]) ne 'ARRAY') {
922       $self->_do_connection_actions($method_prefix, $_) for @$call;
923     } else {
924       $self->_do_connection_actions($method_prefix, @$_) for @$call;
925     }
926   } else {
927     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
928   }
929
930   return $self;
931 }
932
933 sub connect_call_do_sql {
934   my $self = shift;
935   $self->_do_query(@_);
936 }
937
938 sub disconnect_call_do_sql {
939   my $self = shift;
940   $self->_do_query(@_);
941 }
942
943 # override in db-specific backend when necessary
944 sub connect_call_datetime_setup { 1 }
945
946 sub _do_query {
947   my ($self, $action) = @_;
948
949   if (ref $action eq 'CODE') {
950     $action = $action->($self);
951     $self->_do_query($_) foreach @$action;
952   }
953   else {
954     # Most debuggers expect ($sql, @bind), so we need to exclude
955     # the attribute hash which is the second argument to $dbh->do
956     # furthermore the bind values are usually to be presented
957     # as named arrayref pairs, so wrap those here too
958     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
959     my $sql = shift @do_args;
960     my $attrs = shift @do_args;
961     my @bind = map { [ undef, $_ ] } @do_args;
962
963     $self->_query_start($sql, @bind);
964     $self->_get_dbh->do($sql, $attrs, @do_args);
965     $self->_query_end($sql, @bind);
966   }
967
968   return $self;
969 }
970
971 sub _connect {
972   my ($self, @info) = @_;
973
974   $self->throw_exception("You failed to provide any connection info")
975     if !@info;
976
977   my ($old_connect_via, $dbh);
978
979   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
980     $old_connect_via = $DBI::connect_via;
981     $DBI::connect_via = 'connect';
982   }
983
984   eval {
985     if(ref $info[0] eq 'CODE') {
986        $dbh = &{$info[0]}
987     }
988     else {
989        $dbh = DBI->connect(@info);
990     }
991
992     if($dbh && !$self->unsafe) {
993       my $weak_self = $self;
994       Scalar::Util::weaken($weak_self);
995       $dbh->{HandleError} = sub {
996           if ($weak_self) {
997             $weak_self->throw_exception("DBI Exception: $_[0]");
998           }
999           else {
1000             croak ("DBI Exception: $_[0]");
1001           }
1002       };
1003       $dbh->{ShowErrorStatement} = 1;
1004       $dbh->{RaiseError} = 1;
1005       $dbh->{PrintError} = 0;
1006     }
1007   };
1008
1009   $DBI::connect_via = $old_connect_via if $old_connect_via;
1010
1011   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1012     if !$dbh || $@;
1013
1014   $self->_dbh_autocommit($dbh->{AutoCommit});
1015
1016   $dbh;
1017 }
1018
1019 sub svp_begin {
1020   my ($self, $name) = @_;
1021
1022   $name = $self->_svp_generate_name
1023     unless defined $name;
1024
1025   $self->throw_exception ("You can't use savepoints outside a transaction")
1026     if $self->{transaction_depth} == 0;
1027
1028   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1029     unless $self->can('_svp_begin');
1030
1031   push @{ $self->{savepoints} }, $name;
1032
1033   $self->debugobj->svp_begin($name) if $self->debug;
1034
1035   return $self->_svp_begin($name);
1036 }
1037
1038 sub svp_release {
1039   my ($self, $name) = @_;
1040
1041   $self->throw_exception ("You can't use savepoints outside a transaction")
1042     if $self->{transaction_depth} == 0;
1043
1044   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1045     unless $self->can('_svp_release');
1046
1047   if (defined $name) {
1048     $self->throw_exception ("Savepoint '$name' does not exist")
1049       unless grep { $_ eq $name } @{ $self->{savepoints} };
1050
1051     # Dig through the stack until we find the one we are releasing.  This keeps
1052     # the stack up to date.
1053     my $svp;
1054
1055     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1056   } else {
1057     $name = pop @{ $self->{savepoints} };
1058   }
1059
1060   $self->debugobj->svp_release($name) if $self->debug;
1061
1062   return $self->_svp_release($name);
1063 }
1064
1065 sub svp_rollback {
1066   my ($self, $name) = @_;
1067
1068   $self->throw_exception ("You can't use savepoints outside a transaction")
1069     if $self->{transaction_depth} == 0;
1070
1071   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1072     unless $self->can('_svp_rollback');
1073
1074   if (defined $name) {
1075       # If they passed us a name, verify that it exists in the stack
1076       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1077           $self->throw_exception("Savepoint '$name' does not exist!");
1078       }
1079
1080       # Dig through the stack until we find the one we are releasing.  This keeps
1081       # the stack up to date.
1082       while(my $s = pop(@{ $self->{savepoints} })) {
1083           last if($s eq $name);
1084       }
1085       # Add the savepoint back to the stack, as a rollback doesn't remove the
1086       # named savepoint, only everything after it.
1087       push(@{ $self->{savepoints} }, $name);
1088   } else {
1089       # We'll assume they want to rollback to the last savepoint
1090       $name = $self->{savepoints}->[-1];
1091   }
1092
1093   $self->debugobj->svp_rollback($name) if $self->debug;
1094
1095   return $self->_svp_rollback($name);
1096 }
1097
1098 sub _svp_generate_name {
1099     my ($self) = @_;
1100
1101     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1102 }
1103
1104 sub txn_begin {
1105   my $self = shift;
1106   if($self->{transaction_depth} == 0) {
1107     $self->debugobj->txn_begin()
1108       if $self->debug;
1109
1110     # being here implies we have AutoCommit => 1
1111     # if the user is utilizing txn_do - good for
1112     # him, otherwise we need to ensure that the
1113     # $dbh is healthy on BEGIN
1114     my $dbh_method = $self->{_in_dbh_do} ? '_dbh' : 'dbh';
1115     $self->$dbh_method->begin_work;
1116
1117   } elsif ($self->auto_savepoint) {
1118     $self->svp_begin;
1119   }
1120   $self->{transaction_depth}++;
1121 }
1122
1123 sub txn_commit {
1124   my $self = shift;
1125   if ($self->{transaction_depth} == 1) {
1126     my $dbh = $self->_dbh;
1127     $self->debugobj->txn_commit()
1128       if ($self->debug);
1129     $dbh->commit;
1130     $self->{transaction_depth} = 0
1131       if $self->_dbh_autocommit;
1132   }
1133   elsif($self->{transaction_depth} > 1) {
1134     $self->{transaction_depth}--;
1135     $self->svp_release
1136       if $self->auto_savepoint;
1137   }
1138 }
1139
1140 sub txn_rollback {
1141   my $self = shift;
1142   my $dbh = $self->_dbh;
1143   eval {
1144     if ($self->{transaction_depth} == 1) {
1145       $self->debugobj->txn_rollback()
1146         if ($self->debug);
1147       $self->{transaction_depth} = 0
1148         if $self->_dbh_autocommit;
1149       $dbh->rollback;
1150     }
1151     elsif($self->{transaction_depth} > 1) {
1152       $self->{transaction_depth}--;
1153       if ($self->auto_savepoint) {
1154         $self->svp_rollback;
1155         $self->svp_release;
1156       }
1157     }
1158     else {
1159       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1160     }
1161   };
1162   if ($@) {
1163     my $error = $@;
1164     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1165     $error =~ /$exception_class/ and $self->throw_exception($error);
1166     # ensure that a failed rollback resets the transaction depth
1167     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1168     $self->throw_exception($error);
1169   }
1170 }
1171
1172 # This used to be the top-half of _execute.  It was split out to make it
1173 #  easier to override in NoBindVars without duping the rest.  It takes up
1174 #  all of _execute's args, and emits $sql, @bind.
1175 sub _prep_for_execute {
1176   my ($self, $op, $extra_bind, $ident, $args) = @_;
1177
1178   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1179     $ident = $ident->from();
1180   }
1181
1182   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1183
1184   unshift(@bind,
1185     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1186       if $extra_bind;
1187   return ($sql, \@bind);
1188 }
1189
1190
1191 sub _fix_bind_params {
1192     my ($self, @bind) = @_;
1193
1194     ### Turn @bind from something like this:
1195     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1196     ### to this:
1197     ###   ( "'1'", "'1'", "'3'" )
1198     return
1199         map {
1200             if ( defined( $_ && $_->[1] ) ) {
1201                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1202             }
1203             else { q{'NULL'}; }
1204         } @bind;
1205 }
1206
1207 sub _query_start {
1208     my ( $self, $sql, @bind ) = @_;
1209
1210     if ( $self->debug ) {
1211         @bind = $self->_fix_bind_params(@bind);
1212
1213         $self->debugobj->query_start( $sql, @bind );
1214     }
1215 }
1216
1217 sub _query_end {
1218     my ( $self, $sql, @bind ) = @_;
1219
1220     if ( $self->debug ) {
1221         @bind = $self->_fix_bind_params(@bind);
1222         $self->debugobj->query_end( $sql, @bind );
1223     }
1224 }
1225
1226 sub _dbh_execute {
1227   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1228
1229   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1230
1231   $self->_query_start( $sql, @$bind );
1232
1233   my $sth = $self->sth($sql,$op);
1234
1235   my $placeholder_index = 1;
1236
1237   foreach my $bound (@$bind) {
1238     my $attributes = {};
1239     my($column_name, @data) = @$bound;
1240
1241     if ($bind_attributes) {
1242       $attributes = $bind_attributes->{$column_name}
1243       if defined $bind_attributes->{$column_name};
1244     }
1245
1246     foreach my $data (@data) {
1247       my $ref = ref $data;
1248       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1249
1250       $sth->bind_param($placeholder_index, $data, $attributes);
1251       $placeholder_index++;
1252     }
1253   }
1254
1255   # Can this fail without throwing an exception anyways???
1256   my $rv = $sth->execute();
1257   $self->throw_exception($sth->errstr) if !$rv;
1258
1259   $self->_query_end( $sql, @$bind );
1260
1261   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1262 }
1263
1264 sub _execute {
1265     my $self = shift;
1266     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1267 }
1268
1269 sub insert {
1270   my ($self, $source, $to_insert) = @_;
1271
1272 # redispatch to insert method of storage we reblessed into, if necessary
1273   if (not $self->_driver_determined) {
1274     $self->_determine_driver;
1275     goto $self->can('insert');
1276   }
1277
1278   my $ident = $source->from;
1279   my $bind_attributes = $self->source_bind_attributes($source);
1280
1281   my $updated_cols = {};
1282
1283   foreach my $col ( $source->columns ) {
1284     if ( !defined $to_insert->{$col} ) {
1285       my $col_info = $source->column_info($col);
1286
1287       if ( $col_info->{auto_nextval} ) {
1288         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1289           'nextval',
1290           $col_info->{sequence} ||
1291             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1292         );
1293       }
1294     }
1295   }
1296
1297   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1298
1299   return $updated_cols;
1300 }
1301
1302 ## Still not quite perfect, and EXPERIMENTAL
1303 ## Currently it is assumed that all values passed will be "normal", i.e. not
1304 ## scalar refs, or at least, all the same type as the first set, the statement is
1305 ## only prepped once.
1306 sub insert_bulk {
1307   my ($self, $source, $cols, $data) = @_;
1308
1309 # redispatch to insert_bulk method of storage we reblessed into, if necessary
1310   if (not $self->_driver_determined) {
1311     $self->_determine_driver;
1312     goto $self->can('insert_bulk');
1313   }
1314
1315   my %colvalues;
1316   my $table = $source->from;
1317   @colvalues{@$cols} = (0..$#$cols);
1318   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1319
1320   $self->_query_start( $sql, @bind );
1321   my $sth = $self->sth($sql);
1322
1323 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1324
1325   ## This must be an arrayref, else nothing works!
1326   my $tuple_status = [];
1327
1328   ## Get the bind_attributes, if any exist
1329   my $bind_attributes = $self->source_bind_attributes($source);
1330
1331   ## Bind the values and execute
1332   my $placeholder_index = 1;
1333
1334   foreach my $bound (@bind) {
1335
1336     my $attributes = {};
1337     my ($column_name, $data_index) = @$bound;
1338
1339     if( $bind_attributes ) {
1340       $attributes = $bind_attributes->{$column_name}
1341       if defined $bind_attributes->{$column_name};
1342     }
1343
1344     my @data = map { $_->[$data_index] } @$data;
1345
1346     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1347     $placeholder_index++;
1348   }
1349   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1350   if (my $err = $@) {
1351     my $i = 0;
1352     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1353
1354     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1355       if ($i > $#$tuple_status);
1356
1357     require Data::Dumper;
1358     local $Data::Dumper::Terse = 1;
1359     local $Data::Dumper::Indent = 1;
1360     local $Data::Dumper::Useqq = 1;
1361     local $Data::Dumper::Quotekeys = 0;
1362
1363     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1364       $tuple_status->[$i][1],
1365       Data::Dumper::Dumper(
1366         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1367       ),
1368     );
1369   }
1370   $self->throw_exception($sth->errstr) if !$rv;
1371
1372   $self->_query_end( $sql, @bind );
1373   return (wantarray ? ($rv, $sth, @bind) : $rv);
1374 }
1375
1376 sub update {
1377   my $self = shift @_;
1378   my $source = shift @_;
1379   $self->_determine_driver;
1380   my $bind_attributes = $self->source_bind_attributes($source);
1381
1382   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1383 }
1384
1385
1386 sub delete {
1387   my $self = shift @_;
1388   my $source = shift @_;
1389   $self->_determine_driver;
1390   my $bind_attrs = $self->source_bind_attributes($source);
1391
1392   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1393 }
1394
1395 # We were sent here because the $rs contains a complex search
1396 # which will require a subquery to select the correct rows
1397 # (i.e. joined or limited resultsets)
1398 #
1399 # Genarating a single PK column subquery is trivial and supported
1400 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1401 # Look at _multipk_update_delete()
1402 sub _subq_update_delete {
1403   my $self = shift;
1404   my ($rs, $op, $values) = @_;
1405
1406   my $rsrc = $rs->result_source;
1407
1408   # we already check this, but double check naively just in case. Should be removed soon
1409   my $sel = $rs->_resolved_attrs->{select};
1410   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1411   my @pcols = $rsrc->primary_columns;
1412   if (@$sel != @pcols) {
1413     $self->throw_exception (
1414       'Subquery update/delete can not be called on resultsets selecting a'
1415      .' number of columns different than the number of primary keys'
1416     );
1417   }
1418
1419   if (@pcols == 1) {
1420     return $self->$op (
1421       $rsrc,
1422       $op eq 'update' ? $values : (),
1423       { $pcols[0] => { -in => $rs->as_query } },
1424     );
1425   }
1426
1427   else {
1428     return $self->_multipk_update_delete (@_);
1429   }
1430 }
1431
1432 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1433 # resultset update/delete involving subqueries. So by default resort
1434 # to simple (and inefficient) delete_all style per-row opearations,
1435 # while allowing specific storages to override this with a faster
1436 # implementation.
1437 #
1438 sub _multipk_update_delete {
1439   return shift->_per_row_update_delete (@_);
1440 }
1441
1442 # This is the default loop used to delete/update rows for multi PK
1443 # resultsets, and used by mysql exclusively (because it can't do anything
1444 # else).
1445 #
1446 # We do not use $row->$op style queries, because resultset update/delete
1447 # is not expected to cascade (this is what delete_all/update_all is for).
1448 #
1449 # There should be no race conditions as the entire operation is rolled
1450 # in a transaction.
1451 #
1452 sub _per_row_update_delete {
1453   my $self = shift;
1454   my ($rs, $op, $values) = @_;
1455
1456   my $rsrc = $rs->result_source;
1457   my @pcols = $rsrc->primary_columns;
1458
1459   my $guard = $self->txn_scope_guard;
1460
1461   # emulate the return value of $sth->execute for non-selects
1462   my $row_cnt = '0E0';
1463
1464   my $subrs_cur = $rs->cursor;
1465   while (my @pks = $subrs_cur->next) {
1466
1467     my $cond;
1468     for my $i (0.. $#pcols) {
1469       $cond->{$pcols[$i]} = $pks[$i];
1470     }
1471
1472     $self->$op (
1473       $rsrc,
1474       $op eq 'update' ? $values : (),
1475       $cond,
1476     );
1477
1478     $row_cnt++;
1479   }
1480
1481   $guard->commit;
1482
1483   return $row_cnt;
1484 }
1485
1486 sub _select {
1487   my $self = shift;
1488
1489   # localization is neccessary as
1490   # 1) there is no infrastructure to pass this around before SQLA2
1491   # 2) _select_args sets it and _prep_for_execute consumes it
1492   my $sql_maker = $self->sql_maker;
1493   local $sql_maker->{_dbic_rs_attrs};
1494
1495   return $self->_execute($self->_select_args(@_));
1496 }
1497
1498 sub _select_args_to_query {
1499   my $self = shift;
1500
1501   # localization is neccessary as
1502   # 1) there is no infrastructure to pass this around before SQLA2
1503   # 2) _select_args sets it and _prep_for_execute consumes it
1504   my $sql_maker = $self->sql_maker;
1505   local $sql_maker->{_dbic_rs_attrs};
1506
1507   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1508   #  = $self->_select_args($ident, $select, $cond, $attrs);
1509   my ($op, $bind, $ident, $bind_attrs, @args) =
1510     $self->_select_args(@_);
1511
1512   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1513   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1514   $prepared_bind ||= [];
1515
1516   return wantarray
1517     ? ($sql, $prepared_bind, $bind_attrs)
1518     : \[ "($sql)", @$prepared_bind ]
1519   ;
1520 }
1521
1522 sub _select_args {
1523   my ($self, $ident, $select, $where, $attrs) = @_;
1524
1525   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1526
1527   my $sql_maker = $self->sql_maker;
1528   $sql_maker->{_dbic_rs_attrs} = {
1529     %$attrs,
1530     select => $select,
1531     from => $ident,
1532     where => $where,
1533     $rs_alias
1534       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1535       : ()
1536     ,
1537   };
1538
1539   # calculate bind_attrs before possible $ident mangling
1540   my $bind_attrs = {};
1541   for my $alias (keys %$alias2source) {
1542     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1543     for my $col (keys %$bindtypes) {
1544
1545       my $fqcn = join ('.', $alias, $col);
1546       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1547
1548       # Unqialified column names are nice, but at the same time can be
1549       # rather ambiguous. What we do here is basically go along with
1550       # the loop, adding an unqualified column slot to $bind_attrs,
1551       # alongside the fully qualified name. As soon as we encounter
1552       # another column by that name (which would imply another table)
1553       # we unset the unqualified slot and never add any info to it
1554       # to avoid erroneous type binding. If this happens the users
1555       # only choice will be to fully qualify his column name
1556
1557       if (exists $bind_attrs->{$col}) {
1558         $bind_attrs->{$col} = {};
1559       }
1560       else {
1561         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1562       }
1563     }
1564   }
1565
1566   # adjust limits
1567   if (
1568     $attrs->{software_limit}
1569       ||
1570     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1571   ) {
1572     $attrs->{software_limit} = 1;
1573   }
1574   else {
1575     $self->throw_exception("rows attribute must be positive if present")
1576       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1577
1578     # MySQL actually recommends this approach.  I cringe.
1579     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1580   }
1581
1582   my @limit;
1583
1584   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1585   # otherwise delegate the limiting to the storage, unless software limit was requested
1586   if (
1587     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1588        ||
1589     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1590       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1591   ) {
1592     ($ident, $select, $where, $attrs)
1593       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1594   }
1595   elsif (! $attrs->{software_limit} ) {
1596     push @limit, $attrs->{rows}, $attrs->{offset};
1597   }
1598
1599 ###
1600   # This would be the point to deflate anything found in $where
1601   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1602   # expect a row object. And all we have is a resultsource (it is trivial
1603   # to extract deflator coderefs via $alias2source above).
1604   #
1605   # I don't see a way forward other than changing the way deflators are
1606   # invoked, and that's just bad...
1607 ###
1608
1609   my $order = { map
1610     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1611     (qw/order_by group_by having/ )
1612   };
1613
1614   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1615 }
1616
1617 #
1618 # This is the code producing joined subqueries like:
1619 # SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
1620 #
1621 sub _adjust_select_args_for_complex_prefetch {
1622   my ($self, $from, $select, $where, $attrs) = @_;
1623
1624   $self->throw_exception ('Nothing to prefetch... how did we get here?!')
1625     if not @{$attrs->{_prefetch_select}};
1626
1627   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
1628     if (ref $from ne 'ARRAY' || ref $from->[0] ne 'HASH' || ref $from->[1] ne 'ARRAY');
1629
1630
1631   # generate inner/outer attribute lists, remove stuff that doesn't apply
1632   my $outer_attrs = { %$attrs };
1633   delete $outer_attrs->{$_} for qw/where bind rows offset group_by having/;
1634
1635   my $inner_attrs = { %$attrs };
1636   delete $inner_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
1637
1638
1639   # bring over all non-collapse-induced order_by into the inner query (if any)
1640   # the outer one will have to keep them all
1641   delete $inner_attrs->{order_by};
1642   if (my $ord_cnt = @{$outer_attrs->{order_by}} - @{$outer_attrs->{_collapse_order_by}} ) {
1643     $inner_attrs->{order_by} = [
1644       @{$outer_attrs->{order_by}}[ 0 .. $ord_cnt - 1]
1645     ];
1646   }
1647
1648
1649   # generate the inner/outer select lists
1650   # for inside we consider only stuff *not* brought in by the prefetch
1651   # on the outside we substitute any function for its alias
1652   my $outer_select = [ @$select ];
1653   my $inner_select = [];
1654   for my $i (0 .. ( @$outer_select - @{$outer_attrs->{_prefetch_select}} - 1) ) {
1655     my $sel = $outer_select->[$i];
1656
1657     if (ref $sel eq 'HASH' ) {
1658       $sel->{-as} ||= $attrs->{as}[$i];
1659       $outer_select->[$i] = join ('.', $attrs->{alias}, ($sel->{-as} || "inner_column_$i") );
1660     }
1661
1662     push @$inner_select, $sel;
1663   }
1664
1665   # normalize a copy of $from, so it will be easier to work with further
1666   # down (i.e. promote the initial hashref to an AoH)
1667   $from = [ @$from ];
1668   $from->[0] = [ $from->[0] ];
1669   my %original_join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1670
1671
1672   # decide which parts of the join will remain in either part of
1673   # the outer/inner query
1674
1675   # First we compose a list of which aliases are used in restrictions
1676   # (i.e. conditions/order/grouping/etc). Since we do not have
1677   # introspectable SQLA, we fall back to ugly scanning of raw SQL for
1678   # WHERE, and for pieces of ORDER BY in order to determine which aliases
1679   # need to appear in the resulting sql.
1680   # It may not be very efficient, but it's a reasonable stop-gap
1681   # Also unqualified column names will not be considered, but more often
1682   # than not this is actually ok
1683   #
1684   # In the same loop we enumerate part of the selection aliases, as
1685   # it requires the same sqla hack for the time being
1686   my ($restrict_aliases, $select_aliases, $prefetch_aliases);
1687   {
1688     # produce stuff unquoted, so it can be scanned
1689     my $sql_maker = $self->sql_maker;
1690     local $sql_maker->{quote_char};
1691     my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1692     $sep = "\Q$sep\E";
1693
1694     my $non_prefetch_select_sql = $sql_maker->_recurse_fields ($inner_select);
1695     my $prefetch_select_sql = $sql_maker->_recurse_fields ($outer_attrs->{_prefetch_select});
1696     my $where_sql = $sql_maker->where ($where);
1697     my $group_by_sql = $sql_maker->_order_by({
1698       map { $_ => $inner_attrs->{$_} } qw/group_by having/
1699     });
1700     my @non_prefetch_order_by_chunks = (map
1701       { ref $_ ? $_->[0] : $_ }
1702       $sql_maker->_order_by_chunks ($inner_attrs->{order_by})
1703     );
1704
1705
1706     for my $alias (keys %original_join_info) {
1707       my $seen_re = qr/\b $alias $sep/x;
1708
1709       for my $piece ($where_sql, $group_by_sql, @non_prefetch_order_by_chunks ) {
1710         if ($piece =~ $seen_re) {
1711           $restrict_aliases->{$alias} = 1;
1712         }
1713       }
1714
1715       if ($non_prefetch_select_sql =~ $seen_re) {
1716           $select_aliases->{$alias} = 1;
1717       }
1718
1719       if ($prefetch_select_sql =~ $seen_re) {
1720           $prefetch_aliases->{$alias} = 1;
1721       }
1722
1723     }
1724   }
1725
1726   # Add any non-left joins to the restriction list (such joins are indeed restrictions)
1727   for my $j (values %original_join_info) {
1728     my $alias = $j->{-alias} or next;
1729     $restrict_aliases->{$alias} = 1 if (
1730       (not $j->{-join_type})
1731         or
1732       ($j->{-join_type} !~ /^left (?: \s+ outer)? $/xi)
1733     );
1734   }
1735
1736   # mark all join parents as mentioned
1737   # (e.g.  join => { cds => 'tracks' } - tracks will need to bring cds too )
1738   for my $collection ($restrict_aliases, $select_aliases) {
1739     for my $alias (keys %$collection) {
1740       $collection->{$_} = 1
1741         for (@{ $original_join_info{$alias}{-join_path} || [] });
1742     }
1743   }
1744
1745   # construct the inner $from for the subquery
1746   my %inner_joins = (map { %{$_ || {}} } ($restrict_aliases, $select_aliases) );
1747   my @inner_from;
1748   for my $j (@$from) {
1749     push @inner_from, $j if $inner_joins{$j->[0]{-alias}};
1750   }
1751
1752   # if a multi-type join was needed in the subquery ("multi" is indicated by
1753   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1754   unless ($inner_attrs->{group_by}) {
1755     for my $alias (keys %inner_joins) {
1756
1757       # the dot comes from some weirdness in collapse
1758       # remove after the rewrite
1759       if ($attrs->{collapse}{".$alias"}) {
1760         $inner_attrs->{group_by} ||= $inner_select;
1761         last;
1762       }
1763     }
1764   }
1765
1766   # demote the inner_from head
1767   $inner_from[0] = $inner_from[0][0];
1768
1769   # generate the subquery
1770   my $subq = $self->_select_args_to_query (
1771     \@inner_from,
1772     $inner_select,
1773     $where,
1774     $inner_attrs,
1775   );
1776
1777   my $subq_joinspec = {
1778     -alias => $attrs->{alias},
1779     -source_handle => $inner_from[0]{-source_handle},
1780     $attrs->{alias} => $subq,
1781   };
1782
1783   # Generate the outer from - this is relatively easy (really just replace
1784   # the join slot with the subquery), with a major caveat - we can not
1785   # join anything that is non-selecting (not part of the prefetch), but at
1786   # the same time is a multi-type relationship, as it will explode the result.
1787   #
1788   # There are two possibilities here
1789   # - either the join is non-restricting, in which case we simply throw it away
1790   # - it is part of the restrictions, in which case we need to collapse the outer
1791   #   result by tackling yet another group_by to the outside of the query
1792
1793   # so first generate the outer_from, up to the substitution point
1794   my @outer_from;
1795   while (my $j = shift @$from) {
1796     if ($j->[0]{-alias} eq $attrs->{alias}) { # time to swap
1797       push @outer_from, [
1798         $subq_joinspec,
1799         @{$j}[1 .. $#$j],
1800       ];
1801       last; # we'll take care of what's left in $from below
1802     }
1803     else {
1804       push @outer_from, $j;
1805     }
1806   }
1807
1808   # see what's left - throw away if not selecting/restricting
1809   # also throw in a group_by if restricting to guard against
1810   # cross-join explosions
1811   #
1812   while (my $j = shift @$from) {
1813     my $alias = $j->[0]{-alias};
1814
1815     if ($select_aliases->{$alias} || $prefetch_aliases->{$alias}) {
1816       push @outer_from, $j;
1817     }
1818     elsif ($restrict_aliases->{$alias}) {
1819       push @outer_from, $j;
1820
1821       # FIXME - this should be obviated by SQLA2, as I'll be able to 
1822       # have restrict_inner and restrict_outer... or something to that
1823       # effect... I think...
1824
1825       # FIXME2 - I can't find a clean way to determine if a particular join
1826       # is a multi - instead I am just treating everything as a potential
1827       # explosive join (ribasushi)
1828       #
1829       # if (my $handle = $j->[0]{-source_handle}) {
1830       #   my $rsrc = $handle->resolve;
1831       #   ... need to bail out of the following if this is not a multi,
1832       #       as it will be much easier on the db ...
1833
1834           $outer_attrs->{group_by} ||= $outer_select;
1835       # }
1836     }
1837   }
1838
1839   # demote the outer_from head
1840   $outer_from[0] = $outer_from[0][0];
1841
1842   # This is totally horrific - the $where ends up in both the inner and outer query
1843   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
1844   # then if where conditions apply to the *right* side of the prefetch, you may have
1845   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
1846   # the outer select to exclude joins you didin't want in the first place
1847   #
1848   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
1849   return (\@outer_from, $outer_select, $where, $outer_attrs);
1850 }
1851
1852 sub _resolve_ident_sources {
1853   my ($self, $ident) = @_;
1854
1855   my $alias2source = {};
1856   my $rs_alias;
1857
1858   # the reason this is so contrived is that $ident may be a {from}
1859   # structure, specifying multiple tables to join
1860   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1861     # this is compat mode for insert/update/delete which do not deal with aliases
1862     $alias2source->{me} = $ident;
1863     $rs_alias = 'me';
1864   }
1865   elsif (ref $ident eq 'ARRAY') {
1866
1867     for (@$ident) {
1868       my $tabinfo;
1869       if (ref $_ eq 'HASH') {
1870         $tabinfo = $_;
1871         $rs_alias = $tabinfo->{-alias};
1872       }
1873       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1874         $tabinfo = $_->[0];
1875       }
1876
1877       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
1878         if ($tabinfo->{-source_handle});
1879     }
1880   }
1881
1882   return ($alias2source, $rs_alias);
1883 }
1884
1885 # Takes $ident, \@column_names
1886 #
1887 # returns { $column_name => \%column_info, ... }
1888 # also note: this adds -result_source => $rsrc to the column info
1889 #
1890 # usage:
1891 #   my $col_sources = $self->_resolve_column_info($ident, @column_names);
1892 sub _resolve_column_info {
1893   my ($self, $ident, $colnames) = @_;
1894   my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
1895
1896   my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1897   $sep = "\Q$sep\E";
1898
1899   my (%return, %seen_cols);
1900
1901   # compile a global list of column names, to be able to properly
1902   # disambiguate unqualified column names (if at all possible)
1903   for my $alias (keys %$alias2src) {
1904     my $rsrc = $alias2src->{$alias};
1905     for my $colname ($rsrc->columns) {
1906       push @{$seen_cols{$colname}}, $alias;
1907     }
1908   }
1909
1910   COLUMN:
1911   foreach my $col (@$colnames) {
1912     my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
1913
1914     unless ($alias) {
1915       # see if the column was seen exactly once (so we know which rsrc it came from)
1916       if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
1917         $alias = $seen_cols{$colname}[0];
1918       }
1919       else {
1920         next COLUMN;
1921       }
1922     }
1923
1924     my $rsrc = $alias2src->{$alias};
1925     $return{$col} = $rsrc && {
1926       %{$rsrc->column_info($colname)},
1927       -result_source => $rsrc,
1928       -source_alias => $alias,
1929     };
1930   }
1931
1932   return \%return;
1933 }
1934
1935 # Returns a counting SELECT for a simple count
1936 # query. Abstracted so that a storage could override
1937 # this to { count => 'firstcol' } or whatever makes
1938 # sense as a performance optimization
1939 sub _count_select {
1940   #my ($self, $source, $rs_attrs) = @_;
1941   return { count => '*' };
1942 }
1943
1944 # Returns a SELECT which will end up in the subselect
1945 # There may or may not be a group_by, as the subquery
1946 # might have been called to accomodate a limit
1947 #
1948 # Most databases would be happy with whatever ends up
1949 # here, but some choke in various ways.
1950 #
1951 sub _subq_count_select {
1952   my ($self, $source, $rs_attrs) = @_;
1953   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1954
1955   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1956   return @pcols ? \@pcols : [ 1 ];
1957 }
1958
1959
1960 sub source_bind_attributes {
1961   my ($self, $source) = @_;
1962
1963   my $bind_attributes;
1964   foreach my $column ($source->columns) {
1965
1966     my $data_type = $source->column_info($column)->{data_type} || '';
1967     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1968      if $data_type;
1969   }
1970
1971   return $bind_attributes;
1972 }
1973
1974 =head2 select
1975
1976 =over 4
1977
1978 =item Arguments: $ident, $select, $condition, $attrs
1979
1980 =back
1981
1982 Handle a SQL select statement.
1983
1984 =cut
1985
1986 sub select {
1987   my $self = shift;
1988   my ($ident, $select, $condition, $attrs) = @_;
1989   return $self->cursor_class->new($self, \@_, $attrs);
1990 }
1991
1992 sub select_single {
1993   my $self = shift;
1994   my ($rv, $sth, @bind) = $self->_select(@_);
1995   my @row = $sth->fetchrow_array;
1996   my @nextrow = $sth->fetchrow_array if @row;
1997   if(@row && @nextrow) {
1998     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1999   }
2000   # Need to call finish() to work round broken DBDs
2001   $sth->finish();
2002   return @row;
2003 }
2004
2005 =head2 sth
2006
2007 =over 4
2008
2009 =item Arguments: $sql
2010
2011 =back
2012
2013 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2014
2015 =cut
2016
2017 sub _dbh_sth {
2018   my ($self, $dbh, $sql) = @_;
2019
2020   # 3 is the if_active parameter which avoids active sth re-use
2021   my $sth = $self->disable_sth_caching
2022     ? $dbh->prepare($sql)
2023     : $dbh->prepare_cached($sql, {}, 3);
2024
2025   # XXX You would think RaiseError would make this impossible,
2026   #  but apparently that's not true :(
2027   $self->throw_exception($dbh->errstr) if !$sth;
2028
2029   $sth;
2030 }
2031
2032 sub sth {
2033   my ($self, $sql) = @_;
2034   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2035 }
2036
2037 sub _dbh_columns_info_for {
2038   my ($self, $dbh, $table) = @_;
2039
2040   if ($dbh->can('column_info')) {
2041     my %result;
2042     eval {
2043       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2044       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2045       $sth->execute();
2046       while ( my $info = $sth->fetchrow_hashref() ){
2047         my %column_info;
2048         $column_info{data_type}   = $info->{TYPE_NAME};
2049         $column_info{size}      = $info->{COLUMN_SIZE};
2050         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2051         $column_info{default_value} = $info->{COLUMN_DEF};
2052         my $col_name = $info->{COLUMN_NAME};
2053         $col_name =~ s/^\"(.*)\"$/$1/;
2054
2055         $result{$col_name} = \%column_info;
2056       }
2057     };
2058     return \%result if !$@ && scalar keys %result;
2059   }
2060
2061   my %result;
2062   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2063   $sth->execute;
2064   my @columns = @{$sth->{NAME_lc}};
2065   for my $i ( 0 .. $#columns ){
2066     my %column_info;
2067     $column_info{data_type} = $sth->{TYPE}->[$i];
2068     $column_info{size} = $sth->{PRECISION}->[$i];
2069     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2070
2071     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2072       $column_info{data_type} = $1;
2073       $column_info{size}    = $2;
2074     }
2075
2076     $result{$columns[$i]} = \%column_info;
2077   }
2078   $sth->finish;
2079
2080   foreach my $col (keys %result) {
2081     my $colinfo = $result{$col};
2082     my $type_num = $colinfo->{data_type};
2083     my $type_name;
2084     if(defined $type_num && $dbh->can('type_info')) {
2085       my $type_info = $dbh->type_info($type_num);
2086       $type_name = $type_info->{TYPE_NAME} if $type_info;
2087       $colinfo->{data_type} = $type_name if $type_name;
2088     }
2089   }
2090
2091   return \%result;
2092 }
2093
2094 sub columns_info_for {
2095   my ($self, $table) = @_;
2096   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2097 }
2098
2099 =head2 last_insert_id
2100
2101 Return the row id of the last insert.
2102
2103 =cut
2104
2105 sub _dbh_last_insert_id {
2106     # All Storage's need to register their own _dbh_last_insert_id
2107     # the old SQLite-based method was highly inappropriate
2108
2109     my $self = shift;
2110     my $class = ref $self;
2111     $self->throw_exception (<<EOE);
2112
2113 No _dbh_last_insert_id() method found in $class.
2114 Since the method of obtaining the autoincrement id of the last insert
2115 operation varies greatly between different databases, this method must be
2116 individually implemented for every storage class.
2117 EOE
2118 }
2119
2120 sub last_insert_id {
2121   my $self = shift;
2122   $self->_dbh_last_insert_id ($self->_dbh, @_);
2123 }
2124
2125 =head2 _native_data_type
2126
2127 =over 4
2128
2129 =item Arguments: $type_name
2130
2131 =back
2132
2133 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2134 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2135 L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
2136
2137 The default implementation returns C<undef>, implement in your Storage driver if
2138 you need this functionality.
2139
2140 Should map types from other databases to the native RDBMS type, for example
2141 C<VARCHAR2> to C<VARCHAR>.
2142
2143 Types with modifiers should map to the underlying data type. For example,
2144 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2145
2146 Composite types should map to the container type, for example
2147 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2148
2149 =cut
2150
2151 sub _native_data_type {
2152   #my ($self, $data_type) = @_;
2153   return undef
2154 }
2155
2156 =head2 sqlt_type
2157
2158 Returns the database driver name.
2159
2160 =cut
2161
2162 sub sqlt_type {
2163   my ($self) = @_;
2164
2165   if (not $self->_driver_determined) {
2166     $self->_determine_driver;
2167     goto $self->can ('sqlt_type');
2168   }
2169
2170   $self->_get_dbh->{Driver}->{Name};
2171 }
2172
2173 =head2 bind_attribute_by_data_type
2174
2175 Given a datatype from column info, returns a database specific bind
2176 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2177 let the database planner just handle it.
2178
2179 Generally only needed for special case column types, like bytea in postgres.
2180
2181 =cut
2182
2183 sub bind_attribute_by_data_type {
2184     return;
2185 }
2186
2187 =head2 is_datatype_numeric
2188
2189 Given a datatype from column_info, returns a boolean value indicating if
2190 the current RDBMS considers it a numeric value. This controls how
2191 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2192 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2193 be performed instead of the usual C<eq>.
2194
2195 =cut
2196
2197 sub is_datatype_numeric {
2198   my ($self, $dt) = @_;
2199
2200   return 0 unless $dt;
2201
2202   return $dt =~ /^ (?:
2203     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2204   ) $/ix;
2205 }
2206
2207
2208 =head2 create_ddl_dir (EXPERIMENTAL)
2209
2210 =over 4
2211
2212 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2213
2214 =back
2215
2216 Creates a SQL file based on the Schema, for each of the specified
2217 database engines in C<\@databases> in the given directory.
2218 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2219
2220 Given a previous version number, this will also create a file containing
2221 the ALTER TABLE statements to transform the previous schema into the
2222 current one. Note that these statements may contain C<DROP TABLE> or
2223 C<DROP COLUMN> statements that can potentially destroy data.
2224
2225 The file names are created using the C<ddl_filename> method below, please
2226 override this method in your schema if you would like a different file
2227 name format. For the ALTER file, the same format is used, replacing
2228 $version in the name with "$preversion-$version".
2229
2230 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2231 The most common value for this would be C<< { add_drop_table => 1 } >>
2232 to have the SQL produced include a C<DROP TABLE> statement for each table
2233 created. For quoting purposes supply C<quote_table_names> and
2234 C<quote_field_names>.
2235
2236 If no arguments are passed, then the following default values are assumed:
2237
2238 =over 4
2239
2240 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2241
2242 =item version    - $schema->schema_version
2243
2244 =item directory  - './'
2245
2246 =item preversion - <none>
2247
2248 =back
2249
2250 By default, C<\%sqlt_args> will have
2251
2252  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2253
2254 merged with the hash passed in. To disable any of those features, pass in a
2255 hashref like the following
2256
2257  { ignore_constraint_names => 0, # ... other options }
2258
2259
2260 Note that this feature is currently EXPERIMENTAL and may not work correctly
2261 across all databases, or fully handle complex relationships.
2262
2263 WARNING: Please check all SQL files created, before applying them.
2264
2265 =cut
2266
2267 sub create_ddl_dir {
2268   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2269
2270   if(!$dir || !-d $dir) {
2271     carp "No directory given, using ./\n";
2272     $dir = "./";
2273   }
2274   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2275   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2276
2277   my $schema_version = $schema->schema_version || '1.x';
2278   $version ||= $schema_version;
2279
2280   $sqltargs = {
2281     add_drop_table => 1,
2282     ignore_constraint_names => 1,
2283     ignore_index_names => 1,
2284     %{$sqltargs || {}}
2285   };
2286
2287   $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
2288     if !$self->_sqlt_version_ok;
2289
2290   my $sqlt = SQL::Translator->new( $sqltargs );
2291
2292   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2293   my $sqlt_schema = $sqlt->translate({ data => $schema })
2294     or $self->throw_exception ($sqlt->error);
2295
2296   foreach my $db (@$databases) {
2297     $sqlt->reset();
2298     $sqlt->{schema} = $sqlt_schema;
2299     $sqlt->producer($db);
2300
2301     my $file;
2302     my $filename = $schema->ddl_filename($db, $version, $dir);
2303     if (-e $filename && ($version eq $schema_version )) {
2304       # if we are dumping the current version, overwrite the DDL
2305       carp "Overwriting existing DDL file - $filename";
2306       unlink($filename);
2307     }
2308
2309     my $output = $sqlt->translate;
2310     if(!$output) {
2311       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2312       next;
2313     }
2314     if(!open($file, ">$filename")) {
2315       $self->throw_exception("Can't open $filename for writing ($!)");
2316       next;
2317     }
2318     print $file $output;
2319     close($file);
2320
2321     next unless ($preversion);
2322
2323     require SQL::Translator::Diff;
2324
2325     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2326     if(!-e $prefilename) {
2327       carp("No previous schema file found ($prefilename)");
2328       next;
2329     }
2330
2331     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2332     if(-e $difffile) {
2333       carp("Overwriting existing diff file - $difffile");
2334       unlink($difffile);
2335     }
2336
2337     my $source_schema;
2338     {
2339       my $t = SQL::Translator->new($sqltargs);
2340       $t->debug( 0 );
2341       $t->trace( 0 );
2342
2343       $t->parser( $db )
2344         or $self->throw_exception ($t->error);
2345
2346       my $out = $t->translate( $prefilename )
2347         or $self->throw_exception ($t->error);
2348
2349       $source_schema = $t->schema;
2350
2351       $source_schema->name( $prefilename )
2352         unless ( $source_schema->name );
2353     }
2354
2355     # The "new" style of producers have sane normalization and can support
2356     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2357     # And we have to diff parsed SQL against parsed SQL.
2358     my $dest_schema = $sqlt_schema;
2359
2360     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2361       my $t = SQL::Translator->new($sqltargs);
2362       $t->debug( 0 );
2363       $t->trace( 0 );
2364
2365       $t->parser( $db )
2366         or $self->throw_exception ($t->error);
2367
2368       my $out = $t->translate( $filename )
2369         or $self->throw_exception ($t->error);
2370
2371       $dest_schema = $t->schema;
2372
2373       $dest_schema->name( $filename )
2374         unless $dest_schema->name;
2375     }
2376
2377     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2378                                                   $dest_schema,   $db,
2379                                                   $sqltargs
2380                                                  );
2381     if(!open $file, ">$difffile") {
2382       $self->throw_exception("Can't write to $difffile ($!)");
2383       next;
2384     }
2385     print $file $diff;
2386     close($file);
2387   }
2388 }
2389
2390 =head2 deployment_statements
2391
2392 =over 4
2393
2394 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2395
2396 =back
2397
2398 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2399
2400 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2401 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2402
2403 C<$directory> is used to return statements from files in a previously created
2404 L</create_ddl_dir> directory and is optional. The filenames are constructed
2405 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2406
2407 If no C<$directory> is specified then the statements are constructed on the
2408 fly using L<SQL::Translator> and C<$version> is ignored.
2409
2410 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2411
2412 =cut
2413
2414 sub deployment_statements {
2415   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2416   $type ||= $self->sqlt_type;
2417   $version ||= $schema->schema_version || '1.x';
2418   $dir ||= './';
2419   my $filename = $schema->ddl_filename($type, $version, $dir);
2420   if(-f $filename)
2421   {
2422       my $file;
2423       open($file, "<$filename")
2424         or $self->throw_exception("Can't open $filename ($!)");
2425       my @rows = <$file>;
2426       close($file);
2427       return join('', @rows);
2428   }
2429
2430   $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
2431     if !$self->_sqlt_version_ok;
2432
2433   # sources needs to be a parser arg, but for simplicty allow at top level
2434   # coming in
2435   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2436       if exists $sqltargs->{sources};
2437
2438   my $tr = SQL::Translator->new(
2439     producer => "SQL::Translator::Producer::${type}",
2440     %$sqltargs,
2441     parser => 'SQL::Translator::Parser::DBIx::Class',
2442     data => $schema,
2443   );
2444   return $tr->translate;
2445 }
2446
2447 sub deploy {
2448   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2449   my $deploy = sub {
2450     my $line = shift;
2451     return if($line =~ /^--/);
2452     return if(!$line);
2453     # next if($line =~ /^DROP/m);
2454     return if($line =~ /^BEGIN TRANSACTION/m);
2455     return if($line =~ /^COMMIT/m);
2456     return if $line =~ /^\s+$/; # skip whitespace only
2457     $self->_query_start($line);
2458     eval {
2459       # do a dbh_do cycle here, as we need some error checking in
2460       # place (even though we will ignore errors)
2461       $self->dbh_do (sub { $_[1]->do($line) });
2462     };
2463     if ($@) {
2464       carp qq{$@ (running "${line}")};
2465     }
2466     $self->_query_end($line);
2467   };
2468   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2469   if (@statements > 1) {
2470     foreach my $statement (@statements) {
2471       $deploy->( $statement );
2472     }
2473   }
2474   elsif (@statements == 1) {
2475     foreach my $line ( split(";\n", $statements[0])) {
2476       $deploy->( $line );
2477     }
2478   }
2479 }
2480
2481 =head2 datetime_parser
2482
2483 Returns the datetime parser class
2484
2485 =cut
2486
2487 sub datetime_parser {
2488   my $self = shift;
2489   return $self->{datetime_parser} ||= do {
2490     $self->_populate_dbh unless $self->_dbh;
2491     $self->build_datetime_parser(@_);
2492   };
2493 }
2494
2495 =head2 datetime_parser_type
2496
2497 Defines (returns) the datetime parser class - currently hardwired to
2498 L<DateTime::Format::MySQL>
2499
2500 =cut
2501
2502 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2503
2504 =head2 build_datetime_parser
2505
2506 See L</datetime_parser>
2507
2508 =cut
2509
2510 sub build_datetime_parser {
2511   my $self = shift;
2512   my $type = $self->datetime_parser_type(@_);
2513   $self->ensure_class_loaded ($type);
2514   return $type;
2515 }
2516
2517
2518 =head2 is_replicating
2519
2520 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2521 replicate from a master database.  Default is undef, which is the result
2522 returned by databases that don't support replication.
2523
2524 =cut
2525
2526 sub is_replicating {
2527     return;
2528
2529 }
2530
2531 =head2 lag_behind_master
2532
2533 Returns a number that represents a certain amount of lag behind a master db
2534 when a given storage is replicating.  The number is database dependent, but
2535 starts at zero and increases with the amount of lag. Default in undef
2536
2537 =cut
2538
2539 sub lag_behind_master {
2540     return;
2541 }
2542
2543 sub DESTROY {
2544   my $self = shift;
2545   $self->_verify_pid if $self->_dbh;
2546
2547   # some databases need this to stop spewing warnings
2548   if (my $dbh = $self->_dbh) {
2549     eval { $dbh->disconnect };
2550   }
2551
2552   $self->_dbh(undef);
2553 }
2554
2555 1;
2556
2557 =head1 USAGE NOTES
2558
2559 =head2 DBIx::Class and AutoCommit
2560
2561 DBIx::Class can do some wonderful magic with handling exceptions,
2562 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2563 (the default) combined with C<txn_do> for transaction support.
2564
2565 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2566 in an assumed transaction between commits, and you're telling us you'd
2567 like to manage that manually.  A lot of the magic protections offered by
2568 this module will go away.  We can't protect you from exceptions due to database
2569 disconnects because we don't know anything about how to restart your
2570 transactions.  You're on your own for handling all sorts of exceptional
2571 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2572 be with raw DBI.
2573
2574
2575 =head1 AUTHORS
2576
2577 Matt S. Trout <mst@shadowcatsystems.co.uk>
2578
2579 Andy Grundman <andy@hybridized.org>
2580
2581 =head1 LICENSE
2582
2583 You may distribute this code under the same terms as Perl itself.
2584
2585 =cut