abb253392ce8c5978c8e428dd4bd53685f673765
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base 'DBIx::Class::Storage';
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16
17 __PACKAGE__->mk_group_accessors('simple' =>
18   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
19      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
20 );
21
22 # the values for these accessors are picked out (and deleted) from
23 # the attribute hashref passed to connect_info
24 my @storage_options = qw/
25   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
26   disable_sth_caching unsafe auto_savepoint
27 /;
28 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
29
30
31 # default cursor class, overridable in connect_info attributes
32 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
33
34 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
35 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
36
37
38 =head1 NAME
39
40 DBIx::Class::Storage::DBI - DBI storage handler
41
42 =head1 SYNOPSIS
43
44   my $schema = MySchema->connect('dbi:SQLite:my.db');
45
46   $schema->storage->debug(1);
47   $schema->dbh_do("DROP TABLE authors");
48
49   $schema->resultset('Book')->search({
50      written_on => $schema->storage->datetime_parser(DateTime->now)
51   });
52
53 =head1 DESCRIPTION
54
55 This class represents the connection to an RDBMS via L<DBI>.  See
56 L<DBIx::Class::Storage> for general information.  This pod only
57 documents DBI-specific methods and behaviors.
58
59 =head1 METHODS
60
61 =cut
62
63 sub new {
64   my $new = shift->next::method(@_);
65
66   $new->transaction_depth(0);
67   $new->_sql_maker_opts({});
68   $new->{savepoints} = [];
69   $new->{_in_dbh_do} = 0;
70   $new->{_dbh_gen} = 0;
71
72   $new;
73 }
74
75 =head2 connect_info
76
77 This method is normally called by L<DBIx::Class::Schema/connection>, which
78 encapsulates its argument list in an arrayref before passing them here.
79
80 The argument list may contain:
81
82 =over
83
84 =item *
85
86 The same 4-element argument set one would normally pass to
87 L<DBI/connect>, optionally followed by
88 L<extra attributes|/DBIx::Class specific connection attributes>
89 recognized by DBIx::Class:
90
91   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
92
93 =item *
94
95 A single code reference which returns a connected
96 L<DBI database handle|DBI/connect> optionally followed by
97 L<extra attributes|/DBIx::Class specific connection attributes> recognized
98 by DBIx::Class:
99
100   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
101
102 =item *
103
104 A single hashref with all the attributes and the dsn/user/password
105 mixed together:
106
107   $connect_info_args = [{
108     dsn => $dsn,
109     user => $user,
110     password => $pass,
111     %dbi_attributes,
112     %extra_attributes,
113   }];
114
115 This is particularly useful for L<Catalyst> based applications, allowing the
116 following config (L<Config::General> style):
117
118   <Model::DB>
119     schema_class   App::DB
120     <connect_info>
121       dsn          dbi:mysql:database=test
122       user         testuser
123       password     TestPass
124       AutoCommit   1
125     </connect_info>
126   </Model::DB>
127
128 The C<dsn>/C<user>/C<password> combination can be substituted by the
129 C<dbh_maker> key whose value is a coderef that returns the C<$dbh>.
130
131 =back
132
133 Please note that the L<DBI> docs recommend that you always explicitly
134 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
135 recommends that it be set to I<1>, and that you perform transactions
136 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
137 to I<1> if you do not do explicitly set it to zero.  This is the default
138 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
139
140 =head3 DBIx::Class specific connection attributes
141
142 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
143 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
144 the following connection options. These options can be mixed in with your other
145 L<DBI> connection attributes, or placed in a seperate hashref
146 (C<\%extra_attributes>) as shown above.
147
148 Every time C<connect_info> is invoked, any previous settings for
149 these options will be cleared before setting the new ones, regardless of
150 whether any options are specified in the new C<connect_info>.
151
152
153 =over
154
155 =item on_connect_do
156
157 Specifies things to do immediately after connecting or re-connecting to
158 the database.  Its value may contain:
159
160 =over
161
162 =item a scalar
163
164 This contains one SQL statement to execute.
165
166 =item an array reference
167
168 This contains SQL statements to execute in order.  Each element contains
169 a string or a code reference that returns a string.
170
171 =item a code reference
172
173 This contains some code to execute.  Unlike code references within an
174 array reference, its return value is ignored.
175
176 =back
177
178 =item on_disconnect_do
179
180 Takes arguments in the same form as L</on_connect_do> and executes them
181 immediately before disconnecting from the database.
182
183 Note, this only runs if you explicitly call L</disconnect> on the
184 storage object.
185
186 =item on_connect_call
187
188 A more generalized form of L</on_connect_do> that calls the specified
189 C<connect_call_METHOD> methods in your storage driver.
190
191   on_connect_do => 'select 1'
192
193 is equivalent to:
194
195   on_connect_call => [ [ do_sql => 'select 1' ] ]
196
197 Its values may contain:
198
199 =over
200
201 =item a scalar
202
203 Will call the C<connect_call_METHOD> method.
204
205 =item a code reference
206
207 Will execute C<< $code->($storage) >>
208
209 =item an array reference
210
211 Each value can be a method name or code reference.
212
213 =item an array of arrays
214
215 For each array, the first item is taken to be the C<connect_call_> method name
216 or code reference, and the rest are parameters to it.
217
218 =back
219
220 Some predefined storage methods you may use:
221
222 =over
223
224 =item do_sql
225
226 Executes a SQL string or a code reference that returns a SQL string. This is
227 what L</on_connect_do> and L</on_disconnect_do> use.
228
229 It can take:
230
231 =over
232
233 =item a scalar
234
235 Will execute the scalar as SQL.
236
237 =item an arrayref
238
239 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
240 attributes hashref and bind values.
241
242 =item a code reference
243
244 Will execute C<< $code->($storage) >> and execute the return array refs as
245 above.
246
247 =back
248
249 =item datetime_setup
250
251 Execute any statements necessary to initialize the database session to return
252 and accept datetime/timestamp values used with
253 L<DBIx::Class::InflateColumn::DateTime>.
254
255 Only necessary for some databases, see your specific storage driver for
256 implementation details.
257
258 =back
259
260 =item on_disconnect_call
261
262 Takes arguments in the same form as L</on_connect_call> and executes them
263 immediately before disconnecting from the database.
264
265 Calls the C<disconnect_call_METHOD> methods as opposed to the
266 C<connect_call_METHOD> methods called by L</on_connect_call>.
267
268 Note, this only runs if you explicitly call L</disconnect> on the
269 storage object.
270
271 =item disable_sth_caching
272
273 If set to a true value, this option will disable the caching of
274 statement handles via L<DBI/prepare_cached>.
275
276 =item limit_dialect
277
278 Sets the limit dialect. This is useful for JDBC-bridge among others
279 where the remote SQL-dialect cannot be determined by the name of the
280 driver alone. See also L<SQL::Abstract::Limit>.
281
282 =item quote_char
283
284 Specifies what characters to use to quote table and column names. If
285 you use this you will want to specify L</name_sep> as well.
286
287 C<quote_char> expects either a single character, in which case is it
288 is placed on either side of the table/column name, or an arrayref of length
289 2 in which case the table/column name is placed between the elements.
290
291 For example under MySQL you should use C<< quote_char => '`' >>, and for
292 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
293
294 =item name_sep
295
296 This only needs to be used in conjunction with C<quote_char>, and is used to
297 specify the charecter that seperates elements (schemas, tables, columns) from
298 each other. In most cases this is simply a C<.>.
299
300 The consequences of not supplying this value is that L<SQL::Abstract>
301 will assume DBIx::Class' uses of aliases to be complete column
302 names. The output will look like I<"me.name"> when it should actually
303 be I<"me"."name">.
304
305 =item unsafe
306
307 This Storage driver normally installs its own C<HandleError>, sets
308 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
309 all database handles, including those supplied by a coderef.  It does this
310 so that it can have consistent and useful error behavior.
311
312 If you set this option to a true value, Storage will not do its usual
313 modifications to the database handle's attributes, and instead relies on
314 the settings in your connect_info DBI options (or the values you set in
315 your connection coderef, in the case that you are connecting via coderef).
316
317 Note that your custom settings can cause Storage to malfunction,
318 especially if you set a C<HandleError> handler that suppresses exceptions
319 and/or disable C<RaiseError>.
320
321 =item auto_savepoint
322
323 If this option is true, L<DBIx::Class> will use savepoints when nesting
324 transactions, making it possible to recover from failure in the inner
325 transaction without having to abort all outer transactions.
326
327 =item cursor_class
328
329 Use this argument to supply a cursor class other than the default
330 L<DBIx::Class::Storage::DBI::Cursor>.
331
332 =back
333
334 Some real-life examples of arguments to L</connect_info> and
335 L<DBIx::Class::Schema/connect>
336
337   # Simple SQLite connection
338   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
339
340   # Connect via subref
341   ->connect_info([ sub { DBI->connect(...) } ]);
342
343   # Connect via subref in hashref
344   ->connect_info([{
345     dbh_maker => sub { DBI->connect(...) },
346     on_connect_do => 'alter session ...',
347   }]);
348
349   # A bit more complicated
350   ->connect_info(
351     [
352       'dbi:Pg:dbname=foo',
353       'postgres',
354       'my_pg_password',
355       { AutoCommit => 1 },
356       { quote_char => q{"}, name_sep => q{.} },
357     ]
358   );
359
360   # Equivalent to the previous example
361   ->connect_info(
362     [
363       'dbi:Pg:dbname=foo',
364       'postgres',
365       'my_pg_password',
366       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
367     ]
368   );
369
370   # Same, but with hashref as argument
371   # See parse_connect_info for explanation
372   ->connect_info(
373     [{
374       dsn         => 'dbi:Pg:dbname=foo',
375       user        => 'postgres',
376       password    => 'my_pg_password',
377       AutoCommit  => 1,
378       quote_char  => q{"},
379       name_sep    => q{.},
380     }]
381   );
382
383   # Subref + DBIx::Class-specific connection options
384   ->connect_info(
385     [
386       sub { DBI->connect(...) },
387       {
388           quote_char => q{`},
389           name_sep => q{@},
390           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
391           disable_sth_caching => 1,
392       },
393     ]
394   );
395
396
397
398 =cut
399
400 sub connect_info {
401   my ($self, $info_arg) = @_;
402
403   return $self->_connect_info if !$info_arg;
404
405   my @args = @$info_arg;  # take a shallow copy for further mutilation
406   $self->_connect_info([@args]); # copy for _connect_info
407
408
409   # combine/pre-parse arguments depending on invocation style
410
411   my %attrs;
412   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
413     %attrs = %{ $args[1] || {} };
414     @args = $args[0];
415   }
416   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
417     %attrs = %{$args[0]};
418     @args = ();
419     if (my $code = delete $attrs{dbh_maker}) {
420       @args = $code;
421       if (delete @attrs{qw/dsn user password/}) {
422         warn 'dsn/user/password ignored when dbh_maker coderef used in ' .
423              'connect_info';
424       }
425     }
426     else {
427       @args = delete @attrs{qw/dsn user password/};
428     }
429   }
430   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
431     %attrs = (
432       % { $args[3] || {} },
433       % { $args[4] || {} },
434     );
435     @args = @args[0,1,2];
436   }
437
438   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
439   #  the new set of options
440   $self->_sql_maker(undef);
441   $self->_sql_maker_opts({});
442
443   if(keys %attrs) {
444     for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
445       if(my $value = delete $attrs{$storage_opt}) {
446         $self->$storage_opt($value);
447       }
448     }
449     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
450       if(my $opt_val = delete $attrs{$sql_maker_opt}) {
451         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
452       }
453     }
454   }
455
456   if (ref $args[0] eq 'CODE') {
457     # _connect() never looks past $args[0] in this case
458     %attrs = ()
459   } else {
460     %attrs = (
461       %{ $self->_default_dbi_connect_attributes || {} },
462       %attrs,
463     );
464   }
465
466   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
467   $self->_connect_info;
468 }
469
470 sub _default_dbi_connect_attributes {
471   return {
472     AutoCommit => 1,
473     RaiseError => 1,
474     PrintError => 0,
475   };
476 }
477
478 =head2 on_connect_do
479
480 This method is deprecated in favour of setting via L</connect_info>.
481
482 =cut
483
484 =head2 on_disconnect_do
485
486 This method is deprecated in favour of setting via L</connect_info>.
487
488 =cut
489
490 sub _parse_connect_do {
491   my ($self, $type) = @_;
492
493   my $val = $self->$type;
494   return () if not defined $val;
495
496   my @res;
497
498   if (not ref($val)) {
499     push @res, [ 'do_sql', $val ];
500   } elsif (ref($val) eq 'CODE') {
501     push @res, $val;
502   } elsif (ref($val) eq 'ARRAY') {
503     push @res, map { [ 'do_sql', $_ ] } @$val;
504   } else {
505     $self->throw_exception("Invalid type for $type: ".ref($val));
506   }
507
508   return \@res;
509 }
510
511 =head2 dbh_do
512
513 Arguments: ($subref | $method_name), @extra_coderef_args?
514
515 Execute the given $subref or $method_name using the new exception-based
516 connection management.
517
518 The first two arguments will be the storage object that C<dbh_do> was called
519 on and a database handle to use.  Any additional arguments will be passed
520 verbatim to the called subref as arguments 2 and onwards.
521
522 Using this (instead of $self->_dbh or $self->dbh) ensures correct
523 exception handling and reconnection (or failover in future subclasses).
524
525 Your subref should have no side-effects outside of the database, as
526 there is the potential for your subref to be partially double-executed
527 if the database connection was stale/dysfunctional.
528
529 Example:
530
531   my @stuff = $schema->storage->dbh_do(
532     sub {
533       my ($storage, $dbh, @cols) = @_;
534       my $cols = join(q{, }, @cols);
535       $dbh->selectrow_array("SELECT $cols FROM foo");
536     },
537     @column_list
538   );
539
540 =cut
541
542 sub dbh_do {
543   my $self = shift;
544   my $code = shift;
545
546   my $dbh = $self->_dbh;
547
548   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
549       || $self->{transaction_depth};
550
551   local $self->{_in_dbh_do} = 1;
552
553   my @result;
554   my $want_array = wantarray;
555
556   eval {
557     $self->_verify_pid if $dbh;
558     if(!$self->_dbh) {
559         $self->_populate_dbh;
560         $dbh = $self->_dbh;
561     }
562
563     if($want_array) {
564         @result = $self->$code($dbh, @_);
565     }
566     elsif(defined $want_array) {
567         $result[0] = $self->$code($dbh, @_);
568     }
569     else {
570         $self->$code($dbh, @_);
571     }
572   };
573
574   # ->connected might unset $@ - copy
575   my $exception = $@;
576   if(!$exception) { return $want_array ? @result : $result[0] }
577
578   $self->throw_exception($exception) if $self->connected;
579
580   # We were not connected - reconnect and retry, but let any
581   #  exception fall right through this time
582   carp "Retrying $code after catching disconnected exception: $exception"
583     if $ENV{DBIC_DBIRETRY_DEBUG};
584   $self->_populate_dbh;
585   $self->$code($self->_dbh, @_);
586 }
587
588 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
589 # It also informs dbh_do to bypass itself while under the direction of txn_do,
590 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
591 sub txn_do {
592   my $self = shift;
593   my $coderef = shift;
594
595   ref $coderef eq 'CODE' or $self->throw_exception
596     ('$coderef must be a CODE reference');
597
598   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
599
600   local $self->{_in_dbh_do} = 1;
601
602   my @result;
603   my $want_array = wantarray;
604
605   my $tried = 0;
606   while(1) {
607     eval {
608       $self->_verify_pid if $self->_dbh;
609       $self->_populate_dbh if !$self->_dbh;
610
611       $self->txn_begin;
612       if($want_array) {
613           @result = $coderef->(@_);
614       }
615       elsif(defined $want_array) {
616           $result[0] = $coderef->(@_);
617       }
618       else {
619           $coderef->(@_);
620       }
621       $self->txn_commit;
622     };
623
624     # ->connected might unset $@ - copy
625     my $exception = $@;
626     if(!$exception) { return $want_array ? @result : $result[0] }
627
628     if($tried++ || $self->connected) {
629       eval { $self->txn_rollback };
630       my $rollback_exception = $@;
631       if($rollback_exception) {
632         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
633         $self->throw_exception($exception)  # propagate nested rollback
634           if $rollback_exception =~ /$exception_class/;
635
636         $self->throw_exception(
637           "Transaction aborted: ${exception}. "
638           . "Rollback failed: ${rollback_exception}"
639         );
640       }
641       $self->throw_exception($exception)
642     }
643
644     # We were not connected, and was first try - reconnect and retry
645     # via the while loop
646     carp "Retrying $coderef after catching disconnected exception: $exception"
647       if $ENV{DBIC_DBIRETRY_DEBUG};
648     $self->_populate_dbh;
649   }
650 }
651
652 =head2 disconnect
653
654 Our C<disconnect> method also performs a rollback first if the
655 database is not in C<AutoCommit> mode.
656
657 =cut
658
659 sub disconnect {
660   my ($self) = @_;
661
662   if( $self->_dbh ) {
663     my @actions;
664
665     push @actions, ( $self->on_disconnect_call || () );
666     push @actions, $self->_parse_connect_do ('on_disconnect_do');
667
668     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
669
670     $self->_dbh->rollback unless $self->_dbh_autocommit;
671     $self->_dbh->disconnect;
672     $self->_dbh(undef);
673     $self->{_dbh_gen}++;
674   }
675 }
676
677 =head2 with_deferred_fk_checks
678
679 =over 4
680
681 =item Arguments: C<$coderef>
682
683 =item Return Value: The return value of $coderef
684
685 =back
686
687 Storage specific method to run the code ref with FK checks deferred or
688 in MySQL's case disabled entirely.
689
690 =cut
691
692 # Storage subclasses should override this
693 sub with_deferred_fk_checks {
694   my ($self, $sub) = @_;
695
696   $sub->();
697 }
698
699 =head2 connected
700
701 =over
702
703 =item Arguments: none
704
705 =item Return Value: 1|0
706
707 =back
708
709 Verifies that the the current database handle is active and ready to execute
710 an SQL statement (i.e. the connection did not get stale, server is still
711 answering, etc.) This method is used internally by L</dbh>.
712
713 =cut
714
715 sub connected {
716   my $self = shift;
717   return 0 unless $self->_seems_connected;
718
719   #be on the safe side
720   local $self->_dbh->{RaiseError} = 1;
721
722   return $self->_ping;
723 }
724
725 sub _seems_connected {
726   my $self = shift;
727
728   my $dbh = $self->_dbh
729     or return 0;
730
731   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
732     $self->_dbh(undef);
733     $self->{_dbh_gen}++;
734     return 0;
735   }
736   else {
737     $self->_verify_pid;
738     return 0 if !$self->_dbh;
739   }
740
741   return $dbh->FETCH('Active');
742 }
743
744 sub _ping {
745   my $self = shift;
746
747   my $dbh = $self->_dbh or return 0;
748
749   return $dbh->ping;
750 }
751
752 # handle pid changes correctly
753 #  NOTE: assumes $self->_dbh is a valid $dbh
754 sub _verify_pid {
755   my ($self) = @_;
756
757   return if defined $self->_conn_pid && $self->_conn_pid == $$;
758
759   $self->_dbh->{InactiveDestroy} = 1;
760   $self->_dbh(undef);
761   $self->{_dbh_gen}++;
762
763   return;
764 }
765
766 sub ensure_connected {
767   my ($self) = @_;
768
769   unless ($self->connected) {
770     $self->_populate_dbh;
771   }
772 }
773
774 =head2 dbh
775
776 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
777 is guaranteed to be healthy by implicitly calling L</connected>, and if
778 necessary performing a reconnection before returning. Keep in mind that this
779 is very B<expensive> on some database engines. Consider using L<dbh_do>
780 instead.
781
782 =cut
783
784 sub dbh {
785   my ($self) = @_;
786
787   if (not $self->_dbh) {
788     $self->_populate_dbh;
789   } else {
790     $self->ensure_connected;
791   }
792   return $self->_dbh;
793 }
794
795 # this is the internal "get dbh or connect (don't check)" method
796 sub _get_dbh {
797   my $self = shift;
798   $self->_populate_dbh unless $self->_dbh;
799   return $self->_dbh;
800 }
801
802 sub _sql_maker_args {
803     my ($self) = @_;
804
805     return (
806       bindtype=>'columns',
807       array_datatypes => 1,
808       limit_dialect => $self->_get_dbh,
809       %{$self->_sql_maker_opts}
810     );
811 }
812
813 sub sql_maker {
814   my ($self) = @_;
815   unless ($self->_sql_maker) {
816     my $sql_maker_class = $self->sql_maker_class;
817     $self->ensure_class_loaded ($sql_maker_class);
818     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
819   }
820   return $self->_sql_maker;
821 }
822
823 sub _rebless {}
824
825 sub _populate_dbh {
826   my ($self) = @_;
827
828   my @info = @{$self->_dbi_connect_info || []};
829   $self->_dbh(undef); # in case ->connected failed we might get sent here
830   $self->_dbh($self->_connect(@info));
831
832   $self->_conn_pid($$);
833   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
834
835   $self->_determine_driver;
836
837   # Always set the transaction depth on connect, since
838   #  there is no transaction in progress by definition
839   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
840
841   $self->_run_connection_actions unless $self->{_in_determine_driver};
842 }
843
844 sub _run_connection_actions {
845   my $self = shift;
846   my @actions;
847
848   push @actions, ( $self->on_connect_call || () );
849   push @actions, $self->_parse_connect_do ('on_connect_do');
850
851   $self->_do_connection_actions(connect_call_ => $_) for @actions;
852 }
853
854 sub _determine_driver {
855   my ($self) = @_;
856
857   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
858     my $started_unconnected = 0;
859     local $self->{_in_determine_driver} = 1;
860
861     if (ref($self) eq __PACKAGE__) {
862       my $driver;
863       if ($self->_dbh) { # we are connected
864         $driver = $self->_dbh->{Driver}{Name};
865       } else {
866         # try to use dsn to not require being connected, the driver may still
867         # force a connection in _rebless to determine version
868         ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
869         $started_unconnected = 1;
870       }
871
872       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
873       if ($self->load_optional_class($storage_class)) {
874         mro::set_mro($storage_class, 'c3');
875         bless $self, $storage_class;
876         $self->_rebless();
877       }
878     }
879
880     $self->_driver_determined(1);
881
882     $self->_run_connection_actions
883         if $started_unconnected && defined $self->_dbh;
884   }
885 }
886
887 sub _do_connection_actions {
888   my $self          = shift;
889   my $method_prefix = shift;
890   my $call          = shift;
891
892   if (not ref($call)) {
893     my $method = $method_prefix . $call;
894     $self->$method(@_);
895   } elsif (ref($call) eq 'CODE') {
896     $self->$call(@_);
897   } elsif (ref($call) eq 'ARRAY') {
898     if (ref($call->[0]) ne 'ARRAY') {
899       $self->_do_connection_actions($method_prefix, $_) for @$call;
900     } else {
901       $self->_do_connection_actions($method_prefix, @$_) for @$call;
902     }
903   } else {
904     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
905   }
906
907   return $self;
908 }
909
910 sub connect_call_do_sql {
911   my $self = shift;
912   $self->_do_query(@_);
913 }
914
915 sub disconnect_call_do_sql {
916   my $self = shift;
917   $self->_do_query(@_);
918 }
919
920 # override in db-specific backend when necessary
921 sub connect_call_datetime_setup { 1 }
922
923 sub _do_query {
924   my ($self, $action) = @_;
925
926   if (ref $action eq 'CODE') {
927     $action = $action->($self);
928     $self->_do_query($_) foreach @$action;
929   }
930   else {
931     # Most debuggers expect ($sql, @bind), so we need to exclude
932     # the attribute hash which is the second argument to $dbh->do
933     # furthermore the bind values are usually to be presented
934     # as named arrayref pairs, so wrap those here too
935     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
936     my $sql = shift @do_args;
937     my $attrs = shift @do_args;
938     my @bind = map { [ undef, $_ ] } @do_args;
939
940     $self->_query_start($sql, @bind);
941     $self->_dbh->do($sql, $attrs, @do_args);
942     $self->_query_end($sql, @bind);
943   }
944
945   return $self;
946 }
947
948 sub _connect {
949   my ($self, @info) = @_;
950
951   $self->throw_exception("You failed to provide any connection info")
952     if !@info;
953
954   my ($old_connect_via, $dbh);
955
956   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
957     $old_connect_via = $DBI::connect_via;
958     $DBI::connect_via = 'connect';
959   }
960
961   eval {
962     if(ref $info[0] eq 'CODE') {
963        $dbh = &{$info[0]}
964     }
965     else {
966        $dbh = DBI->connect(@info);
967     }
968
969     if($dbh && !$self->unsafe) {
970       my $weak_self = $self;
971       Scalar::Util::weaken($weak_self);
972       $dbh->{HandleError} = sub {
973           if ($weak_self) {
974             $weak_self->throw_exception("DBI Exception: $_[0]");
975           }
976           else {
977             croak ("DBI Exception: $_[0]");
978           }
979       };
980       $dbh->{ShowErrorStatement} = 1;
981       $dbh->{RaiseError} = 1;
982       $dbh->{PrintError} = 0;
983     }
984   };
985
986   $DBI::connect_via = $old_connect_via if $old_connect_via;
987
988   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
989     if !$dbh || $@;
990
991   $self->_dbh_autocommit($dbh->{AutoCommit});
992
993   $dbh;
994 }
995
996 sub svp_begin {
997   my ($self, $name) = @_;
998
999   $name = $self->_svp_generate_name
1000     unless defined $name;
1001
1002   $self->throw_exception ("You can't use savepoints outside a transaction")
1003     if $self->{transaction_depth} == 0;
1004
1005   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1006     unless $self->can('_svp_begin');
1007
1008   push @{ $self->{savepoints} }, $name;
1009
1010   $self->debugobj->svp_begin($name) if $self->debug;
1011
1012   return $self->_svp_begin($name);
1013 }
1014
1015 sub svp_release {
1016   my ($self, $name) = @_;
1017
1018   $self->throw_exception ("You can't use savepoints outside a transaction")
1019     if $self->{transaction_depth} == 0;
1020
1021   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1022     unless $self->can('_svp_release');
1023
1024   if (defined $name) {
1025     $self->throw_exception ("Savepoint '$name' does not exist")
1026       unless grep { $_ eq $name } @{ $self->{savepoints} };
1027
1028     # Dig through the stack until we find the one we are releasing.  This keeps
1029     # the stack up to date.
1030     my $svp;
1031
1032     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1033   } else {
1034     $name = pop @{ $self->{savepoints} };
1035   }
1036
1037   $self->debugobj->svp_release($name) if $self->debug;
1038
1039   return $self->_svp_release($name);
1040 }
1041
1042 sub svp_rollback {
1043   my ($self, $name) = @_;
1044
1045   $self->throw_exception ("You can't use savepoints outside a transaction")
1046     if $self->{transaction_depth} == 0;
1047
1048   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1049     unless $self->can('_svp_rollback');
1050
1051   if (defined $name) {
1052       # If they passed us a name, verify that it exists in the stack
1053       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1054           $self->throw_exception("Savepoint '$name' does not exist!");
1055       }
1056
1057       # Dig through the stack until we find the one we are releasing.  This keeps
1058       # the stack up to date.
1059       while(my $s = pop(@{ $self->{savepoints} })) {
1060           last if($s eq $name);
1061       }
1062       # Add the savepoint back to the stack, as a rollback doesn't remove the
1063       # named savepoint, only everything after it.
1064       push(@{ $self->{savepoints} }, $name);
1065   } else {
1066       # We'll assume they want to rollback to the last savepoint
1067       $name = $self->{savepoints}->[-1];
1068   }
1069
1070   $self->debugobj->svp_rollback($name) if $self->debug;
1071
1072   return $self->_svp_rollback($name);
1073 }
1074
1075 sub _svp_generate_name {
1076     my ($self) = @_;
1077
1078     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1079 }
1080
1081 sub txn_begin {
1082   my $self = shift;
1083   if($self->{transaction_depth} == 0) {
1084     $self->debugobj->txn_begin()
1085       if $self->debug;
1086
1087     # being here implies we have AutoCommit => 1
1088     # if the user is utilizing txn_do - good for
1089     # him, otherwise we need to ensure that the
1090     # $dbh is healthy on BEGIN
1091     my $dbh_method = $self->{_in_dbh_do} ? '_dbh' : 'dbh';
1092     $self->$dbh_method->begin_work;
1093
1094   } elsif ($self->auto_savepoint) {
1095     $self->svp_begin;
1096   }
1097   $self->{transaction_depth}++;
1098 }
1099
1100 sub txn_commit {
1101   my $self = shift;
1102   if ($self->{transaction_depth} == 1) {
1103     my $dbh = $self->_dbh;
1104     $self->debugobj->txn_commit()
1105       if ($self->debug);
1106     $dbh->commit;
1107     $self->{transaction_depth} = 0
1108       if $self->_dbh_autocommit;
1109   }
1110   elsif($self->{transaction_depth} > 1) {
1111     $self->{transaction_depth}--;
1112     $self->svp_release
1113       if $self->auto_savepoint;
1114   }
1115 }
1116
1117 sub txn_rollback {
1118   my $self = shift;
1119   my $dbh = $self->_dbh;
1120   eval {
1121     if ($self->{transaction_depth} == 1) {
1122       $self->debugobj->txn_rollback()
1123         if ($self->debug);
1124       $self->{transaction_depth} = 0
1125         if $self->_dbh_autocommit;
1126       $dbh->rollback;
1127     }
1128     elsif($self->{transaction_depth} > 1) {
1129       $self->{transaction_depth}--;
1130       if ($self->auto_savepoint) {
1131         $self->svp_rollback;
1132         $self->svp_release;
1133       }
1134     }
1135     else {
1136       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1137     }
1138   };
1139   if ($@) {
1140     my $error = $@;
1141     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1142     $error =~ /$exception_class/ and $self->throw_exception($error);
1143     # ensure that a failed rollback resets the transaction depth
1144     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1145     $self->throw_exception($error);
1146   }
1147 }
1148
1149 # This used to be the top-half of _execute.  It was split out to make it
1150 #  easier to override in NoBindVars without duping the rest.  It takes up
1151 #  all of _execute's args, and emits $sql, @bind.
1152 sub _prep_for_execute {
1153   my ($self, $op, $extra_bind, $ident, $args) = @_;
1154
1155   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1156     $ident = $ident->from();
1157   }
1158
1159   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1160
1161   unshift(@bind,
1162     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1163       if $extra_bind;
1164   return ($sql, \@bind);
1165 }
1166
1167
1168 sub _fix_bind_params {
1169     my ($self, @bind) = @_;
1170
1171     ### Turn @bind from something like this:
1172     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1173     ### to this:
1174     ###   ( "'1'", "'1'", "'3'" )
1175     return
1176         map {
1177             if ( defined( $_ && $_->[1] ) ) {
1178                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1179             }
1180             else { q{'NULL'}; }
1181         } @bind;
1182 }
1183
1184 sub _query_start {
1185     my ( $self, $sql, @bind ) = @_;
1186
1187     if ( $self->debug ) {
1188         @bind = $self->_fix_bind_params(@bind);
1189
1190         $self->debugobj->query_start( $sql, @bind );
1191     }
1192 }
1193
1194 sub _query_end {
1195     my ( $self, $sql, @bind ) = @_;
1196
1197     if ( $self->debug ) {
1198         @bind = $self->_fix_bind_params(@bind);
1199         $self->debugobj->query_end( $sql, @bind );
1200     }
1201 }
1202
1203 sub _dbh_execute {
1204   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1205
1206   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1207
1208   $self->_query_start( $sql, @$bind );
1209
1210   my $sth = $self->sth($sql,$op);
1211
1212   my $placeholder_index = 1;
1213
1214   foreach my $bound (@$bind) {
1215     my $attributes = {};
1216     my($column_name, @data) = @$bound;
1217
1218     if ($bind_attributes) {
1219       $attributes = $bind_attributes->{$column_name}
1220       if defined $bind_attributes->{$column_name};
1221     }
1222
1223     foreach my $data (@data) {
1224       my $ref = ref $data;
1225       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1226
1227       $sth->bind_param($placeholder_index, $data, $attributes);
1228       $placeholder_index++;
1229     }
1230   }
1231
1232   # Can this fail without throwing an exception anyways???
1233   my $rv = $sth->execute();
1234   $self->throw_exception($sth->errstr) if !$rv;
1235
1236   $self->_query_end( $sql, @$bind );
1237
1238   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1239 }
1240
1241 sub _execute {
1242     my $self = shift;
1243     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1244 }
1245
1246 sub insert {
1247   my ($self, $source, $to_insert) = @_;
1248
1249 # redispatch to insert method of storage we reblessed into, if necessary
1250   if (not $self->_driver_determined) {
1251     $self->_determine_driver;
1252     goto $self->can('insert');
1253   }
1254
1255   my $ident = $source->from;
1256   my $bind_attributes = $self->source_bind_attributes($source);
1257
1258   my $updated_cols = {};
1259
1260   foreach my $col ( $source->columns ) {
1261     if ( !defined $to_insert->{$col} ) {
1262       my $col_info = $source->column_info($col);
1263
1264       if ( $col_info->{auto_nextval} ) {
1265         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1266           'nextval',
1267           $col_info->{sequence} ||
1268             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
1269         );
1270       }
1271     }
1272   }
1273
1274   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
1275
1276   return $updated_cols;
1277 }
1278
1279 ## Still not quite perfect, and EXPERIMENTAL
1280 ## Currently it is assumed that all values passed will be "normal", i.e. not
1281 ## scalar refs, or at least, all the same type as the first set, the statement is
1282 ## only prepped once.
1283 sub insert_bulk {
1284   my ($self, $source, $cols, $data) = @_;
1285   my %colvalues;
1286   my $table = $source->from;
1287   @colvalues{@$cols} = (0..$#$cols);
1288   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
1289
1290   $self->_determine_driver;
1291
1292   $self->_query_start( $sql, @bind );
1293   my $sth = $self->sth($sql);
1294
1295 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1296
1297   ## This must be an arrayref, else nothing works!
1298   my $tuple_status = [];
1299
1300   ## Get the bind_attributes, if any exist
1301   my $bind_attributes = $self->source_bind_attributes($source);
1302
1303   ## Bind the values and execute
1304   my $placeholder_index = 1;
1305
1306   foreach my $bound (@bind) {
1307
1308     my $attributes = {};
1309     my ($column_name, $data_index) = @$bound;
1310
1311     if( $bind_attributes ) {
1312       $attributes = $bind_attributes->{$column_name}
1313       if defined $bind_attributes->{$column_name};
1314     }
1315
1316     my @data = map { $_->[$data_index] } @$data;
1317
1318     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1319     $placeholder_index++;
1320   }
1321   my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
1322   if (my $err = $@) {
1323     my $i = 0;
1324     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1325
1326     $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
1327       if ($i > $#$tuple_status);
1328
1329     require Data::Dumper;
1330     local $Data::Dumper::Terse = 1;
1331     local $Data::Dumper::Indent = 1;
1332     local $Data::Dumper::Useqq = 1;
1333     local $Data::Dumper::Quotekeys = 0;
1334
1335     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1336       $tuple_status->[$i][1],
1337       Data::Dumper::Dumper(
1338         { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
1339       ),
1340     );
1341   }
1342   $self->throw_exception($sth->errstr) if !$rv;
1343
1344   $self->_query_end( $sql, @bind );
1345   return (wantarray ? ($rv, $sth, @bind) : $rv);
1346 }
1347
1348 sub update {
1349   my $self = shift @_;
1350   my $source = shift @_;
1351   $self->_determine_driver;
1352   my $bind_attributes = $self->source_bind_attributes($source);
1353
1354   return $self->_execute('update' => [], $source, $bind_attributes, @_);
1355 }
1356
1357
1358 sub delete {
1359   my $self = shift @_;
1360   my $source = shift @_;
1361   $self->_determine_driver;
1362   my $bind_attrs = $self->source_bind_attributes($source);
1363
1364   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1365 }
1366
1367 # We were sent here because the $rs contains a complex search
1368 # which will require a subquery to select the correct rows
1369 # (i.e. joined or limited resultsets)
1370 #
1371 # Genarating a single PK column subquery is trivial and supported
1372 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1373 # Look at _multipk_update_delete()
1374 sub _subq_update_delete {
1375   my $self = shift;
1376   my ($rs, $op, $values) = @_;
1377
1378   my $rsrc = $rs->result_source;
1379
1380   # we already check this, but double check naively just in case. Should be removed soon
1381   my $sel = $rs->_resolved_attrs->{select};
1382   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1383   my @pcols = $rsrc->primary_columns;
1384   if (@$sel != @pcols) {
1385     $self->throw_exception (
1386       'Subquery update/delete can not be called on resultsets selecting a'
1387      .' number of columns different than the number of primary keys'
1388     );
1389   }
1390
1391   if (@pcols == 1) {
1392     return $self->$op (
1393       $rsrc,
1394       $op eq 'update' ? $values : (),
1395       { $pcols[0] => { -in => $rs->as_query } },
1396     );
1397   }
1398
1399   else {
1400     return $self->_multipk_update_delete (@_);
1401   }
1402 }
1403
1404 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1405 # resultset update/delete involving subqueries. So by default resort
1406 # to simple (and inefficient) delete_all style per-row opearations,
1407 # while allowing specific storages to override this with a faster
1408 # implementation.
1409 #
1410 sub _multipk_update_delete {
1411   return shift->_per_row_update_delete (@_);
1412 }
1413
1414 # This is the default loop used to delete/update rows for multi PK
1415 # resultsets, and used by mysql exclusively (because it can't do anything
1416 # else).
1417 #
1418 # We do not use $row->$op style queries, because resultset update/delete
1419 # is not expected to cascade (this is what delete_all/update_all is for).
1420 #
1421 # There should be no race conditions as the entire operation is rolled
1422 # in a transaction.
1423 #
1424 sub _per_row_update_delete {
1425   my $self = shift;
1426   my ($rs, $op, $values) = @_;
1427
1428   my $rsrc = $rs->result_source;
1429   my @pcols = $rsrc->primary_columns;
1430
1431   my $guard = $self->txn_scope_guard;
1432
1433   # emulate the return value of $sth->execute for non-selects
1434   my $row_cnt = '0E0';
1435
1436   my $subrs_cur = $rs->cursor;
1437   while (my @pks = $subrs_cur->next) {
1438
1439     my $cond;
1440     for my $i (0.. $#pcols) {
1441       $cond->{$pcols[$i]} = $pks[$i];
1442     }
1443
1444     $self->$op (
1445       $rsrc,
1446       $op eq 'update' ? $values : (),
1447       $cond,
1448     );
1449
1450     $row_cnt++;
1451   }
1452
1453   $guard->commit;
1454
1455   return $row_cnt;
1456 }
1457
1458 sub _select {
1459   my $self = shift;
1460
1461   # localization is neccessary as
1462   # 1) there is no infrastructure to pass this around before SQLA2
1463   # 2) _select_args sets it and _prep_for_execute consumes it
1464   my $sql_maker = $self->sql_maker;
1465   local $sql_maker->{_dbic_rs_attrs};
1466
1467   return $self->_execute($self->_select_args(@_));
1468 }
1469
1470 sub _select_args_to_query {
1471   my $self = shift;
1472
1473   # localization is neccessary as
1474   # 1) there is no infrastructure to pass this around before SQLA2
1475   # 2) _select_args sets it and _prep_for_execute consumes it
1476   my $sql_maker = $self->sql_maker;
1477   local $sql_maker->{_dbic_rs_attrs};
1478
1479   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1480   #  = $self->_select_args($ident, $select, $cond, $attrs);
1481   my ($op, $bind, $ident, $bind_attrs, @args) =
1482     $self->_select_args(@_);
1483
1484   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1485   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1486   $prepared_bind ||= [];
1487
1488   return wantarray
1489     ? ($sql, $prepared_bind, $bind_attrs)
1490     : \[ "($sql)", @$prepared_bind ]
1491   ;
1492 }
1493
1494 sub _select_args {
1495   my ($self, $ident, $select, $where, $attrs) = @_;
1496
1497   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1498
1499   my $sql_maker = $self->sql_maker;
1500   $sql_maker->{_dbic_rs_attrs} = {
1501     %$attrs,
1502     select => $select,
1503     from => $ident,
1504     where => $where,
1505     $rs_alias
1506       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1507       : ()
1508     ,
1509   };
1510
1511   # calculate bind_attrs before possible $ident mangling
1512   my $bind_attrs = {};
1513   for my $alias (keys %$alias2source) {
1514     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1515     for my $col (keys %$bindtypes) {
1516
1517       my $fqcn = join ('.', $alias, $col);
1518       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1519
1520       # Unqialified column names are nice, but at the same time can be
1521       # rather ambiguous. What we do here is basically go along with
1522       # the loop, adding an unqualified column slot to $bind_attrs,
1523       # alongside the fully qualified name. As soon as we encounter
1524       # another column by that name (which would imply another table)
1525       # we unset the unqualified slot and never add any info to it
1526       # to avoid erroneous type binding. If this happens the users
1527       # only choice will be to fully qualify his column name
1528
1529       if (exists $bind_attrs->{$col}) {
1530         $bind_attrs->{$col} = {};
1531       }
1532       else {
1533         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1534       }
1535     }
1536   }
1537
1538   # adjust limits
1539   if (
1540     $attrs->{software_limit}
1541       ||
1542     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1543   ) {
1544     $attrs->{software_limit} = 1;
1545   }
1546   else {
1547     $self->throw_exception("rows attribute must be positive if present")
1548       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1549
1550     # MySQL actually recommends this approach.  I cringe.
1551     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1552   }
1553
1554   my @limit;
1555
1556   # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
1557   # otherwise delegate the limiting to the storage, unless software limit was requested
1558   if (
1559     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1560        ||
1561     ( $attrs->{group_by} && @{$attrs->{group_by}} &&
1562       $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
1563   ) {
1564     ($ident, $select, $where, $attrs)
1565       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1566   }
1567   elsif (! $attrs->{software_limit} ) {
1568     push @limit, $attrs->{rows}, $attrs->{offset};
1569   }
1570
1571 ###
1572   # This would be the point to deflate anything found in $where
1573   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1574   # expect a row object. And all we have is a resultsource (it is trivial
1575   # to extract deflator coderefs via $alias2source above).
1576   #
1577   # I don't see a way forward other than changing the way deflators are
1578   # invoked, and that's just bad...
1579 ###
1580
1581   my $order = { map
1582     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1583     (qw/order_by group_by having/ )
1584   };
1585
1586   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1587 }
1588
1589 #
1590 # This is the code producing joined subqueries like:
1591 # SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
1592 #
1593 sub _adjust_select_args_for_complex_prefetch {
1594   my ($self, $from, $select, $where, $attrs) = @_;
1595
1596   $self->throw_exception ('Nothing to prefetch... how did we get here?!')
1597     if not @{$attrs->{_prefetch_select}};
1598
1599   $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
1600     if (ref $from ne 'ARRAY' || ref $from->[0] ne 'HASH' || ref $from->[1] ne 'ARRAY');
1601
1602
1603   # generate inner/outer attribute lists, remove stuff that doesn't apply
1604   my $outer_attrs = { %$attrs };
1605   delete $outer_attrs->{$_} for qw/where bind rows offset group_by having/;
1606
1607   my $inner_attrs = { %$attrs };
1608   delete $inner_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
1609
1610
1611   # bring over all non-collapse-induced order_by into the inner query (if any)
1612   # the outer one will have to keep them all
1613   delete $inner_attrs->{order_by};
1614   if (my $ord_cnt = @{$outer_attrs->{order_by}} - @{$outer_attrs->{_collapse_order_by}} ) {
1615     $inner_attrs->{order_by} = [
1616       @{$outer_attrs->{order_by}}[ 0 .. $ord_cnt - 1]
1617     ];
1618   }
1619
1620
1621   # generate the inner/outer select lists
1622   # for inside we consider only stuff *not* brought in by the prefetch
1623   # on the outside we substitute any function for its alias
1624   my $outer_select = [ @$select ];
1625   my $inner_select = [];
1626   for my $i (0 .. ( @$outer_select - @{$outer_attrs->{_prefetch_select}} - 1) ) {
1627     my $sel = $outer_select->[$i];
1628
1629     if (ref $sel eq 'HASH' ) {
1630       $sel->{-as} ||= $attrs->{as}[$i];
1631       $outer_select->[$i] = join ('.', $attrs->{alias}, ($sel->{-as} || "inner_column_$i") );
1632     }
1633
1634     push @$inner_select, $sel;
1635   }
1636
1637   # normalize a copy of $from, so it will be easier to work with further
1638   # down (i.e. promote the initial hashref to an AoH)
1639   $from = [ @$from ];
1640   $from->[0] = [ $from->[0] ];
1641   my %original_join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
1642
1643
1644   # decide which parts of the join will remain in either part of
1645   # the outer/inner query
1646
1647   # First we compose a list of which aliases are used in restrictions
1648   # (i.e. conditions/order/grouping/etc). Since we do not have
1649   # introspectable SQLA, we fall back to ugly scanning of raw SQL for
1650   # WHERE, and for pieces of ORDER BY in order to determine which aliases
1651   # need to appear in the resulting sql.
1652   # It may not be very efficient, but it's a reasonable stop-gap
1653   # Also unqualified column names will not be considered, but more often
1654   # than not this is actually ok
1655   #
1656   # In the same loop we enumerate part of the selection aliases, as
1657   # it requires the same sqla hack for the time being
1658   my ($restrict_aliases, $select_aliases, $prefetch_aliases);
1659   {
1660     # produce stuff unquoted, so it can be scanned
1661     my $sql_maker = $self->sql_maker;
1662     local $sql_maker->{quote_char};
1663     my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1664     $sep = "\Q$sep\E";
1665
1666     my $non_prefetch_select_sql = $sql_maker->_recurse_fields ($inner_select);
1667     my $prefetch_select_sql = $sql_maker->_recurse_fields ($outer_attrs->{_prefetch_select});
1668     my $where_sql = $sql_maker->where ($where);
1669     my $group_by_sql = $sql_maker->_order_by({
1670       map { $_ => $inner_attrs->{$_} } qw/group_by having/
1671     });
1672     my @non_prefetch_order_by_chunks = (map
1673       { ref $_ ? $_->[0] : $_ }
1674       $sql_maker->_order_by_chunks ($inner_attrs->{order_by})
1675     );
1676
1677
1678     for my $alias (keys %original_join_info) {
1679       my $seen_re = qr/\b $alias $sep/x;
1680
1681       for my $piece ($where_sql, $group_by_sql, @non_prefetch_order_by_chunks ) {
1682         if ($piece =~ $seen_re) {
1683           $restrict_aliases->{$alias} = 1;
1684         }
1685       }
1686
1687       if ($non_prefetch_select_sql =~ $seen_re) {
1688           $select_aliases->{$alias} = 1;
1689       }
1690
1691       if ($prefetch_select_sql =~ $seen_re) {
1692           $prefetch_aliases->{$alias} = 1;
1693       }
1694
1695     }
1696   }
1697
1698   # Add any non-left joins to the restriction list (such joins are indeed restrictions)
1699   for my $j (values %original_join_info) {
1700     my $alias = $j->{-alias} or next;
1701     $restrict_aliases->{$alias} = 1 if (
1702       (not $j->{-join_type})
1703         or
1704       ($j->{-join_type} !~ /^left (?: \s+ outer)? $/xi)
1705     );
1706   }
1707
1708   # mark all join parents as mentioned
1709   # (e.g.  join => { cds => 'tracks' } - tracks will need to bring cds too )
1710   for my $collection ($restrict_aliases, $select_aliases) {
1711     for my $alias (keys %$collection) {
1712       $collection->{$_} = 1
1713         for (@{ $original_join_info{$alias}{-join_path} || [] });
1714     }
1715   }
1716
1717   # construct the inner $from for the subquery
1718   my %inner_joins = (map { %{$_ || {}} } ($restrict_aliases, $select_aliases) );
1719   my @inner_from;
1720   for my $j (@$from) {
1721     push @inner_from, $j if $inner_joins{$j->[0]{-alias}};
1722   }
1723
1724   # if a multi-type join was needed in the subquery ("multi" is indicated by
1725   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
1726   unless ($inner_attrs->{group_by}) {
1727     for my $alias (keys %inner_joins) {
1728
1729       # the dot comes from some weirdness in collapse
1730       # remove after the rewrite
1731       if ($attrs->{collapse}{".$alias"}) {
1732         $inner_attrs->{group_by} ||= $inner_select;
1733         last;
1734       }
1735     }
1736   }
1737
1738   # demote the inner_from head
1739   $inner_from[0] = $inner_from[0][0];
1740
1741   # generate the subquery
1742   my $subq = $self->_select_args_to_query (
1743     \@inner_from,
1744     $inner_select,
1745     $where,
1746     $inner_attrs,
1747   );
1748
1749   my $subq_joinspec = {
1750     -alias => $attrs->{alias},
1751     -source_handle => $inner_from[0]{-source_handle},
1752     $attrs->{alias} => $subq,
1753   };
1754
1755   # Generate the outer from - this is relatively easy (really just replace
1756   # the join slot with the subquery), with a major caveat - we can not
1757   # join anything that is non-selecting (not part of the prefetch), but at
1758   # the same time is a multi-type relationship, as it will explode the result.
1759   #
1760   # There are two possibilities here
1761   # - either the join is non-restricting, in which case we simply throw it away
1762   # - it is part of the restrictions, in which case we need to collapse the outer
1763   #   result by tackling yet another group_by to the outside of the query
1764
1765   # so first generate the outer_from, up to the substitution point
1766   my @outer_from;
1767   while (my $j = shift @$from) {
1768     if ($j->[0]{-alias} eq $attrs->{alias}) { # time to swap
1769       push @outer_from, [
1770         $subq_joinspec,
1771         @{$j}[1 .. $#$j],
1772       ];
1773       last; # we'll take care of what's left in $from below
1774     }
1775     else {
1776       push @outer_from, $j;
1777     }
1778   }
1779
1780   # see what's left - throw away if not selecting/restricting
1781   # also throw in a group_by if restricting to guard against
1782   # cross-join explosions
1783   #
1784   while (my $j = shift @$from) {
1785     my $alias = $j->[0]{-alias};
1786
1787     if ($select_aliases->{$alias} || $prefetch_aliases->{$alias}) {
1788       push @outer_from, $j;
1789     }
1790     elsif ($restrict_aliases->{$alias}) {
1791       push @outer_from, $j;
1792
1793       # FIXME - this should be obviated by SQLA2, as I'll be able to 
1794       # have restrict_inner and restrict_outer... or something to that
1795       # effect... I think...
1796
1797       # FIXME2 - I can't find a clean way to determine if a particular join
1798       # is a multi - instead I am just treating everything as a potential
1799       # explosive join (ribasushi)
1800       #
1801       # if (my $handle = $j->[0]{-source_handle}) {
1802       #   my $rsrc = $handle->resolve;
1803       #   ... need to bail out of the following if this is not a multi,
1804       #       as it will be much easier on the db ...
1805
1806           $outer_attrs->{group_by} ||= $outer_select;
1807       # }
1808     }
1809   }
1810
1811   # demote the outer_from head
1812   $outer_from[0] = $outer_from[0][0];
1813
1814   # This is totally horrific - the $where ends up in both the inner and outer query
1815   # Unfortunately not much can be done until SQLA2 introspection arrives, and even
1816   # then if where conditions apply to the *right* side of the prefetch, you may have
1817   # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
1818   # the outer select to exclude joins you didin't want in the first place
1819   #
1820   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
1821   return (\@outer_from, $outer_select, $where, $outer_attrs);
1822 }
1823
1824 sub _resolve_ident_sources {
1825   my ($self, $ident) = @_;
1826
1827   my $alias2source = {};
1828   my $rs_alias;
1829
1830   # the reason this is so contrived is that $ident may be a {from}
1831   # structure, specifying multiple tables to join
1832   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1833     # this is compat mode for insert/update/delete which do not deal with aliases
1834     $alias2source->{me} = $ident;
1835     $rs_alias = 'me';
1836   }
1837   elsif (ref $ident eq 'ARRAY') {
1838
1839     for (@$ident) {
1840       my $tabinfo;
1841       if (ref $_ eq 'HASH') {
1842         $tabinfo = $_;
1843         $rs_alias = $tabinfo->{-alias};
1844       }
1845       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
1846         $tabinfo = $_->[0];
1847       }
1848
1849       $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
1850         if ($tabinfo->{-source_handle});
1851     }
1852   }
1853
1854   return ($alias2source, $rs_alias);
1855 }
1856
1857 # Takes $ident, \@column_names
1858 #
1859 # returns { $column_name => \%column_info, ... }
1860 # also note: this adds -result_source => $rsrc to the column info
1861 #
1862 # usage:
1863 #   my $col_sources = $self->_resolve_column_info($ident, @column_names);
1864 sub _resolve_column_info {
1865   my ($self, $ident, $colnames) = @_;
1866   my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
1867
1868   my $sep = $self->_sql_maker_opts->{name_sep} || '.';
1869   $sep = "\Q$sep\E";
1870
1871   my (%return, %seen_cols);
1872
1873   # compile a global list of column names, to be able to properly
1874   # disambiguate unqualified column names (if at all possible)
1875   for my $alias (keys %$alias2src) {
1876     my $rsrc = $alias2src->{$alias};
1877     for my $colname ($rsrc->columns) {
1878       push @{$seen_cols{$colname}}, $alias;
1879     }
1880   }
1881
1882   COLUMN:
1883   foreach my $col (@$colnames) {
1884     my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
1885
1886     unless ($alias) {
1887       # see if the column was seen exactly once (so we know which rsrc it came from)
1888       if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
1889         $alias = $seen_cols{$colname}[0];
1890       }
1891       else {
1892         next COLUMN;
1893       }
1894     }
1895
1896     my $rsrc = $alias2src->{$alias};
1897     $return{$col} = $rsrc && {
1898       %{$rsrc->column_info($colname)},
1899       -result_source => $rsrc,
1900       -source_alias => $alias,
1901     };
1902   }
1903
1904   return \%return;
1905 }
1906
1907 # Returns a counting SELECT for a simple count
1908 # query. Abstracted so that a storage could override
1909 # this to { count => 'firstcol' } or whatever makes
1910 # sense as a performance optimization
1911 sub _count_select {
1912   #my ($self, $source, $rs_attrs) = @_;
1913   return { count => '*' };
1914 }
1915
1916 # Returns a SELECT which will end up in the subselect
1917 # There may or may not be a group_by, as the subquery
1918 # might have been called to accomodate a limit
1919 #
1920 # Most databases would be happy with whatever ends up
1921 # here, but some choke in various ways.
1922 #
1923 sub _subq_count_select {
1924   my ($self, $source, $rs_attrs) = @_;
1925   return $rs_attrs->{group_by} if $rs_attrs->{group_by};
1926
1927   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1928   return @pcols ? \@pcols : [ 1 ];
1929 }
1930
1931
1932 sub source_bind_attributes {
1933   my ($self, $source) = @_;
1934
1935   my $bind_attributes;
1936   foreach my $column ($source->columns) {
1937
1938     my $data_type = $source->column_info($column)->{data_type} || '';
1939     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1940      if $data_type;
1941   }
1942
1943   return $bind_attributes;
1944 }
1945
1946 =head2 select
1947
1948 =over 4
1949
1950 =item Arguments: $ident, $select, $condition, $attrs
1951
1952 =back
1953
1954 Handle a SQL select statement.
1955
1956 =cut
1957
1958 sub select {
1959   my $self = shift;
1960   my ($ident, $select, $condition, $attrs) = @_;
1961   return $self->cursor_class->new($self, \@_, $attrs);
1962 }
1963
1964 sub select_single {
1965   my $self = shift;
1966   my ($rv, $sth, @bind) = $self->_select(@_);
1967   my @row = $sth->fetchrow_array;
1968   my @nextrow = $sth->fetchrow_array if @row;
1969   if(@row && @nextrow) {
1970     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1971   }
1972   # Need to call finish() to work round broken DBDs
1973   $sth->finish();
1974   return @row;
1975 }
1976
1977 =head2 sth
1978
1979 =over 4
1980
1981 =item Arguments: $sql
1982
1983 =back
1984
1985 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1986
1987 =cut
1988
1989 sub _dbh_sth {
1990   my ($self, $dbh, $sql) = @_;
1991
1992   # 3 is the if_active parameter which avoids active sth re-use
1993   my $sth = $self->disable_sth_caching
1994     ? $dbh->prepare($sql)
1995     : $dbh->prepare_cached($sql, {}, 3);
1996
1997   # XXX You would think RaiseError would make this impossible,
1998   #  but apparently that's not true :(
1999   $self->throw_exception($dbh->errstr) if !$sth;
2000
2001   $sth;
2002 }
2003
2004 sub sth {
2005   my ($self, $sql) = @_;
2006   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2007 }
2008
2009 sub _dbh_columns_info_for {
2010   my ($self, $dbh, $table) = @_;
2011
2012   if ($dbh->can('column_info')) {
2013     my %result;
2014     eval {
2015       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2016       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2017       $sth->execute();
2018       while ( my $info = $sth->fetchrow_hashref() ){
2019         my %column_info;
2020         $column_info{data_type}   = $info->{TYPE_NAME};
2021         $column_info{size}      = $info->{COLUMN_SIZE};
2022         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2023         $column_info{default_value} = $info->{COLUMN_DEF};
2024         my $col_name = $info->{COLUMN_NAME};
2025         $col_name =~ s/^\"(.*)\"$/$1/;
2026
2027         $result{$col_name} = \%column_info;
2028       }
2029     };
2030     return \%result if !$@ && scalar keys %result;
2031   }
2032
2033   my %result;
2034   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2035   $sth->execute;
2036   my @columns = @{$sth->{NAME_lc}};
2037   for my $i ( 0 .. $#columns ){
2038     my %column_info;
2039     $column_info{data_type} = $sth->{TYPE}->[$i];
2040     $column_info{size} = $sth->{PRECISION}->[$i];
2041     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2042
2043     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2044       $column_info{data_type} = $1;
2045       $column_info{size}    = $2;
2046     }
2047
2048     $result{$columns[$i]} = \%column_info;
2049   }
2050   $sth->finish;
2051
2052   foreach my $col (keys %result) {
2053     my $colinfo = $result{$col};
2054     my $type_num = $colinfo->{data_type};
2055     my $type_name;
2056     if(defined $type_num && $dbh->can('type_info')) {
2057       my $type_info = $dbh->type_info($type_num);
2058       $type_name = $type_info->{TYPE_NAME} if $type_info;
2059       $colinfo->{data_type} = $type_name if $type_name;
2060     }
2061   }
2062
2063   return \%result;
2064 }
2065
2066 sub columns_info_for {
2067   my ($self, $table) = @_;
2068   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2069 }
2070
2071 =head2 last_insert_id
2072
2073 Return the row id of the last insert.
2074
2075 =cut
2076
2077 sub _dbh_last_insert_id {
2078     # All Storage's need to register their own _dbh_last_insert_id
2079     # the old SQLite-based method was highly inappropriate
2080
2081     my $self = shift;
2082     my $class = ref $self;
2083     $self->throw_exception (<<EOE);
2084
2085 No _dbh_last_insert_id() method found in $class.
2086 Since the method of obtaining the autoincrement id of the last insert
2087 operation varies greatly between different databases, this method must be
2088 individually implemented for every storage class.
2089 EOE
2090 }
2091
2092 sub last_insert_id {
2093   my $self = shift;
2094   $self->_dbh_last_insert_id ($self->_dbh, @_);
2095 }
2096
2097 =head2 _native_data_type
2098
2099 =over 4
2100
2101 =item Arguments: $type_name
2102
2103 =back
2104
2105 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2106 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2107 L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
2108
2109 The default implementation returns C<undef>, implement in your Storage driver if
2110 you need this functionality.
2111
2112 Should map types from other databases to the native RDBMS type, for example
2113 C<VARCHAR2> to C<VARCHAR>.
2114
2115 Types with modifiers should map to the underlying data type. For example,
2116 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2117
2118 Composite types should map to the container type, for example
2119 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2120
2121 =cut
2122
2123 sub _native_data_type {
2124   #my ($self, $data_type) = @_;
2125   return undef
2126 }
2127
2128 =head2 sqlt_type
2129
2130 Returns the database driver name.
2131
2132 =cut
2133
2134 sub sqlt_type {
2135   my ($self) = @_;
2136
2137   if (not $self->_driver_determined) {
2138     $self->_determine_driver;
2139     goto $self->can ('sqlt_type');
2140   }
2141
2142   $self->_get_dbh->{Driver}->{Name};
2143 }
2144
2145 =head2 bind_attribute_by_data_type
2146
2147 Given a datatype from column info, returns a database specific bind
2148 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2149 let the database planner just handle it.
2150
2151 Generally only needed for special case column types, like bytea in postgres.
2152
2153 =cut
2154
2155 sub bind_attribute_by_data_type {
2156     return;
2157 }
2158
2159 =head2 is_datatype_numeric
2160
2161 Given a datatype from column_info, returns a boolean value indicating if
2162 the current RDBMS considers it a numeric value. This controls how
2163 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2164 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2165 be performed instead of the usual C<eq>.
2166
2167 =cut
2168
2169 sub is_datatype_numeric {
2170   my ($self, $dt) = @_;
2171
2172   return 0 unless $dt;
2173
2174   return $dt =~ /^ (?:
2175     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2176   ) $/ix;
2177 }
2178
2179
2180 =head2 create_ddl_dir (EXPERIMENTAL)
2181
2182 =over 4
2183
2184 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2185
2186 =back
2187
2188 Creates a SQL file based on the Schema, for each of the specified
2189 database engines in C<\@databases> in the given directory.
2190 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2191
2192 Given a previous version number, this will also create a file containing
2193 the ALTER TABLE statements to transform the previous schema into the
2194 current one. Note that these statements may contain C<DROP TABLE> or
2195 C<DROP COLUMN> statements that can potentially destroy data.
2196
2197 The file names are created using the C<ddl_filename> method below, please
2198 override this method in your schema if you would like a different file
2199 name format. For the ALTER file, the same format is used, replacing
2200 $version in the name with "$preversion-$version".
2201
2202 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2203 The most common value for this would be C<< { add_drop_table => 1 } >>
2204 to have the SQL produced include a C<DROP TABLE> statement for each table
2205 created. For quoting purposes supply C<quote_table_names> and
2206 C<quote_field_names>.
2207
2208 If no arguments are passed, then the following default values are assumed:
2209
2210 =over 4
2211
2212 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2213
2214 =item version    - $schema->schema_version
2215
2216 =item directory  - './'
2217
2218 =item preversion - <none>
2219
2220 =back
2221
2222 By default, C<\%sqlt_args> will have
2223
2224  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2225
2226 merged with the hash passed in. To disable any of those features, pass in a
2227 hashref like the following
2228
2229  { ignore_constraint_names => 0, # ... other options }
2230
2231
2232 Note that this feature is currently EXPERIMENTAL and may not work correctly
2233 across all databases, or fully handle complex relationships.
2234
2235 WARNING: Please check all SQL files created, before applying them.
2236
2237 =cut
2238
2239 sub create_ddl_dir {
2240   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2241
2242   if(!$dir || !-d $dir) {
2243     carp "No directory given, using ./\n";
2244     $dir = "./";
2245   }
2246   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2247   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2248
2249   my $schema_version = $schema->schema_version || '1.x';
2250   $version ||= $schema_version;
2251
2252   $sqltargs = {
2253     add_drop_table => 1,
2254     ignore_constraint_names => 1,
2255     ignore_index_names => 1,
2256     %{$sqltargs || {}}
2257   };
2258
2259   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
2260       . $self->_check_sqlt_message . q{'})
2261           if !$self->_check_sqlt_version;
2262
2263   my $sqlt = SQL::Translator->new( $sqltargs );
2264
2265   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2266   my $sqlt_schema = $sqlt->translate({ data => $schema })
2267     or $self->throw_exception ($sqlt->error);
2268
2269   foreach my $db (@$databases) {
2270     $sqlt->reset();
2271     $sqlt->{schema} = $sqlt_schema;
2272     $sqlt->producer($db);
2273
2274     my $file;
2275     my $filename = $schema->ddl_filename($db, $version, $dir);
2276     if (-e $filename && ($version eq $schema_version )) {
2277       # if we are dumping the current version, overwrite the DDL
2278       carp "Overwriting existing DDL file - $filename";
2279       unlink($filename);
2280     }
2281
2282     my $output = $sqlt->translate;
2283     if(!$output) {
2284       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2285       next;
2286     }
2287     if(!open($file, ">$filename")) {
2288       $self->throw_exception("Can't open $filename for writing ($!)");
2289       next;
2290     }
2291     print $file $output;
2292     close($file);
2293
2294     next unless ($preversion);
2295
2296     require SQL::Translator::Diff;
2297
2298     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2299     if(!-e $prefilename) {
2300       carp("No previous schema file found ($prefilename)");
2301       next;
2302     }
2303
2304     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2305     if(-e $difffile) {
2306       carp("Overwriting existing diff file - $difffile");
2307       unlink($difffile);
2308     }
2309
2310     my $source_schema;
2311     {
2312       my $t = SQL::Translator->new($sqltargs);
2313       $t->debug( 0 );
2314       $t->trace( 0 );
2315
2316       $t->parser( $db )
2317         or $self->throw_exception ($t->error);
2318
2319       my $out = $t->translate( $prefilename )
2320         or $self->throw_exception ($t->error);
2321
2322       $source_schema = $t->schema;
2323
2324       $source_schema->name( $prefilename )
2325         unless ( $source_schema->name );
2326     }
2327
2328     # The "new" style of producers have sane normalization and can support
2329     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2330     # And we have to diff parsed SQL against parsed SQL.
2331     my $dest_schema = $sqlt_schema;
2332
2333     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2334       my $t = SQL::Translator->new($sqltargs);
2335       $t->debug( 0 );
2336       $t->trace( 0 );
2337
2338       $t->parser( $db )
2339         or $self->throw_exception ($t->error);
2340
2341       my $out = $t->translate( $filename )
2342         or $self->throw_exception ($t->error);
2343
2344       $dest_schema = $t->schema;
2345
2346       $dest_schema->name( $filename )
2347         unless $dest_schema->name;
2348     }
2349
2350     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2351                                                   $dest_schema,   $db,
2352                                                   $sqltargs
2353                                                  );
2354     if(!open $file, ">$difffile") {
2355       $self->throw_exception("Can't write to $difffile ($!)");
2356       next;
2357     }
2358     print $file $diff;
2359     close($file);
2360   }
2361 }
2362
2363 =head2 deployment_statements
2364
2365 =over 4
2366
2367 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2368
2369 =back
2370
2371 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2372
2373 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2374 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2375
2376 C<$directory> is used to return statements from files in a previously created
2377 L</create_ddl_dir> directory and is optional. The filenames are constructed
2378 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2379
2380 If no C<$directory> is specified then the statements are constructed on the
2381 fly using L<SQL::Translator> and C<$version> is ignored.
2382
2383 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2384
2385 =cut
2386
2387 sub deployment_statements {
2388   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2389   $type ||= $self->sqlt_type;
2390   $version ||= $schema->schema_version || '1.x';
2391   $dir ||= './';
2392   my $filename = $schema->ddl_filename($type, $version, $dir);
2393   if(-f $filename)
2394   {
2395       my $file;
2396       open($file, "<$filename")
2397         or $self->throw_exception("Can't open $filename ($!)");
2398       my @rows = <$file>;
2399       close($file);
2400       return join('', @rows);
2401   }
2402
2403   $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
2404       . $self->_check_sqlt_message . q{'})
2405           if !$self->_check_sqlt_version;
2406
2407   # sources needs to be a parser arg, but for simplicty allow at top level
2408   # coming in
2409   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2410       if exists $sqltargs->{sources};
2411
2412   my $tr = SQL::Translator->new(
2413     producer => "SQL::Translator::Producer::${type}",
2414     %$sqltargs,
2415     parser => 'SQL::Translator::Parser::DBIx::Class',
2416     data => $schema,
2417   );
2418   return $tr->translate;
2419 }
2420
2421 sub deploy {
2422   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2423   my $deploy = sub {
2424     my $line = shift;
2425     return if($line =~ /^--/);
2426     return if(!$line);
2427     # next if($line =~ /^DROP/m);
2428     return if($line =~ /^BEGIN TRANSACTION/m);
2429     return if($line =~ /^COMMIT/m);
2430     return if $line =~ /^\s+$/; # skip whitespace only
2431     $self->_query_start($line);
2432     eval {
2433       # do a dbh_do cycle here, as we need some error checking in
2434       # place (even though we will ignore errors)
2435       $self->dbh_do (sub { $_[1]->do($line) });
2436     };
2437     if ($@) {
2438       carp qq{$@ (running "${line}")};
2439     }
2440     $self->_query_end($line);
2441   };
2442   my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2443   if (@statements > 1) {
2444     foreach my $statement (@statements) {
2445       $deploy->( $statement );
2446     }
2447   }
2448   elsif (@statements == 1) {
2449     foreach my $line ( split(";\n", $statements[0])) {
2450       $deploy->( $line );
2451     }
2452   }
2453 }
2454
2455 =head2 datetime_parser
2456
2457 Returns the datetime parser class
2458
2459 =cut
2460
2461 sub datetime_parser {
2462   my $self = shift;
2463   return $self->{datetime_parser} ||= do {
2464     $self->_populate_dbh unless $self->_dbh;
2465     $self->build_datetime_parser(@_);
2466   };
2467 }
2468
2469 =head2 datetime_parser_type
2470
2471 Defines (returns) the datetime parser class - currently hardwired to
2472 L<DateTime::Format::MySQL>
2473
2474 =cut
2475
2476 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2477
2478 =head2 build_datetime_parser
2479
2480 See L</datetime_parser>
2481
2482 =cut
2483
2484 sub build_datetime_parser {
2485   my $self = shift;
2486   my $type = $self->datetime_parser_type(@_);
2487   eval "use ${type}";
2488   $self->throw_exception("Couldn't load ${type}: $@") if $@;
2489   return $type;
2490 }
2491
2492 {
2493     my $_check_sqlt_version; # private
2494     my $_check_sqlt_message; # private
2495     sub _check_sqlt_version {
2496         return $_check_sqlt_version if defined $_check_sqlt_version;
2497         eval 'use SQL::Translator "0.09003"';
2498         $_check_sqlt_message = $@ || '';
2499         $_check_sqlt_version = !$@;
2500     }
2501
2502     sub _check_sqlt_message {
2503         _check_sqlt_version if !defined $_check_sqlt_message;
2504         $_check_sqlt_message;
2505     }
2506 }
2507
2508 =head2 is_replicating
2509
2510 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2511 replicate from a master database.  Default is undef, which is the result
2512 returned by databases that don't support replication.
2513
2514 =cut
2515
2516 sub is_replicating {
2517     return;
2518
2519 }
2520
2521 =head2 lag_behind_master
2522
2523 Returns a number that represents a certain amount of lag behind a master db
2524 when a given storage is replicating.  The number is database dependent, but
2525 starts at zero and increases with the amount of lag. Default in undef
2526
2527 =cut
2528
2529 sub lag_behind_master {
2530     return;
2531 }
2532
2533 sub DESTROY {
2534   my $self = shift;
2535   $self->_verify_pid if $self->_dbh;
2536
2537   # some databases need this to stop spewing warnings
2538   if (my $dbh = $self->_dbh) {
2539     eval { $dbh->disconnect };
2540   }
2541
2542   $self->_dbh(undef);
2543 }
2544
2545 1;
2546
2547 =head1 USAGE NOTES
2548
2549 =head2 DBIx::Class and AutoCommit
2550
2551 DBIx::Class can do some wonderful magic with handling exceptions,
2552 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2553 (the default) combined with C<txn_do> for transaction support.
2554
2555 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2556 in an assumed transaction between commits, and you're telling us you'd
2557 like to manage that manually.  A lot of the magic protections offered by
2558 this module will go away.  We can't protect you from exceptions due to database
2559 disconnects because we don't know anything about how to restart your
2560 transactions.  You're on your own for handling all sorts of exceptional
2561 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2562 be with raw DBI.
2563
2564
2565 =head1 AUTHORS
2566
2567 Matt S. Trout <mst@shadowcatsystems.co.uk>
2568
2569 Andy Grundman <andy@hybridized.org>
2570
2571 =head1 LICENSE
2572
2573 You may distribute this code under the same terms as Perl itself.
2574
2575 =cut