create_ddl_dir mkpaths its dir if necessary. also, added storage/deploy.t as place...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17 use Sub::Name ();
18
19 use File::Path ();
20
21 __PACKAGE__->mk_group_accessors('simple' =>
22   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
23      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints
24      _server_info_hash/
25 );
26
27 # the values for these accessors are picked out (and deleted) from
28 # the attribute hashref passed to connect_info
29 my @storage_options = qw/
30   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
31   disable_sth_caching unsafe auto_savepoint
32 /;
33 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
34
35
36 # default cursor class, overridable in connect_info attributes
37 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
38
39 __PACKAGE__->mk_group_accessors('inherited' => qw/
40   sql_maker_class
41   _supports_insert_returning
42 /);
43 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
44
45
46 # Each of these methods need _determine_driver called before itself
47 # in order to function reliably. This is a purely DRY optimization
48 my @rdbms_specific_methods = qw/
49   deployment_statements
50   sqlt_type
51   build_datetime_parser
52   datetime_parser_type
53
54   insert
55   insert_bulk
56   update
57   delete
58   select
59   select_single
60 /;
61
62 for my $meth (@rdbms_specific_methods) {
63
64   my $orig = __PACKAGE__->can ($meth)
65     or next;
66
67   no strict qw/refs/;
68   no warnings qw/redefine/;
69   *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
70     if (not $_[0]->_driver_determined) {
71       $_[0]->_determine_driver;
72       goto $_[0]->can($meth);
73     }
74     $orig->(@_);
75   };
76 }
77
78
79 =head1 NAME
80
81 DBIx::Class::Storage::DBI - DBI storage handler
82
83 =head1 SYNOPSIS
84
85   my $schema = MySchema->connect('dbi:SQLite:my.db');
86
87   $schema->storage->debug(1);
88
89   my @stuff = $schema->storage->dbh_do(
90     sub {
91       my ($storage, $dbh, @args) = @_;
92       $dbh->do("DROP TABLE authors");
93     },
94     @column_list
95   );
96
97   $schema->resultset('Book')->search({
98      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
99   });
100
101 =head1 DESCRIPTION
102
103 This class represents the connection to an RDBMS via L<DBI>.  See
104 L<DBIx::Class::Storage> for general information.  This pod only
105 documents DBI-specific methods and behaviors.
106
107 =head1 METHODS
108
109 =cut
110
111 sub new {
112   my $new = shift->next::method(@_);
113
114   $new->transaction_depth(0);
115   $new->_sql_maker_opts({});
116   $new->{savepoints} = [];
117   $new->{_in_dbh_do} = 0;
118   $new->{_dbh_gen} = 0;
119
120   $new;
121 }
122
123 =head2 connect_info
124
125 This method is normally called by L<DBIx::Class::Schema/connection>, which
126 encapsulates its argument list in an arrayref before passing them here.
127
128 The argument list may contain:
129
130 =over
131
132 =item *
133
134 The same 4-element argument set one would normally pass to
135 L<DBI/connect>, optionally followed by
136 L<extra attributes|/DBIx::Class specific connection attributes>
137 recognized by DBIx::Class:
138
139   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
140
141 =item *
142
143 A single code reference which returns a connected
144 L<DBI database handle|DBI/connect> optionally followed by
145 L<extra attributes|/DBIx::Class specific connection attributes> recognized
146 by DBIx::Class:
147
148   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
149
150 =item *
151
152 A single hashref with all the attributes and the dsn/user/password
153 mixed together:
154
155   $connect_info_args = [{
156     dsn => $dsn,
157     user => $user,
158     password => $pass,
159     %dbi_attributes,
160     %extra_attributes,
161   }];
162
163   $connect_info_args = [{
164     dbh_maker => sub { DBI->connect (...) },
165     %dbi_attributes,
166     %extra_attributes,
167   }];
168
169 This is particularly useful for L<Catalyst> based applications, allowing the
170 following config (L<Config::General> style):
171
172   <Model::DB>
173     schema_class   App::DB
174     <connect_info>
175       dsn          dbi:mysql:database=test
176       user         testuser
177       password     TestPass
178       AutoCommit   1
179     </connect_info>
180   </Model::DB>
181
182 The C<dsn>/C<user>/C<password> combination can be substituted by the
183 C<dbh_maker> key whose value is a coderef that returns a connected
184 L<DBI database handle|DBI/connect>
185
186 =back
187
188 Please note that the L<DBI> docs recommend that you always explicitly
189 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
190 recommends that it be set to I<1>, and that you perform transactions
191 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
192 to I<1> if you do not do explicitly set it to zero.  This is the default
193 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
194
195 =head3 DBIx::Class specific connection attributes
196
197 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
198 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
199 the following connection options. These options can be mixed in with your other
200 L<DBI> connection attributes, or placed in a separate hashref
201 (C<\%extra_attributes>) as shown above.
202
203 Every time C<connect_info> is invoked, any previous settings for
204 these options will be cleared before setting the new ones, regardless of
205 whether any options are specified in the new C<connect_info>.
206
207
208 =over
209
210 =item on_connect_do
211
212 Specifies things to do immediately after connecting or re-connecting to
213 the database.  Its value may contain:
214
215 =over
216
217 =item a scalar
218
219 This contains one SQL statement to execute.
220
221 =item an array reference
222
223 This contains SQL statements to execute in order.  Each element contains
224 a string or a code reference that returns a string.
225
226 =item a code reference
227
228 This contains some code to execute.  Unlike code references within an
229 array reference, its return value is ignored.
230
231 =back
232
233 =item on_disconnect_do
234
235 Takes arguments in the same form as L</on_connect_do> and executes them
236 immediately before disconnecting from the database.
237
238 Note, this only runs if you explicitly call L</disconnect> on the
239 storage object.
240
241 =item on_connect_call
242
243 A more generalized form of L</on_connect_do> that calls the specified
244 C<connect_call_METHOD> methods in your storage driver.
245
246   on_connect_do => 'select 1'
247
248 is equivalent to:
249
250   on_connect_call => [ [ do_sql => 'select 1' ] ]
251
252 Its values may contain:
253
254 =over
255
256 =item a scalar
257
258 Will call the C<connect_call_METHOD> method.
259
260 =item a code reference
261
262 Will execute C<< $code->($storage) >>
263
264 =item an array reference
265
266 Each value can be a method name or code reference.
267
268 =item an array of arrays
269
270 For each array, the first item is taken to be the C<connect_call_> method name
271 or code reference, and the rest are parameters to it.
272
273 =back
274
275 Some predefined storage methods you may use:
276
277 =over
278
279 =item do_sql
280
281 Executes a SQL string or a code reference that returns a SQL string. This is
282 what L</on_connect_do> and L</on_disconnect_do> use.
283
284 It can take:
285
286 =over
287
288 =item a scalar
289
290 Will execute the scalar as SQL.
291
292 =item an arrayref
293
294 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
295 attributes hashref and bind values.
296
297 =item a code reference
298
299 Will execute C<< $code->($storage) >> and execute the return array refs as
300 above.
301
302 =back
303
304 =item datetime_setup
305
306 Execute any statements necessary to initialize the database session to return
307 and accept datetime/timestamp values used with
308 L<DBIx::Class::InflateColumn::DateTime>.
309
310 Only necessary for some databases, see your specific storage driver for
311 implementation details.
312
313 =back
314
315 =item on_disconnect_call
316
317 Takes arguments in the same form as L</on_connect_call> and executes them
318 immediately before disconnecting from the database.
319
320 Calls the C<disconnect_call_METHOD> methods as opposed to the
321 C<connect_call_METHOD> methods called by L</on_connect_call>.
322
323 Note, this only runs if you explicitly call L</disconnect> on the
324 storage object.
325
326 =item disable_sth_caching
327
328 If set to a true value, this option will disable the caching of
329 statement handles via L<DBI/prepare_cached>.
330
331 =item limit_dialect
332
333 Sets the limit dialect. This is useful for JDBC-bridge among others
334 where the remote SQL-dialect cannot be determined by the name of the
335 driver alone. See also L<SQL::Abstract::Limit>.
336
337 =item quote_char
338
339 Specifies what characters to use to quote table and column names. If
340 you use this you will want to specify L</name_sep> as well.
341
342 C<quote_char> expects either a single character, in which case is it
343 is placed on either side of the table/column name, or an arrayref of length
344 2 in which case the table/column name is placed between the elements.
345
346 For example under MySQL you should use C<< quote_char => '`' >>, and for
347 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
348
349 =item name_sep
350
351 This only needs to be used in conjunction with C<quote_char>, and is used to
352 specify the character that separates elements (schemas, tables, columns) from
353 each other. In most cases this is simply a C<.>.
354
355 The consequences of not supplying this value is that L<SQL::Abstract>
356 will assume DBIx::Class' uses of aliases to be complete column
357 names. The output will look like I<"me.name"> when it should actually
358 be I<"me"."name">.
359
360 =item unsafe
361
362 This Storage driver normally installs its own C<HandleError>, sets
363 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
364 all database handles, including those supplied by a coderef.  It does this
365 so that it can have consistent and useful error behavior.
366
367 If you set this option to a true value, Storage will not do its usual
368 modifications to the database handle's attributes, and instead relies on
369 the settings in your connect_info DBI options (or the values you set in
370 your connection coderef, in the case that you are connecting via coderef).
371
372 Note that your custom settings can cause Storage to malfunction,
373 especially if you set a C<HandleError> handler that suppresses exceptions
374 and/or disable C<RaiseError>.
375
376 =item auto_savepoint
377
378 If this option is true, L<DBIx::Class> will use savepoints when nesting
379 transactions, making it possible to recover from failure in the inner
380 transaction without having to abort all outer transactions.
381
382 =item cursor_class
383
384 Use this argument to supply a cursor class other than the default
385 L<DBIx::Class::Storage::DBI::Cursor>.
386
387 =back
388
389 Some real-life examples of arguments to L</connect_info> and
390 L<DBIx::Class::Schema/connect>
391
392   # Simple SQLite connection
393   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
394
395   # Connect via subref
396   ->connect_info([ sub { DBI->connect(...) } ]);
397
398   # Connect via subref in hashref
399   ->connect_info([{
400     dbh_maker => sub { DBI->connect(...) },
401     on_connect_do => 'alter session ...',
402   }]);
403
404   # A bit more complicated
405   ->connect_info(
406     [
407       'dbi:Pg:dbname=foo',
408       'postgres',
409       'my_pg_password',
410       { AutoCommit => 1 },
411       { quote_char => q{"}, name_sep => q{.} },
412     ]
413   );
414
415   # Equivalent to the previous example
416   ->connect_info(
417     [
418       'dbi:Pg:dbname=foo',
419       'postgres',
420       'my_pg_password',
421       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
422     ]
423   );
424
425   # Same, but with hashref as argument
426   # See parse_connect_info for explanation
427   ->connect_info(
428     [{
429       dsn         => 'dbi:Pg:dbname=foo',
430       user        => 'postgres',
431       password    => 'my_pg_password',
432       AutoCommit  => 1,
433       quote_char  => q{"},
434       name_sep    => q{.},
435     }]
436   );
437
438   # Subref + DBIx::Class-specific connection options
439   ->connect_info(
440     [
441       sub { DBI->connect(...) },
442       {
443           quote_char => q{`},
444           name_sep => q{@},
445           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
446           disable_sth_caching => 1,
447       },
448     ]
449   );
450
451
452
453 =cut
454
455 sub connect_info {
456   my ($self, $info) = @_;
457
458   return $self->_connect_info if !$info;
459
460   $self->_connect_info($info); # copy for _connect_info
461
462   $info = $self->_normalize_connect_info($info)
463     if ref $info eq 'ARRAY';
464
465   for my $storage_opt (keys %{ $info->{storage_options} }) {
466     my $value = $info->{storage_options}{$storage_opt};
467
468     $self->$storage_opt($value);
469   }
470
471   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
472   #  the new set of options
473   $self->_sql_maker(undef);
474   $self->_sql_maker_opts({});
475
476   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
477     my $value = $info->{sql_maker_options}{$sql_maker_opt};
478
479     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
480   }
481
482   my %attrs = (
483     %{ $self->_default_dbi_connect_attributes || {} },
484     %{ $info->{attributes} || {} },
485   );
486
487   my @args = @{ $info->{arguments} };
488
489   $self->_dbi_connect_info([@args,
490     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
491
492   return $self->_connect_info;
493 }
494
495 sub _normalize_connect_info {
496   my ($self, $info_arg) = @_;
497   my %info;
498
499   my @args = @$info_arg;  # take a shallow copy for further mutilation
500
501   # combine/pre-parse arguments depending on invocation style
502
503   my %attrs;
504   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
505     %attrs = %{ $args[1] || {} };
506     @args = $args[0];
507   }
508   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
509     %attrs = %{$args[0]};
510     @args = ();
511     if (my $code = delete $attrs{dbh_maker}) {
512       @args = $code;
513
514       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
515       if (@ignored) {
516         carp sprintf (
517             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
518           . "to the result of 'dbh_maker'",
519
520           join (', ', map { "'$_'" } (@ignored) ),
521         );
522       }
523     }
524     else {
525       @args = delete @attrs{qw/dsn user password/};
526     }
527   }
528   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
529     %attrs = (
530       % { $args[3] || {} },
531       % { $args[4] || {} },
532     );
533     @args = @args[0,1,2];
534   }
535
536   $info{arguments} = \@args;
537
538   my @storage_opts = grep exists $attrs{$_},
539     @storage_options, 'cursor_class';
540
541   @{ $info{storage_options} }{@storage_opts} =
542     delete @attrs{@storage_opts} if @storage_opts;
543
544   my @sql_maker_opts = grep exists $attrs{$_},
545     qw/limit_dialect quote_char name_sep/;
546
547   @{ $info{sql_maker_options} }{@sql_maker_opts} =
548     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
549
550   $info{attributes} = \%attrs if %attrs;
551
552   return \%info;
553 }
554
555 sub _default_dbi_connect_attributes {
556   return {
557     AutoCommit => 1,
558     RaiseError => 1,
559     PrintError => 0,
560   };
561 }
562
563 =head2 on_connect_do
564
565 This method is deprecated in favour of setting via L</connect_info>.
566
567 =cut
568
569 =head2 on_disconnect_do
570
571 This method is deprecated in favour of setting via L</connect_info>.
572
573 =cut
574
575 sub _parse_connect_do {
576   my ($self, $type) = @_;
577
578   my $val = $self->$type;
579   return () if not defined $val;
580
581   my @res;
582
583   if (not ref($val)) {
584     push @res, [ 'do_sql', $val ];
585   } elsif (ref($val) eq 'CODE') {
586     push @res, $val;
587   } elsif (ref($val) eq 'ARRAY') {
588     push @res, map { [ 'do_sql', $_ ] } @$val;
589   } else {
590     $self->throw_exception("Invalid type for $type: ".ref($val));
591   }
592
593   return \@res;
594 }
595
596 =head2 dbh_do
597
598 Arguments: ($subref | $method_name), @extra_coderef_args?
599
600 Execute the given $subref or $method_name using the new exception-based
601 connection management.
602
603 The first two arguments will be the storage object that C<dbh_do> was called
604 on and a database handle to use.  Any additional arguments will be passed
605 verbatim to the called subref as arguments 2 and onwards.
606
607 Using this (instead of $self->_dbh or $self->dbh) ensures correct
608 exception handling and reconnection (or failover in future subclasses).
609
610 Your subref should have no side-effects outside of the database, as
611 there is the potential for your subref to be partially double-executed
612 if the database connection was stale/dysfunctional.
613
614 Example:
615
616   my @stuff = $schema->storage->dbh_do(
617     sub {
618       my ($storage, $dbh, @cols) = @_;
619       my $cols = join(q{, }, @cols);
620       $dbh->selectrow_array("SELECT $cols FROM foo");
621     },
622     @column_list
623   );
624
625 =cut
626
627 sub dbh_do {
628   my $self = shift;
629   my $code = shift;
630
631   my $dbh = $self->_get_dbh;
632
633   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
634       || $self->{transaction_depth};
635
636   local $self->{_in_dbh_do} = 1;
637
638   my @result;
639   my $want_array = wantarray;
640
641   eval {
642
643     if($want_array) {
644         @result = $self->$code($dbh, @_);
645     }
646     elsif(defined $want_array) {
647         $result[0] = $self->$code($dbh, @_);
648     }
649     else {
650         $self->$code($dbh, @_);
651     }
652   };
653
654   # ->connected might unset $@ - copy
655   my $exception = $@;
656   if(!$exception) { return $want_array ? @result : $result[0] }
657
658   $self->throw_exception($exception) if $self->connected;
659
660   # We were not connected - reconnect and retry, but let any
661   #  exception fall right through this time
662   carp "Retrying $code after catching disconnected exception: $exception"
663     if $ENV{DBIC_DBIRETRY_DEBUG};
664   $self->_populate_dbh;
665   $self->$code($self->_dbh, @_);
666 }
667
668 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
669 # It also informs dbh_do to bypass itself while under the direction of txn_do,
670 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
671 sub txn_do {
672   my $self = shift;
673   my $coderef = shift;
674
675   ref $coderef eq 'CODE' or $self->throw_exception
676     ('$coderef must be a CODE reference');
677
678   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
679
680   local $self->{_in_dbh_do} = 1;
681
682   my @result;
683   my $want_array = wantarray;
684
685   my $tried = 0;
686   while(1) {
687     eval {
688       $self->_get_dbh;
689
690       $self->txn_begin;
691       if($want_array) {
692           @result = $coderef->(@_);
693       }
694       elsif(defined $want_array) {
695           $result[0] = $coderef->(@_);
696       }
697       else {
698           $coderef->(@_);
699       }
700       $self->txn_commit;
701     };
702
703     # ->connected might unset $@ - copy
704     my $exception = $@;
705     if(!$exception) { return $want_array ? @result : $result[0] }
706
707     if($tried++ || $self->connected) {
708       eval { $self->txn_rollback };
709       my $rollback_exception = $@;
710       if($rollback_exception) {
711         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
712         $self->throw_exception($exception)  # propagate nested rollback
713           if $rollback_exception =~ /$exception_class/;
714
715         $self->throw_exception(
716           "Transaction aborted: ${exception}. "
717           . "Rollback failed: ${rollback_exception}"
718         );
719       }
720       $self->throw_exception($exception)
721     }
722
723     # We were not connected, and was first try - reconnect and retry
724     # via the while loop
725     carp "Retrying $coderef after catching disconnected exception: $exception"
726       if $ENV{DBIC_DBIRETRY_DEBUG};
727     $self->_populate_dbh;
728   }
729 }
730
731 =head2 disconnect
732
733 Our C<disconnect> method also performs a rollback first if the
734 database is not in C<AutoCommit> mode.
735
736 =cut
737
738 sub disconnect {
739   my ($self) = @_;
740
741   if( $self->_dbh ) {
742     my @actions;
743
744     push @actions, ( $self->on_disconnect_call || () );
745     push @actions, $self->_parse_connect_do ('on_disconnect_do');
746
747     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
748
749     $self->_dbh_rollback unless $self->_dbh_autocommit;
750
751     %{ $self->_dbh->{CachedKids} } = ();
752     $self->_dbh->disconnect;
753     $self->_dbh(undef);
754     $self->{_dbh_gen}++;
755   }
756 }
757
758 =head2 with_deferred_fk_checks
759
760 =over 4
761
762 =item Arguments: C<$coderef>
763
764 =item Return Value: The return value of $coderef
765
766 =back
767
768 Storage specific method to run the code ref with FK checks deferred or
769 in MySQL's case disabled entirely.
770
771 =cut
772
773 # Storage subclasses should override this
774 sub with_deferred_fk_checks {
775   my ($self, $sub) = @_;
776   $sub->();
777 }
778
779 =head2 connected
780
781 =over
782
783 =item Arguments: none
784
785 =item Return Value: 1|0
786
787 =back
788
789 Verifies that the current database handle is active and ready to execute
790 an SQL statement (e.g. the connection did not get stale, server is still
791 answering, etc.) This method is used internally by L</dbh>.
792
793 =cut
794
795 sub connected {
796   my $self = shift;
797   return 0 unless $self->_seems_connected;
798
799   #be on the safe side
800   local $self->_dbh->{RaiseError} = 1;
801
802   return $self->_ping;
803 }
804
805 sub _seems_connected {
806   my $self = shift;
807
808   my $dbh = $self->_dbh
809     or return 0;
810
811   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
812     $self->_dbh(undef);
813     $self->{_dbh_gen}++;
814     return 0;
815   }
816   else {
817     $self->_verify_pid;
818     return 0 if !$self->_dbh;
819   }
820
821   return $dbh->FETCH('Active');
822 }
823
824 sub _ping {
825   my $self = shift;
826
827   my $dbh = $self->_dbh or return 0;
828
829   return $dbh->ping;
830 }
831
832 # handle pid changes correctly
833 #  NOTE: assumes $self->_dbh is a valid $dbh
834 sub _verify_pid {
835   my ($self) = @_;
836
837   return if defined $self->_conn_pid && $self->_conn_pid == $$;
838
839   $self->_dbh->{InactiveDestroy} = 1;
840   $self->_dbh(undef);
841   $self->{_dbh_gen}++;
842
843   return;
844 }
845
846 sub ensure_connected {
847   my ($self) = @_;
848
849   unless ($self->connected) {
850     $self->_populate_dbh;
851   }
852 }
853
854 =head2 dbh
855
856 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
857 is guaranteed to be healthy by implicitly calling L</connected>, and if
858 necessary performing a reconnection before returning. Keep in mind that this
859 is very B<expensive> on some database engines. Consider using L</dbh_do>
860 instead.
861
862 =cut
863
864 sub dbh {
865   my ($self) = @_;
866
867   if (not $self->_dbh) {
868     $self->_populate_dbh;
869   } else {
870     $self->ensure_connected;
871   }
872   return $self->_dbh;
873 }
874
875 # this is the internal "get dbh or connect (don't check)" method
876 sub _get_dbh {
877   my $self = shift;
878   $self->_verify_pid if $self->_dbh;
879   $self->_populate_dbh unless $self->_dbh;
880   return $self->_dbh;
881 }
882
883 sub _sql_maker_args {
884     my ($self) = @_;
885
886     return (
887       bindtype=>'columns',
888       array_datatypes => 1,
889       limit_dialect => $self->_get_dbh,
890       %{$self->_sql_maker_opts}
891     );
892 }
893
894 sub sql_maker {
895   my ($self) = @_;
896   unless ($self->_sql_maker) {
897     my $sql_maker_class = $self->sql_maker_class;
898     $self->ensure_class_loaded ($sql_maker_class);
899     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
900   }
901   return $self->_sql_maker;
902 }
903
904 # nothing to do by default
905 sub _rebless {}
906 sub _init {}
907
908 sub _populate_dbh {
909   my ($self) = @_;
910
911   my @info = @{$self->_dbi_connect_info || []};
912   $self->_dbh(undef); # in case ->connected failed we might get sent here
913   $self->_server_info_hash (undef);
914   $self->_dbh($self->_connect(@info));
915
916   $self->_conn_pid($$);
917   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
918
919   $self->_determine_driver;
920
921   # Always set the transaction depth on connect, since
922   #  there is no transaction in progress by definition
923   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
924
925   $self->_run_connection_actions unless $self->{_in_determine_driver};
926 }
927
928 sub _run_connection_actions {
929   my $self = shift;
930   my @actions;
931
932   push @actions, ( $self->on_connect_call || () );
933   push @actions, $self->_parse_connect_do ('on_connect_do');
934
935   $self->_do_connection_actions(connect_call_ => $_) for @actions;
936 }
937
938 sub _server_info {
939   my $self = shift;
940
941   unless ($self->_server_info_hash) {
942
943     my %info;
944
945     my $server_version = $self->_get_server_version;
946
947     if (defined $server_version) {
948       $info{dbms_version} = $server_version;
949
950       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
951       my @verparts = split (/\./, $numeric_version);
952       if (
953         @verparts
954           &&
955         $verparts[0] <= 999
956       ) {
957         # consider only up to 3 version parts, iff not more than 3 digits
958         my @use_parts;
959         while (@verparts && @use_parts < 3) {
960           my $p = shift @verparts;
961           last if $p > 999;
962           push @use_parts, $p;
963         }
964         push @use_parts, 0 while @use_parts < 3;
965
966         $info{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
967       }
968     }
969
970     $self->_server_info_hash(\%info);
971   }
972
973   return $self->_server_info_hash
974 }
975
976 sub _get_server_version {
977   eval { shift->_get_dbh->get_info(18) };
978 }
979
980 sub _determine_driver {
981   my ($self) = @_;
982
983   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
984     my $started_connected = 0;
985     local $self->{_in_determine_driver} = 1;
986
987     if (ref($self) eq __PACKAGE__) {
988       my $driver;
989       if ($self->_dbh) { # we are connected
990         $driver = $self->_dbh->{Driver}{Name};
991         $started_connected = 1;
992       } else {
993         # if connect_info is a CODEREF, we have no choice but to connect
994         if (ref $self->_dbi_connect_info->[0] &&
995             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
996           $self->_populate_dbh;
997           $driver = $self->_dbh->{Driver}{Name};
998         }
999         else {
1000           # try to use dsn to not require being connected, the driver may still
1001           # force a connection in _rebless to determine version
1002           # (dsn may not be supplied at all if all we do is make a mock-schema)
1003           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1004           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1005           $driver ||= $ENV{DBI_DRIVER};
1006         }
1007       }
1008
1009       if ($driver) {
1010         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1011         if ($self->load_optional_class($storage_class)) {
1012           mro::set_mro($storage_class, 'c3');
1013           bless $self, $storage_class;
1014           $self->_rebless();
1015         }
1016       }
1017     }
1018
1019     $self->_driver_determined(1);
1020
1021     $self->_init; # run driver-specific initializations
1022
1023     $self->_run_connection_actions
1024         if !$started_connected && defined $self->_dbh;
1025   }
1026 }
1027
1028 sub _do_connection_actions {
1029   my $self          = shift;
1030   my $method_prefix = shift;
1031   my $call          = shift;
1032
1033   if (not ref($call)) {
1034     my $method = $method_prefix . $call;
1035     $self->$method(@_);
1036   } elsif (ref($call) eq 'CODE') {
1037     $self->$call(@_);
1038   } elsif (ref($call) eq 'ARRAY') {
1039     if (ref($call->[0]) ne 'ARRAY') {
1040       $self->_do_connection_actions($method_prefix, $_) for @$call;
1041     } else {
1042       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1043     }
1044   } else {
1045     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1046   }
1047
1048   return $self;
1049 }
1050
1051 sub connect_call_do_sql {
1052   my $self = shift;
1053   $self->_do_query(@_);
1054 }
1055
1056 sub disconnect_call_do_sql {
1057   my $self = shift;
1058   $self->_do_query(@_);
1059 }
1060
1061 # override in db-specific backend when necessary
1062 sub connect_call_datetime_setup { 1 }
1063
1064 sub _do_query {
1065   my ($self, $action) = @_;
1066
1067   if (ref $action eq 'CODE') {
1068     $action = $action->($self);
1069     $self->_do_query($_) foreach @$action;
1070   }
1071   else {
1072     # Most debuggers expect ($sql, @bind), so we need to exclude
1073     # the attribute hash which is the second argument to $dbh->do
1074     # furthermore the bind values are usually to be presented
1075     # as named arrayref pairs, so wrap those here too
1076     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1077     my $sql = shift @do_args;
1078     my $attrs = shift @do_args;
1079     my @bind = map { [ undef, $_ ] } @do_args;
1080
1081     $self->_query_start($sql, @bind);
1082     $self->_get_dbh->do($sql, $attrs, @do_args);
1083     $self->_query_end($sql, @bind);
1084   }
1085
1086   return $self;
1087 }
1088
1089 sub _connect {
1090   my ($self, @info) = @_;
1091
1092   $self->throw_exception("You failed to provide any connection info")
1093     if !@info;
1094
1095   my ($old_connect_via, $dbh);
1096
1097   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1098     $old_connect_via = $DBI::connect_via;
1099     $DBI::connect_via = 'connect';
1100   }
1101
1102   eval {
1103     if(ref $info[0] eq 'CODE') {
1104        $dbh = $info[0]->();
1105     }
1106     else {
1107        $dbh = DBI->connect(@info);
1108     }
1109
1110     if($dbh && !$self->unsafe) {
1111       my $weak_self = $self;
1112       Scalar::Util::weaken($weak_self);
1113       $dbh->{HandleError} = sub {
1114           if ($weak_self) {
1115             $weak_self->throw_exception("DBI Exception: $_[0]");
1116           }
1117           else {
1118             # the handler may be invoked by something totally out of
1119             # the scope of DBIC
1120             croak ("DBI Exception: $_[0]");
1121           }
1122       };
1123       $dbh->{ShowErrorStatement} = 1;
1124       $dbh->{RaiseError} = 1;
1125       $dbh->{PrintError} = 0;
1126     }
1127   };
1128
1129   $DBI::connect_via = $old_connect_via if $old_connect_via;
1130
1131   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1132     if !$dbh || $@;
1133
1134   $self->_dbh_autocommit($dbh->{AutoCommit});
1135
1136   $dbh;
1137 }
1138
1139 sub svp_begin {
1140   my ($self, $name) = @_;
1141
1142   $name = $self->_svp_generate_name
1143     unless defined $name;
1144
1145   $self->throw_exception ("You can't use savepoints outside a transaction")
1146     if $self->{transaction_depth} == 0;
1147
1148   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1149     unless $self->can('_svp_begin');
1150
1151   push @{ $self->{savepoints} }, $name;
1152
1153   $self->debugobj->svp_begin($name) if $self->debug;
1154
1155   return $self->_svp_begin($name);
1156 }
1157
1158 sub svp_release {
1159   my ($self, $name) = @_;
1160
1161   $self->throw_exception ("You can't use savepoints outside a transaction")
1162     if $self->{transaction_depth} == 0;
1163
1164   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1165     unless $self->can('_svp_release');
1166
1167   if (defined $name) {
1168     $self->throw_exception ("Savepoint '$name' does not exist")
1169       unless grep { $_ eq $name } @{ $self->{savepoints} };
1170
1171     # Dig through the stack until we find the one we are releasing.  This keeps
1172     # the stack up to date.
1173     my $svp;
1174
1175     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1176   } else {
1177     $name = pop @{ $self->{savepoints} };
1178   }
1179
1180   $self->debugobj->svp_release($name) if $self->debug;
1181
1182   return $self->_svp_release($name);
1183 }
1184
1185 sub svp_rollback {
1186   my ($self, $name) = @_;
1187
1188   $self->throw_exception ("You can't use savepoints outside a transaction")
1189     if $self->{transaction_depth} == 0;
1190
1191   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1192     unless $self->can('_svp_rollback');
1193
1194   if (defined $name) {
1195       # If they passed us a name, verify that it exists in the stack
1196       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1197           $self->throw_exception("Savepoint '$name' does not exist!");
1198       }
1199
1200       # Dig through the stack until we find the one we are releasing.  This keeps
1201       # the stack up to date.
1202       while(my $s = pop(@{ $self->{savepoints} })) {
1203           last if($s eq $name);
1204       }
1205       # Add the savepoint back to the stack, as a rollback doesn't remove the
1206       # named savepoint, only everything after it.
1207       push(@{ $self->{savepoints} }, $name);
1208   } else {
1209       # We'll assume they want to rollback to the last savepoint
1210       $name = $self->{savepoints}->[-1];
1211   }
1212
1213   $self->debugobj->svp_rollback($name) if $self->debug;
1214
1215   return $self->_svp_rollback($name);
1216 }
1217
1218 sub _svp_generate_name {
1219     my ($self) = @_;
1220
1221     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1222 }
1223
1224 sub txn_begin {
1225   my $self = shift;
1226
1227   # this means we have not yet connected and do not know the AC status
1228   # (e.g. coderef $dbh)
1229   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1230
1231   if($self->{transaction_depth} == 0) {
1232     $self->debugobj->txn_begin()
1233       if $self->debug;
1234     $self->_dbh_begin_work;
1235   }
1236   elsif ($self->auto_savepoint) {
1237     $self->svp_begin;
1238   }
1239   $self->{transaction_depth}++;
1240 }
1241
1242 sub _dbh_begin_work {
1243   my $self = shift;
1244
1245   # if the user is utilizing txn_do - good for him, otherwise we need to
1246   # ensure that the $dbh is healthy on BEGIN.
1247   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1248   # will be replaced by a failure of begin_work itself (which will be
1249   # then retried on reconnect)
1250   if ($self->{_in_dbh_do}) {
1251     $self->_dbh->begin_work;
1252   } else {
1253     $self->dbh_do(sub { $_[1]->begin_work });
1254   }
1255 }
1256
1257 sub txn_commit {
1258   my $self = shift;
1259   if ($self->{transaction_depth} == 1) {
1260     $self->debugobj->txn_commit()
1261       if ($self->debug);
1262     $self->_dbh_commit;
1263     $self->{transaction_depth} = 0
1264       if $self->_dbh_autocommit;
1265   }
1266   elsif($self->{transaction_depth} > 1) {
1267     $self->{transaction_depth}--;
1268     $self->svp_release
1269       if $self->auto_savepoint;
1270   }
1271 }
1272
1273 sub _dbh_commit {
1274   my $self = shift;
1275   my $dbh  = $self->_dbh
1276     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1277   $dbh->commit;
1278 }
1279
1280 sub txn_rollback {
1281   my $self = shift;
1282   my $dbh = $self->_dbh;
1283   eval {
1284     if ($self->{transaction_depth} == 1) {
1285       $self->debugobj->txn_rollback()
1286         if ($self->debug);
1287       $self->{transaction_depth} = 0
1288         if $self->_dbh_autocommit;
1289       $self->_dbh_rollback;
1290     }
1291     elsif($self->{transaction_depth} > 1) {
1292       $self->{transaction_depth}--;
1293       if ($self->auto_savepoint) {
1294         $self->svp_rollback;
1295         $self->svp_release;
1296       }
1297     }
1298     else {
1299       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1300     }
1301   };
1302   if ($@) {
1303     my $error = $@;
1304     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1305     $error =~ /$exception_class/ and $self->throw_exception($error);
1306     # ensure that a failed rollback resets the transaction depth
1307     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1308     $self->throw_exception($error);
1309   }
1310 }
1311
1312 sub _dbh_rollback {
1313   my $self = shift;
1314   my $dbh  = $self->_dbh
1315     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1316   $dbh->rollback;
1317 }
1318
1319 # This used to be the top-half of _execute.  It was split out to make it
1320 #  easier to override in NoBindVars without duping the rest.  It takes up
1321 #  all of _execute's args, and emits $sql, @bind.
1322 sub _prep_for_execute {
1323   my ($self, $op, $extra_bind, $ident, $args) = @_;
1324
1325   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1326     $ident = $ident->from();
1327   }
1328
1329   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1330
1331   unshift(@bind,
1332     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1333       if $extra_bind;
1334   return ($sql, \@bind);
1335 }
1336
1337
1338 sub _fix_bind_params {
1339     my ($self, @bind) = @_;
1340
1341     ### Turn @bind from something like this:
1342     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1343     ### to this:
1344     ###   ( "'1'", "'1'", "'3'" )
1345     return
1346         map {
1347             if ( defined( $_ && $_->[1] ) ) {
1348                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1349             }
1350             else { q{'NULL'}; }
1351         } @bind;
1352 }
1353
1354 sub _query_start {
1355     my ( $self, $sql, @bind ) = @_;
1356
1357     if ( $self->debug ) {
1358         @bind = $self->_fix_bind_params(@bind);
1359
1360         $self->debugobj->query_start( $sql, @bind );
1361     }
1362 }
1363
1364 sub _query_end {
1365     my ( $self, $sql, @bind ) = @_;
1366
1367     if ( $self->debug ) {
1368         @bind = $self->_fix_bind_params(@bind);
1369         $self->debugobj->query_end( $sql, @bind );
1370     }
1371 }
1372
1373 sub _dbh_execute {
1374   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1375
1376   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1377
1378   $self->_query_start( $sql, @$bind );
1379
1380   my $sth = $self->sth($sql,$op);
1381
1382   my $placeholder_index = 1;
1383
1384   foreach my $bound (@$bind) {
1385     my $attributes = {};
1386     my($column_name, @data) = @$bound;
1387
1388     if ($bind_attributes) {
1389       $attributes = $bind_attributes->{$column_name}
1390       if defined $bind_attributes->{$column_name};
1391     }
1392
1393     foreach my $data (@data) {
1394       my $ref = ref $data;
1395       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1396
1397       $sth->bind_param($placeholder_index, $data, $attributes);
1398       $placeholder_index++;
1399     }
1400   }
1401
1402   # Can this fail without throwing an exception anyways???
1403   my $rv = $sth->execute();
1404   $self->throw_exception($sth->errstr) if !$rv;
1405
1406   $self->_query_end( $sql, @$bind );
1407
1408   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1409 }
1410
1411 sub _execute {
1412     my $self = shift;
1413     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1414 }
1415
1416 sub _prefetch_insert_auto_nextvals {
1417   my ($self, $source, $to_insert) = @_;
1418
1419   my $upd = {};
1420
1421   foreach my $col ( $source->columns ) {
1422     if ( !defined $to_insert->{$col} ) {
1423       my $col_info = $source->column_info($col);
1424
1425       if ( $col_info->{auto_nextval} ) {
1426         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1427           'nextval',
1428           $col_info->{sequence} ||=
1429             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1430         );
1431       }
1432     }
1433   }
1434
1435   return $upd;
1436 }
1437
1438 sub insert {
1439   my $self = shift;
1440   my ($source, $to_insert, $opts) = @_;
1441
1442   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1443
1444   my $bind_attributes = $self->source_bind_attributes($source);
1445
1446   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1447
1448   if ($opts->{returning}) {
1449     my @ret_cols = @{$opts->{returning}};
1450
1451     my @ret_vals = eval {
1452       local $SIG{__WARN__} = sub {};
1453       my @r = $sth->fetchrow_array;
1454       $sth->finish;
1455       @r;
1456     };
1457
1458     my %ret;
1459     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1460
1461     $updated_cols = {
1462       %$updated_cols,
1463       %ret,
1464     };
1465   }
1466
1467   return $updated_cols;
1468 }
1469
1470 ## Currently it is assumed that all values passed will be "normal", i.e. not
1471 ## scalar refs, or at least, all the same type as the first set, the statement is
1472 ## only prepped once.
1473 sub insert_bulk {
1474   my ($self, $source, $cols, $data) = @_;
1475
1476   my %colvalues;
1477   @colvalues{@$cols} = (0..$#$cols);
1478
1479   for my $i (0..$#$cols) {
1480     my $first_val = $data->[0][$i];
1481     next unless ref $first_val eq 'SCALAR';
1482
1483     $colvalues{ $cols->[$i] } = $first_val;
1484   }
1485
1486   # check for bad data and stringify stringifiable objects
1487   my $bad_slice = sub {
1488     my ($msg, $col_idx, $slice_idx) = @_;
1489     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1490       $msg,
1491       $cols->[$col_idx],
1492       do {
1493         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1494         Data::Dumper::Concise::Dumper({
1495           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1496         }),
1497       }
1498     );
1499   };
1500
1501   for my $datum_idx (0..$#$data) {
1502     my $datum = $data->[$datum_idx];
1503
1504     for my $col_idx (0..$#$cols) {
1505       my $val            = $datum->[$col_idx];
1506       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1507       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1508
1509       if ($is_literal_sql) {
1510         if (not ref $val) {
1511           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1512         }
1513         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1514           $bad_slice->("$reftype reference found where literal SQL expected",
1515             $col_idx, $datum_idx);
1516         }
1517         elsif ($$val ne $$sqla_bind){
1518           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1519             $col_idx, $datum_idx);
1520         }
1521       }
1522       elsif (my $reftype = ref $val) {
1523         require overload;
1524         if (overload::Method($val, '""')) {
1525           $datum->[$col_idx] = "".$val;
1526         }
1527         else {
1528           $bad_slice->("$reftype reference found where bind expected",
1529             $col_idx, $datum_idx);
1530         }
1531       }
1532     }
1533   }
1534
1535   my ($sql, $bind) = $self->_prep_for_execute (
1536     'insert', undef, $source, [\%colvalues]
1537   );
1538   my @bind = @$bind;
1539
1540   my $empty_bind = 1 if (not @bind) &&
1541     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1542
1543   if ((not @bind) && (not $empty_bind)) {
1544     $self->throw_exception(
1545       'Cannot insert_bulk without support for placeholders'
1546     );
1547   }
1548
1549   # neither _execute_array, nor _execute_inserts_with_no_binds are
1550   # atomic (even if _execute _array is a single call). Thus a safety
1551   # scope guard
1552   my $guard = $self->txn_scope_guard;
1553
1554   $self->_query_start( $sql, ['__BULK__'] );
1555   my $sth = $self->sth($sql);
1556   my $rv = do {
1557     if ($empty_bind) {
1558       # bind_param_array doesn't work if there are no binds
1559       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1560     }
1561     else {
1562 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1563       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1564     }
1565   };
1566
1567   $self->_query_end( $sql, ['__BULK__'] );
1568
1569   $guard->commit;
1570
1571   return (wantarray ? ($rv, $sth, @bind) : $rv);
1572 }
1573
1574 sub _execute_array {
1575   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1576
1577   ## This must be an arrayref, else nothing works!
1578   my $tuple_status = [];
1579
1580   ## Get the bind_attributes, if any exist
1581   my $bind_attributes = $self->source_bind_attributes($source);
1582
1583   ## Bind the values and execute
1584   my $placeholder_index = 1;
1585
1586   foreach my $bound (@$bind) {
1587
1588     my $attributes = {};
1589     my ($column_name, $data_index) = @$bound;
1590
1591     if( $bind_attributes ) {
1592       $attributes = $bind_attributes->{$column_name}
1593       if defined $bind_attributes->{$column_name};
1594     }
1595
1596     my @data = map { $_->[$data_index] } @$data;
1597
1598     $sth->bind_param_array(
1599       $placeholder_index,
1600       [@data],
1601       (%$attributes ?  $attributes : ()),
1602     );
1603     $placeholder_index++;
1604   }
1605
1606   my $rv = eval {
1607     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1608   };
1609   my $err = $@ || $sth->errstr;
1610
1611 # Statement must finish even if there was an exception.
1612   eval { $sth->finish };
1613   $err = $@ unless $err;
1614
1615   if ($err) {
1616     my $i = 0;
1617     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1618
1619     $self->throw_exception("Unexpected populate error: $err")
1620       if ($i > $#$tuple_status);
1621
1622     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1623       ($tuple_status->[$i][1] || $err),
1624       Data::Dumper::Concise::Dumper({
1625         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1626       }),
1627     );
1628   }
1629   return $rv;
1630 }
1631
1632 sub _dbh_execute_array {
1633     my ($self, $sth, $tuple_status, @extra) = @_;
1634
1635     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1636 }
1637
1638 sub _dbh_execute_inserts_with_no_binds {
1639   my ($self, $sth, $count) = @_;
1640
1641   eval {
1642     my $dbh = $self->_get_dbh;
1643     local $dbh->{RaiseError} = 1;
1644     local $dbh->{PrintError} = 0;
1645
1646     $sth->execute foreach 1..$count;
1647   };
1648   my $exception = $@;
1649
1650 # Make sure statement is finished even if there was an exception.
1651   eval { $sth->finish };
1652   $exception = $@ unless $exception;
1653
1654   $self->throw_exception($exception) if $exception;
1655
1656   return $count;
1657 }
1658
1659 sub update {
1660   my ($self, $source, @args) = @_;
1661
1662   my $bind_attrs = $self->source_bind_attributes($source);
1663
1664   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1665 }
1666
1667
1668 sub delete {
1669   my ($self, $source, @args) = @_;
1670
1671   my $bind_attrs = $self->source_bind_attributes($source);
1672
1673   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1674 }
1675
1676 # We were sent here because the $rs contains a complex search
1677 # which will require a subquery to select the correct rows
1678 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1679 #
1680 # Generating a single PK column subquery is trivial and supported
1681 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1682 # Look at _multipk_update_delete()
1683 sub _subq_update_delete {
1684   my $self = shift;
1685   my ($rs, $op, $values) = @_;
1686
1687   my $rsrc = $rs->result_source;
1688
1689   # quick check if we got a sane rs on our hands
1690   my @pcols = $rsrc->_pri_cols;
1691
1692   my $sel = $rs->_resolved_attrs->{select};
1693   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1694
1695   if (
1696       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1697         ne
1698       join ("\x00", sort @$sel )
1699   ) {
1700     $self->throw_exception (
1701       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1702     );
1703   }
1704
1705   if (@pcols == 1) {
1706     return $self->$op (
1707       $rsrc,
1708       $op eq 'update' ? $values : (),
1709       { $pcols[0] => { -in => $rs->as_query } },
1710     );
1711   }
1712
1713   else {
1714     return $self->_multipk_update_delete (@_);
1715   }
1716 }
1717
1718 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1719 # resultset update/delete involving subqueries. So by default resort
1720 # to simple (and inefficient) delete_all style per-row opearations,
1721 # while allowing specific storages to override this with a faster
1722 # implementation.
1723 #
1724 sub _multipk_update_delete {
1725   return shift->_per_row_update_delete (@_);
1726 }
1727
1728 # This is the default loop used to delete/update rows for multi PK
1729 # resultsets, and used by mysql exclusively (because it can't do anything
1730 # else).
1731 #
1732 # We do not use $row->$op style queries, because resultset update/delete
1733 # is not expected to cascade (this is what delete_all/update_all is for).
1734 #
1735 # There should be no race conditions as the entire operation is rolled
1736 # in a transaction.
1737 #
1738 sub _per_row_update_delete {
1739   my $self = shift;
1740   my ($rs, $op, $values) = @_;
1741
1742   my $rsrc = $rs->result_source;
1743   my @pcols = $rsrc->_pri_cols;
1744
1745   my $guard = $self->txn_scope_guard;
1746
1747   # emulate the return value of $sth->execute for non-selects
1748   my $row_cnt = '0E0';
1749
1750   my $subrs_cur = $rs->cursor;
1751   my @all_pk = $subrs_cur->all;
1752   for my $pks ( @all_pk) {
1753
1754     my $cond;
1755     for my $i (0.. $#pcols) {
1756       $cond->{$pcols[$i]} = $pks->[$i];
1757     }
1758
1759     $self->$op (
1760       $rsrc,
1761       $op eq 'update' ? $values : (),
1762       $cond,
1763     );
1764
1765     $row_cnt++;
1766   }
1767
1768   $guard->commit;
1769
1770   return $row_cnt;
1771 }
1772
1773 sub _select {
1774   my $self = shift;
1775
1776   # localization is neccessary as
1777   # 1) there is no infrastructure to pass this around before SQLA2
1778   # 2) _select_args sets it and _prep_for_execute consumes it
1779   my $sql_maker = $self->sql_maker;
1780   local $sql_maker->{_dbic_rs_attrs};
1781
1782   return $self->_execute($self->_select_args(@_));
1783 }
1784
1785 sub _select_args_to_query {
1786   my $self = shift;
1787
1788   # localization is neccessary as
1789   # 1) there is no infrastructure to pass this around before SQLA2
1790   # 2) _select_args sets it and _prep_for_execute consumes it
1791   my $sql_maker = $self->sql_maker;
1792   local $sql_maker->{_dbic_rs_attrs};
1793
1794   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1795   #  = $self->_select_args($ident, $select, $cond, $attrs);
1796   my ($op, $bind, $ident, $bind_attrs, @args) =
1797     $self->_select_args(@_);
1798
1799   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1800   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1801   $prepared_bind ||= [];
1802
1803   return wantarray
1804     ? ($sql, $prepared_bind, $bind_attrs)
1805     : \[ "($sql)", @$prepared_bind ]
1806   ;
1807 }
1808
1809 sub _select_args {
1810   my ($self, $ident, $select, $where, $attrs) = @_;
1811
1812   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1813
1814   my $sql_maker = $self->sql_maker;
1815   $sql_maker->{_dbic_rs_attrs} = {
1816     %$attrs,
1817     select => $select,
1818     from => $ident,
1819     where => $where,
1820     $rs_alias && $alias2source->{$rs_alias}
1821       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1822       : ()
1823     ,
1824   };
1825
1826   # calculate bind_attrs before possible $ident mangling
1827   my $bind_attrs = {};
1828   for my $alias (keys %$alias2source) {
1829     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1830     for my $col (keys %$bindtypes) {
1831
1832       my $fqcn = join ('.', $alias, $col);
1833       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1834
1835       # Unqialified column names are nice, but at the same time can be
1836       # rather ambiguous. What we do here is basically go along with
1837       # the loop, adding an unqualified column slot to $bind_attrs,
1838       # alongside the fully qualified name. As soon as we encounter
1839       # another column by that name (which would imply another table)
1840       # we unset the unqualified slot and never add any info to it
1841       # to avoid erroneous type binding. If this happens the users
1842       # only choice will be to fully qualify his column name
1843
1844       if (exists $bind_attrs->{$col}) {
1845         $bind_attrs->{$col} = {};
1846       }
1847       else {
1848         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1849       }
1850     }
1851   }
1852
1853   # adjust limits
1854   if (
1855     $attrs->{software_limit}
1856       ||
1857     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1858   ) {
1859     $attrs->{software_limit} = 1;
1860   }
1861   else {
1862     $self->throw_exception("rows attribute must be positive if present")
1863       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1864
1865     # MySQL actually recommends this approach.  I cringe.
1866     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1867   }
1868
1869   my @limit;
1870
1871   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
1872   # storage, unless software limit was requested
1873   if (
1874     #limited has_many
1875     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1876        ||
1877     # limited prefetch with RNO subqueries
1878     (
1879       $attrs->{rows}
1880         &&
1881       $sql_maker->limit_dialect eq 'RowNumberOver'
1882         &&
1883       $attrs->{_prefetch_select}
1884         &&
1885       @{$attrs->{_prefetch_select}}
1886     )
1887       ||
1888     # grouped prefetch
1889     ( $attrs->{group_by}
1890         &&
1891       @{$attrs->{group_by}}
1892         &&
1893       $attrs->{_prefetch_select}
1894         &&
1895       @{$attrs->{_prefetch_select}}
1896     )
1897   ) {
1898     ($ident, $select, $where, $attrs)
1899       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1900   }
1901
1902   elsif (
1903     ($attrs->{rows} || $attrs->{offset})
1904       &&
1905     $sql_maker->limit_dialect eq 'RowNumberOver'
1906       &&
1907     (ref $ident eq 'ARRAY' && @$ident > 1)  # indicates a join
1908       &&
1909     scalar $self->_parse_order_by ($attrs->{order_by})
1910   ) {
1911     # the RNO limit dialect above mangles the SQL such that the join gets lost
1912     # wrap a subquery here
1913
1914     push @limit, delete @{$attrs}{qw/rows offset/};
1915
1916     my $subq = $self->_select_args_to_query (
1917       $ident,
1918       $select,
1919       $where,
1920       $attrs,
1921     );
1922
1923     $ident = {
1924       -alias => $attrs->{alias},
1925       -source_handle => $ident->[0]{-source_handle},
1926       $attrs->{alias} => $subq,
1927     };
1928
1929     # all part of the subquery now
1930     delete @{$attrs}{qw/order_by group_by having/};
1931     $where = undef;
1932   }
1933
1934   elsif (! $attrs->{software_limit} ) {
1935     push @limit, $attrs->{rows}, $attrs->{offset};
1936   }
1937
1938   # try to simplify the joinmap further (prune unreferenced type-single joins)
1939   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
1940
1941 ###
1942   # This would be the point to deflate anything found in $where
1943   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1944   # expect a row object. And all we have is a resultsource (it is trivial
1945   # to extract deflator coderefs via $alias2source above).
1946   #
1947   # I don't see a way forward other than changing the way deflators are
1948   # invoked, and that's just bad...
1949 ###
1950
1951   my $order = { map
1952     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1953     (qw/order_by group_by having/ )
1954   };
1955
1956   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1957 }
1958
1959 # Returns a counting SELECT for a simple count
1960 # query. Abstracted so that a storage could override
1961 # this to { count => 'firstcol' } or whatever makes
1962 # sense as a performance optimization
1963 sub _count_select {
1964   #my ($self, $source, $rs_attrs) = @_;
1965   return { count => '*' };
1966 }
1967
1968 # Returns a SELECT which will end up in the subselect
1969 # There may or may not be a group_by, as the subquery
1970 # might have been called to accomodate a limit
1971 #
1972 # Most databases would be happy with whatever ends up
1973 # here, but some choke in various ways.
1974 #
1975 sub _subq_count_select {
1976   my ($self, $source, $rs_attrs) = @_;
1977
1978   if (my $groupby = $rs_attrs->{group_by}) {
1979
1980     my $avail_columns = $self->_resolve_column_info ($rs_attrs->{from});
1981
1982     my $sel_index;
1983     for my $sel (@{$rs_attrs->{select}}) {
1984       if (ref $sel eq 'HASH' and $sel->{-as}) {
1985         $sel_index->{$sel->{-as}} = $sel;
1986       }
1987     }
1988
1989     my @selection;
1990     for my $g_part (@$groupby) {
1991       if (ref $g_part or $avail_columns->{$g_part}) {
1992         push @selection, $g_part;
1993       }
1994       elsif ($sel_index->{$g_part}) {
1995         push @selection, $sel_index->{$g_part};
1996       }
1997       else {
1998         $self->throw_exception ("group_by criteria '$g_part' not contained within current resultset source(s)");
1999       }
2000     }
2001
2002     return \@selection;
2003   }
2004
2005   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
2006   return @pcols ? \@pcols : [ 1 ];
2007 }
2008
2009 sub source_bind_attributes {
2010   my ($self, $source) = @_;
2011
2012   my $bind_attributes;
2013   foreach my $column ($source->columns) {
2014
2015     my $data_type = $source->column_info($column)->{data_type} || '';
2016     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2017      if $data_type;
2018   }
2019
2020   return $bind_attributes;
2021 }
2022
2023 =head2 select
2024
2025 =over 4
2026
2027 =item Arguments: $ident, $select, $condition, $attrs
2028
2029 =back
2030
2031 Handle a SQL select statement.
2032
2033 =cut
2034
2035 sub select {
2036   my $self = shift;
2037   my ($ident, $select, $condition, $attrs) = @_;
2038   return $self->cursor_class->new($self, \@_, $attrs);
2039 }
2040
2041 sub select_single {
2042   my $self = shift;
2043   my ($rv, $sth, @bind) = $self->_select(@_);
2044   my @row = $sth->fetchrow_array;
2045   my @nextrow = $sth->fetchrow_array if @row;
2046   if(@row && @nextrow) {
2047     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2048   }
2049   # Need to call finish() to work round broken DBDs
2050   $sth->finish();
2051   return @row;
2052 }
2053
2054 =head2 sth
2055
2056 =over 4
2057
2058 =item Arguments: $sql
2059
2060 =back
2061
2062 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2063
2064 =cut
2065
2066 sub _dbh_sth {
2067   my ($self, $dbh, $sql) = @_;
2068
2069   # 3 is the if_active parameter which avoids active sth re-use
2070   my $sth = $self->disable_sth_caching
2071     ? $dbh->prepare($sql)
2072     : $dbh->prepare_cached($sql, {}, 3);
2073
2074   # XXX You would think RaiseError would make this impossible,
2075   #  but apparently that's not true :(
2076   $self->throw_exception($dbh->errstr) if !$sth;
2077
2078   $sth;
2079 }
2080
2081 sub sth {
2082   my ($self, $sql) = @_;
2083   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2084 }
2085
2086 sub _dbh_columns_info_for {
2087   my ($self, $dbh, $table) = @_;
2088
2089   if ($dbh->can('column_info')) {
2090     my %result;
2091     eval {
2092       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2093       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2094       $sth->execute();
2095       while ( my $info = $sth->fetchrow_hashref() ){
2096         my %column_info;
2097         $column_info{data_type}   = $info->{TYPE_NAME};
2098         $column_info{size}      = $info->{COLUMN_SIZE};
2099         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2100         $column_info{default_value} = $info->{COLUMN_DEF};
2101         my $col_name = $info->{COLUMN_NAME};
2102         $col_name =~ s/^\"(.*)\"$/$1/;
2103
2104         $result{$col_name} = \%column_info;
2105       }
2106     };
2107     return \%result if !$@ && scalar keys %result;
2108   }
2109
2110   my %result;
2111   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2112   $sth->execute;
2113   my @columns = @{$sth->{NAME_lc}};
2114   for my $i ( 0 .. $#columns ){
2115     my %column_info;
2116     $column_info{data_type} = $sth->{TYPE}->[$i];
2117     $column_info{size} = $sth->{PRECISION}->[$i];
2118     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2119
2120     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2121       $column_info{data_type} = $1;
2122       $column_info{size}    = $2;
2123     }
2124
2125     $result{$columns[$i]} = \%column_info;
2126   }
2127   $sth->finish;
2128
2129   foreach my $col (keys %result) {
2130     my $colinfo = $result{$col};
2131     my $type_num = $colinfo->{data_type};
2132     my $type_name;
2133     if(defined $type_num && $dbh->can('type_info')) {
2134       my $type_info = $dbh->type_info($type_num);
2135       $type_name = $type_info->{TYPE_NAME} if $type_info;
2136       $colinfo->{data_type} = $type_name if $type_name;
2137     }
2138   }
2139
2140   return \%result;
2141 }
2142
2143 sub columns_info_for {
2144   my ($self, $table) = @_;
2145   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2146 }
2147
2148 =head2 last_insert_id
2149
2150 Return the row id of the last insert.
2151
2152 =cut
2153
2154 sub _dbh_last_insert_id {
2155     my ($self, $dbh, $source, $col) = @_;
2156
2157     my $id = eval { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2158
2159     return $id if defined $id;
2160
2161     my $class = ref $self;
2162     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2163 }
2164
2165 sub last_insert_id {
2166   my $self = shift;
2167   $self->_dbh_last_insert_id ($self->_dbh, @_);
2168 }
2169
2170 =head2 _native_data_type
2171
2172 =over 4
2173
2174 =item Arguments: $type_name
2175
2176 =back
2177
2178 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2179 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2180 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2181
2182 The default implementation returns C<undef>, implement in your Storage driver if
2183 you need this functionality.
2184
2185 Should map types from other databases to the native RDBMS type, for example
2186 C<VARCHAR2> to C<VARCHAR>.
2187
2188 Types with modifiers should map to the underlying data type. For example,
2189 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2190
2191 Composite types should map to the container type, for example
2192 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2193
2194 =cut
2195
2196 sub _native_data_type {
2197   #my ($self, $data_type) = @_;
2198   return undef
2199 }
2200
2201 # Check if placeholders are supported at all
2202 sub _placeholders_supported {
2203   my $self = shift;
2204   my $dbh  = $self->_get_dbh;
2205
2206   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2207   # but it is inaccurate more often than not
2208   eval {
2209     local $dbh->{PrintError} = 0;
2210     local $dbh->{RaiseError} = 1;
2211     $dbh->do('select ?', {}, 1);
2212   };
2213   return $@ ? 0 : 1;
2214 }
2215
2216 # Check if placeholders bound to non-string types throw exceptions
2217 #
2218 sub _typeless_placeholders_supported {
2219   my $self = shift;
2220   my $dbh  = $self->_get_dbh;
2221
2222   eval {
2223     local $dbh->{PrintError} = 0;
2224     local $dbh->{RaiseError} = 1;
2225     # this specifically tests a bind that is NOT a string
2226     $dbh->do('select 1 where 1 = ?', {}, 1);
2227   };
2228   return $@ ? 0 : 1;
2229 }
2230
2231 =head2 sqlt_type
2232
2233 Returns the database driver name.
2234
2235 =cut
2236
2237 sub sqlt_type {
2238   shift->_get_dbh->{Driver}->{Name};
2239 }
2240
2241 =head2 bind_attribute_by_data_type
2242
2243 Given a datatype from column info, returns a database specific bind
2244 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2245 let the database planner just handle it.
2246
2247 Generally only needed for special case column types, like bytea in postgres.
2248
2249 =cut
2250
2251 sub bind_attribute_by_data_type {
2252     return;
2253 }
2254
2255 =head2 is_datatype_numeric
2256
2257 Given a datatype from column_info, returns a boolean value indicating if
2258 the current RDBMS considers it a numeric value. This controls how
2259 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2260 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2261 be performed instead of the usual C<eq>.
2262
2263 =cut
2264
2265 sub is_datatype_numeric {
2266   my ($self, $dt) = @_;
2267
2268   return 0 unless $dt;
2269
2270   return $dt =~ /^ (?:
2271     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2272   ) $/ix;
2273 }
2274
2275
2276 =head2 create_ddl_dir
2277
2278 =over 4
2279
2280 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2281
2282 =back
2283
2284 Creates a SQL file based on the Schema, for each of the specified
2285 database engines in C<\@databases> in the given directory.
2286 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2287
2288 Given a previous version number, this will also create a file containing
2289 the ALTER TABLE statements to transform the previous schema into the
2290 current one. Note that these statements may contain C<DROP TABLE> or
2291 C<DROP COLUMN> statements that can potentially destroy data.
2292
2293 The file names are created using the C<ddl_filename> method below, please
2294 override this method in your schema if you would like a different file
2295 name format. For the ALTER file, the same format is used, replacing
2296 $version in the name with "$preversion-$version".
2297
2298 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2299 The most common value for this would be C<< { add_drop_table => 1 } >>
2300 to have the SQL produced include a C<DROP TABLE> statement for each table
2301 created. For quoting purposes supply C<quote_table_names> and
2302 C<quote_field_names>.
2303
2304 If no arguments are passed, then the following default values are assumed:
2305
2306 =over 4
2307
2308 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2309
2310 =item version    - $schema->schema_version
2311
2312 =item directory  - './'
2313
2314 =item preversion - <none>
2315
2316 =back
2317
2318 By default, C<\%sqlt_args> will have
2319
2320  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2321
2322 merged with the hash passed in. To disable any of those features, pass in a
2323 hashref like the following
2324
2325  { ignore_constraint_names => 0, # ... other options }
2326
2327
2328 WARNING: You are strongly advised to check all SQL files created, before applying
2329 them.
2330
2331 =cut
2332
2333 sub create_ddl_dir {
2334   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2335
2336   unless ($dir) {
2337     carp "No directory given, using ./\n";
2338     $dir = './';
2339   } else {
2340       -d $dir or File::Path::mkpath($dir)
2341           or croak "create_ddl_dir: could not create dir '$dir'";
2342   }
2343
2344   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2345
2346   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2347   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2348
2349   my $schema_version = $schema->schema_version || '1.x';
2350   $version ||= $schema_version;
2351
2352   $sqltargs = {
2353     add_drop_table => 1,
2354     ignore_constraint_names => 1,
2355     ignore_index_names => 1,
2356     %{$sqltargs || {}}
2357   };
2358
2359   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2360     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2361   }
2362
2363   my $sqlt = SQL::Translator->new( $sqltargs );
2364
2365   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2366   my $sqlt_schema = $sqlt->translate({ data => $schema })
2367     or $self->throw_exception ($sqlt->error);
2368
2369   foreach my $db (@$databases) {
2370     $sqlt->reset();
2371     $sqlt->{schema} = $sqlt_schema;
2372     $sqlt->producer($db);
2373
2374     my $file;
2375     my $filename = $schema->ddl_filename($db, $version, $dir);
2376     if (-e $filename && ($version eq $schema_version )) {
2377       # if we are dumping the current version, overwrite the DDL
2378       carp "Overwriting existing DDL file - $filename";
2379       unlink($filename);
2380     }
2381
2382     my $output = $sqlt->translate;
2383     if(!$output) {
2384       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2385       next;
2386     }
2387     if(!open($file, ">$filename")) {
2388       $self->throw_exception("Can't open $filename for writing ($!)");
2389       next;
2390     }
2391     print $file $output;
2392     close($file);
2393
2394     next unless ($preversion);
2395
2396     require SQL::Translator::Diff;
2397
2398     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2399     if(!-e $prefilename) {
2400       carp("No previous schema file found ($prefilename)");
2401       next;
2402     }
2403
2404     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2405     if(-e $difffile) {
2406       carp("Overwriting existing diff file - $difffile");
2407       unlink($difffile);
2408     }
2409
2410     my $source_schema;
2411     {
2412       my $t = SQL::Translator->new($sqltargs);
2413       $t->debug( 0 );
2414       $t->trace( 0 );
2415
2416       $t->parser( $db )
2417         or $self->throw_exception ($t->error);
2418
2419       my $out = $t->translate( $prefilename )
2420         or $self->throw_exception ($t->error);
2421
2422       $source_schema = $t->schema;
2423
2424       $source_schema->name( $prefilename )
2425         unless ( $source_schema->name );
2426     }
2427
2428     # The "new" style of producers have sane normalization and can support
2429     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2430     # And we have to diff parsed SQL against parsed SQL.
2431     my $dest_schema = $sqlt_schema;
2432
2433     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2434       my $t = SQL::Translator->new($sqltargs);
2435       $t->debug( 0 );
2436       $t->trace( 0 );
2437
2438       $t->parser( $db )
2439         or $self->throw_exception ($t->error);
2440
2441       my $out = $t->translate( $filename )
2442         or $self->throw_exception ($t->error);
2443
2444       $dest_schema = $t->schema;
2445
2446       $dest_schema->name( $filename )
2447         unless $dest_schema->name;
2448     }
2449
2450     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2451                                                   $dest_schema,   $db,
2452                                                   $sqltargs
2453                                                  );
2454     if(!open $file, ">$difffile") {
2455       $self->throw_exception("Can't write to $difffile ($!)");
2456       next;
2457     }
2458     print $file $diff;
2459     close($file);
2460   }
2461 }
2462
2463 =head2 deployment_statements
2464
2465 =over 4
2466
2467 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2468
2469 =back
2470
2471 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2472
2473 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2474 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2475
2476 C<$directory> is used to return statements from files in a previously created
2477 L</create_ddl_dir> directory and is optional. The filenames are constructed
2478 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2479
2480 If no C<$directory> is specified then the statements are constructed on the
2481 fly using L<SQL::Translator> and C<$version> is ignored.
2482
2483 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2484
2485 =cut
2486
2487 sub deployment_statements {
2488   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2489   $type ||= $self->sqlt_type;
2490   $version ||= $schema->schema_version || '1.x';
2491   $dir ||= './';
2492   my $filename = $schema->ddl_filename($type, $version, $dir);
2493   if(-f $filename)
2494   {
2495       my $file;
2496       open($file, "<$filename")
2497         or $self->throw_exception("Can't open $filename ($!)");
2498       my @rows = <$file>;
2499       close($file);
2500       return join('', @rows);
2501   }
2502
2503   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2504     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2505   }
2506
2507   # sources needs to be a parser arg, but for simplicty allow at top level
2508   # coming in
2509   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2510       if exists $sqltargs->{sources};
2511
2512   my $tr = SQL::Translator->new(
2513     producer => "SQL::Translator::Producer::${type}",
2514     %$sqltargs,
2515     parser => 'SQL::Translator::Parser::DBIx::Class',
2516     data => $schema,
2517   );
2518
2519   my @ret;
2520   my $wa = wantarray;
2521   if ($wa) {
2522     @ret = $tr->translate;
2523   }
2524   else {
2525     $ret[0] = $tr->translate;
2526   }
2527
2528   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2529     unless (@ret && defined $ret[0]);
2530
2531   return $wa ? @ret : $ret[0];
2532 }
2533
2534 sub deploy {
2535   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2536   my $deploy = sub {
2537     my $line = shift;
2538     return if($line =~ /^--/);
2539     return if(!$line);
2540     # next if($line =~ /^DROP/m);
2541     return if($line =~ /^BEGIN TRANSACTION/m);
2542     return if($line =~ /^COMMIT/m);
2543     return if $line =~ /^\s+$/; # skip whitespace only
2544     $self->_query_start($line);
2545     eval {
2546       # do a dbh_do cycle here, as we need some error checking in
2547       # place (even though we will ignore errors)
2548       $self->dbh_do (sub { $_[1]->do($line) });
2549     };
2550     if ($@) {
2551       carp qq{$@ (running "${line}")};
2552     }
2553     $self->_query_end($line);
2554   };
2555   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2556   if (@statements > 1) {
2557     foreach my $statement (@statements) {
2558       $deploy->( $statement );
2559     }
2560   }
2561   elsif (@statements == 1) {
2562     foreach my $line ( split(";\n", $statements[0])) {
2563       $deploy->( $line );
2564     }
2565   }
2566 }
2567
2568 =head2 datetime_parser
2569
2570 Returns the datetime parser class
2571
2572 =cut
2573
2574 sub datetime_parser {
2575   my $self = shift;
2576   return $self->{datetime_parser} ||= do {
2577     $self->build_datetime_parser(@_);
2578   };
2579 }
2580
2581 =head2 datetime_parser_type
2582
2583 Defines (returns) the datetime parser class - currently hardwired to
2584 L<DateTime::Format::MySQL>
2585
2586 =cut
2587
2588 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2589
2590 =head2 build_datetime_parser
2591
2592 See L</datetime_parser>
2593
2594 =cut
2595
2596 sub build_datetime_parser {
2597   my $self = shift;
2598   my $type = $self->datetime_parser_type(@_);
2599   $self->ensure_class_loaded ($type);
2600   return $type;
2601 }
2602
2603
2604 =head2 is_replicating
2605
2606 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2607 replicate from a master database.  Default is undef, which is the result
2608 returned by databases that don't support replication.
2609
2610 =cut
2611
2612 sub is_replicating {
2613     return;
2614
2615 }
2616
2617 =head2 lag_behind_master
2618
2619 Returns a number that represents a certain amount of lag behind a master db
2620 when a given storage is replicating.  The number is database dependent, but
2621 starts at zero and increases with the amount of lag. Default in undef
2622
2623 =cut
2624
2625 sub lag_behind_master {
2626     return;
2627 }
2628
2629 =head2 relname_to_table_alias
2630
2631 =over 4
2632
2633 =item Arguments: $relname, $join_count
2634
2635 =back
2636
2637 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2638 queries.
2639
2640 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2641 way these aliases are named.
2642
2643 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2644 otherwise C<"$relname">.
2645
2646 =cut
2647
2648 sub relname_to_table_alias {
2649   my ($self, $relname, $join_count) = @_;
2650
2651   my $alias = ($join_count && $join_count > 1 ?
2652     join('_', $relname, $join_count) : $relname);
2653
2654   return $alias;
2655 }
2656
2657 sub DESTROY {
2658   my $self = shift;
2659
2660   $self->_verify_pid if $self->_dbh;
2661
2662   # some databases need this to stop spewing warnings
2663   if (my $dbh = $self->_dbh) {
2664     local $@;
2665     eval {
2666       %{ $dbh->{CachedKids} } = ();
2667       $dbh->disconnect;
2668     };
2669   }
2670
2671   $self->_dbh(undef);
2672 }
2673
2674 1;
2675
2676 =head1 USAGE NOTES
2677
2678 =head2 DBIx::Class and AutoCommit
2679
2680 DBIx::Class can do some wonderful magic with handling exceptions,
2681 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2682 (the default) combined with C<txn_do> for transaction support.
2683
2684 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2685 in an assumed transaction between commits, and you're telling us you'd
2686 like to manage that manually.  A lot of the magic protections offered by
2687 this module will go away.  We can't protect you from exceptions due to database
2688 disconnects because we don't know anything about how to restart your
2689 transactions.  You're on your own for handling all sorts of exceptional
2690 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2691 be with raw DBI.
2692
2693
2694 =head1 AUTHORS
2695
2696 Matt S. Trout <mst@shadowcatsystems.co.uk>
2697
2698 Andy Grundman <andy@hybridized.org>
2699
2700 =head1 LICENSE
2701
2702 You may distribute this code under the same terms as Perl itself.
2703
2704 =cut