6a85b2c635f2fea805b648d376c75389ab08726a
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use DBIx::Class::Storage::Statistics;
14 use Scalar::Util qw/refaddr weaken reftype blessed/;
15 use Data::Dumper::Concise 'Dumper';
16 use Sub::Name 'subname';
17 use Try::Tiny;
18 use File::Path 'make_path';
19 use namespace::clean;
20
21
22 # default cursor class, overridable in connect_info attributes
23 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
24
25 __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class sql_limit_dialect/);
26 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/
29   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
30   _dbh _dbh_details _conn_pid _conn_tid _sql_maker _sql_maker_opts
31   transaction_depth _dbh_autocommit  savepoints
32 /);
33
34 # the values for these accessors are picked out (and deleted) from
35 # the attribute hashref passed to connect_info
36 my @storage_options = qw/
37   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
38   disable_sth_caching unsafe auto_savepoint
39 /;
40 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
41
42
43 # capability definitions, using a 2-tiered accessor system
44 # The rationale is:
45 #
46 # A driver/user may define _use_X, which blindly without any checks says:
47 # "(do not) use this capability", (use_dbms_capability is an "inherited"
48 # type accessor)
49 #
50 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
51 # accessor, which in turn calls _determine_supports_X, and stores the return
52 # in a special slot on the storage object, which is wiped every time a $dbh
53 # reconnection takes place (it is not guaranteed that upon reconnection we
54 # will get the same rdbms version). _determine_supports_X does not need to
55 # exist on a driver, as we ->can for it before calling.
56
57 my @capabilities = (qw/insert_returning placeholders typeless_placeholders/);
58 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
59 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } @capabilities );
60
61
62 # Each of these methods need _determine_driver called before itself
63 # in order to function reliably. This is a purely DRY optimization
64 #
65 # get_(use)_dbms_capability need to be called on the correct Storage
66 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
67 # _determine_supports_X which obv. needs a correct driver as well
68 my @rdbms_specific_methods = qw/
69   deployment_statements
70   sqlt_type
71   sql_maker
72   build_datetime_parser
73   datetime_parser_type
74
75   insert
76   insert_bulk
77   update
78   delete
79   select
80   select_single
81
82   get_use_dbms_capability
83   get_dbms_capability
84
85   _server_info
86   _get_server_version
87 /;
88
89 for my $meth (@rdbms_specific_methods) {
90
91   my $orig = __PACKAGE__->can ($meth)
92     or die "$meth is not a ::Storage::DBI method!";
93
94   no strict qw/refs/;
95   no warnings qw/redefine/;
96   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
97     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
98       $_[0]->_determine_driver;
99
100       # This for some reason crashes and burns on perl 5.8.1
101       # IFF the method ends up throwing an exception
102       #goto $_[0]->can ($meth);
103
104       my $cref = $_[0]->can ($meth);
105       goto $cref;
106     }
107     goto $orig;
108   };
109 }
110
111
112 =head1 NAME
113
114 DBIx::Class::Storage::DBI - DBI storage handler
115
116 =head1 SYNOPSIS
117
118   my $schema = MySchema->connect('dbi:SQLite:my.db');
119
120   $schema->storage->debug(1);
121
122   my @stuff = $schema->storage->dbh_do(
123     sub {
124       my ($storage, $dbh, @args) = @_;
125       $dbh->do("DROP TABLE authors");
126     },
127     @column_list
128   );
129
130   $schema->resultset('Book')->search({
131      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
132   });
133
134 =head1 DESCRIPTION
135
136 This class represents the connection to an RDBMS via L<DBI>.  See
137 L<DBIx::Class::Storage> for general information.  This pod only
138 documents DBI-specific methods and behaviors.
139
140 =head1 METHODS
141
142 =cut
143
144 sub new {
145   my $new = shift->next::method(@_);
146
147   $new->transaction_depth(0);
148   $new->_sql_maker_opts({});
149   $new->_dbh_details({});
150   $new->{savepoints} = [];
151   $new->{_in_dbh_do} = 0;
152   $new->{_dbh_gen} = 0;
153
154   # read below to see what this does
155   $new->_arm_global_destructor;
156
157   $new;
158 }
159
160 # This is hack to work around perl shooting stuff in random
161 # order on exit(). If we do not walk the remaining storage
162 # objects in an END block, there is a *small but real* chance
163 # of a fork()ed child to kill the parent's shared DBI handle,
164 # *before perl reaches the DESTROY in this package*
165 # Yes, it is ugly and effective.
166 {
167   my %seek_and_destroy;
168
169   sub _arm_global_destructor {
170     my $self = shift;
171     my $key = Scalar::Util::refaddr ($self);
172     $seek_and_destroy{$key} = $self;
173     Scalar::Util::weaken ($seek_and_destroy{$key});
174   }
175
176   END {
177     local $?; # just in case the DBI destructor changes it somehow
178
179     # destroy just the object if not native to this process/thread
180     $_->_preserve_foreign_dbh for (grep
181       { defined $_ }
182       values %seek_and_destroy
183     );
184   }
185 }
186
187 sub DESTROY {
188   my $self = shift;
189
190   # some databases spew warnings on implicit disconnect
191   local $SIG{__WARN__} = sub {};
192   $self->_dbh(undef);
193 }
194
195 sub _preserve_foreign_dbh {
196   my $self = shift;
197
198   return unless $self->_dbh;
199
200   $self->_verify_tid;
201
202   return unless $self->_dbh;
203
204   $self->_verify_pid;
205
206 }
207
208 # handle pid changes correctly - do not destroy parent's connection
209 sub _verify_pid {
210   my $self = shift;
211
212   return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
213
214   $self->_dbh->{InactiveDestroy} = 1;
215   $self->_dbh(undef);
216   $self->{_dbh_gen}++;
217
218   return;
219 }
220
221 # very similar to above, but seems to FAIL if I set InactiveDestroy
222 sub _verify_tid {
223   my $self = shift;
224
225   if ( ! defined $self->_conn_tid ) {
226     return; # no threads
227   }
228   elsif ( $self->_conn_tid == threads->tid ) {
229     return; # same thread
230   }
231
232   #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
233   $self->_dbh(undef);
234   $self->{_dbh_gen}++;
235
236   return;
237 }
238
239
240 =head2 connect_info
241
242 This method is normally called by L<DBIx::Class::Schema/connection>, which
243 encapsulates its argument list in an arrayref before passing them here.
244
245 The argument list may contain:
246
247 =over
248
249 =item *
250
251 The same 4-element argument set one would normally pass to
252 L<DBI/connect>, optionally followed by
253 L<extra attributes|/DBIx::Class specific connection attributes>
254 recognized by DBIx::Class:
255
256   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
257
258 =item *
259
260 A single code reference which returns a connected
261 L<DBI database handle|DBI/connect> optionally followed by
262 L<extra attributes|/DBIx::Class specific connection attributes> recognized
263 by DBIx::Class:
264
265   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
266
267 =item *
268
269 A single hashref with all the attributes and the dsn/user/password
270 mixed together:
271
272   $connect_info_args = [{
273     dsn => $dsn,
274     user => $user,
275     password => $pass,
276     %dbi_attributes,
277     %extra_attributes,
278   }];
279
280   $connect_info_args = [{
281     dbh_maker => sub { DBI->connect (...) },
282     %dbi_attributes,
283     %extra_attributes,
284   }];
285
286 This is particularly useful for L<Catalyst> based applications, allowing the
287 following config (L<Config::General> style):
288
289   <Model::DB>
290     schema_class   App::DB
291     <connect_info>
292       dsn          dbi:mysql:database=test
293       user         testuser
294       password     TestPass
295       AutoCommit   1
296     </connect_info>
297   </Model::DB>
298
299 The C<dsn>/C<user>/C<password> combination can be substituted by the
300 C<dbh_maker> key whose value is a coderef that returns a connected
301 L<DBI database handle|DBI/connect>
302
303 =back
304
305 Please note that the L<DBI> docs recommend that you always explicitly
306 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
307 recommends that it be set to I<1>, and that you perform transactions
308 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
309 to I<1> if you do not do explicitly set it to zero.  This is the default
310 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
311
312 =head3 DBIx::Class specific connection attributes
313
314 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
315 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
316 the following connection options. These options can be mixed in with your other
317 L<DBI> connection attributes, or placed in a separate hashref
318 (C<\%extra_attributes>) as shown above.
319
320 Every time C<connect_info> is invoked, any previous settings for
321 these options will be cleared before setting the new ones, regardless of
322 whether any options are specified in the new C<connect_info>.
323
324
325 =over
326
327 =item on_connect_do
328
329 Specifies things to do immediately after connecting or re-connecting to
330 the database.  Its value may contain:
331
332 =over
333
334 =item a scalar
335
336 This contains one SQL statement to execute.
337
338 =item an array reference
339
340 This contains SQL statements to execute in order.  Each element contains
341 a string or a code reference that returns a string.
342
343 =item a code reference
344
345 This contains some code to execute.  Unlike code references within an
346 array reference, its return value is ignored.
347
348 =back
349
350 =item on_disconnect_do
351
352 Takes arguments in the same form as L</on_connect_do> and executes them
353 immediately before disconnecting from the database.
354
355 Note, this only runs if you explicitly call L</disconnect> on the
356 storage object.
357
358 =item on_connect_call
359
360 A more generalized form of L</on_connect_do> that calls the specified
361 C<connect_call_METHOD> methods in your storage driver.
362
363   on_connect_do => 'select 1'
364
365 is equivalent to:
366
367   on_connect_call => [ [ do_sql => 'select 1' ] ]
368
369 Its values may contain:
370
371 =over
372
373 =item a scalar
374
375 Will call the C<connect_call_METHOD> method.
376
377 =item a code reference
378
379 Will execute C<< $code->($storage) >>
380
381 =item an array reference
382
383 Each value can be a method name or code reference.
384
385 =item an array of arrays
386
387 For each array, the first item is taken to be the C<connect_call_> method name
388 or code reference, and the rest are parameters to it.
389
390 =back
391
392 Some predefined storage methods you may use:
393
394 =over
395
396 =item do_sql
397
398 Executes a SQL string or a code reference that returns a SQL string. This is
399 what L</on_connect_do> and L</on_disconnect_do> use.
400
401 It can take:
402
403 =over
404
405 =item a scalar
406
407 Will execute the scalar as SQL.
408
409 =item an arrayref
410
411 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
412 attributes hashref and bind values.
413
414 =item a code reference
415
416 Will execute C<< $code->($storage) >> and execute the return array refs as
417 above.
418
419 =back
420
421 =item datetime_setup
422
423 Execute any statements necessary to initialize the database session to return
424 and accept datetime/timestamp values used with
425 L<DBIx::Class::InflateColumn::DateTime>.
426
427 Only necessary for some databases, see your specific storage driver for
428 implementation details.
429
430 =back
431
432 =item on_disconnect_call
433
434 Takes arguments in the same form as L</on_connect_call> and executes them
435 immediately before disconnecting from the database.
436
437 Calls the C<disconnect_call_METHOD> methods as opposed to the
438 C<connect_call_METHOD> methods called by L</on_connect_call>.
439
440 Note, this only runs if you explicitly call L</disconnect> on the
441 storage object.
442
443 =item disable_sth_caching
444
445 If set to a true value, this option will disable the caching of
446 statement handles via L<DBI/prepare_cached>.
447
448 =item limit_dialect
449
450 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
451 default L</sql_limit_dialect> setting of the storage (if any). For a list
452 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
453
454 =item quote_char
455
456 Specifies what characters to use to quote table and column names.
457
458 C<quote_char> expects either a single character, in which case is it
459 is placed on either side of the table/column name, or an arrayref of length
460 2 in which case the table/column name is placed between the elements.
461
462 For example under MySQL you should use C<< quote_char => '`' >>, and for
463 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
464
465 =item name_sep
466
467 This parameter is only useful in conjunction with C<quote_char>, and is used to
468 specify the character that separates elements (schemas, tables, columns) from
469 each other. If unspecified it defaults to the most commonly used C<.>.
470
471 =item unsafe
472
473 This Storage driver normally installs its own C<HandleError>, sets
474 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
475 all database handles, including those supplied by a coderef.  It does this
476 so that it can have consistent and useful error behavior.
477
478 If you set this option to a true value, Storage will not do its usual
479 modifications to the database handle's attributes, and instead relies on
480 the settings in your connect_info DBI options (or the values you set in
481 your connection coderef, in the case that you are connecting via coderef).
482
483 Note that your custom settings can cause Storage to malfunction,
484 especially if you set a C<HandleError> handler that suppresses exceptions
485 and/or disable C<RaiseError>.
486
487 =item auto_savepoint
488
489 If this option is true, L<DBIx::Class> will use savepoints when nesting
490 transactions, making it possible to recover from failure in the inner
491 transaction without having to abort all outer transactions.
492
493 =item cursor_class
494
495 Use this argument to supply a cursor class other than the default
496 L<DBIx::Class::Storage::DBI::Cursor>.
497
498 =back
499
500 Some real-life examples of arguments to L</connect_info> and
501 L<DBIx::Class::Schema/connect>
502
503   # Simple SQLite connection
504   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
505
506   # Connect via subref
507   ->connect_info([ sub { DBI->connect(...) } ]);
508
509   # Connect via subref in hashref
510   ->connect_info([{
511     dbh_maker => sub { DBI->connect(...) },
512     on_connect_do => 'alter session ...',
513   }]);
514
515   # A bit more complicated
516   ->connect_info(
517     [
518       'dbi:Pg:dbname=foo',
519       'postgres',
520       'my_pg_password',
521       { AutoCommit => 1 },
522       { quote_char => q{"} },
523     ]
524   );
525
526   # Equivalent to the previous example
527   ->connect_info(
528     [
529       'dbi:Pg:dbname=foo',
530       'postgres',
531       'my_pg_password',
532       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
533     ]
534   );
535
536   # Same, but with hashref as argument
537   # See parse_connect_info for explanation
538   ->connect_info(
539     [{
540       dsn         => 'dbi:Pg:dbname=foo',
541       user        => 'postgres',
542       password    => 'my_pg_password',
543       AutoCommit  => 1,
544       quote_char  => q{"},
545       name_sep    => q{.},
546     }]
547   );
548
549   # Subref + DBIx::Class-specific connection options
550   ->connect_info(
551     [
552       sub { DBI->connect(...) },
553       {
554           quote_char => q{`},
555           name_sep => q{@},
556           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
557           disable_sth_caching => 1,
558       },
559     ]
560   );
561
562
563
564 =cut
565
566 sub connect_info {
567   my ($self, $info) = @_;
568
569   return $self->_connect_info if !$info;
570
571   $self->_connect_info($info); # copy for _connect_info
572
573   $info = $self->_normalize_connect_info($info)
574     if ref $info eq 'ARRAY';
575
576   for my $storage_opt (keys %{ $info->{storage_options} }) {
577     my $value = $info->{storage_options}{$storage_opt};
578
579     $self->$storage_opt($value);
580   }
581
582   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
583   #  the new set of options
584   $self->_sql_maker(undef);
585   $self->_sql_maker_opts({});
586
587   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
588     my $value = $info->{sql_maker_options}{$sql_maker_opt};
589
590     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
591   }
592
593   my %attrs = (
594     %{ $self->_default_dbi_connect_attributes || {} },
595     %{ $info->{attributes} || {} },
596   );
597
598   my @args = @{ $info->{arguments} };
599
600   $self->_dbi_connect_info([@args,
601     %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
602
603   # FIXME - dirty:
604   # save attributes them in a separate accessor so they are always
605   # introspectable, even in case of a CODE $dbhmaker
606   $self->_dbic_connect_attributes (\%attrs);
607
608   return $self->_connect_info;
609 }
610
611 sub _normalize_connect_info {
612   my ($self, $info_arg) = @_;
613   my %info;
614
615   my @args = @$info_arg;  # take a shallow copy for further mutilation
616
617   # combine/pre-parse arguments depending on invocation style
618
619   my %attrs;
620   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
621     %attrs = %{ $args[1] || {} };
622     @args = $args[0];
623   }
624   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
625     %attrs = %{$args[0]};
626     @args = ();
627     if (my $code = delete $attrs{dbh_maker}) {
628       @args = $code;
629
630       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
631       if (@ignored) {
632         carp sprintf (
633             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
634           . "to the result of 'dbh_maker'",
635
636           join (', ', map { "'$_'" } (@ignored) ),
637         );
638       }
639     }
640     else {
641       @args = delete @attrs{qw/dsn user password/};
642     }
643   }
644   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
645     %attrs = (
646       % { $args[3] || {} },
647       % { $args[4] || {} },
648     );
649     @args = @args[0,1,2];
650   }
651
652   $info{arguments} = \@args;
653
654   my @storage_opts = grep exists $attrs{$_},
655     @storage_options, 'cursor_class';
656
657   @{ $info{storage_options} }{@storage_opts} =
658     delete @attrs{@storage_opts} if @storage_opts;
659
660   my @sql_maker_opts = grep exists $attrs{$_},
661     qw/limit_dialect quote_char name_sep/;
662
663   @{ $info{sql_maker_options} }{@sql_maker_opts} =
664     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
665
666   $info{attributes} = \%attrs if %attrs;
667
668   return \%info;
669 }
670
671 sub _default_dbi_connect_attributes {
672   return {
673     AutoCommit => 1,
674     RaiseError => 1,
675     PrintError => 0,
676   };
677 }
678
679 =head2 on_connect_do
680
681 This method is deprecated in favour of setting via L</connect_info>.
682
683 =cut
684
685 =head2 on_disconnect_do
686
687 This method is deprecated in favour of setting via L</connect_info>.
688
689 =cut
690
691 sub _parse_connect_do {
692   my ($self, $type) = @_;
693
694   my $val = $self->$type;
695   return () if not defined $val;
696
697   my @res;
698
699   if (not ref($val)) {
700     push @res, [ 'do_sql', $val ];
701   } elsif (ref($val) eq 'CODE') {
702     push @res, $val;
703   } elsif (ref($val) eq 'ARRAY') {
704     push @res, map { [ 'do_sql', $_ ] } @$val;
705   } else {
706     $self->throw_exception("Invalid type for $type: ".ref($val));
707   }
708
709   return \@res;
710 }
711
712 =head2 dbh_do
713
714 Arguments: ($subref | $method_name), @extra_coderef_args?
715
716 Execute the given $subref or $method_name using the new exception-based
717 connection management.
718
719 The first two arguments will be the storage object that C<dbh_do> was called
720 on and a database handle to use.  Any additional arguments will be passed
721 verbatim to the called subref as arguments 2 and onwards.
722
723 Using this (instead of $self->_dbh or $self->dbh) ensures correct
724 exception handling and reconnection (or failover in future subclasses).
725
726 Your subref should have no side-effects outside of the database, as
727 there is the potential for your subref to be partially double-executed
728 if the database connection was stale/dysfunctional.
729
730 Example:
731
732   my @stuff = $schema->storage->dbh_do(
733     sub {
734       my ($storage, $dbh, @cols) = @_;
735       my $cols = join(q{, }, @cols);
736       $dbh->selectrow_array("SELECT $cols FROM foo");
737     },
738     @column_list
739   );
740
741 =cut
742
743 sub dbh_do {
744   my $self = shift;
745   my $code = shift;
746
747   my $dbh = $self->_get_dbh;
748
749   return $self->$code($dbh, @_)
750     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
751
752   local $self->{_in_dbh_do} = 1;
753
754   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
755   my $args = \@_;
756   return try {
757     $self->$code ($dbh, @$args);
758   } catch {
759     $self->throw_exception($_) if $self->connected;
760
761     # We were not connected - reconnect and retry, but let any
762     #  exception fall right through this time
763     carp "Retrying $code after catching disconnected exception: $_"
764       if $ENV{DBIC_DBIRETRY_DEBUG};
765
766     $self->_populate_dbh;
767     $self->$code($self->_dbh, @$args);
768   };
769 }
770
771 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
772 # It also informs dbh_do to bypass itself while under the direction of txn_do,
773 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
774 sub txn_do {
775   my $self = shift;
776   my $coderef = shift;
777
778   ref $coderef eq 'CODE' or $self->throw_exception
779     ('$coderef must be a CODE reference');
780
781   return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
782
783   local $self->{_in_dbh_do} = 1;
784
785   my @result;
786   my $want_array = wantarray;
787
788   my $tried = 0;
789   while(1) {
790     my $exception;
791
792     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
793     my $args = \@_;
794
795     try {
796       $self->txn_begin;
797       if($want_array) {
798           @result = $coderef->(@$args);
799       }
800       elsif(defined $want_array) {
801           $result[0] = $coderef->(@$args);
802       }
803       else {
804           $coderef->(@$args);
805       }
806       $self->txn_commit;
807     } catch {
808       $exception = $_;
809     };
810
811     if(! defined $exception) { return $want_array ? @result : $result[0] }
812
813     if($tried++ || $self->connected) {
814       my $rollback_exception;
815       try { $self->txn_rollback } catch { $rollback_exception = shift };
816       if(defined $rollback_exception) {
817         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
818         $self->throw_exception($exception)  # propagate nested rollback
819           if $rollback_exception =~ /$exception_class/;
820
821         $self->throw_exception(
822           "Transaction aborted: ${exception}. "
823           . "Rollback failed: ${rollback_exception}"
824         );
825       }
826       $self->throw_exception($exception)
827     }
828
829     # We were not connected, and was first try - reconnect and retry
830     # via the while loop
831     carp "Retrying $coderef after catching disconnected exception: $exception"
832       if $ENV{DBIC_TXNRETRY_DEBUG};
833     $self->_populate_dbh;
834   }
835 }
836
837 =head2 disconnect
838
839 Our C<disconnect> method also performs a rollback first if the
840 database is not in C<AutoCommit> mode.
841
842 =cut
843
844 sub disconnect {
845   my ($self) = @_;
846
847   if( $self->_dbh ) {
848     my @actions;
849
850     push @actions, ( $self->on_disconnect_call || () );
851     push @actions, $self->_parse_connect_do ('on_disconnect_do');
852
853     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
854
855     $self->_dbh_rollback unless $self->_dbh_autocommit;
856
857     %{ $self->_dbh->{CachedKids} } = ();
858     $self->_dbh->disconnect;
859     $self->_dbh(undef);
860     $self->{_dbh_gen}++;
861   }
862 }
863
864 =head2 with_deferred_fk_checks
865
866 =over 4
867
868 =item Arguments: C<$coderef>
869
870 =item Return Value: The return value of $coderef
871
872 =back
873
874 Storage specific method to run the code ref with FK checks deferred or
875 in MySQL's case disabled entirely.
876
877 =cut
878
879 # Storage subclasses should override this
880 sub with_deferred_fk_checks {
881   my ($self, $sub) = @_;
882   $sub->();
883 }
884
885 =head2 connected
886
887 =over
888
889 =item Arguments: none
890
891 =item Return Value: 1|0
892
893 =back
894
895 Verifies that the current database handle is active and ready to execute
896 an SQL statement (e.g. the connection did not get stale, server is still
897 answering, etc.) This method is used internally by L</dbh>.
898
899 =cut
900
901 sub connected {
902   my $self = shift;
903   return 0 unless $self->_seems_connected;
904
905   #be on the safe side
906   local $self->_dbh->{RaiseError} = 1;
907
908   return $self->_ping;
909 }
910
911 sub _seems_connected {
912   my $self = shift;
913
914   $self->_preserve_foreign_dbh;
915
916   my $dbh = $self->_dbh
917     or return 0;
918
919   return $dbh->FETCH('Active');
920 }
921
922 sub _ping {
923   my $self = shift;
924
925   my $dbh = $self->_dbh or return 0;
926
927   return $dbh->ping;
928 }
929
930 sub ensure_connected {
931   my ($self) = @_;
932
933   unless ($self->connected) {
934     $self->_populate_dbh;
935   }
936 }
937
938 =head2 dbh
939
940 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
941 is guaranteed to be healthy by implicitly calling L</connected>, and if
942 necessary performing a reconnection before returning. Keep in mind that this
943 is very B<expensive> on some database engines. Consider using L</dbh_do>
944 instead.
945
946 =cut
947
948 sub dbh {
949   my ($self) = @_;
950
951   if (not $self->_dbh) {
952     $self->_populate_dbh;
953   } else {
954     $self->ensure_connected;
955   }
956   return $self->_dbh;
957 }
958
959 # this is the internal "get dbh or connect (don't check)" method
960 sub _get_dbh {
961   my $self = shift;
962   $self->_preserve_foreign_dbh;
963   $self->_populate_dbh unless $self->_dbh;
964   return $self->_dbh;
965 }
966
967 sub sql_maker {
968   my ($self) = @_;
969   unless ($self->_sql_maker) {
970     my $sql_maker_class = $self->sql_maker_class;
971     $self->ensure_class_loaded ($sql_maker_class);
972
973     my %opts = %{$self->_sql_maker_opts||{}};
974     my $dialect =
975       $opts{limit_dialect}
976         ||
977       $self->sql_limit_dialect
978         ||
979       do {
980         my $s_class = (ref $self) || $self;
981         carp (
982           "Your storage class ($s_class) does not set sql_limit_dialect and you "
983         . 'have not supplied an explicit limit_dialect in your connection_info. '
984         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
985         . 'databases but can be (and often is) painfully slow.'
986         );
987
988         'GenericSubQ';
989       }
990     ;
991
992     $self->_sql_maker($sql_maker_class->new(
993       bindtype=>'columns',
994       array_datatypes => 1,
995       limit_dialect => $dialect,
996       name_sep => '.',
997       %opts,
998     ));
999   }
1000   return $self->_sql_maker;
1001 }
1002
1003 # nothing to do by default
1004 sub _rebless {}
1005 sub _init {}
1006
1007 sub _populate_dbh {
1008   my ($self) = @_;
1009
1010   my @info = @{$self->_dbi_connect_info || []};
1011   $self->_dbh(undef); # in case ->connected failed we might get sent here
1012   $self->_dbh_details({}); # reset everything we know
1013
1014   $self->_dbh($self->_connect(@info));
1015
1016   $self->_conn_pid($$);
1017   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
1018
1019   $self->_determine_driver;
1020
1021   # Always set the transaction depth on connect, since
1022   #  there is no transaction in progress by definition
1023   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1024
1025   $self->_run_connection_actions unless $self->{_in_determine_driver};
1026 }
1027
1028 sub _run_connection_actions {
1029   my $self = shift;
1030   my @actions;
1031
1032   push @actions, ( $self->on_connect_call || () );
1033   push @actions, $self->_parse_connect_do ('on_connect_do');
1034
1035   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1036 }
1037
1038
1039
1040 sub set_use_dbms_capability {
1041   $_[0]->set_inherited ($_[1], $_[2]);
1042 }
1043
1044 sub get_use_dbms_capability {
1045   my ($self, $capname) = @_;
1046
1047   my $use = $self->get_inherited ($capname);
1048   return defined $use
1049     ? $use
1050     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1051   ;
1052 }
1053
1054 sub set_dbms_capability {
1055   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1056 }
1057
1058 sub get_dbms_capability {
1059   my ($self, $capname) = @_;
1060
1061   my $cap = $self->_dbh_details->{capability}{$capname};
1062
1063   unless (defined $cap) {
1064     if (my $meth = $self->can ("_determine$capname")) {
1065       $cap = $self->$meth ? 1 : 0;
1066     }
1067     else {
1068       $cap = 0;
1069     }
1070
1071     $self->set_dbms_capability ($capname, $cap);
1072   }
1073
1074   return $cap;
1075 }
1076
1077 sub _server_info {
1078   my $self = shift;
1079
1080   my $info;
1081   unless ($info = $self->_dbh_details->{info}) {
1082
1083     $info = {};
1084
1085     my $server_version = try { $self->_get_server_version };
1086
1087     if (defined $server_version) {
1088       $info->{dbms_version} = $server_version;
1089
1090       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1091       my @verparts = split (/\./, $numeric_version);
1092       if (
1093         @verparts
1094           &&
1095         $verparts[0] <= 999
1096       ) {
1097         # consider only up to 3 version parts, iff not more than 3 digits
1098         my @use_parts;
1099         while (@verparts && @use_parts < 3) {
1100           my $p = shift @verparts;
1101           last if $p > 999;
1102           push @use_parts, $p;
1103         }
1104         push @use_parts, 0 while @use_parts < 3;
1105
1106         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1107       }
1108     }
1109
1110     $self->_dbh_details->{info} = $info;
1111   }
1112
1113   return $info;
1114 }
1115
1116 sub _get_server_version {
1117   shift->_get_dbh->get_info(18);
1118 }
1119
1120 sub _determine_driver {
1121   my ($self) = @_;
1122
1123   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1124     my $started_connected = 0;
1125     local $self->{_in_determine_driver} = 1;
1126
1127     if (ref($self) eq __PACKAGE__) {
1128       my $driver;
1129       if ($self->_dbh) { # we are connected
1130         $driver = $self->_dbh->{Driver}{Name};
1131         $started_connected = 1;
1132       } else {
1133         # if connect_info is a CODEREF, we have no choice but to connect
1134         if (ref $self->_dbi_connect_info->[0] &&
1135             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1136           $self->_populate_dbh;
1137           $driver = $self->_dbh->{Driver}{Name};
1138         }
1139         else {
1140           # try to use dsn to not require being connected, the driver may still
1141           # force a connection in _rebless to determine version
1142           # (dsn may not be supplied at all if all we do is make a mock-schema)
1143           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1144           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1145           $driver ||= $ENV{DBI_DRIVER};
1146         }
1147       }
1148
1149       if ($driver) {
1150         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1151         if ($self->load_optional_class($storage_class)) {
1152           mro::set_mro($storage_class, 'c3');
1153           bless $self, $storage_class;
1154           $self->_rebless();
1155         }
1156       }
1157     }
1158
1159     $self->_driver_determined(1);
1160
1161     $self->_init; # run driver-specific initializations
1162
1163     $self->_run_connection_actions
1164         if !$started_connected && defined $self->_dbh;
1165   }
1166 }
1167
1168 sub _do_connection_actions {
1169   my $self          = shift;
1170   my $method_prefix = shift;
1171   my $call          = shift;
1172
1173   if (not ref($call)) {
1174     my $method = $method_prefix . $call;
1175     $self->$method(@_);
1176   } elsif (ref($call) eq 'CODE') {
1177     $self->$call(@_);
1178   } elsif (ref($call) eq 'ARRAY') {
1179     if (ref($call->[0]) ne 'ARRAY') {
1180       $self->_do_connection_actions($method_prefix, $_) for @$call;
1181     } else {
1182       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1183     }
1184   } else {
1185     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1186   }
1187
1188   return $self;
1189 }
1190
1191 sub connect_call_do_sql {
1192   my $self = shift;
1193   $self->_do_query(@_);
1194 }
1195
1196 sub disconnect_call_do_sql {
1197   my $self = shift;
1198   $self->_do_query(@_);
1199 }
1200
1201 # override in db-specific backend when necessary
1202 sub connect_call_datetime_setup { 1 }
1203
1204 sub _do_query {
1205   my ($self, $action) = @_;
1206
1207   if (ref $action eq 'CODE') {
1208     $action = $action->($self);
1209     $self->_do_query($_) foreach @$action;
1210   }
1211   else {
1212     # Most debuggers expect ($sql, @bind), so we need to exclude
1213     # the attribute hash which is the second argument to $dbh->do
1214     # furthermore the bind values are usually to be presented
1215     # as named arrayref pairs, so wrap those here too
1216     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1217     my $sql = shift @do_args;
1218     my $attrs = shift @do_args;
1219     my @bind = map { [ undef, $_ ] } @do_args;
1220
1221     $self->_query_start($sql, @bind);
1222     $self->_get_dbh->do($sql, $attrs, @do_args);
1223     $self->_query_end($sql, @bind);
1224   }
1225
1226   return $self;
1227 }
1228
1229 sub _connect {
1230   my ($self, @info) = @_;
1231
1232   $self->throw_exception("You failed to provide any connection info")
1233     if !@info;
1234
1235   my ($old_connect_via, $dbh);
1236
1237   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1238     $old_connect_via = $DBI::connect_via;
1239     $DBI::connect_via = 'connect';
1240   }
1241
1242   try {
1243     if(ref $info[0] eq 'CODE') {
1244        $dbh = $info[0]->();
1245     }
1246     else {
1247        $dbh = DBI->connect(@info);
1248     }
1249
1250     if (!$dbh) {
1251       die $DBI::errstr;
1252     }
1253
1254     unless ($self->unsafe) {
1255
1256       # this odd anonymous coderef dereference is in fact really
1257       # necessary to avoid the unwanted effect described in perl5
1258       # RT#75792
1259       sub {
1260         my $weak_self = $_[0];
1261         weaken $weak_self;
1262
1263         $_[1]->{HandleError} = sub {
1264           if ($weak_self) {
1265             $weak_self->throw_exception("DBI Exception: $_[0]");
1266           }
1267           else {
1268             # the handler may be invoked by something totally out of
1269             # the scope of DBIC
1270             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1271           }
1272         };
1273       }->($self, $dbh);
1274
1275       $dbh->{ShowErrorStatement} = 1;
1276       $dbh->{RaiseError} = 1;
1277       $dbh->{PrintError} = 0;
1278     }
1279   }
1280   catch {
1281     $self->throw_exception("DBI Connection failed: $_")
1282   }
1283   finally {
1284     $DBI::connect_via = $old_connect_via if $old_connect_via;
1285   };
1286
1287   $self->_dbh_autocommit($dbh->{AutoCommit});
1288   $dbh;
1289 }
1290
1291 sub svp_begin {
1292   my ($self, $name) = @_;
1293
1294   $name = $self->_svp_generate_name
1295     unless defined $name;
1296
1297   $self->throw_exception ("You can't use savepoints outside a transaction")
1298     if $self->{transaction_depth} == 0;
1299
1300   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1301     unless $self->can('_svp_begin');
1302
1303   push @{ $self->{savepoints} }, $name;
1304
1305   $self->debugobj->svp_begin($name) if $self->debug;
1306
1307   return $self->_svp_begin($name);
1308 }
1309
1310 sub svp_release {
1311   my ($self, $name) = @_;
1312
1313   $self->throw_exception ("You can't use savepoints outside a transaction")
1314     if $self->{transaction_depth} == 0;
1315
1316   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1317     unless $self->can('_svp_release');
1318
1319   if (defined $name) {
1320     $self->throw_exception ("Savepoint '$name' does not exist")
1321       unless grep { $_ eq $name } @{ $self->{savepoints} };
1322
1323     # Dig through the stack until we find the one we are releasing.  This keeps
1324     # the stack up to date.
1325     my $svp;
1326
1327     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1328   } else {
1329     $name = pop @{ $self->{savepoints} };
1330   }
1331
1332   $self->debugobj->svp_release($name) if $self->debug;
1333
1334   return $self->_svp_release($name);
1335 }
1336
1337 sub svp_rollback {
1338   my ($self, $name) = @_;
1339
1340   $self->throw_exception ("You can't use savepoints outside a transaction")
1341     if $self->{transaction_depth} == 0;
1342
1343   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1344     unless $self->can('_svp_rollback');
1345
1346   if (defined $name) {
1347       # If they passed us a name, verify that it exists in the stack
1348       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1349           $self->throw_exception("Savepoint '$name' does not exist!");
1350       }
1351
1352       # Dig through the stack until we find the one we are releasing.  This keeps
1353       # the stack up to date.
1354       while(my $s = pop(@{ $self->{savepoints} })) {
1355           last if($s eq $name);
1356       }
1357       # Add the savepoint back to the stack, as a rollback doesn't remove the
1358       # named savepoint, only everything after it.
1359       push(@{ $self->{savepoints} }, $name);
1360   } else {
1361       # We'll assume they want to rollback to the last savepoint
1362       $name = $self->{savepoints}->[-1];
1363   }
1364
1365   $self->debugobj->svp_rollback($name) if $self->debug;
1366
1367   return $self->_svp_rollback($name);
1368 }
1369
1370 sub _svp_generate_name {
1371     my ($self) = @_;
1372
1373     return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1374 }
1375
1376 sub txn_begin {
1377   my $self = shift;
1378
1379   # this means we have not yet connected and do not know the AC status
1380   # (e.g. coderef $dbh)
1381   $self->ensure_connected if (! defined $self->_dbh_autocommit);
1382
1383   if($self->{transaction_depth} == 0) {
1384     $self->debugobj->txn_begin()
1385       if $self->debug;
1386     $self->_dbh_begin_work;
1387   }
1388   elsif ($self->auto_savepoint) {
1389     $self->svp_begin;
1390   }
1391   $self->{transaction_depth}++;
1392 }
1393
1394 sub _dbh_begin_work {
1395   my $self = shift;
1396
1397   # if the user is utilizing txn_do - good for him, otherwise we need to
1398   # ensure that the $dbh is healthy on BEGIN.
1399   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1400   # will be replaced by a failure of begin_work itself (which will be
1401   # then retried on reconnect)
1402   if ($self->{_in_dbh_do}) {
1403     $self->_dbh->begin_work;
1404   } else {
1405     $self->dbh_do(sub { $_[1]->begin_work });
1406   }
1407 }
1408
1409 sub txn_commit {
1410   my $self = shift;
1411   if ($self->{transaction_depth} == 1) {
1412     $self->debugobj->txn_commit()
1413       if ($self->debug);
1414     $self->_dbh_commit;
1415     $self->{transaction_depth} = 0
1416       if $self->_dbh_autocommit;
1417   }
1418   elsif($self->{transaction_depth} > 1) {
1419     $self->{transaction_depth}--;
1420     $self->svp_release
1421       if $self->auto_savepoint;
1422   }
1423 }
1424
1425 sub _dbh_commit {
1426   my $self = shift;
1427   my $dbh  = $self->_dbh
1428     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1429   $dbh->commit;
1430 }
1431
1432 sub txn_rollback {
1433   my $self = shift;
1434   my $dbh = $self->_dbh;
1435   try {
1436     if ($self->{transaction_depth} == 1) {
1437       $self->debugobj->txn_rollback()
1438         if ($self->debug);
1439       $self->{transaction_depth} = 0
1440         if $self->_dbh_autocommit;
1441       $self->_dbh_rollback;
1442     }
1443     elsif($self->{transaction_depth} > 1) {
1444       $self->{transaction_depth}--;
1445       if ($self->auto_savepoint) {
1446         $self->svp_rollback;
1447         $self->svp_release;
1448       }
1449     }
1450     else {
1451       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1452     }
1453   }
1454   catch {
1455     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1456
1457     if ($_ !~ /$exception_class/) {
1458       # ensure that a failed rollback resets the transaction depth
1459       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1460     }
1461
1462     $self->throw_exception($_)
1463   };
1464 }
1465
1466 sub _dbh_rollback {
1467   my $self = shift;
1468   my $dbh  = $self->_dbh
1469     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1470   $dbh->rollback;
1471 }
1472
1473 # This used to be the top-half of _execute.  It was split out to make it
1474 #  easier to override in NoBindVars without duping the rest.  It takes up
1475 #  all of _execute's args, and emits $sql, @bind.
1476 sub _prep_for_execute {
1477   my ($self, $op, $extra_bind, $ident, $args) = @_;
1478
1479   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1480     $ident = $ident->from();
1481   }
1482
1483   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1484
1485   unshift(@bind,
1486     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1487       if $extra_bind;
1488   return ($sql, \@bind);
1489 }
1490
1491
1492 sub _fix_bind_params {
1493     my ($self, @bind) = @_;
1494
1495     ### Turn @bind from something like this:
1496     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1497     ### to this:
1498     ###   ( "'1'", "'1'", "'3'" )
1499     return
1500         map {
1501             if ( defined( $_ && $_->[1] ) ) {
1502                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1503             }
1504             else { q{'NULL'}; }
1505         } @bind;
1506 }
1507
1508 sub _query_start {
1509     my ( $self, $sql, @bind ) = @_;
1510
1511     if ( $self->debug ) {
1512         @bind = $self->_fix_bind_params(@bind);
1513
1514         $self->debugobj->query_start( $sql, @bind );
1515     }
1516 }
1517
1518 sub _query_end {
1519     my ( $self, $sql, @bind ) = @_;
1520
1521     if ( $self->debug ) {
1522         @bind = $self->_fix_bind_params(@bind);
1523         $self->debugobj->query_end( $sql, @bind );
1524     }
1525 }
1526
1527 sub _dbh_execute {
1528   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1529
1530   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1531
1532   $self->_query_start( $sql, @$bind );
1533
1534   my $sth = $self->sth($sql,$op);
1535
1536   my $placeholder_index = 1;
1537
1538   foreach my $bound (@$bind) {
1539     my $attributes = {};
1540     my($column_name, @data) = @$bound;
1541
1542     if ($bind_attributes) {
1543       $attributes = $bind_attributes->{$column_name}
1544       if defined $bind_attributes->{$column_name};
1545     }
1546
1547     foreach my $data (@data) {
1548       my $ref = ref $data;
1549       $data = $ref && $ref ne 'ARRAY' ? ''.$data : $data; # stringify args (except arrayrefs)
1550
1551       $sth->bind_param($placeholder_index, $data, $attributes);
1552       $placeholder_index++;
1553     }
1554   }
1555
1556   # Can this fail without throwing an exception anyways???
1557   my $rv = $sth->execute();
1558   $self->throw_exception(
1559     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1560   ) if !$rv;
1561
1562   $self->_query_end( $sql, @$bind );
1563
1564   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1565 }
1566
1567 sub _execute {
1568     my $self = shift;
1569     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1570 }
1571
1572 sub _prefetch_insert_auto_nextvals {
1573   my ($self, $source, $to_insert) = @_;
1574
1575   my $upd = {};
1576
1577   foreach my $col ( $source->columns ) {
1578     if ( !defined $to_insert->{$col} ) {
1579       my $col_info = $source->column_info($col);
1580
1581       if ( $col_info->{auto_nextval} ) {
1582         $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
1583           'nextval',
1584           $col_info->{sequence} ||=
1585             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1586         );
1587       }
1588     }
1589   }
1590
1591   return $upd;
1592 }
1593
1594 sub insert {
1595   my $self = shift;
1596   my ($source, $to_insert, $opts) = @_;
1597
1598   my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
1599
1600   my $bind_attributes = $self->source_bind_attributes($source);
1601
1602   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
1603
1604   if ($opts->{returning}) {
1605     my @ret_cols = @{$opts->{returning}};
1606
1607     my @ret_vals = try {
1608       local $SIG{__WARN__} = sub {};
1609       my @r = $sth->fetchrow_array;
1610       $sth->finish;
1611       @r;
1612     };
1613
1614     my %ret;
1615     @ret{@ret_cols} = @ret_vals if (@ret_vals);
1616
1617     $updated_cols = {
1618       %$updated_cols,
1619       %ret,
1620     };
1621   }
1622
1623   return $updated_cols;
1624 }
1625
1626 ## Currently it is assumed that all values passed will be "normal", i.e. not
1627 ## scalar refs, or at least, all the same type as the first set, the statement is
1628 ## only prepped once.
1629 sub insert_bulk {
1630   my ($self, $source, $cols, $data) = @_;
1631
1632   my %colvalues;
1633   @colvalues{@$cols} = (0..$#$cols);
1634
1635   for my $i (0..$#$cols) {
1636     my $first_val = $data->[0][$i];
1637     next unless ref $first_val eq 'SCALAR';
1638
1639     $colvalues{ $cols->[$i] } = $first_val;
1640   }
1641
1642   # check for bad data and stringify stringifiable objects
1643   my $bad_slice = sub {
1644     my ($msg, $col_idx, $slice_idx) = @_;
1645     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1646       $msg,
1647       $cols->[$col_idx],
1648       do {
1649         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1650         Dumper {
1651           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1652         },
1653       }
1654     );
1655   };
1656
1657   for my $datum_idx (0..$#$data) {
1658     my $datum = $data->[$datum_idx];
1659
1660     for my $col_idx (0..$#$cols) {
1661       my $val            = $datum->[$col_idx];
1662       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1663       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1664
1665       if ($is_literal_sql) {
1666         if (not ref $val) {
1667           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1668         }
1669         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1670           $bad_slice->("$reftype reference found where literal SQL expected",
1671             $col_idx, $datum_idx);
1672         }
1673         elsif ($$val ne $$sqla_bind){
1674           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1675             $col_idx, $datum_idx);
1676         }
1677       }
1678       elsif (my $reftype = ref $val) {
1679         require overload;
1680         if (overload::Method($val, '""')) {
1681           $datum->[$col_idx] = "".$val;
1682         }
1683         else {
1684           $bad_slice->("$reftype reference found where bind expected",
1685             $col_idx, $datum_idx);
1686         }
1687       }
1688     }
1689   }
1690
1691   my ($sql, $bind) = $self->_prep_for_execute (
1692     'insert', undef, $source, [\%colvalues]
1693   );
1694   my @bind = @$bind;
1695
1696   my $empty_bind = 1 if (not @bind) &&
1697     (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
1698
1699   if ((not @bind) && (not $empty_bind)) {
1700     $self->throw_exception(
1701       'Cannot insert_bulk without support for placeholders'
1702     );
1703   }
1704
1705   # neither _execute_array, nor _execute_inserts_with_no_binds are
1706   # atomic (even if _execute _array is a single call). Thus a safety
1707   # scope guard
1708   my $guard = $self->txn_scope_guard;
1709
1710   $self->_query_start( $sql, [ dummy => '__BULK_INSERT__' ] );
1711   my $sth = $self->sth($sql);
1712   my $rv = do {
1713     if ($empty_bind) {
1714       # bind_param_array doesn't work if there are no binds
1715       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1716     }
1717     else {
1718 #      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1719       $self->_execute_array( $source, $sth, \@bind, $cols, $data );
1720     }
1721   };
1722
1723   $self->_query_end( $sql, [ dummy => '__BULK_INSERT__' ] );
1724
1725   $guard->commit;
1726
1727   return (wantarray ? ($rv, $sth, @bind) : $rv);
1728 }
1729
1730 sub _execute_array {
1731   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1732
1733   ## This must be an arrayref, else nothing works!
1734   my $tuple_status = [];
1735
1736   ## Get the bind_attributes, if any exist
1737   my $bind_attributes = $self->source_bind_attributes($source);
1738
1739   ## Bind the values and execute
1740   my $placeholder_index = 1;
1741
1742   foreach my $bound (@$bind) {
1743
1744     my $attributes = {};
1745     my ($column_name, $data_index) = @$bound;
1746
1747     if( $bind_attributes ) {
1748       $attributes = $bind_attributes->{$column_name}
1749       if defined $bind_attributes->{$column_name};
1750     }
1751
1752     my @data = map { $_->[$data_index] } @$data;
1753
1754     $sth->bind_param_array(
1755       $placeholder_index,
1756       [@data],
1757       (%$attributes ?  $attributes : ()),
1758     );
1759     $placeholder_index++;
1760   }
1761
1762   my ($rv, $err);
1763   try {
1764     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1765   }
1766   catch {
1767     $err = shift;
1768   };
1769
1770   # Statement must finish even if there was an exception.
1771   try {
1772     $sth->finish
1773   }
1774   catch {
1775     $err = shift unless defined $err
1776   };
1777
1778   $err = $sth->errstr
1779     if (! defined $err and $sth->err);
1780
1781   if (defined $err) {
1782     my $i = 0;
1783     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1784
1785     $self->throw_exception("Unexpected populate error: $err")
1786       if ($i > $#$tuple_status);
1787
1788     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1789       ($tuple_status->[$i][1] || $err),
1790       Dumper { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) },
1791     );
1792   }
1793
1794   return $rv;
1795 }
1796
1797 sub _dbh_execute_array {
1798     my ($self, $sth, $tuple_status, @extra) = @_;
1799
1800     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1801 }
1802
1803 sub _dbh_execute_inserts_with_no_binds {
1804   my ($self, $sth, $count) = @_;
1805
1806   my $err;
1807   try {
1808     my $dbh = $self->_get_dbh;
1809     local $dbh->{RaiseError} = 1;
1810     local $dbh->{PrintError} = 0;
1811
1812     $sth->execute foreach 1..$count;
1813   }
1814   catch {
1815     $err = shift;
1816   }
1817   finally {
1818     # Make sure statement is finished even if there was an exception.
1819     try {
1820       $sth->finish
1821     }
1822     catch {
1823       $err = shift unless defined $err;
1824     };
1825   };
1826
1827   $self->throw_exception($err) if defined $err;
1828
1829   return $count;
1830 }
1831
1832 sub update {
1833   my ($self, $source, @args) = @_;
1834
1835   my $bind_attrs = $self->source_bind_attributes($source);
1836
1837   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1838 }
1839
1840
1841 sub delete {
1842   my ($self, $source, @args) = @_;
1843
1844   my $bind_attrs = $self->source_bind_attributes($source);
1845
1846   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1847 }
1848
1849 # We were sent here because the $rs contains a complex search
1850 # which will require a subquery to select the correct rows
1851 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1852 #
1853 # Generating a single PK column subquery is trivial and supported
1854 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1855 # Look at _multipk_update_delete()
1856 sub _subq_update_delete {
1857   my $self = shift;
1858   my ($rs, $op, $values) = @_;
1859
1860   my $rsrc = $rs->result_source;
1861
1862   # quick check if we got a sane rs on our hands
1863   my @pcols = $rsrc->_pri_cols;
1864
1865   my $sel = $rs->_resolved_attrs->{select};
1866   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1867
1868   if (
1869       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1870         ne
1871       join ("\x00", sort @$sel )
1872   ) {
1873     $self->throw_exception (
1874       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
1875     );
1876   }
1877
1878   if (@pcols == 1) {
1879     return $self->$op (
1880       $rsrc,
1881       $op eq 'update' ? $values : (),
1882       { $pcols[0] => { -in => $rs->as_query } },
1883     );
1884   }
1885
1886   else {
1887     return $self->_multipk_update_delete (@_);
1888   }
1889 }
1890
1891 # ANSI SQL does not provide a reliable way to perform a multicol-PK
1892 # resultset update/delete involving subqueries. So by default resort
1893 # to simple (and inefficient) delete_all style per-row opearations,
1894 # while allowing specific storages to override this with a faster
1895 # implementation.
1896 #
1897 sub _multipk_update_delete {
1898   return shift->_per_row_update_delete (@_);
1899 }
1900
1901 # This is the default loop used to delete/update rows for multi PK
1902 # resultsets, and used by mysql exclusively (because it can't do anything
1903 # else).
1904 #
1905 # We do not use $row->$op style queries, because resultset update/delete
1906 # is not expected to cascade (this is what delete_all/update_all is for).
1907 #
1908 # There should be no race conditions as the entire operation is rolled
1909 # in a transaction.
1910 #
1911 sub _per_row_update_delete {
1912   my $self = shift;
1913   my ($rs, $op, $values) = @_;
1914
1915   my $rsrc = $rs->result_source;
1916   my @pcols = $rsrc->_pri_cols;
1917
1918   my $guard = $self->txn_scope_guard;
1919
1920   # emulate the return value of $sth->execute for non-selects
1921   my $row_cnt = '0E0';
1922
1923   my $subrs_cur = $rs->cursor;
1924   my @all_pk = $subrs_cur->all;
1925   for my $pks ( @all_pk) {
1926
1927     my $cond;
1928     for my $i (0.. $#pcols) {
1929       $cond->{$pcols[$i]} = $pks->[$i];
1930     }
1931
1932     $self->$op (
1933       $rsrc,
1934       $op eq 'update' ? $values : (),
1935       $cond,
1936     );
1937
1938     $row_cnt++;
1939   }
1940
1941   $guard->commit;
1942
1943   return $row_cnt;
1944 }
1945
1946 sub _select {
1947   my $self = shift;
1948   $self->_execute($self->_select_args(@_));
1949 }
1950
1951 sub _select_args_to_query {
1952   my $self = shift;
1953
1954   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
1955   #  = $self->_select_args($ident, $select, $cond, $attrs);
1956   my ($op, $bind, $ident, $bind_attrs, @args) =
1957     $self->_select_args(@_);
1958
1959   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
1960   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
1961   $prepared_bind ||= [];
1962
1963   return wantarray
1964     ? ($sql, $prepared_bind, $bind_attrs)
1965     : \[ "($sql)", @$prepared_bind ]
1966   ;
1967 }
1968
1969 sub _select_args {
1970   my ($self, $ident, $select, $where, $attrs) = @_;
1971
1972   my $sql_maker = $self->sql_maker;
1973   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
1974
1975   $attrs = {
1976     %$attrs,
1977     select => $select,
1978     from => $ident,
1979     where => $where,
1980     $rs_alias && $alias2source->{$rs_alias}
1981       ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
1982       : ()
1983     ,
1984   };
1985
1986   # calculate bind_attrs before possible $ident mangling
1987   my $bind_attrs = {};
1988   for my $alias (keys %$alias2source) {
1989     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
1990     for my $col (keys %$bindtypes) {
1991
1992       my $fqcn = join ('.', $alias, $col);
1993       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
1994
1995       # Unqialified column names are nice, but at the same time can be
1996       # rather ambiguous. What we do here is basically go along with
1997       # the loop, adding an unqualified column slot to $bind_attrs,
1998       # alongside the fully qualified name. As soon as we encounter
1999       # another column by that name (which would imply another table)
2000       # we unset the unqualified slot and never add any info to it
2001       # to avoid erroneous type binding. If this happens the users
2002       # only choice will be to fully qualify his column name
2003
2004       if (exists $bind_attrs->{$col}) {
2005         $bind_attrs->{$col} = {};
2006       }
2007       else {
2008         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2009       }
2010     }
2011   }
2012
2013   # Sanity check the attributes (SQLMaker does it too, but
2014   # in case of a software_limit we'll never reach there)
2015   if (defined $attrs->{offset}) {
2016     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2017       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2018   }
2019   $attrs->{offset} ||= 0;
2020
2021   if (defined $attrs->{rows}) {
2022     $self->throw_exception("The rows attribute must be a positive integer if present")
2023       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2024   }
2025   elsif ($attrs->{offset}) {
2026     # MySQL actually recommends this approach.  I cringe.
2027     $attrs->{rows} = $sql_maker->__max_int;
2028   }
2029
2030   my @limit;
2031
2032   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2033   # storage, unless software limit was requested
2034   if (
2035     #limited has_many
2036     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2037        ||
2038     # grouped prefetch (to satisfy group_by == select)
2039     ( $attrs->{group_by}
2040         &&
2041       @{$attrs->{group_by}}
2042         &&
2043       $attrs->{_prefetch_select}
2044         &&
2045       @{$attrs->{_prefetch_select}}
2046     )
2047   ) {
2048     ($ident, $select, $where, $attrs)
2049       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2050   }
2051   elsif (! $attrs->{software_limit} ) {
2052     push @limit, $attrs->{rows}, $attrs->{offset};
2053   }
2054
2055   # try to simplify the joinmap further (prune unreferenced type-single joins)
2056   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2057
2058 ###
2059   # This would be the point to deflate anything found in $where
2060   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2061   # expect a row object. And all we have is a resultsource (it is trivial
2062   # to extract deflator coderefs via $alias2source above).
2063   #
2064   # I don't see a way forward other than changing the way deflators are
2065   # invoked, and that's just bad...
2066 ###
2067
2068   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2069 }
2070
2071 # Returns a counting SELECT for a simple count
2072 # query. Abstracted so that a storage could override
2073 # this to { count => 'firstcol' } or whatever makes
2074 # sense as a performance optimization
2075 sub _count_select {
2076   #my ($self, $source, $rs_attrs) = @_;
2077   return { count => '*' };
2078 }
2079
2080
2081 sub source_bind_attributes {
2082   my ($self, $source) = @_;
2083
2084   my $bind_attributes;
2085   foreach my $column ($source->columns) {
2086
2087     my $data_type = $source->column_info($column)->{data_type} || '';
2088     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
2089      if $data_type;
2090   }
2091
2092   return $bind_attributes;
2093 }
2094
2095 =head2 select
2096
2097 =over 4
2098
2099 =item Arguments: $ident, $select, $condition, $attrs
2100
2101 =back
2102
2103 Handle a SQL select statement.
2104
2105 =cut
2106
2107 sub select {
2108   my $self = shift;
2109   my ($ident, $select, $condition, $attrs) = @_;
2110   return $self->cursor_class->new($self, \@_, $attrs);
2111 }
2112
2113 sub select_single {
2114   my $self = shift;
2115   my ($rv, $sth, @bind) = $self->_select(@_);
2116   my @row = $sth->fetchrow_array;
2117   my @nextrow = $sth->fetchrow_array if @row;
2118   if(@row && @nextrow) {
2119     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2120   }
2121   # Need to call finish() to work round broken DBDs
2122   $sth->finish();
2123   return @row;
2124 }
2125
2126 =head2 sql_limit_dialect
2127
2128 This is an accessor for the default SQL limit dialect used by a particular
2129 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2130 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2131 see L<DBIx::Class::SQLMaker::LimitDialects>.
2132
2133 =head2 sth
2134
2135 =over 4
2136
2137 =item Arguments: $sql
2138
2139 =back
2140
2141 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2142
2143 =cut
2144
2145 sub _dbh_sth {
2146   my ($self, $dbh, $sql) = @_;
2147
2148   # 3 is the if_active parameter which avoids active sth re-use
2149   my $sth = $self->disable_sth_caching
2150     ? $dbh->prepare($sql)
2151     : $dbh->prepare_cached($sql, {}, 3);
2152
2153   # XXX You would think RaiseError would make this impossible,
2154   #  but apparently that's not true :(
2155   $self->throw_exception($dbh->errstr) if !$sth;
2156
2157   $sth;
2158 }
2159
2160 sub sth {
2161   my ($self, $sql) = @_;
2162   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2163 }
2164
2165 sub _dbh_columns_info_for {
2166   my ($self, $dbh, $table) = @_;
2167
2168   if ($dbh->can('column_info')) {
2169     my %result;
2170     my $caught;
2171     try {
2172       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2173       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2174       $sth->execute();
2175       while ( my $info = $sth->fetchrow_hashref() ){
2176         my %column_info;
2177         $column_info{data_type}   = $info->{TYPE_NAME};
2178         $column_info{size}      = $info->{COLUMN_SIZE};
2179         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2180         $column_info{default_value} = $info->{COLUMN_DEF};
2181         my $col_name = $info->{COLUMN_NAME};
2182         $col_name =~ s/^\"(.*)\"$/$1/;
2183
2184         $result{$col_name} = \%column_info;
2185       }
2186     } catch {
2187       $caught = 1;
2188     };
2189     return \%result if !$caught && scalar keys %result;
2190   }
2191
2192   my %result;
2193   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2194   $sth->execute;
2195   my @columns = @{$sth->{NAME_lc}};
2196   for my $i ( 0 .. $#columns ){
2197     my %column_info;
2198     $column_info{data_type} = $sth->{TYPE}->[$i];
2199     $column_info{size} = $sth->{PRECISION}->[$i];
2200     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2201
2202     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2203       $column_info{data_type} = $1;
2204       $column_info{size}    = $2;
2205     }
2206
2207     $result{$columns[$i]} = \%column_info;
2208   }
2209   $sth->finish;
2210
2211   foreach my $col (keys %result) {
2212     my $colinfo = $result{$col};
2213     my $type_num = $colinfo->{data_type};
2214     my $type_name;
2215     if(defined $type_num && $dbh->can('type_info')) {
2216       my $type_info = $dbh->type_info($type_num);
2217       $type_name = $type_info->{TYPE_NAME} if $type_info;
2218       $colinfo->{data_type} = $type_name if $type_name;
2219     }
2220   }
2221
2222   return \%result;
2223 }
2224
2225 sub columns_info_for {
2226   my ($self, $table) = @_;
2227   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2228 }
2229
2230 =head2 last_insert_id
2231
2232 Return the row id of the last insert.
2233
2234 =cut
2235
2236 sub _dbh_last_insert_id {
2237     my ($self, $dbh, $source, $col) = @_;
2238
2239     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2240
2241     return $id if defined $id;
2242
2243     my $class = ref $self;
2244     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2245 }
2246
2247 sub last_insert_id {
2248   my $self = shift;
2249   $self->_dbh_last_insert_id ($self->_dbh, @_);
2250 }
2251
2252 =head2 _native_data_type
2253
2254 =over 4
2255
2256 =item Arguments: $type_name
2257
2258 =back
2259
2260 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2261 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2262 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2263
2264 The default implementation returns C<undef>, implement in your Storage driver if
2265 you need this functionality.
2266
2267 Should map types from other databases to the native RDBMS type, for example
2268 C<VARCHAR2> to C<VARCHAR>.
2269
2270 Types with modifiers should map to the underlying data type. For example,
2271 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2272
2273 Composite types should map to the container type, for example
2274 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2275
2276 =cut
2277
2278 sub _native_data_type {
2279   #my ($self, $data_type) = @_;
2280   return undef
2281 }
2282
2283 # Check if placeholders are supported at all
2284 sub _determine_supports_placeholders {
2285   my $self = shift;
2286   my $dbh  = $self->_get_dbh;
2287
2288   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2289   # but it is inaccurate more often than not
2290   return try {
2291     local $dbh->{PrintError} = 0;
2292     local $dbh->{RaiseError} = 1;
2293     $dbh->do('select ?', {}, 1);
2294     1;
2295   }
2296   catch {
2297     0;
2298   };
2299 }
2300
2301 # Check if placeholders bound to non-string types throw exceptions
2302 #
2303 sub _determine_supports_typeless_placeholders {
2304   my $self = shift;
2305   my $dbh  = $self->_get_dbh;
2306
2307   return try {
2308     local $dbh->{PrintError} = 0;
2309     local $dbh->{RaiseError} = 1;
2310     # this specifically tests a bind that is NOT a string
2311     $dbh->do('select 1 where 1 = ?', {}, 1);
2312     1;
2313   }
2314   catch {
2315     0;
2316   };
2317 }
2318
2319 =head2 sqlt_type
2320
2321 Returns the database driver name.
2322
2323 =cut
2324
2325 sub sqlt_type {
2326   shift->_get_dbh->{Driver}->{Name};
2327 }
2328
2329 =head2 bind_attribute_by_data_type
2330
2331 Given a datatype from column info, returns a database specific bind
2332 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2333 let the database planner just handle it.
2334
2335 Generally only needed for special case column types, like bytea in postgres.
2336
2337 =cut
2338
2339 sub bind_attribute_by_data_type {
2340     return;
2341 }
2342
2343 =head2 is_datatype_numeric
2344
2345 Given a datatype from column_info, returns a boolean value indicating if
2346 the current RDBMS considers it a numeric value. This controls how
2347 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2348 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2349 be performed instead of the usual C<eq>.
2350
2351 =cut
2352
2353 sub is_datatype_numeric {
2354   my ($self, $dt) = @_;
2355
2356   return 0 unless $dt;
2357
2358   return $dt =~ /^ (?:
2359     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2360   ) $/ix;
2361 }
2362
2363
2364 =head2 create_ddl_dir
2365
2366 =over 4
2367
2368 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2369
2370 =back
2371
2372 Creates a SQL file based on the Schema, for each of the specified
2373 database engines in C<\@databases> in the given directory.
2374 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2375
2376 Given a previous version number, this will also create a file containing
2377 the ALTER TABLE statements to transform the previous schema into the
2378 current one. Note that these statements may contain C<DROP TABLE> or
2379 C<DROP COLUMN> statements that can potentially destroy data.
2380
2381 The file names are created using the C<ddl_filename> method below, please
2382 override this method in your schema if you would like a different file
2383 name format. For the ALTER file, the same format is used, replacing
2384 $version in the name with "$preversion-$version".
2385
2386 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2387 The most common value for this would be C<< { add_drop_table => 1 } >>
2388 to have the SQL produced include a C<DROP TABLE> statement for each table
2389 created. For quoting purposes supply C<quote_table_names> and
2390 C<quote_field_names>.
2391
2392 If no arguments are passed, then the following default values are assumed:
2393
2394 =over 4
2395
2396 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2397
2398 =item version    - $schema->schema_version
2399
2400 =item directory  - './'
2401
2402 =item preversion - <none>
2403
2404 =back
2405
2406 By default, C<\%sqlt_args> will have
2407
2408  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2409
2410 merged with the hash passed in. To disable any of those features, pass in a
2411 hashref like the following
2412
2413  { ignore_constraint_names => 0, # ... other options }
2414
2415
2416 WARNING: You are strongly advised to check all SQL files created, before applying
2417 them.
2418
2419 =cut
2420
2421 sub create_ddl_dir {
2422   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2423
2424   unless ($dir) {
2425     carp "No directory given, using ./\n";
2426     $dir = './';
2427   } else {
2428       -d $dir
2429         or
2430       make_path ("$dir")  # make_path does not like objects (i.e. Path::Class::Dir)
2431         or
2432       $self->throw_exception(
2433         "Failed to create '$dir': " . ($! || $@ || 'error unknow')
2434       );
2435   }
2436
2437   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2438
2439   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2440   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2441
2442   my $schema_version = $schema->schema_version || '1.x';
2443   $version ||= $schema_version;
2444
2445   $sqltargs = {
2446     add_drop_table => 1,
2447     ignore_constraint_names => 1,
2448     ignore_index_names => 1,
2449     %{$sqltargs || {}}
2450   };
2451
2452   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2453     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2454   }
2455
2456   my $sqlt = SQL::Translator->new( $sqltargs );
2457
2458   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2459   my $sqlt_schema = $sqlt->translate({ data => $schema })
2460     or $self->throw_exception ($sqlt->error);
2461
2462   foreach my $db (@$databases) {
2463     $sqlt->reset();
2464     $sqlt->{schema} = $sqlt_schema;
2465     $sqlt->producer($db);
2466
2467     my $file;
2468     my $filename = $schema->ddl_filename($db, $version, $dir);
2469     if (-e $filename && ($version eq $schema_version )) {
2470       # if we are dumping the current version, overwrite the DDL
2471       carp "Overwriting existing DDL file - $filename";
2472       unlink($filename);
2473     }
2474
2475     my $output = $sqlt->translate;
2476     if(!$output) {
2477       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2478       next;
2479     }
2480     if(!open($file, ">$filename")) {
2481       $self->throw_exception("Can't open $filename for writing ($!)");
2482       next;
2483     }
2484     print $file $output;
2485     close($file);
2486
2487     next unless ($preversion);
2488
2489     require SQL::Translator::Diff;
2490
2491     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2492     if(!-e $prefilename) {
2493       carp("No previous schema file found ($prefilename)");
2494       next;
2495     }
2496
2497     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2498     if(-e $difffile) {
2499       carp("Overwriting existing diff file - $difffile");
2500       unlink($difffile);
2501     }
2502
2503     my $source_schema;
2504     {
2505       my $t = SQL::Translator->new($sqltargs);
2506       $t->debug( 0 );
2507       $t->trace( 0 );
2508
2509       $t->parser( $db )
2510         or $self->throw_exception ($t->error);
2511
2512       my $out = $t->translate( $prefilename )
2513         or $self->throw_exception ($t->error);
2514
2515       $source_schema = $t->schema;
2516
2517       $source_schema->name( $prefilename )
2518         unless ( $source_schema->name );
2519     }
2520
2521     # The "new" style of producers have sane normalization and can support
2522     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2523     # And we have to diff parsed SQL against parsed SQL.
2524     my $dest_schema = $sqlt_schema;
2525
2526     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2527       my $t = SQL::Translator->new($sqltargs);
2528       $t->debug( 0 );
2529       $t->trace( 0 );
2530
2531       $t->parser( $db )
2532         or $self->throw_exception ($t->error);
2533
2534       my $out = $t->translate( $filename )
2535         or $self->throw_exception ($t->error);
2536
2537       $dest_schema = $t->schema;
2538
2539       $dest_schema->name( $filename )
2540         unless $dest_schema->name;
2541     }
2542
2543     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2544                                                   $dest_schema,   $db,
2545                                                   $sqltargs
2546                                                  );
2547     if(!open $file, ">$difffile") {
2548       $self->throw_exception("Can't write to $difffile ($!)");
2549       next;
2550     }
2551     print $file $diff;
2552     close($file);
2553   }
2554 }
2555
2556 =head2 deployment_statements
2557
2558 =over 4
2559
2560 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2561
2562 =back
2563
2564 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2565
2566 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2567 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2568
2569 C<$directory> is used to return statements from files in a previously created
2570 L</create_ddl_dir> directory and is optional. The filenames are constructed
2571 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2572
2573 If no C<$directory> is specified then the statements are constructed on the
2574 fly using L<SQL::Translator> and C<$version> is ignored.
2575
2576 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2577
2578 =cut
2579
2580 sub deployment_statements {
2581   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2582   $type ||= $self->sqlt_type;
2583   $version ||= $schema->schema_version || '1.x';
2584   $dir ||= './';
2585   my $filename = $schema->ddl_filename($type, $version, $dir);
2586   if(-f $filename)
2587   {
2588       my $file;
2589       open($file, "<$filename")
2590         or $self->throw_exception("Can't open $filename ($!)");
2591       my @rows = <$file>;
2592       close($file);
2593       return join('', @rows);
2594   }
2595
2596   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2597     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2598   }
2599
2600   # sources needs to be a parser arg, but for simplicty allow at top level
2601   # coming in
2602   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2603       if exists $sqltargs->{sources};
2604
2605   my $tr = SQL::Translator->new(
2606     producer => "SQL::Translator::Producer::${type}",
2607     %$sqltargs,
2608     parser => 'SQL::Translator::Parser::DBIx::Class',
2609     data => $schema,
2610   );
2611
2612   my @ret;
2613   my $wa = wantarray;
2614   if ($wa) {
2615     @ret = $tr->translate;
2616   }
2617   else {
2618     $ret[0] = $tr->translate;
2619   }
2620
2621   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2622     unless (@ret && defined $ret[0]);
2623
2624   return $wa ? @ret : $ret[0];
2625 }
2626
2627 sub deploy {
2628   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2629   my $deploy = sub {
2630     my $line = shift;
2631     return if($line =~ /^--/);
2632     return if(!$line);
2633     # next if($line =~ /^DROP/m);
2634     return if($line =~ /^BEGIN TRANSACTION/m);
2635     return if($line =~ /^COMMIT/m);
2636     return if $line =~ /^\s+$/; # skip whitespace only
2637     $self->_query_start($line);
2638     try {
2639       # do a dbh_do cycle here, as we need some error checking in
2640       # place (even though we will ignore errors)
2641       $self->dbh_do (sub { $_[1]->do($line) });
2642     } catch {
2643       carp qq{$_ (running "${line}")};
2644     };
2645     $self->_query_end($line);
2646   };
2647   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2648   if (@statements > 1) {
2649     foreach my $statement (@statements) {
2650       $deploy->( $statement );
2651     }
2652   }
2653   elsif (@statements == 1) {
2654     foreach my $line ( split(";\n", $statements[0])) {
2655       $deploy->( $line );
2656     }
2657   }
2658 }
2659
2660 =head2 datetime_parser
2661
2662 Returns the datetime parser class
2663
2664 =cut
2665
2666 sub datetime_parser {
2667   my $self = shift;
2668   return $self->{datetime_parser} ||= do {
2669     $self->build_datetime_parser(@_);
2670   };
2671 }
2672
2673 =head2 datetime_parser_type
2674
2675 Defines (returns) the datetime parser class - currently hardwired to
2676 L<DateTime::Format::MySQL>
2677
2678 =cut
2679
2680 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2681
2682 =head2 build_datetime_parser
2683
2684 See L</datetime_parser>
2685
2686 =cut
2687
2688 sub build_datetime_parser {
2689   my $self = shift;
2690   my $type = $self->datetime_parser_type(@_);
2691   $self->ensure_class_loaded ($type);
2692   return $type;
2693 }
2694
2695
2696 =head2 is_replicating
2697
2698 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2699 replicate from a master database.  Default is undef, which is the result
2700 returned by databases that don't support replication.
2701
2702 =cut
2703
2704 sub is_replicating {
2705     return;
2706
2707 }
2708
2709 =head2 lag_behind_master
2710
2711 Returns a number that represents a certain amount of lag behind a master db
2712 when a given storage is replicating.  The number is database dependent, but
2713 starts at zero and increases with the amount of lag. Default in undef
2714
2715 =cut
2716
2717 sub lag_behind_master {
2718     return;
2719 }
2720
2721 =head2 relname_to_table_alias
2722
2723 =over 4
2724
2725 =item Arguments: $relname, $join_count
2726
2727 =back
2728
2729 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2730 queries.
2731
2732 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2733 way these aliases are named.
2734
2735 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2736 otherwise C<"$relname">.
2737
2738 =cut
2739
2740 sub relname_to_table_alias {
2741   my ($self, $relname, $join_count) = @_;
2742
2743   my $alias = ($join_count && $join_count > 1 ?
2744     join('_', $relname, $join_count) : $relname);
2745
2746   return $alias;
2747 }
2748
2749 1;
2750
2751 =head1 USAGE NOTES
2752
2753 =head2 DBIx::Class and AutoCommit
2754
2755 DBIx::Class can do some wonderful magic with handling exceptions,
2756 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2757 (the default) combined with C<txn_do> for transaction support.
2758
2759 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2760 in an assumed transaction between commits, and you're telling us you'd
2761 like to manage that manually.  A lot of the magic protections offered by
2762 this module will go away.  We can't protect you from exceptions due to database
2763 disconnects because we don't know anything about how to restart your
2764 transactions.  You're on your own for handling all sorts of exceptional
2765 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2766 be with raw DBI.
2767
2768
2769 =head1 AUTHORS
2770
2771 Matt S. Trout <mst@shadowcatsystems.co.uk>
2772
2773 Andy Grundman <andy@hybridized.org>
2774
2775 =head1 LICENSE
2776
2777 You may distribute this code under the same terms as Perl itself.
2778
2779 =cut