add storage->_server_info->{dbms_ver_normalized}
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util();
15 use List::Util();
16 use Data::Dumper::Concise();
17 use Sub::Name ();
18
19 __PACKAGE__->mk_group_accessors('simple' =>
20   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
21      _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints
22      __server_info/
23 );
24
25 # the values for these accessors are picked out (and deleted) from
26 # the attribute hashref passed to connect_info
27 my @storage_options = qw/
28   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
29   disable_sth_caching unsafe auto_savepoint
30 /;
31 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
32
33
34 # default cursor class, overridable in connect_info attributes
35 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
36
37 __PACKAGE__->mk_group_accessors('inherited' => qw/
38   sql_maker_class
39   can_insert_returning
40 /);
41 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
42
43
44 # Each of these methods need _determine_driver called before itself
45 # in order to function reliably. This is a purely DRY optimization
46 my @rdbms_specific_methods = qw/
47   deployment_statements
48   sqlt_type
49   build_datetime_parser
50   datetime_parser_type
51
52   insert
53   insert_bulk
54   update
55   delete
56   select
57   select_single
58 /;
59
60 for my $meth (@rdbms_specific_methods) {
61
62   my $orig = __PACKAGE__->can ($meth)
63     or next;
64
65   no strict qw/refs/;
66   no warnings qw/redefine/;
67   *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
68     if (not $_[0]->_driver_determined) {
69       $_[0]->_determine_driver;
70       goto $_[0]->can($meth);
71     }
72     $orig->(@_);
73   };
74 }
75
76
77 =head1 NAME
78
79 DBIx::Class::Storage::DBI - DBI storage handler
80
81 =head1 SYNOPSIS
82
83   my $schema = MySchema->connect('dbi:SQLite:my.db');
84
85   $schema->storage->debug(1);
86
87   my @stuff = $schema->storage->dbh_do(
88     sub {
89       my ($storage, $dbh, @args) = @_;
90       $dbh->do("DROP TABLE authors");
91     },
92     @column_list
93   );
94
95   $schema->resultset('Book')->search({
96      written_on => $schema->storage->datetime_parser(DateTime->now)
97   });
98
99 =head1 DESCRIPTION
100
101 This class represents the connection to an RDBMS via L<DBI>.  See
102 L<DBIx::Class::Storage> for general information.  This pod only
103 documents DBI-specific methods and behaviors.
104
105 =head1 METHODS
106
107 =cut
108
109 sub new {
110   my $new = shift->next::method(@_);
111
112   $new->transaction_depth(0);
113   $new->_sql_maker_opts({});
114   $new->{savepoints} = [];
115   $new->{_in_dbh_do} = 0;
116   $new->{_dbh_gen} = 0;
117
118   $new;
119 }
120
121 =head2 connect_info
122
123 This method is normally called by L<DBIx::Class::Schema/connection>, which
124 encapsulates its argument list in an arrayref before passing them here.
125
126 The argument list may contain:
127
128 =over
129
130 =item *
131
132 The same 4-element argument set one would normally pass to
133 L<DBI/connect>, optionally followed by
134 L<extra attributes|/DBIx::Class specific connection attributes>
135 recognized by DBIx::Class:
136
137   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
138
139 =item *
140
141 A single code reference which returns a connected
142 L<DBI database handle|DBI/connect> optionally followed by
143 L<extra attributes|/DBIx::Class specific connection attributes> recognized
144 by DBIx::Class:
145
146   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
147
148 =item *
149
150 A single hashref with all the attributes and the dsn/user/password
151 mixed together:
152
153   $connect_info_args = [{
154     dsn => $dsn,
155     user => $user,
156     password => $pass,
157     %dbi_attributes,
158     %extra_attributes,
159   }];
160
161   $connect_info_args = [{
162     dbh_maker => sub { DBI->connect (...) },
163     %dbi_attributes,
164     %extra_attributes,
165   }];
166
167 This is particularly useful for L<Catalyst> based applications, allowing the
168 following config (L<Config::General> style):
169
170   <Model::DB>
171     schema_class   App::DB
172     <connect_info>
173       dsn          dbi:mysql:database=test
174       user         testuser
175       password     TestPass
176       AutoCommit   1
177     </connect_info>
178   </Model::DB>
179
180 The C<dsn>/C<user>/C<password> combination can be substituted by the
181 C<dbh_maker> key whose value is a coderef that returns a connected
182 L<DBI database handle|DBI/connect>
183
184 =back
185
186 Please note that the L<DBI> docs recommend that you always explicitly
187 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
188 recommends that it be set to I<1>, and that you perform transactions
189 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
190 to I<1> if you do not do explicitly set it to zero.  This is the default
191 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
192
193 =head3 DBIx::Class specific connection attributes
194
195 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
196 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
197 the following connection options. These options can be mixed in with your other
198 L<DBI> connection attributes, or placed in a separate hashref
199 (C<\%extra_attributes>) as shown above.
200
201 Every time C<connect_info> is invoked, any previous settings for
202 these options will be cleared before setting the new ones, regardless of
203 whether any options are specified in the new C<connect_info>.
204
205
206 =over
207
208 =item on_connect_do
209
210 Specifies things to do immediately after connecting or re-connecting to
211 the database.  Its value may contain:
212
213 =over
214
215 =item a scalar
216
217 This contains one SQL statement to execute.
218
219 =item an array reference
220
221 This contains SQL statements to execute in order.  Each element contains
222 a string or a code reference that returns a string.
223
224 =item a code reference
225
226 This contains some code to execute.  Unlike code references within an
227 array reference, its return value is ignored.
228
229 =back
230
231 =item on_disconnect_do
232
233 Takes arguments in the same form as L</on_connect_do> and executes them
234 immediately before disconnecting from the database.
235
236 Note, this only runs if you explicitly call L</disconnect> on the
237 storage object.
238
239 =item on_connect_call
240
241 A more generalized form of L</on_connect_do> that calls the specified
242 C<connect_call_METHOD> methods in your storage driver.
243
244   on_connect_do => 'select 1'
245
246 is equivalent to:
247
248   on_connect_call => [ [ do_sql => 'select 1' ] ]
249
250 Its values may contain:
251
252 =over
253
254 =item a scalar
255
256 Will call the C<connect_call_METHOD> method.
257
258 =item a code reference
259
260 Will execute C<< $code->($storage) >>
261
262 =item an array reference
263
264 Each value can be a method name or code reference.
265
266 =item an array of arrays
267
268 For each array, the first item is taken to be the C<connect_call_> method name
269 or code reference, and the rest are parameters to it.
270
271 =back
272
273 Some predefined storage methods you may use:
274
275 =over
276
277 =item do_sql
278
279 Executes a SQL string or a code reference that returns a SQL string. This is
280 what L</on_connect_do> and L</on_disconnect_do> use.
281
282 It can take:
283
284 =over
285
286 =item a scalar
287
288 Will execute the scalar as SQL.
289
290 =item an arrayref
291
292 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
293 attributes hashref and bind values.
294
295 =item a code reference
296
297 Will execute C<< $code->($storage) >> and execute the return array refs as
298 above.
299
300 =back
301
302 =item datetime_setup
303
304 Execute any statements necessary to initialize the database session to return
305 and accept datetime/timestamp values used with
306 L<DBIx::Class::InflateColumn::DateTime>.
307
308 Only necessary for some databases, see your specific storage driver for
309 implementation details.
310
311 =back
312
313 =item on_disconnect_call
314
315 Takes arguments in the same form as L</on_connect_call> and executes them
316 immediately before disconnecting from the database.
317
318 Calls the C<disconnect_call_METHOD> methods as opposed to the
319 C<connect_call_METHOD> methods called by L</on_connect_call>.
320
321 Note, this only runs if you explicitly call L</disconnect> on the
322 storage object.
323
324 =item disable_sth_caching
325
326 If set to a true value, this option will disable the caching of
327 statement handles via L<DBI/prepare_cached>.
328
329 =item limit_dialect
330
331 Sets the limit dialect. This is useful for JDBC-bridge among others
332 where the remote SQL-dialect cannot be determined by the name of the
333 driver alone. See also L<SQL::Abstract::Limit>.
334
335 =item quote_char
336
337 Specifies what characters to use to quote table and column names. If
338 you use this you will want to specify L</name_sep> as well.
339
340 C<quote_char> expects either a single character, in which case is it
341 is placed on either side of the table/column name, or an arrayref of length
342 2 in which case the table/column name is placed between the elements.
343
344 For example under MySQL you should use C<< quote_char => '`' >>, and for
345 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
346
347 =item name_sep
348
349 This only needs to be used in conjunction with C<quote_char>, and is used to
350 specify the character that separates elements (schemas, tables, columns) from
351 each other. In most cases this is simply a C<.>.
352
353 The consequences of not supplying this value is that L<SQL::Abstract>
354 will assume DBIx::Class' uses of aliases to be complete column
355 names. The output will look like I<"me.name"> when it should actually
356 be I<"me"."name">.
357
358 =item unsafe
359
360 This Storage driver normally installs its own C<HandleError>, sets
361 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
362 all database handles, including those supplied by a coderef.  It does this
363 so that it can have consistent and useful error behavior.
364
365 If you set this option to a true value, Storage will not do its usual
366 modifications to the database handle's attributes, and instead relies on
367 the settings in your connect_info DBI options (or the values you set in
368 your connection coderef, in the case that you are connecting via coderef).
369
370 Note that your custom settings can cause Storage to malfunction,
371 especially if you set a C<HandleError> handler that suppresses exceptions
372 and/or disable C<RaiseError>.
373
374 =item auto_savepoint
375
376 If this option is true, L<DBIx::Class> will use savepoints when nesting
377 transactions, making it possible to recover from failure in the inner
378 transaction without having to abort all outer transactions.
379
380 =item cursor_class
381
382 Use this argument to supply a cursor class other than the default
383 L<DBIx::Class::Storage::DBI::Cursor>.
384
385 =back
386
387 Some real-life examples of arguments to L</connect_info> and
388 L<DBIx::Class::Schema/connect>
389
390   # Simple SQLite connection
391   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
392
393   # Connect via subref
394   ->connect_info([ sub { DBI->connect(...) } ]);
395
396   # Connect via subref in hashref
397   ->connect_info([{
398     dbh_maker => sub { DBI->connect(...) },
399     on_connect_do => 'alter session ...',
400   }]);
401
402   # A bit more complicated
403   ->connect_info(
404     [
405       'dbi:Pg:dbname=foo',
406       'postgres',
407       'my_pg_password',
408       { AutoCommit => 1 },
409       { quote_char => q{"}, name_sep => q{.} },
410     ]
411   );
412
413   # Equivalent to the previous example
414   ->connect_info(
415     [
416       'dbi:Pg:dbname=foo',
417       'postgres',
418       'my_pg_password',
419       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
420     ]
421   );
422
423   # Same, but with hashref as argument
424   # See parse_connect_info for explanation
425   ->connect_info(
426     [{
427       dsn         => 'dbi:Pg:dbname=foo',
428       user        => 'postgres',
429       password    => 'my_pg_password',
430       AutoCommit  => 1,
431       quote_char  => q{"},
432       name_sep    => q{.},
433     }]
434   );
435
436   # Subref + DBIx::Class-specific connection options
437   ->connect_info(
438     [
439       sub { DBI->connect(...) },
440       {
441           quote_char => q{`},
442           name_sep => q{@},
443           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
444           disable_sth_caching => 1,
445       },
446     ]
447   );
448
449
450
451 =cut
452
453 sub connect_info {
454   my ($self, $info) = @_;
455
456   return $self->_connect_info if !$info;
457
458   $self->_connect_info($info); # copy for _connect_info
459
460   $info = $self->_normalize_connect_info($info)
461     if ref $info eq 'ARRAY';
462
463   for my $storage_opt (keys %{ $info->{storage_options} }) {
464     my $value = $info->{storage_options}{$storage_opt};
465
466     $self->$storage_opt($value);
467   }
468
469   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
470   #  the new set of options
471   $self->_sql_maker(undef);
472   $self->_sql_maker_opts({});
473
474   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
475     my $value = $info->{sql_maker_options}{$sql_maker_opt};
476
477     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
478   }
479
480   my %attrs = (
481     %{ $self->_default_dbi_connect_attributes || {} },
482     %{ $info->{attributes} || {} },
483   );
484
485   my @args = @{ $info->{arguments} };
486
487   $self->_dbi_connect_info([@args,
488     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
489
490   return $self->_connect_info;
491 }
492
493 sub _normalize_connect_info {
494   my ($self, $info_arg) = @_;
495   my %info;
496
497   my @args = @$info_arg;  # take a shallow copy for further mutilation
498
499   # combine/pre-parse arguments depending on invocation style
500
501   my %attrs;
502   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
503     %attrs = %{ $args[1] || {} };
504     @args = $args[0];
505   }
506   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
507     %attrs = %{$args[0]};
508     @args = ();
509     if (my $code = delete $attrs{dbh_maker}) {
510       @args = $code;
511
512       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
513       if (@ignored) {
514         carp sprintf (
515             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
516           . "to the result of 'dbh_maker'",
517
518           join (', ', map { "'$_'" } (@ignored) ),
519         );
520       }
521     }
522     else {
523       @args = delete @attrs{qw/dsn user password/};
524     }
525   }
526   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
527     %attrs = (
528       % { $args[3] || {} },
529       % { $args[4] || {} },
530     );
531     @args = @args[0,1,2];
532   }
533
534   $info{arguments} = \@args;
535
536   my @storage_opts = grep exists $attrs{$_},
537     @storage_options, 'cursor_class';
538
539   @{ $info{storage_options} }{@storage_opts} =
540     delete @attrs{@storage_opts} if @storage_opts;
541
542   my @sql_maker_opts = grep exists $attrs{$_},
543     qw/limit_dialect quote_char name_sep/;
544
545   @{ $info{sql_maker_options} }{@sql_maker_opts} =
546     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
547
548   $info{attributes} = \%attrs if %attrs;
549
550   return \%info;
551 }
552
553 sub _default_dbi_connect_attributes {
554   return {
555     AutoCommit => 1,
556     RaiseError => 1,
557     PrintError => 0,
558   };
559 }
560
561 =head2 on_connect_do
562
563 This method is deprecated in favour of setting via L</connect_info>.
564
565 =cut
566
567 =head2 on_disconnect_do
568
569 This method is deprecated in favour of setting via L</connect_info>.
570
571 =cut
572
573 sub _parse_connect_do {
574   my ($self, $type) = @_;
575
576   my $val = $self->$type;
577   return () if not defined $val;
578
579   my @res;
580
581   if (not ref($val)) {
582     push @res, [ 'do_sql', $val ];
583   } elsif (ref($val) eq 'CODE') {
584     push @res, $val;
585   } elsif (ref($val) eq 'ARRAY') {
586     push @res, map { [ 'do_sql', $_ ] } @$val;
587   } else {
588     $self->throw_exception("Invalid type for $type: ".ref($val));
589   }
590
591   return \@res;
592 }
593
594 =head2 dbh_do
595
596 Arguments: ($subref | $method_name), @extra_coderef_args?
597
598 Execute the given $subref or $method_name using the new exception-based
599 connection management.
600
601 The first two arguments will be the storage object that C<dbh_do> was called
602 on and a database handle to use.  Any additional arguments will be passed
603 verbatim to the called subref as arguments 2 and onwards.
604
605 Using this (instead of $self->_dbh or $self->dbh) ensures correct
606 exception handling and reconnection (or failover in future subclasses).
607
608 Your subref should have no side-effects outside of the database, as
609 there is the potential for your subref to be partially double-executed
610 if the database connection was stale/dysfunctional.
611
612 Example:
613
614   my @stuff = $schema->storage->dbh_do(
615     sub {
616       my ($storage, $dbh, @cols) = @_;
617       my $cols = join(q{, }, @cols);
618       $dbh->selectrow_array("SELECT $cols FROM foo");
619     },
620     @column_list
621   );
622
623 =cut
624
625 sub dbh_do {
626   my $self = shift;
627   my $code = shift;
628
629   my $dbh = $self->_get_dbh;
630
631   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
632       || $self->{transaction_depth};
633
634   local $self->{_in_dbh_do} = 1;
635
636   my @result;
637   my $want_array = wantarray;
638
639   eval {
640
641     if($want_array) {
642         @result = $self->$code($dbh, @_);
643     }
644     elsif(defined $want_array) {
645         $result[0] = $self->$code($dbh, @_);
646     }
647     else {
648         $self->$code($dbh, @_);
649     }
650   };
651
652   # ->connected might unset $@ - copy
653   my $exception = $@;
654   if(!$exception) { return $want_array ? @result : $result[0] }
655
656   $self->throw_exception($exception) if $self->connected;
657
658   # We were not connected - reconnect and retry, but let any
659   #  exception fall right through this time
660   carp "Retrying $code after catching disconnected exception: $exception"
661     if $ENV{DBIC_DBIRETRY_DEBUG};
662   $self->_populate_dbh;
663   $self->$code($self->_dbh, @_);
664 }
665
666 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
667 # It also informs dbh_do to bypass itself while under the direction of txn_do,
668 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
669 sub txn_do {
670   my $self = shift;
671   my $coderef = shift;
672
673   ref $coderef eq 'CODE' or $self->throw_exception
674     ('$coderef must be a CODE reference');
675
676   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
677
678   local $self->{_in_dbh_do} = 1;
679
680   my @result;
681   my $want_array = wantarray;
682
683   my $tried = 0;
684   while(1) {
685     eval {
686       $self->_get_dbh;
687
688       $self->txn_begin;
689       if($want_array) {
690           @result = $coderef->(@_);
691       }
692       elsif(defined $want_array) {
693           $result[0] = $coderef->(@_);
694       }
695       else {
696           $coderef->(@_);
697       }
698       $self->txn_commit;
699     };
700
701     # ->connected might unset $@ - copy
702     my $exception = $@;
703     if(!$exception) { return $want_array ? @result : $result[0] }
704
705     if($tried++ || $self->connected) {
706       eval { $self->txn_rollback };
707       my $rollback_exception = $@;
708       if($rollback_exception) {
709         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
710         $self->throw_exception($exception)  # propagate nested rollback
711           if $rollback_exception =~ /$exception_class/;
712
713         $self->throw_exception(
714           "Transaction aborted: ${exception}. "
715           . "Rollback failed: ${rollback_exception}"
716         );
717       }
718       $self->throw_exception($exception)
719     }
720
721     # We were not connected, and was first try - reconnect and retry
722     # via the while loop
723     carp "Retrying $coderef after catching disconnected exception: $exception"
724       if $ENV{DBIC_DBIRETRY_DEBUG};
725     $self->_populate_dbh;
726   }
727 }
728
729 =head2 disconnect
730
731 Our C<disconnect> method also performs a rollback first if the
732 database is not in C<AutoCommit> mode.
733
734 =cut
735
736 sub disconnect {
737   my ($self) = @_;
738
739   if( $self->_dbh ) {
740     my @actions;
741
742     push @actions, ( $self->on_disconnect_call || () );
743     push @actions, $self->_parse_connect_do ('on_disconnect_do');
744
745     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
746
747     $self->_dbh_rollback unless $self->_dbh_autocommit;
748
749     %{ $self->_dbh->{CachedKids} } = ();
750     $self->_dbh->disconnect;
751     $self->_dbh(undef);
752     $self->{_dbh_gen}++;
753   }
754 }
755
756 =head2 with_deferred_fk_checks
757
758 =over 4
759
760 =item Arguments: C<$coderef>
761
762 =item Return Value: The return value of $coderef
763
764 =back
765
766 Storage specific method to run the code ref with FK checks deferred or
767 in MySQL's case disabled entirely.
768
769 =cut
770
771 # Storage subclasses should override this
772 sub with_deferred_fk_checks {
773   my ($self, $sub) = @_;
774   $sub->();
775 }
776
777 =head2 connected
778
779 =over
780
781 =item Arguments: none
782
783 =item Return Value: 1|0
784
785 =back
786
787 Verifies that the current database handle is active and ready to execute
788 an SQL statement (e.g. the connection did not get stale, server is still
789 answering, etc.) This method is used internally by L</dbh>.
790
791 =cut
792
793 sub connected {
794   my $self = shift;
795   return 0 unless $self->_seems_connected;
796
797   #be on the safe side
798   local $self->_dbh->{RaiseError} = 1;
799
800   return $self->_ping;
801 }
802
803 sub _seems_connected {
804   my $self = shift;
805
806   my $dbh = $self->_dbh
807     or return 0;
808
809   if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
810     $self->_dbh(undef);
811     $self->{_dbh_gen}++;
812     return 0;
813   }
814   else {
815     $self->_verify_pid;
816     return 0 if !$self->_dbh;
817   }
818
819   return $dbh->FETCH('Active');
820 }
821
822 sub _ping {
823   my $self = shift;
824
825   my $dbh = $self->_dbh or return 0;
826
827   return $dbh->ping;
828 }
829
830 # handle pid changes correctly
831 #  NOTE: assumes $self->_dbh is a valid $dbh
832 sub _verify_pid {
833   my ($self) = @_;
834
835   return if defined $self->_conn_pid && $self->_conn_pid == $$;
836
837   $self->_dbh->{InactiveDestroy} = 1;
838   $self->_dbh(undef);
839   $self->{_dbh_gen}++;
840
841   return;
842 }
843
844 sub ensure_connected {
845   my ($self) = @_;
846
847   unless ($self->connected) {
848     $self->_populate_dbh;
849   }
850 }
851
852 =head2 dbh
853
854 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
855 is guaranteed to be healthy by implicitly calling L</connected>, and if
856 necessary performing a reconnection before returning. Keep in mind that this
857 is very B<expensive> on some database engines. Consider using L</dbh_do>
858 instead.
859
860 =cut
861
862 sub dbh {
863   my ($self) = @_;
864
865   if (not $self->_dbh) {
866     $self->_populate_dbh;
867   } else {
868     $self->ensure_connected;
869   }
870   return $self->_dbh;
871 }
872
873 # this is the internal "get dbh or connect (don't check)" method
874 sub _get_dbh {
875   my $self = shift;
876   $self->_verify_pid if $self->_dbh;
877   $self->_populate_dbh unless $self->_dbh;
878   return $self->_dbh;
879 }
880
881 sub _sql_maker_args {
882     my ($self) = @_;
883
884     return (
885       bindtype=>'columns',
886       array_datatypes => 1,
887       limit_dialect => $self->_get_dbh,
888       %{$self->_sql_maker_opts}
889     );
890 }
891
892 sub sql_maker {
893   my ($self) = @_;
894   unless ($self->_sql_maker) {
895     my $sql_maker_class = $self->sql_maker_class;
896     $self->ensure_class_loaded ($sql_maker_class);
897     $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
898   }
899   return $self->_sql_maker;
900 }
901
902 # nothing to do by default
903 sub _rebless {}
904 sub _init {}
905
906 sub _populate_dbh {
907   my ($self) = @_;
908
909   my @info = @{$self->_dbi_connect_info || []};
910   $self->_dbh(undef); # in case ->connected failed we might get sent here
911   $self->_dbh($self->_connect(@info));
912
913   $self->_conn_pid($$);
914   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
915
916   $self->_determine_driver;
917
918   # Always set the transaction depth on connect, since
919   #  there is no transaction in progress by definition
920   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
921
922   $self->_run_connection_actions unless $self->{_in_determine_driver};
923
924   $self->_populate_server_info;
925 }
926
927 sub _run_connection_actions {
928   my $self = shift;
929   my @actions;
930
931   push @actions, ( $self->on_connect_call || () );
932   push @actions, $self->_parse_connect_do ('on_connect_do');
933
934   $self->_do_connection_actions(connect_call_ => $_) for @actions;
935 }
936
937 sub _populate_server_info {
938   my $self = shift;
939   my %info;
940
941   my $dbms_ver = eval { local $@; $self->_get_dbh->get_info(18) };
942
943   if (defined $dbms_ver) {
944     $info{dbms_ver} = $dbms_ver;
945
946     my @verparts = split /\./, $dbms_ver;
947     $info{dbms_ver_normalized} = sprintf "%d.%03d%03d", @verparts;
948   }
949
950   $self->__server_info(\%info);
951
952   return \%info;
953 }
954
955 sub _server_info {
956   my $self = shift;
957
958   $self->_get_dbh;
959
960   return $self->__server_info(@_);
961 }
962
963 sub _determine_driver {
964   my ($self) = @_;
965
966   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
967     my $started_connected = 0;
968     local $self->{_in_determine_driver} = 1;
969
970     if (ref($self) eq __PACKAGE__) {
971       my $driver;
972       if ($self->_dbh) { # we are connected
973         $driver = $self->_dbh->{Driver}{Name};
974         $started_connected = 1;
975       } else {
976         # if connect_info is a CODEREF, we have no choice but to connect
977         if (ref $self->_dbi_connect_info->[0] &&
978             Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
979           $self->_populate_dbh;
980           $driver = $self->_dbh->{Driver}{Name};
981         }
982         else {
983           # try to use dsn to not require being connected, the driver may still
984           # force a connection in _rebless to determine version
985           # (dsn may not be supplied at all if all we do is make a mock-schema)
986           my $dsn = $self->_dbi_connect_info->[0] || '';
987           ($driver) = $dsn =~ /dbi:([^:]+):/i;
988         }
989       }
990
991       if ($driver) {
992         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
993         if ($self->load_optional_class($storage_class)) {
994           mro::set_mro($storage_class, 'c3');
995           bless $self, $storage_class;
996           $self->_rebless();
997         }
998       }
999     }
1000
1001     $self->_driver_determined(1);
1002
1003     $self->_init; # run driver-specific initializations
1004
1005     $self->_run_connection_actions
1006         if !$started_connected && defined $self->_dbh;
1007   }
1008 }
1009
1010 sub _do_connection_actions {
1011   my $self          = shift;
1012   my $method_prefix = shift;
1013   my $call          = shift;
1014
1015   if (not ref($call)) {
1016     my $method = $method_prefix . $call;
1017     $self->$method(@_);
1018   } elsif (ref($call) eq 'CODE') {
1019     $self->$call(@_);
1020   } elsif (ref($call) eq 'ARRAY') {
1021     if (ref($call->[0]) ne 'ARRAY') {
1022       $self->_do_connection_actions($method_prefix, $_) for @$call;
1023     } else {
1024       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1025     }
1026   } else {
1027     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1028   }
1029
1030   return $self;
1031 }
1032
1033 sub connect_call_do_sql {
1034   my $self = shift;
1035   $self->_do_query(@_);
1036 }
1037
1038 sub disconnect_call_do_sql {
1039   my $self = shift;
1040   $self->_do_query(@_);
1041 }
1042
1043 # override in db-specific backend when necessary
1044 sub connect_call_datetime_setup { 1 }
1045
1046 sub _do_query {
1047   my ($self, $action) = @_;
1048
1049   if (ref $action eq 'CODE') {
1050     $action = $action->($self);
1051     $self->_do_query($_) foreach @$action;
1052   }
1053   else {
1054     # Most debuggers expect ($sql, @bind), so we need to exclude
1055     # the attribute hash which is the second argument to $dbh->do
1056     # furthermore the bind values are usually to be presented
1057     # as named arrayref pairs, so wrap those here too
1058     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1059     my $sql = shift @do_args;
1060     my $attrs = shift @do_args;
1061     my @bind = map { [ undef, $_ ] } @do_args;
1062
1063     $self->_query_start($sql, @bind);
1064     $self->_get_dbh->do($sql, $attrs, @do_args);
1065     $self->_query_end($sql, @bind);
1066   }
1067
1068   return $self;
1069 }
1070
1071 sub _connect {
1072   my ($self, @info) = @_;
1073
1074   $self->throw_exception("You failed to provide any connection info")
1075     if !@info;
1076
1077   my ($old_connect_via, $dbh);
1078
1079   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1080     $old_connect_via = $DBI::connect_via;
1081     $DBI::connect_via = 'connect';
1082   }
1083
1084   eval {
1085     if(ref $info[0] eq 'CODE') {
1086        $dbh = $info[0]->();
1087     }
1088     else {
1089        $dbh = DBI->connect(@info);
1090     }
1091
1092     if($dbh && !$self->unsafe) {
1093       my $weak_self = $self;
1094       Scalar::Util::weaken($weak_self);
1095       $dbh->{HandleError} = sub {
1096           if ($weak_self) {
1097             $weak_self->throw_exception("DBI Exception: $_[0]");
1098           }
1099           else {
1100             # the handler may be invoked by something totally out of
1101             # the scope of DBIC
1102             croak ("DBI Exception: $_[0]");
1103           }
1104       };
1105       $dbh->{ShowErrorStatement} = 1;
1106       $dbh->{RaiseError} = 1;
1107       $dbh->{PrintError} = 0;
1108     }
1109   };
1110
1111   $DBI::connect_via = $old_connect_via if $old_connect_via;
1112
1113   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
1114     if !$dbh || $@;
1115
1116   $self->_dbh_autocommit($dbh->{AutoCommit});
1117
1118   $dbh;
1119 }
1120
1121 sub svp_begin {
1122   my ($self, $name) = @_;
1123
1124   $name = $self->_svp_generate_name
1125     unless defined $name;
1126
1127   $self->throw_exception ("You can't use savepoints outside a transaction")
1128     if $self->{transaction_depth} == 0;
1129
1130   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1131     unless $self->can('_svp_begin');
1132
1133   push @{ $self->{savepoints} }, $name;
1134
1135   $self->debugobj->svp_begin($name) if $self->debug;
1136
1137   return $self->_svp_begin($name);
1138 }
1139
1140 sub svp_release {
1141   my ($self, $name) = @_;
1142
1143   $self->throw_exception ("You can't use savepoints outside a transaction")
1144     if $self->{transaction_depth} == 0;
1145
1146   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1147     unless $self->can('_svp_release');
1148
1149   if (defined $name) {
1150     $self->throw_exception ("Savepoint '$name' does not exist")
1151       unless grep { $_ eq $name } @{ $self->{savepoints} };
1152
1153     # Dig through the stack until we find the one we are releasing.  This keeps
1154     # the stack up to date.
1155     my $svp;
1156
1157     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1158   } else {
1159     $name = pop @{ $self->{savepoints} };
1160   }
1161
1162   $self->debugobj->svp_release($name) if $self->debug;
1163
1164   return $self->_svp_release($name);
1165 }
1166
1167 sub svp_rollback {
1168   my ($self, $name) = @_;
1169
1170   $self->throw_exception ("You can't use savepoints outside a transaction")
1171     if $self->{transaction_depth} == 0;
1172
1173   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1174     unless $self->can('_svp_rollback');
1175
1176   if (defined $name) {
1177       # If they passed us a name, verify that it exists in the stack
1178       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1179           $self->throw_exception("Savepoint '$name' does not exist!");
1180       }
1181
1182       # Dig through the stack until we find the one we are releasing.  This keeps
1183       # the stack up to date.
1184       while(my $s = pop(@{ $self->{savepoints} })) {
1185           last if($s eq $name);
1186       }
1187       # Add the savepoint back to the stack, as a rollback doesn't remove the
1188       # named savepoint, only everything after it.
1189       push(@{ $self->{savepoints} }, $name);
1190   } else {
1191       # We'll assume they want to rollback to the last savepoint
1192       $name = $self->{savepoints}->[-1];
1193   }
1194
1195   $self->debugobj->svp_rollback($name) if $self->debug;
1196
1197   return $self->_svp_rollback($name);
1198 }
1199
1200 sub _svp_generate_name {
1201     my ($self) = @_;
1202
1203     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1204 }
1205
1206 sub txn_begin {
1207   my $self = shift;
1208
1209   # this means we have not yet connected and do not know the AC status
1210   # (e.g. coderef $dbh)
1211   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1212
1213   if($self->{transaction_depth} == 0) {
1214     $self->debugobj->txn_begin()
1215       if $self->debug;
1216     $self->_dbh_begin_work;
1217   }
1218   elsif ($self->auto_savepoint) {
1219     $self->svp_begin;
1220   }
1221   $self->{transaction_depth}++;
1222 }
1223
1224 sub _dbh_begin_work {
1225   my $self = shift;
1226
1227   # if the user is utilizing txn_do - good for him, otherwise we need to
1228   # ensure that the $dbh is healthy on BEGIN.
1229   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1230   # will be replaced by a failure of begin_work itself (which will be
1231   # then retried on reconnect)
1232   if ($self->{_in_dbh_do}) {
1233     $self->_dbh->begin_work;
1234   } else {
1235     $self->dbh_do(sub { $_[1]->begin_work });
1236   }
1237 }
1238
1239 sub txn_commit {
1240   my $self = shift;
1241   if ($self->{transaction_depth} == 1) {
1242     $self->debugobj->txn_commit()
1243       if ($self->debug);
1244     $self->_dbh_commit;
1245     $self->{transaction_depth} = 0
1246       if $self->_dbh_autocommit;
1247   }
1248   elsif($self->{transaction_depth} > 1) {
1249     $self->{transaction_depth}--;
1250     $self->svp_release
1251       if $self->auto_savepoint;
1252   }
1253 }
1254
1255 sub _dbh_commit {
1256   my $self = shift;
1257   my $dbh  = $self->_dbh
1258     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1259   $dbh->commit;
1260 }
1261
1262 sub txn_rollback {
1263   my $self = shift;
1264   my $dbh = $self->_dbh;
1265   eval {
1266     if ($self->{transaction_depth} == 1) {
1267       $self->debugobj->txn_rollback()
1268         if ($self->debug);
1269       $self->{transaction_depth} = 0
1270         if $self->_dbh_autocommit;
1271       $self->_dbh_rollback;
1272     }
1273     elsif($self->{transaction_depth} > 1) {
1274       $self->{transaction_depth}--;
1275       if ($self->auto_savepoint) {
1276         $self->svp_rollback;
1277         $self->svp_release;
1278       }
1279     }
1280     else {
1281       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1282     }
1283   };
1284   if ($@) {
1285     my $error = $@;
1286     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1287     $error =~ /$exception_class/ and $self->throw_exception($error);
1288     # ensure that a failed rollback resets the transaction depth
1289     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1290     $self->throw_exception($error);
1291   }
1292 }
1293
1294 sub _dbh_rollback {
1295   my $self = shift;
1296   my $dbh  = $self->_dbh
1297     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1298   $dbh->rollback;
1299 }
1300
1301 # This used to be the top-half of _execute.  It was split out to make it
1302 #  easier to override in NoBindVars without duping the rest.  It takes up
1303 #  all of _execute's args, and emits $sql, @bind.
1304 sub _prep_for_execute {
1305   my ($self, $op, $extra_bind, $ident, $args) = @_;
1306
1307   if( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
1308     $ident = $ident->from();
1309   }
1310
1311   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1312
1313   unshift(@bind,
1314     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1315       if $extra_bind;
1316   return ($sql, \@bind);
1317 }
1318
1319
1320 sub _fix_bind_params {
1321     my ($self, @bind) = @_;
1322
1323     ### Turn @bind from something like this:
1324     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1325     ### to this:
1326     ###   ( "'1'", "'1'", "'3'" )
1327     return
1328         map {
1329             if ( defined( $_ && $_->[1] ) ) {
1330                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1331             }
1332             else { q{'NULL'}; }
1333         } @bind;
1334 }
1335
1336 sub _query_start {
1337     my ( $self, $sql, @bind ) = @_;
1338
1339     if ( $self->debug ) {
1340         @bind = $self->_fix_bind_params(@bind);
1341
1342         $self->debugobj->query_start( $sql, @bind );
1343     }
1344 }
1345
1346 sub _query_end {
1347     my ( $self, $sql, @bind ) = @_;
1348
1349     if ( $self->debug ) {
1350         @bind = $self->_fix_bind_params(@bind);
1351         $self->debugobj->query_end( $sql, @bind );
1352     }
1353 }
1354
1355 sub _dbh_execute {
1356   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1357
1358   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1359
1360   $self->_query_start( $sql, @$bind );
1361
1362   my $sth = $self->sth($sql,$op);
1363
1364   my $placeholder_index = 1;
1365
1366   foreach my $bound (@$bind) {
1367     my $attributes = {};
1368     my($column_name, @data) = @$bound;
1369
1370     if ($bind_attributes) {
1371       $attributes = $bind_attributes->{$column_name}
1372       if defined $bind_attributes->{$column_name};
1373     }
1374
1375     foreach my $data (@data) {
1376       my $ref = ref $data;
1377       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1378
1379       $sth->bind_param($placeholder_index, $data, $attributes);
1380       $placeholder_index++;
1381     }
1382   }
1383
1384   # Can this fail without throwing an exception anyways???
1385   my $rv = $sth->execute();
1386   $self->throw_exception($sth->errstr) if !$rv;
1387
1388   $self->_query_end( $sql, @$bind );
1389
1390   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1391 }
1392
1393 sub _execute {
1394     my $self = shift;
1395     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1396 }
1397
1398 sub _prefetch_insert_auto_nextvals {
1399   my ($self, $source, $to_insert) = @_;
1400
1401   my $upd = {};
1402
1403   foreach my $col ( $source->columns ) {
1404     if ( !defined $to_insert->{$col} ) {
1405       my $col_info = $source->column_info($col);
1406
1407       if ( $col_info->{auto_nextval} ) {
1408         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1409           'nextval',
1410           $col_info->{sequence} ||=
1411             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1412         );
1413       }
1414     }
1415   }
1416
1417   return $upd;
1418 }
1419
1420 sub insert {
1421   my $self = shift;
1422   my ($source, $to_insert, $opts) = @_;
1423
1424   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1425
1426   my $bind_attributes = $self->source_bind_attributes($source);
1427
1428   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1429
1430   if ($opts->{returning}) {
1431     my @ret_cols = @{$opts->{returning}};
1432
1433     my @ret_vals = eval {
1434       local $SIG{__WARN__} = sub {};
1435       my @r = $sth->fetchrow_array;
1436       $sth->finish;
1437       @r;
1438     };
1439
1440     my %ret;
1441     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1442
1443     $updated_cols = {
1444       %$updated_cols,
1445       %ret,
1446     };
1447   }
1448
1449   return $updated_cols;
1450 }
1451
1452 ## Currently it is assumed that all values passed will be "normal", i.e. not
1453 ## scalar refs, or at least, all the same type as the first set, the statement is
1454 ## only prepped once.
1455 sub insert_bulk {
1456   my ($self, $source, $cols, $data) = @_;
1457
1458   my %colvalues;
1459   @colvalues{@$cols} = (0..$#$cols);
1460
1461   for my $i (0..$#$cols) {
1462     my $first_val = $data->[0][$i];
1463     next unless ref $first_val eq 'SCALAR';
1464
1465     $colvalues{ $cols->[$i] } = $first_val;
1466   }
1467
1468   # check for bad data and stringify stringifiable objects
1469   my $bad_slice = sub {
1470     my ($msg, $col_idx, $slice_idx) = @_;
1471     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1472       $msg,
1473       $cols->[$col_idx],
1474       do {
1475         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1476         Data::Dumper::Concise::Dumper({
1477           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1478         }),
1479       }
1480     );
1481   };
1482
1483   for my $datum_idx (0..$#$data) {
1484     my $datum = $data->[$datum_idx];
1485
1486     for my $col_idx (0..$#$cols) {
1487       my $val            = $datum->[$col_idx];
1488       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1489       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1490
1491       if ($is_literal_sql) {
1492         if (not ref $val) {
1493           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1494         }
1495         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1496           $bad_slice->("$reftype reference found where literal SQL expected",
1497             $col_idx, $datum_idx);
1498         }
1499         elsif ($$val ne $$sqla_bind){
1500           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1501             $col_idx, $datum_idx);
1502         }
1503       }
1504       elsif (my $reftype = ref $val) {
1505         require overload;
1506         if (overload::Method($val, '""')) {
1507           $datum->[$col_idx] = "".$val;
1508         }
1509         else {
1510           $bad_slice->("$reftype reference found where bind expected",
1511             $col_idx, $datum_idx);
1512         }
1513       }
1514     }
1515   }
1516
1517   my ($sql, $bind) = $self->_prep_for_execute (
1518     'insert', undef, $source, [\%colvalues]
1519   );
1520   my @bind = @$bind;
1521
1522   my $empty_bind = 1 if (not @bind) &&
1523     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1524
1525   if ((not @bind) && (not $empty_bind)) {
1526     $self->throw_exception(
1527       'Cannot insert_bulk without support for placeholders'
1528     );
1529   }
1530
1531   # neither _execute_array, nor _execute_inserts_with_no_binds are
1532   # atomic (even if _execute _array is a single call). Thus a safety
1533   # scope guard
1534   my $guard = $self->txn_scope_guard;
1535
1536   $self->_query_start( $sql, ['__BULK__'] );
1537   my $sth = $self->sth($sql);
1538   my $rv = do {
1539     if ($empty_bind) {
1540       # bind_param_array doesn't work if there are no binds
1541       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1542     }
1543     else {
1544 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1545       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1546     }
1547   };
1548
1549   $self->_query_end( $sql, ['__BULK__'] );
1550
1551   $guard->commit;
1552
1553   return (wantarray ? ($rv, $sth, @bind) : $rv);
1554 }
1555
1556 sub _execute_array {
1557   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1558
1559   ## This must be an arrayref, else nothing works!
1560   my $tuple_status = [];
1561
1562   ## Get the bind_attributes, if any exist
1563   my $bind_attributes = $self->source_bind_attributes($source);
1564
1565   ## Bind the values and execute
1566   my $placeholder_index = 1;
1567
1568   foreach my $bound (@$bind) {
1569
1570     my $attributes = {};
1571     my ($column_name, $data_index) = @$bound;
1572
1573     if( $bind_attributes ) {
1574       $attributes = $bind_attributes->{$column_name}
1575       if defined $bind_attributes->{$column_name};
1576     }
1577
1578     my @data = map { $_->[$data_index] } @$data;
1579
1580     $sth->bind_param_array(
1581       $placeholder_index,
1582       [@data],
1583       (%$attributes ?  $attributes : ()),
1584     );
1585     $placeholder_index++;
1586   }
1587
1588   my $rv = eval {
1589     $self->_dbh_execute_array($sth, $tuple_status, @extra);
1590   };
1591   my $err = $@ || $sth->errstr;
1592
1593 # Statement must finish even if there was an exception.
1594   eval { $sth->finish };
1595   $err = $@ unless $err;
1596
1597   if ($err) {
1598     my $i = 0;
1599     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1600
1601     $self->throw_exception("Unexpected populate error: $err")
1602       if ($i > $#$tuple_status);
1603
1604     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1605       ($tuple_status->[$i][1] || $err),
1606       Data::Dumper::Concise::Dumper({
1607         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
1608       }),
1609     );
1610   }
1611   return $rv;
1612 }
1613
1614 sub _dbh_execute_array {
1615     my ($self, $sth, $tuple_status, @extra) = @_;
1616
1617     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1618 }
1619
1620 sub _dbh_execute_inserts_with_no_binds {
1621   my ($self, $sth, $count) = @_;
1622
1623   eval {
1624     my $dbh = $self->_get_dbh;
1625     local $dbh->{RaiseError} = 1;
1626     local $dbh->{PrintError} = 0;
1627
1628     $sth->execute foreach 1..$count;
1629   };
1630   my $exception = $@;
1631
1632 # Make sure statement is finished even if there was an exception.
1633   eval { $sth->finish };
1634   $exception = $@ unless $exception;
1635
1636   $self->throw_exception($exception) if $exception;
1637
1638   return $count;
1639 }
1640
1641 sub update {
1642   my ($self, $source, @args) = @_;
1643
1644   my $bind_attrs = $self->source_bind_attributes($source);
1645
1646   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1647 }
1648
1649
1650 sub delete {
1651   my ($self, $source, @args) = @_;
1652
1653   my $bind_attrs = $self->source_bind_attributes($source);
1654
1655   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1656 }
1657
1658 # We were sent here because the $rs contains a complex search
1659 # which will require a subquery to select the correct rows
1660 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1661 #
1662 # Generating a single PK column subquery is trivial and supported
1663 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1664 # Look at _multipk_update_delete()
1665 sub _subq_update_delete {
1666   my $self = shift;
1667   my ($rs, $op, $values) = @_;
1668
1669   my $rsrc = $rs->result_source;
1670
1671   # quick check if we got a sane rs on our hands
1672   my @pcols = $rsrc->_pri_cols;
1673
1674   my $sel = $rs->_resolved_attrs->{select};
1675   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1676
1677   if (
1678       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1679         ne
1680       join ("\x00", sort @$sel )
1681   ) {
1682     $self->throw_exception (
1683       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1684     );
1685   }
1686
1687   if (@pcols == 1) {
1688     return $self->$op (
1689       $rsrc,
1690       $op eq 'update' ? $values : (),
1691       { $pcols[0] => { -in => $rs->as_query } },
1692     );
1693   }
1694
1695   else {
1696     return $self->_multipk_update_delete (@_);
1697   }
1698 }
1699
1700 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1701 # resultset update/delete involving subqueries. So by default resort
1702 # to simple (and inefficient) delete_all style per-row opearations,
1703 # while allowing specific storages to override this with a faster
1704 # implementation.
1705 #
1706 sub _multipk_update_delete {
1707   return shift->_per_row_update_delete (@_);
1708 }
1709
1710 # This is the default loop used to delete/update rows for multi PK
1711 # resultsets, and used by mysql exclusively (because it can't do anything
1712 # else).
1713 #
1714 # We do not use $row->$op style queries, because resultset update/delete
1715 # is not expected to cascade (this is what delete_all/update_all is for).
1716 #
1717 # There should be no race conditions as the entire operation is rolled
1718 # in a transaction.
1719 #
1720 sub _per_row_update_delete {
1721   my $self = shift;
1722   my ($rs, $op, $values) = @_;
1723
1724   my $rsrc = $rs->result_source;
1725   my @pcols = $rsrc->_pri_cols;
1726
1727   my $guard = $self->txn_scope_guard;
1728
1729   # emulate the return value of $sth->execute for non-selects
1730   my $row_cnt = '0E0';
1731
1732   my $subrs_cur = $rs->cursor;
1733   my @all_pk = $subrs_cur->all;
1734   for my $pks ( @all_pk) {
1735
1736     my $cond;
1737     for my $i (0.. $#pcols) {
1738       $cond->{$pcols[$i]} = $pks->[$i];
1739     }
1740
1741     $self->$op (
1742       $rsrc,
1743       $op eq 'update' ? $values : (),
1744       $cond,
1745     );
1746
1747     $row_cnt++;
1748   }
1749
1750   $guard->commit;
1751
1752   return $row_cnt;
1753 }
1754
1755 sub _select {
1756   my $self = shift;
1757
1758   # localization is neccessary as
1759   # 1) there is no infrastructure to pass this around before SQLA2
1760   # 2) _select_args sets it and _prep_for_execute consumes it
1761   my $sql_maker = $self->sql_maker;
1762   local $sql_maker->{_dbic_rs_attrs};
1763
1764   return $self->_execute($self->_select_args(@_));
1765 }
1766
1767 sub _select_args_to_query {
1768   my $self = shift;
1769
1770   # localization is neccessary as
1771   # 1) there is no infrastructure to pass this around before SQLA2
1772   # 2) _select_args sets it and _prep_for_execute consumes it
1773   my $sql_maker = $self->sql_maker;
1774   local $sql_maker->{_dbic_rs_attrs};
1775
1776   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
1777   #  = $self->_select_args($ident, $select, $cond, $attrs);
1778   my ($op, $bind, $ident, $bind_attrs, @args) =
1779     $self->_select_args(@_);
1780
1781   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
1782   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1783   $prepared_bind ||= [];
1784
1785   return wantarray
1786     ? ($sql, $prepared_bind, $bind_attrs)
1787     : \[ "($sql)", @$prepared_bind ]
1788   ;
1789 }
1790
1791 sub _select_args {
1792   my ($self, $ident, $select, $where, $attrs) = @_;
1793
1794   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1795
1796   my $sql_maker = $self->sql_maker;
1797   $sql_maker->{_dbic_rs_attrs} = {
1798     %$attrs,
1799     select => $select,
1800     from => $ident,
1801     where => $where,
1802     $rs_alias && $alias2source->{$rs_alias}
1803       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
1804       : ()
1805     ,
1806   };
1807
1808   # calculate bind_attrs before possible $ident mangling
1809   my $bind_attrs = {};
1810   for my $alias (keys %$alias2source) {
1811     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1812     for my $col (keys %$bindtypes) {
1813
1814       my $fqcn = join ('.', $alias, $col);
1815       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1816
1817       # Unqialified column names are nice, but at the same time can be
1818       # rather ambiguous. What we do here is basically go along with
1819       # the loop, adding an unqualified column slot to $bind_attrs,
1820       # alongside the fully qualified name. As soon as we encounter
1821       # another column by that name (which would imply another table)
1822       # we unset the unqualified slot and never add any info to it
1823       # to avoid erroneous type binding. If this happens the users
1824       # only choice will be to fully qualify his column name
1825
1826       if (exists $bind_attrs->{$col}) {
1827         $bind_attrs->{$col} = {};
1828       }
1829       else {
1830         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
1831       }
1832     }
1833   }
1834
1835   # adjust limits
1836   if (
1837     $attrs->{software_limit}
1838       ||
1839     $sql_maker->_default_limit_syntax eq "GenericSubQ"
1840   ) {
1841     $attrs->{software_limit} = 1;
1842   }
1843   else {
1844     $self->throw_exception("rows attribute must be positive if present")
1845       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1846
1847     # MySQL actually recommends this approach.  I cringe.
1848     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
1849   }
1850
1851   my @limit;
1852
1853   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
1854   # storage, unless software limit was requested
1855   if (
1856     #limited has_many
1857     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
1858        ||
1859     # limited prefetch with RNO subqueries
1860     (
1861       $attrs->{rows}
1862         &&
1863       $sql_maker->limit_dialect eq 'RowNumberOver'
1864         &&
1865       $attrs->{_prefetch_select}
1866         &&
1867       @{$attrs->{_prefetch_select}}
1868     )
1869       ||
1870     # grouped prefetch
1871     ( $attrs->{group_by}
1872         &&
1873       @{$attrs->{group_by}}
1874         &&
1875       $attrs->{_prefetch_select}
1876         &&
1877       @{$attrs->{_prefetch_select}}
1878     )
1879   ) {
1880     ($ident, $select, $where, $attrs)
1881       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
1882   }
1883
1884   elsif (
1885     ($attrs->{rows} || $attrs->{offset})
1886       &&
1887     $sql_maker->limit_dialect eq 'RowNumberOver'
1888       &&
1889     (ref $ident eq 'ARRAY' && @$ident > 1)  # indicates a join
1890       &&
1891     scalar $self->_parse_order_by ($attrs->{order_by})
1892   ) {
1893     # the RNO limit dialect above mangles the SQL such that the join gets lost
1894     # wrap a subquery here
1895
1896     push @limit, delete @{$attrs}{qw/rows offset/};
1897
1898     my $subq = $self->_select_args_to_query (
1899       $ident,
1900       $select,
1901       $where,
1902       $attrs,
1903     );
1904
1905     $ident = {
1906       -alias => $attrs->{alias},
1907       -source_handle => $ident->[0]{-source_handle},
1908       $attrs->{alias} => $subq,
1909     };
1910
1911     # all part of the subquery now
1912     delete @{$attrs}{qw/order_by group_by having/};
1913     $where = undef;
1914   }
1915
1916   elsif (! $attrs->{software_limit} ) {
1917     push @limit, $attrs->{rows}, $attrs->{offset};
1918   }
1919
1920   # try to simplify the joinmap further (prune unreferenced type-single joins)
1921   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
1922
1923 ###
1924   # This would be the point to deflate anything found in $where
1925   # (and leave $attrs->{bind} intact). Problem is - inflators historically
1926   # expect a row object. And all we have is a resultsource (it is trivial
1927   # to extract deflator coderefs via $alias2source above).
1928   #
1929   # I don't see a way forward other than changing the way deflators are
1930   # invoked, and that's just bad...
1931 ###
1932
1933   my $order = { map
1934     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
1935     (qw/order_by group_by having/ )
1936   };
1937
1938   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
1939 }
1940
1941 # Returns a counting SELECT for a simple count
1942 # query. Abstracted so that a storage could override
1943 # this to { count => 'firstcol' } or whatever makes
1944 # sense as a performance optimization
1945 sub _count_select {
1946   #my ($self, $source, $rs_attrs) = @_;
1947   return { count => '*' };
1948 }
1949
1950 # Returns a SELECT which will end up in the subselect
1951 # There may or may not be a group_by, as the subquery
1952 # might have been called to accomodate a limit
1953 #
1954 # Most databases would be happy with whatever ends up
1955 # here, but some choke in various ways.
1956 #
1957 sub _subq_count_select {
1958   my ($self, $source, $rs_attrs) = @_;
1959
1960   if (my $groupby = $rs_attrs->{group_by}) {
1961
1962     my $avail_columns = $self->_resolve_column_info ($rs_attrs->{from});
1963
1964     my $sel_index;
1965     for my $sel (@{$rs_attrs->{select}}) {
1966       if (ref $sel eq 'HASH' and $sel->{-as}) {
1967         $sel_index->{$sel->{-as}} = $sel;
1968       }
1969     }
1970
1971     my @selection;
1972     for my $g_part (@$groupby) {
1973       if (ref $g_part or $avail_columns->{$g_part}) {
1974         push @selection, $g_part;
1975       }
1976       elsif ($sel_index->{$g_part}) {
1977         push @selection, $sel_index->{$g_part};
1978       }
1979       else {
1980         $self->throw_exception ("group_by criteria '$g_part' not contained within current resultset source(s)");
1981       }
1982     }
1983
1984     return \@selection;
1985   }
1986
1987   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
1988   return @pcols ? \@pcols : [ 1 ];
1989 }
1990
1991 sub source_bind_attributes {
1992   my ($self, $source) = @_;
1993
1994   my $bind_attributes;
1995   foreach my $column ($source->columns) {
1996
1997     my $data_type = $source->column_info($column)->{data_type} || '';
1998     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1999      if $data_type;
2000   }
2001
2002   return $bind_attributes;
2003 }
2004
2005 =head2 select
2006
2007 =over 4
2008
2009 =item Arguments: $ident, $select, $condition, $attrs
2010
2011 =back
2012
2013 Handle a SQL select statement.
2014
2015 =cut
2016
2017 sub select {
2018   my $self = shift;
2019   my ($ident, $select, $condition, $attrs) = @_;
2020   return $self->cursor_class->new($self, \@_, $attrs);
2021 }
2022
2023 sub select_single {
2024   my $self = shift;
2025   my ($rv, $sth, @bind) = $self->_select(@_);
2026   my @row = $sth->fetchrow_array;
2027   my @nextrow = $sth->fetchrow_array if @row;
2028   if(@row && @nextrow) {
2029     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2030   }
2031   # Need to call finish() to work round broken DBDs
2032   $sth->finish();
2033   return @row;
2034 }
2035
2036 =head2 sth
2037
2038 =over 4
2039
2040 =item Arguments: $sql
2041
2042 =back
2043
2044 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2045
2046 =cut
2047
2048 sub _dbh_sth {
2049   my ($self, $dbh, $sql) = @_;
2050
2051   # 3 is the if_active parameter which avoids active sth re-use
2052   my $sth = $self->disable_sth_caching
2053     ? $dbh->prepare($sql)
2054     : $dbh->prepare_cached($sql, {}, 3);
2055
2056   # XXX You would think RaiseError would make this impossible,
2057   #  but apparently that's not true :(
2058   $self->throw_exception($dbh->errstr) if !$sth;
2059
2060   $sth;
2061 }
2062
2063 sub sth {
2064   my ($self, $sql) = @_;
2065   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2066 }
2067
2068 sub _dbh_columns_info_for {
2069   my ($self, $dbh, $table) = @_;
2070
2071   if ($dbh->can('column_info')) {
2072     my %result;
2073     eval {
2074       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2075       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2076       $sth->execute();
2077       while ( my $info = $sth->fetchrow_hashref() ){
2078         my %column_info;
2079         $column_info{data_type}   = $info->{TYPE_NAME};
2080         $column_info{size}      = $info->{COLUMN_SIZE};
2081         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2082         $column_info{default_value} = $info->{COLUMN_DEF};
2083         my $col_name = $info->{COLUMN_NAME};
2084         $col_name =~ s/^\"(.*)\"$/$1/;
2085
2086         $result{$col_name} = \%column_info;
2087       }
2088     };
2089     return \%result if !$@ && scalar keys %result;
2090   }
2091
2092   my %result;
2093   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2094   $sth->execute;
2095   my @columns = @{$sth->{NAME_lc}};
2096   for my $i ( 0 .. $#columns ){
2097     my %column_info;
2098     $column_info{data_type} = $sth->{TYPE}->[$i];
2099     $column_info{size} = $sth->{PRECISION}->[$i];
2100     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2101
2102     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2103       $column_info{data_type} = $1;
2104       $column_info{size}    = $2;
2105     }
2106
2107     $result{$columns[$i]} = \%column_info;
2108   }
2109   $sth->finish;
2110
2111   foreach my $col (keys %result) {
2112     my $colinfo = $result{$col};
2113     my $type_num = $colinfo->{data_type};
2114     my $type_name;
2115     if(defined $type_num && $dbh->can('type_info')) {
2116       my $type_info = $dbh->type_info($type_num);
2117       $type_name = $type_info->{TYPE_NAME} if $type_info;
2118       $colinfo->{data_type} = $type_name if $type_name;
2119     }
2120   }
2121
2122   return \%result;
2123 }
2124
2125 sub columns_info_for {
2126   my ($self, $table) = @_;
2127   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2128 }
2129
2130 =head2 last_insert_id
2131
2132 Return the row id of the last insert.
2133
2134 =cut
2135
2136 sub _dbh_last_insert_id {
2137     my ($self, $dbh, $source, $col) = @_;
2138
2139     my $id = eval { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2140
2141     return $id if defined $id;
2142
2143     my $class = ref $self;
2144     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2145 }
2146
2147 sub last_insert_id {
2148   my $self = shift;
2149   $self->_dbh_last_insert_id ($self->_dbh, @_);
2150 }
2151
2152 =head2 _native_data_type
2153
2154 =over 4
2155
2156 =item Arguments: $type_name
2157
2158 =back
2159
2160 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2161 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2162 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2163
2164 The default implementation returns C<undef>, implement in your Storage driver if
2165 you need this functionality.
2166
2167 Should map types from other databases to the native RDBMS type, for example
2168 C<VARCHAR2> to C<VARCHAR>.
2169
2170 Types with modifiers should map to the underlying data type. For example,
2171 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2172
2173 Composite types should map to the container type, for example
2174 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2175
2176 =cut
2177
2178 sub _native_data_type {
2179   #my ($self, $data_type) = @_;
2180   return undef
2181 }
2182
2183 # Check if placeholders are supported at all
2184 sub _placeholders_supported {
2185   my $self = shift;
2186   my $dbh  = $self->_get_dbh;
2187
2188   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2189   # but it is inaccurate more often than not
2190   eval {
2191     local $dbh->{PrintError} = 0;
2192     local $dbh->{RaiseError} = 1;
2193     $dbh->do('select ?', {}, 1);
2194   };
2195   return $@ ? 0 : 1;
2196 }
2197
2198 # Check if placeholders bound to non-string types throw exceptions
2199 #
2200 sub _typeless_placeholders_supported {
2201   my $self = shift;
2202   my $dbh  = $self->_get_dbh;
2203
2204   eval {
2205     local $dbh->{PrintError} = 0;
2206     local $dbh->{RaiseError} = 1;
2207     # this specifically tests a bind that is NOT a string
2208     $dbh->do('select 1 where 1 = ?', {}, 1);
2209   };
2210   return $@ ? 0 : 1;
2211 }
2212
2213 =head2 sqlt_type
2214
2215 Returns the database driver name.
2216
2217 =cut
2218
2219 sub sqlt_type {
2220   shift->_get_dbh->{Driver}->{Name};
2221 }
2222
2223 =head2 bind_attribute_by_data_type
2224
2225 Given a datatype from column info, returns a database specific bind
2226 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2227 let the database planner just handle it.
2228
2229 Generally only needed for special case column types, like bytea in postgres.
2230
2231 =cut
2232
2233 sub bind_attribute_by_data_type {
2234     return;
2235 }
2236
2237 =head2 is_datatype_numeric
2238
2239 Given a datatype from column_info, returns a boolean value indicating if
2240 the current RDBMS considers it a numeric value. This controls how
2241 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2242 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2243 be performed instead of the usual C<eq>.
2244
2245 =cut
2246
2247 sub is_datatype_numeric {
2248   my ($self, $dt) = @_;
2249
2250   return 0 unless $dt;
2251
2252   return $dt =~ /^ (?:
2253     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2254   ) $/ix;
2255 }
2256
2257
2258 =head2 create_ddl_dir
2259
2260 =over 4
2261
2262 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2263
2264 =back
2265
2266 Creates a SQL file based on the Schema, for each of the specified
2267 database engines in C<\@databases> in the given directory.
2268 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2269
2270 Given a previous version number, this will also create a file containing
2271 the ALTER TABLE statements to transform the previous schema into the
2272 current one. Note that these statements may contain C<DROP TABLE> or
2273 C<DROP COLUMN> statements that can potentially destroy data.
2274
2275 The file names are created using the C<ddl_filename> method below, please
2276 override this method in your schema if you would like a different file
2277 name format. For the ALTER file, the same format is used, replacing
2278 $version in the name with "$preversion-$version".
2279
2280 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2281 The most common value for this would be C<< { add_drop_table => 1 } >>
2282 to have the SQL produced include a C<DROP TABLE> statement for each table
2283 created. For quoting purposes supply C<quote_table_names> and
2284 C<quote_field_names>.
2285
2286 If no arguments are passed, then the following default values are assumed:
2287
2288 =over 4
2289
2290 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2291
2292 =item version    - $schema->schema_version
2293
2294 =item directory  - './'
2295
2296 =item preversion - <none>
2297
2298 =back
2299
2300 By default, C<\%sqlt_args> will have
2301
2302  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2303
2304 merged with the hash passed in. To disable any of those features, pass in a
2305 hashref like the following
2306
2307  { ignore_constraint_names => 0, # ... other options }
2308
2309
2310 WARNING: You are strongly advised to check all SQL files created, before applying
2311 them.
2312
2313 =cut
2314
2315 sub create_ddl_dir {
2316   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2317
2318   unless ($dir) {
2319     carp "No directory given, using ./\n";
2320     $dir = './';
2321   }
2322
2323   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2324
2325   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2326   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2327
2328   my $schema_version = $schema->schema_version || '1.x';
2329   $version ||= $schema_version;
2330
2331   $sqltargs = {
2332     add_drop_table => 1,
2333     ignore_constraint_names => 1,
2334     ignore_index_names => 1,
2335     %{$sqltargs || {}}
2336   };
2337
2338   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2339     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2340   }
2341
2342   my $sqlt = SQL::Translator->new( $sqltargs );
2343
2344   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2345   my $sqlt_schema = $sqlt->translate({ data => $schema })
2346     or $self->throw_exception ($sqlt->error);
2347
2348   foreach my $db (@$databases) {
2349     $sqlt->reset();
2350     $sqlt->{schema} = $sqlt_schema;
2351     $sqlt->producer($db);
2352
2353     my $file;
2354     my $filename = $schema->ddl_filename($db, $version, $dir);
2355     if (-e $filename && ($version eq $schema_version )) {
2356       # if we are dumping the current version, overwrite the DDL
2357       carp "Overwriting existing DDL file - $filename";
2358       unlink($filename);
2359     }
2360
2361     my $output = $sqlt->translate;
2362     if(!$output) {
2363       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2364       next;
2365     }
2366     if(!open($file, ">$filename")) {
2367       $self->throw_exception("Can't open $filename for writing ($!)");
2368       next;
2369     }
2370     print $file $output;
2371     close($file);
2372
2373     next unless ($preversion);
2374
2375     require SQL::Translator::Diff;
2376
2377     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2378     if(!-e $prefilename) {
2379       carp("No previous schema file found ($prefilename)");
2380       next;
2381     }
2382
2383     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2384     if(-e $difffile) {
2385       carp("Overwriting existing diff file - $difffile");
2386       unlink($difffile);
2387     }
2388
2389     my $source_schema;
2390     {
2391       my $t = SQL::Translator->new($sqltargs);
2392       $t->debug( 0 );
2393       $t->trace( 0 );
2394
2395       $t->parser( $db )
2396         or $self->throw_exception ($t->error);
2397
2398       my $out = $t->translate( $prefilename )
2399         or $self->throw_exception ($t->error);
2400
2401       $source_schema = $t->schema;
2402
2403       $source_schema->name( $prefilename )
2404         unless ( $source_schema->name );
2405     }
2406
2407     # The "new" style of producers have sane normalization and can support
2408     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2409     # And we have to diff parsed SQL against parsed SQL.
2410     my $dest_schema = $sqlt_schema;
2411
2412     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2413       my $t = SQL::Translator->new($sqltargs);
2414       $t->debug( 0 );
2415       $t->trace( 0 );
2416
2417       $t->parser( $db )
2418         or $self->throw_exception ($t->error);
2419
2420       my $out = $t->translate( $filename )
2421         or $self->throw_exception ($t->error);
2422
2423       $dest_schema = $t->schema;
2424
2425       $dest_schema->name( $filename )
2426         unless $dest_schema->name;
2427     }
2428
2429     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2430                                                   $dest_schema,   $db,
2431                                                   $sqltargs
2432                                                  );
2433     if(!open $file, ">$difffile") {
2434       $self->throw_exception("Can't write to $difffile ($!)");
2435       next;
2436     }
2437     print $file $diff;
2438     close($file);
2439   }
2440 }
2441
2442 =head2 deployment_statements
2443
2444 =over 4
2445
2446 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2447
2448 =back
2449
2450 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2451
2452 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2453 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2454
2455 C<$directory> is used to return statements from files in a previously created
2456 L</create_ddl_dir> directory and is optional. The filenames are constructed
2457 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2458
2459 If no C<$directory> is specified then the statements are constructed on the
2460 fly using L<SQL::Translator> and C<$version> is ignored.
2461
2462 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2463
2464 =cut
2465
2466 sub deployment_statements {
2467   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2468   $type ||= $self->sqlt_type;
2469   $version ||= $schema->schema_version || '1.x';
2470   $dir ||= './';
2471   my $filename = $schema->ddl_filename($type, $version, $dir);
2472   if(-f $filename)
2473   {
2474       my $file;
2475       open($file, "<$filename")
2476         or $self->throw_exception("Can't open $filename ($!)");
2477       my @rows = <$file>;
2478       close($file);
2479       return join('', @rows);
2480   }
2481
2482   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2483     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2484   }
2485
2486   # sources needs to be a parser arg, but for simplicty allow at top level
2487   # coming in
2488   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2489       if exists $sqltargs->{sources};
2490
2491   my $tr = SQL::Translator->new(
2492     producer => "SQL::Translator::Producer::${type}",
2493     %$sqltargs,
2494     parser => 'SQL::Translator::Parser::DBIx::Class',
2495     data => $schema,
2496   );
2497
2498   my @ret;
2499   my $wa = wantarray;
2500   if ($wa) {
2501     @ret = $tr->translate;
2502   }
2503   else {
2504     $ret[0] = $tr->translate;
2505   }
2506
2507   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2508     unless (@ret && defined $ret[0]);
2509
2510   return $wa ? @ret : $ret[0];
2511 }
2512
2513 sub deploy {
2514   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2515   my $deploy = sub {
2516     my $line = shift;
2517     return if($line =~ /^--/);
2518     return if(!$line);
2519     # next if($line =~ /^DROP/m);
2520     return if($line =~ /^BEGIN TRANSACTION/m);
2521     return if($line =~ /^COMMIT/m);
2522     return if $line =~ /^\s+$/; # skip whitespace only
2523     $self->_query_start($line);
2524     eval {
2525       # do a dbh_do cycle here, as we need some error checking in
2526       # place (even though we will ignore errors)
2527       $self->dbh_do (sub { $_[1]->do($line) });
2528     };
2529     if ($@) {
2530       carp qq{$@ (running "${line}")};
2531     }
2532     $self->_query_end($line);
2533   };
2534   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2535   if (@statements > 1) {
2536     foreach my $statement (@statements) {
2537       $deploy->( $statement );
2538     }
2539   }
2540   elsif (@statements == 1) {
2541     foreach my $line ( split(";\n", $statements[0])) {
2542       $deploy->( $line );
2543     }
2544   }
2545 }
2546
2547 =head2 datetime_parser
2548
2549 Returns the datetime parser class
2550
2551 =cut
2552
2553 sub datetime_parser {
2554   my $self = shift;
2555   return $self->{datetime_parser} ||= do {
2556     $self->build_datetime_parser(@_);
2557   };
2558 }
2559
2560 =head2 datetime_parser_type
2561
2562 Defines (returns) the datetime parser class - currently hardwired to
2563 L<DateTime::Format::MySQL>
2564
2565 =cut
2566
2567 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2568
2569 =head2 build_datetime_parser
2570
2571 See L</datetime_parser>
2572
2573 =cut
2574
2575 sub build_datetime_parser {
2576   my $self = shift;
2577   my $type = $self->datetime_parser_type(@_);
2578   $self->ensure_class_loaded ($type);
2579   return $type;
2580 }
2581
2582
2583 =head2 is_replicating
2584
2585 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2586 replicate from a master database.  Default is undef, which is the result
2587 returned by databases that don't support replication.
2588
2589 =cut
2590
2591 sub is_replicating {
2592     return;
2593
2594 }
2595
2596 =head2 lag_behind_master
2597
2598 Returns a number that represents a certain amount of lag behind a master db
2599 when a given storage is replicating.  The number is database dependent, but
2600 starts at zero and increases with the amount of lag. Default in undef
2601
2602 =cut
2603
2604 sub lag_behind_master {
2605     return;
2606 }
2607
2608 =head2 relname_to_table_alias
2609
2610 =over 4
2611
2612 =item Arguments: $relname, $join_count
2613
2614 =back
2615
2616 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2617 queries.
2618
2619 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2620 way these aliases are named.
2621
2622 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2623 otherwise C<"$relname">.
2624
2625 =cut
2626
2627 sub relname_to_table_alias {
2628   my ($self, $relname, $join_count) = @_;
2629
2630   my $alias = ($join_count && $join_count > 1 ?
2631     join('_', $relname, $join_count) : $relname);
2632
2633   return $alias;
2634 }
2635
2636 sub DESTROY {
2637   my $self = shift;
2638
2639   $self->_verify_pid if $self->_dbh;
2640
2641   # some databases need this to stop spewing warnings
2642   if (my $dbh = $self->_dbh) {
2643     local $@;
2644     eval {
2645       %{ $dbh->{CachedKids} } = ();
2646       $dbh->disconnect;
2647     };
2648   }
2649
2650   $self->_dbh(undef);
2651 }
2652
2653 1;
2654
2655 =head1 USAGE NOTES
2656
2657 =head2 DBIx::Class and AutoCommit
2658
2659 DBIx::Class can do some wonderful magic with handling exceptions,
2660 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2661 (the default) combined with C<txn_do> for transaction support.
2662
2663 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2664 in an assumed transaction between commits, and you're telling us you'd
2665 like to manage that manually.  A lot of the magic protections offered by
2666 this module will go away.  We can't protect you from exceptions due to database
2667 disconnects because we don't know anything about how to restart your
2668 transactions.  You're on your own for handling all sorts of exceptional
2669 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2670 be with raw DBI.
2671
2672
2673 =head1 AUTHORS
2674
2675 Matt S. Trout <mst@shadowcatsystems.co.uk>
2676
2677 Andy Grundman <andy@hybridized.org>
2678
2679 =head1 LICENSE
2680
2681 You may distribute this code under the same terms as Perl itself.
2682
2683 =cut