Ditch Carp::Clan for our own thing
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use DBIx::Class::Exception;
12 use Scalar::Util qw/refaddr weaken reftype blessed/;
13 use List::Util qw/first/;
14 use Sub::Name 'subname';
15 use Try::Tiny;
16 use overload ();
17 use namespace::clean;
18
19
20 # default cursor class, overridable in connect_info attributes
21 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
22
23 __PACKAGE__->mk_group_accessors('inherited' => qw/
24   sql_limit_dialect sql_quote_char sql_name_sep
25 /);
26
27 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
28
29 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
30 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
31
32 __PACKAGE__->sql_name_sep('.');
33
34 __PACKAGE__->mk_group_accessors('simple' => qw/
35   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
36   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
37   transaction_depth _dbh_autocommit  savepoints
38 /);
39
40 # the values for these accessors are picked out (and deleted) from
41 # the attribute hashref passed to connect_info
42 my @storage_options = qw/
43   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
44   disable_sth_caching unsafe auto_savepoint
45 /;
46 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
47
48
49 # capability definitions, using a 2-tiered accessor system
50 # The rationale is:
51 #
52 # A driver/user may define _use_X, which blindly without any checks says:
53 # "(do not) use this capability", (use_dbms_capability is an "inherited"
54 # type accessor)
55 #
56 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
57 # accessor, which in turn calls _determine_supports_X, and stores the return
58 # in a special slot on the storage object, which is wiped every time a $dbh
59 # reconnection takes place (it is not guaranteed that upon reconnection we
60 # will get the same rdbms version). _determine_supports_X does not need to
61 # exist on a driver, as we ->can for it before calling.
62
63 my @capabilities = (qw/
64   insert_returning
65   insert_returning_bound
66   placeholders
67   typeless_placeholders
68   join_optimizer
69 /);
70 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
71 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
72
73 # on by default, not strictly a capability (pending rewrite)
74 __PACKAGE__->_use_join_optimizer (1);
75 sub _determine_supports_join_optimizer { 1 };
76
77 # Each of these methods need _determine_driver called before itself
78 # in order to function reliably. This is a purely DRY optimization
79 #
80 # get_(use)_dbms_capability need to be called on the correct Storage
81 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
82 # _determine_supports_X which obv. needs a correct driver as well
83 my @rdbms_specific_methods = qw/
84   deployment_statements
85   sqlt_type
86   sql_maker
87   build_datetime_parser
88   datetime_parser_type
89
90   insert
91   insert_bulk
92   update
93   delete
94   select
95   select_single
96
97   get_use_dbms_capability
98   get_dbms_capability
99
100   _server_info
101   _get_server_version
102 /;
103
104 for my $meth (@rdbms_specific_methods) {
105
106   my $orig = __PACKAGE__->can ($meth)
107     or die "$meth is not a ::Storage::DBI method!";
108
109   no strict qw/refs/;
110   no warnings qw/redefine/;
111   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
112     if (
113       # only fire when invoked on an instance, a valid class-based invocation
114       # would e.g. be setting a default for an inherited accessor
115       ref $_[0]
116         and
117       ! $_[0]->_driver_determined
118         and
119       ! $_[0]->{_in_determine_driver}
120     ) {
121       $_[0]->_determine_driver;
122
123       # This for some reason crashes and burns on perl 5.8.1
124       # IFF the method ends up throwing an exception
125       #goto $_[0]->can ($meth);
126
127       my $cref = $_[0]->can ($meth);
128       goto $cref;
129     }
130
131     goto $orig;
132   };
133 }
134
135
136 =head1 NAME
137
138 DBIx::Class::Storage::DBI - DBI storage handler
139
140 =head1 SYNOPSIS
141
142   my $schema = MySchema->connect('dbi:SQLite:my.db');
143
144   $schema->storage->debug(1);
145
146   my @stuff = $schema->storage->dbh_do(
147     sub {
148       my ($storage, $dbh, @args) = @_;
149       $dbh->do("DROP TABLE authors");
150     },
151     @column_list
152   );
153
154   $schema->resultset('Book')->search({
155      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
156   });
157
158 =head1 DESCRIPTION
159
160 This class represents the connection to an RDBMS via L<DBI>.  See
161 L<DBIx::Class::Storage> for general information.  This pod only
162 documents DBI-specific methods and behaviors.
163
164 =head1 METHODS
165
166 =cut
167
168 sub new {
169   my $new = shift->next::method(@_);
170
171   $new->transaction_depth(0);
172   $new->_sql_maker_opts({});
173   $new->_dbh_details({});
174   $new->{savepoints} = [];
175   $new->{_in_dbh_do} = 0;
176   $new->{_dbh_gen} = 0;
177
178   # read below to see what this does
179   $new->_arm_global_destructor;
180
181   $new;
182 }
183
184 # This is hack to work around perl shooting stuff in random
185 # order on exit(). If we do not walk the remaining storage
186 # objects in an END block, there is a *small but real* chance
187 # of a fork()ed child to kill the parent's shared DBI handle,
188 # *before perl reaches the DESTROY in this package*
189 # Yes, it is ugly and effective.
190 # Additionally this registry is used by the CLONE method to
191 # make sure no handles are shared between threads
192 {
193   my %seek_and_destroy;
194
195   sub _arm_global_destructor {
196     my $self = shift;
197     my $key = refaddr ($self);
198     $seek_and_destroy{$key} = $self;
199     weaken ($seek_and_destroy{$key});
200   }
201
202   END {
203     local $?; # just in case the DBI destructor changes it somehow
204
205     # destroy just the object if not native to this process/thread
206     $_->_verify_pid for (grep
207       { defined $_ }
208       values %seek_and_destroy
209     );
210   }
211
212   sub CLONE {
213     # As per DBI's recommendation, DBIC disconnects all handles as
214     # soon as possible (DBIC will reconnect only on demand from within
215     # the thread)
216     for (values %seek_and_destroy) {
217       next unless $_;
218       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
219       $_->_dbh(undef);
220     }
221   }
222 }
223
224 sub DESTROY {
225   my $self = shift;
226
227   # some databases spew warnings on implicit disconnect
228   local $SIG{__WARN__} = sub {};
229   $self->_dbh(undef);
230
231   # this op is necessary, since the very last perl runtime statement
232   # triggers a global destruction shootout, and the $SIG localization
233   # may very well be destroyed before perl actually gets to do the
234   # $dbh undef
235   1;
236 }
237
238 # handle pid changes correctly - do not destroy parent's connection
239 sub _verify_pid {
240   my $self = shift;
241
242   my $pid = $self->_conn_pid;
243   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
244     $dbh->{InactiveDestroy} = 1;
245     $self->{_dbh_gen}++;
246     $self->_dbh(undef);
247   }
248
249   return;
250 }
251
252 =head2 connect_info
253
254 This method is normally called by L<DBIx::Class::Schema/connection>, which
255 encapsulates its argument list in an arrayref before passing them here.
256
257 The argument list may contain:
258
259 =over
260
261 =item *
262
263 The same 4-element argument set one would normally pass to
264 L<DBI/connect>, optionally followed by
265 L<extra attributes|/DBIx::Class specific connection attributes>
266 recognized by DBIx::Class:
267
268   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
269
270 =item *
271
272 A single code reference which returns a connected
273 L<DBI database handle|DBI/connect> optionally followed by
274 L<extra attributes|/DBIx::Class specific connection attributes> recognized
275 by DBIx::Class:
276
277   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
278
279 =item *
280
281 A single hashref with all the attributes and the dsn/user/password
282 mixed together:
283
284   $connect_info_args = [{
285     dsn => $dsn,
286     user => $user,
287     password => $pass,
288     %dbi_attributes,
289     %extra_attributes,
290   }];
291
292   $connect_info_args = [{
293     dbh_maker => sub { DBI->connect (...) },
294     %dbi_attributes,
295     %extra_attributes,
296   }];
297
298 This is particularly useful for L<Catalyst> based applications, allowing the
299 following config (L<Config::General> style):
300
301   <Model::DB>
302     schema_class   App::DB
303     <connect_info>
304       dsn          dbi:mysql:database=test
305       user         testuser
306       password     TestPass
307       AutoCommit   1
308     </connect_info>
309   </Model::DB>
310
311 The C<dsn>/C<user>/C<password> combination can be substituted by the
312 C<dbh_maker> key whose value is a coderef that returns a connected
313 L<DBI database handle|DBI/connect>
314
315 =back
316
317 Please note that the L<DBI> docs recommend that you always explicitly
318 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
319 recommends that it be set to I<1>, and that you perform transactions
320 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
321 to I<1> if you do not do explicitly set it to zero.  This is the default
322 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
323
324 =head3 DBIx::Class specific connection attributes
325
326 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
327 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
328 the following connection options. These options can be mixed in with your other
329 L<DBI> connection attributes, or placed in a separate hashref
330 (C<\%extra_attributes>) as shown above.
331
332 Every time C<connect_info> is invoked, any previous settings for
333 these options will be cleared before setting the new ones, regardless of
334 whether any options are specified in the new C<connect_info>.
335
336
337 =over
338
339 =item on_connect_do
340
341 Specifies things to do immediately after connecting or re-connecting to
342 the database.  Its value may contain:
343
344 =over
345
346 =item a scalar
347
348 This contains one SQL statement to execute.
349
350 =item an array reference
351
352 This contains SQL statements to execute in order.  Each element contains
353 a string or a code reference that returns a string.
354
355 =item a code reference
356
357 This contains some code to execute.  Unlike code references within an
358 array reference, its return value is ignored.
359
360 =back
361
362 =item on_disconnect_do
363
364 Takes arguments in the same form as L</on_connect_do> and executes them
365 immediately before disconnecting from the database.
366
367 Note, this only runs if you explicitly call L</disconnect> on the
368 storage object.
369
370 =item on_connect_call
371
372 A more generalized form of L</on_connect_do> that calls the specified
373 C<connect_call_METHOD> methods in your storage driver.
374
375   on_connect_do => 'select 1'
376
377 is equivalent to:
378
379   on_connect_call => [ [ do_sql => 'select 1' ] ]
380
381 Its values may contain:
382
383 =over
384
385 =item a scalar
386
387 Will call the C<connect_call_METHOD> method.
388
389 =item a code reference
390
391 Will execute C<< $code->($storage) >>
392
393 =item an array reference
394
395 Each value can be a method name or code reference.
396
397 =item an array of arrays
398
399 For each array, the first item is taken to be the C<connect_call_> method name
400 or code reference, and the rest are parameters to it.
401
402 =back
403
404 Some predefined storage methods you may use:
405
406 =over
407
408 =item do_sql
409
410 Executes a SQL string or a code reference that returns a SQL string. This is
411 what L</on_connect_do> and L</on_disconnect_do> use.
412
413 It can take:
414
415 =over
416
417 =item a scalar
418
419 Will execute the scalar as SQL.
420
421 =item an arrayref
422
423 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
424 attributes hashref and bind values.
425
426 =item a code reference
427
428 Will execute C<< $code->($storage) >> and execute the return array refs as
429 above.
430
431 =back
432
433 =item datetime_setup
434
435 Execute any statements necessary to initialize the database session to return
436 and accept datetime/timestamp values used with
437 L<DBIx::Class::InflateColumn::DateTime>.
438
439 Only necessary for some databases, see your specific storage driver for
440 implementation details.
441
442 =back
443
444 =item on_disconnect_call
445
446 Takes arguments in the same form as L</on_connect_call> and executes them
447 immediately before disconnecting from the database.
448
449 Calls the C<disconnect_call_METHOD> methods as opposed to the
450 C<connect_call_METHOD> methods called by L</on_connect_call>.
451
452 Note, this only runs if you explicitly call L</disconnect> on the
453 storage object.
454
455 =item disable_sth_caching
456
457 If set to a true value, this option will disable the caching of
458 statement handles via L<DBI/prepare_cached>.
459
460 =item limit_dialect
461
462 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
463 default L</sql_limit_dialect> setting of the storage (if any). For a list
464 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
465
466 =item quote_names
467
468 When true automatically sets L</quote_char> and L</name_sep> to the characters
469 appropriate for your particular RDBMS. This option is preferred over specifying
470 L</quote_char> directly.
471
472 =item quote_char
473
474 Specifies what characters to use to quote table and column names.
475
476 C<quote_char> expects either a single character, in which case is it
477 is placed on either side of the table/column name, or an arrayref of length
478 2 in which case the table/column name is placed between the elements.
479
480 For example under MySQL you should use C<< quote_char => '`' >>, and for
481 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
482
483 =item name_sep
484
485 This parameter is only useful in conjunction with C<quote_char>, and is used to
486 specify the character that separates elements (schemas, tables, columns) from
487 each other. If unspecified it defaults to the most commonly used C<.>.
488
489 =item unsafe
490
491 This Storage driver normally installs its own C<HandleError>, sets
492 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
493 all database handles, including those supplied by a coderef.  It does this
494 so that it can have consistent and useful error behavior.
495
496 If you set this option to a true value, Storage will not do its usual
497 modifications to the database handle's attributes, and instead relies on
498 the settings in your connect_info DBI options (or the values you set in
499 your connection coderef, in the case that you are connecting via coderef).
500
501 Note that your custom settings can cause Storage to malfunction,
502 especially if you set a C<HandleError> handler that suppresses exceptions
503 and/or disable C<RaiseError>.
504
505 =item auto_savepoint
506
507 If this option is true, L<DBIx::Class> will use savepoints when nesting
508 transactions, making it possible to recover from failure in the inner
509 transaction without having to abort all outer transactions.
510
511 =item cursor_class
512
513 Use this argument to supply a cursor class other than the default
514 L<DBIx::Class::Storage::DBI::Cursor>.
515
516 =back
517
518 Some real-life examples of arguments to L</connect_info> and
519 L<DBIx::Class::Schema/connect>
520
521   # Simple SQLite connection
522   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
523
524   # Connect via subref
525   ->connect_info([ sub { DBI->connect(...) } ]);
526
527   # Connect via subref in hashref
528   ->connect_info([{
529     dbh_maker => sub { DBI->connect(...) },
530     on_connect_do => 'alter session ...',
531   }]);
532
533   # A bit more complicated
534   ->connect_info(
535     [
536       'dbi:Pg:dbname=foo',
537       'postgres',
538       'my_pg_password',
539       { AutoCommit => 1 },
540       { quote_char => q{"} },
541     ]
542   );
543
544   # Equivalent to the previous example
545   ->connect_info(
546     [
547       'dbi:Pg:dbname=foo',
548       'postgres',
549       'my_pg_password',
550       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
551     ]
552   );
553
554   # Same, but with hashref as argument
555   # See parse_connect_info for explanation
556   ->connect_info(
557     [{
558       dsn         => 'dbi:Pg:dbname=foo',
559       user        => 'postgres',
560       password    => 'my_pg_password',
561       AutoCommit  => 1,
562       quote_char  => q{"},
563       name_sep    => q{.},
564     }]
565   );
566
567   # Subref + DBIx::Class-specific connection options
568   ->connect_info(
569     [
570       sub { DBI->connect(...) },
571       {
572           quote_char => q{`},
573           name_sep => q{@},
574           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
575           disable_sth_caching => 1,
576       },
577     ]
578   );
579
580
581
582 =cut
583
584 sub connect_info {
585   my ($self, $info) = @_;
586
587   return $self->_connect_info if !$info;
588
589   $self->_connect_info($info); # copy for _connect_info
590
591   $info = $self->_normalize_connect_info($info)
592     if ref $info eq 'ARRAY';
593
594   for my $storage_opt (keys %{ $info->{storage_options} }) {
595     my $value = $info->{storage_options}{$storage_opt};
596
597     $self->$storage_opt($value);
598   }
599
600   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
601   #  the new set of options
602   $self->_sql_maker(undef);
603   $self->_sql_maker_opts({});
604
605   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
606     my $value = $info->{sql_maker_options}{$sql_maker_opt};
607
608     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
609   }
610
611   my %attrs = (
612     %{ $self->_default_dbi_connect_attributes || {} },
613     %{ $info->{attributes} || {} },
614   );
615
616   my @args = @{ $info->{arguments} };
617
618   if (keys %attrs and ref $args[0] ne 'CODE') {
619     carp
620         'You provided explicit AutoCommit => 0 in your connection_info. '
621       . 'This is almost universally a bad idea (see the footnotes of '
622       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
623       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
624       . 'this warning.'
625       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
626
627     push @args, \%attrs if keys %attrs;
628   }
629   $self->_dbi_connect_info(\@args);
630
631   # FIXME - dirty:
632   # save attributes them in a separate accessor so they are always
633   # introspectable, even in case of a CODE $dbhmaker
634   $self->_dbic_connect_attributes (\%attrs);
635
636   return $self->_connect_info;
637 }
638
639 sub _normalize_connect_info {
640   my ($self, $info_arg) = @_;
641   my %info;
642
643   my @args = @$info_arg;  # take a shallow copy for further mutilation
644
645   # combine/pre-parse arguments depending on invocation style
646
647   my %attrs;
648   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
649     %attrs = %{ $args[1] || {} };
650     @args = $args[0];
651   }
652   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
653     %attrs = %{$args[0]};
654     @args = ();
655     if (my $code = delete $attrs{dbh_maker}) {
656       @args = $code;
657
658       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
659       if (@ignored) {
660         carp sprintf (
661             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
662           . "to the result of 'dbh_maker'",
663
664           join (', ', map { "'$_'" } (@ignored) ),
665         );
666       }
667     }
668     else {
669       @args = delete @attrs{qw/dsn user password/};
670     }
671   }
672   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
673     %attrs = (
674       % { $args[3] || {} },
675       % { $args[4] || {} },
676     );
677     @args = @args[0,1,2];
678   }
679
680   $info{arguments} = \@args;
681
682   my @storage_opts = grep exists $attrs{$_},
683     @storage_options, 'cursor_class';
684
685   @{ $info{storage_options} }{@storage_opts} =
686     delete @attrs{@storage_opts} if @storage_opts;
687
688   my @sql_maker_opts = grep exists $attrs{$_},
689     qw/limit_dialect quote_char name_sep quote_names/;
690
691   @{ $info{sql_maker_options} }{@sql_maker_opts} =
692     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
693
694   $info{attributes} = \%attrs if %attrs;
695
696   return \%info;
697 }
698
699 sub _default_dbi_connect_attributes () {
700   +{
701     AutoCommit => 1,
702     PrintError => 0,
703     RaiseError => 1,
704     ShowErrorStatement => 1,
705   };
706 }
707
708 =head2 on_connect_do
709
710 This method is deprecated in favour of setting via L</connect_info>.
711
712 =cut
713
714 =head2 on_disconnect_do
715
716 This method is deprecated in favour of setting via L</connect_info>.
717
718 =cut
719
720 sub _parse_connect_do {
721   my ($self, $type) = @_;
722
723   my $val = $self->$type;
724   return () if not defined $val;
725
726   my @res;
727
728   if (not ref($val)) {
729     push @res, [ 'do_sql', $val ];
730   } elsif (ref($val) eq 'CODE') {
731     push @res, $val;
732   } elsif (ref($val) eq 'ARRAY') {
733     push @res, map { [ 'do_sql', $_ ] } @$val;
734   } else {
735     $self->throw_exception("Invalid type for $type: ".ref($val));
736   }
737
738   return \@res;
739 }
740
741 =head2 dbh_do
742
743 Arguments: ($subref | $method_name), @extra_coderef_args?
744
745 Execute the given $subref or $method_name using the new exception-based
746 connection management.
747
748 The first two arguments will be the storage object that C<dbh_do> was called
749 on and a database handle to use.  Any additional arguments will be passed
750 verbatim to the called subref as arguments 2 and onwards.
751
752 Using this (instead of $self->_dbh or $self->dbh) ensures correct
753 exception handling and reconnection (or failover in future subclasses).
754
755 Your subref should have no side-effects outside of the database, as
756 there is the potential for your subref to be partially double-executed
757 if the database connection was stale/dysfunctional.
758
759 Example:
760
761   my @stuff = $schema->storage->dbh_do(
762     sub {
763       my ($storage, $dbh, @cols) = @_;
764       my $cols = join(q{, }, @cols);
765       $dbh->selectrow_array("SELECT $cols FROM foo");
766     },
767     @column_list
768   );
769
770 =cut
771
772 sub dbh_do {
773   my $self = shift;
774   my $code = shift;
775
776   my $dbh = $self->_get_dbh;
777
778   return $self->$code($dbh, @_)
779     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
780
781   local $self->{_in_dbh_do} = 1;
782
783   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
784   my $args = \@_;
785   return try {
786     $self->$code ($dbh, @$args);
787   } catch {
788     $self->throw_exception($_) if $self->connected;
789
790     # We were not connected - reconnect and retry, but let any
791     #  exception fall right through this time
792     carp "Retrying $code after catching disconnected exception: $_"
793       if $ENV{DBIC_DBIRETRY_DEBUG};
794
795     $self->_populate_dbh;
796     $self->$code($self->_dbh, @$args);
797   };
798 }
799
800 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
801 # It also informs dbh_do to bypass itself while under the direction of txn_do,
802 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
803 sub txn_do {
804   my $self = shift;
805   my $coderef = shift;
806
807   ref $coderef eq 'CODE' or $self->throw_exception
808     ('$coderef must be a CODE reference');
809
810   local $self->{_in_dbh_do} = 1;
811
812   my @result;
813   my $want = wantarray;
814
815   my $tried = 0;
816   while(1) {
817     my $exception;
818
819     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
820     my $args = \@_;
821
822     try {
823       $self->txn_begin;
824       my $txn_start_depth = $self->transaction_depth;
825       if($want) {
826           @result = $coderef->(@$args);
827       }
828       elsif(defined $want) {
829           $result[0] = $coderef->(@$args);
830       }
831       else {
832           $coderef->(@$args);
833       }
834
835       my $delta_txn = $txn_start_depth - $self->transaction_depth;
836       if ($delta_txn == 0) {
837         $self->txn_commit;
838       }
839       elsif ($delta_txn != 1) {
840         # an off-by-one would mean we fired a rollback
841         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
842       }
843     } catch {
844       $exception = $_;
845     };
846
847     if(! defined $exception) { return wantarray ? @result : $result[0] }
848
849     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
850       my $rollback_exception;
851       try { $self->txn_rollback } catch { $rollback_exception = shift };
852       if(defined $rollback_exception) {
853         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
854         $self->throw_exception($exception)  # propagate nested rollback
855           if $rollback_exception =~ /$exception_class/;
856
857         $self->throw_exception(
858           "Transaction aborted: ${exception}. "
859           . "Rollback failed: ${rollback_exception}"
860         );
861       }
862       $self->throw_exception($exception)
863     }
864
865     # We were not connected, and was first try - reconnect and retry
866     # via the while loop
867     carp "Retrying $coderef after catching disconnected exception: $exception"
868       if $ENV{DBIC_TXNRETRY_DEBUG};
869     $self->_populate_dbh;
870   }
871 }
872
873 =head2 disconnect
874
875 Our C<disconnect> method also performs a rollback first if the
876 database is not in C<AutoCommit> mode.
877
878 =cut
879
880 sub disconnect {
881   my ($self) = @_;
882
883   if( $self->_dbh ) {
884     my @actions;
885
886     push @actions, ( $self->on_disconnect_call || () );
887     push @actions, $self->_parse_connect_do ('on_disconnect_do');
888
889     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
890
891     $self->_dbh_rollback unless $self->_dbh_autocommit;
892
893     %{ $self->_dbh->{CachedKids} } = ();
894     $self->_dbh->disconnect;
895     $self->_dbh(undef);
896     $self->{_dbh_gen}++;
897   }
898 }
899
900 =head2 with_deferred_fk_checks
901
902 =over 4
903
904 =item Arguments: C<$coderef>
905
906 =item Return Value: The return value of $coderef
907
908 =back
909
910 Storage specific method to run the code ref with FK checks deferred or
911 in MySQL's case disabled entirely.
912
913 =cut
914
915 # Storage subclasses should override this
916 sub with_deferred_fk_checks {
917   my ($self, $sub) = @_;
918   $sub->();
919 }
920
921 =head2 connected
922
923 =over
924
925 =item Arguments: none
926
927 =item Return Value: 1|0
928
929 =back
930
931 Verifies that the current database handle is active and ready to execute
932 an SQL statement (e.g. the connection did not get stale, server is still
933 answering, etc.) This method is used internally by L</dbh>.
934
935 =cut
936
937 sub connected {
938   my $self = shift;
939   return 0 unless $self->_seems_connected;
940
941   #be on the safe side
942   local $self->_dbh->{RaiseError} = 1;
943
944   return $self->_ping;
945 }
946
947 sub _seems_connected {
948   my $self = shift;
949
950   $self->_verify_pid;
951
952   my $dbh = $self->_dbh
953     or return 0;
954
955   return $dbh->FETCH('Active');
956 }
957
958 sub _ping {
959   my $self = shift;
960
961   my $dbh = $self->_dbh or return 0;
962
963   return $dbh->ping;
964 }
965
966 sub ensure_connected {
967   my ($self) = @_;
968
969   unless ($self->connected) {
970     $self->_populate_dbh;
971   }
972 }
973
974 =head2 dbh
975
976 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
977 is guaranteed to be healthy by implicitly calling L</connected>, and if
978 necessary performing a reconnection before returning. Keep in mind that this
979 is very B<expensive> on some database engines. Consider using L</dbh_do>
980 instead.
981
982 =cut
983
984 sub dbh {
985   my ($self) = @_;
986
987   if (not $self->_dbh) {
988     $self->_populate_dbh;
989   } else {
990     $self->ensure_connected;
991   }
992   return $self->_dbh;
993 }
994
995 # this is the internal "get dbh or connect (don't check)" method
996 sub _get_dbh {
997   my $self = shift;
998   $self->_verify_pid;
999   $self->_populate_dbh unless $self->_dbh;
1000   return $self->_dbh;
1001 }
1002
1003 sub sql_maker {
1004   my ($self) = @_;
1005   unless ($self->_sql_maker) {
1006     my $sql_maker_class = $self->sql_maker_class;
1007
1008     my %opts = %{$self->_sql_maker_opts||{}};
1009     my $dialect =
1010       $opts{limit_dialect}
1011         ||
1012       $self->sql_limit_dialect
1013         ||
1014       do {
1015         my $s_class = (ref $self) || $self;
1016         carp (
1017           "Your storage class ($s_class) does not set sql_limit_dialect and you "
1018         . 'have not supplied an explicit limit_dialect in your connection_info. '
1019         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1020         . 'databases but can be (and often is) painfully slow. '
1021         . "Please file an RT ticket against '$s_class' ."
1022         );
1023
1024         'GenericSubQ';
1025       }
1026     ;
1027
1028     my ($quote_char, $name_sep);
1029
1030     if ($opts{quote_names}) {
1031       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
1032         my $s_class = (ref $self) || $self;
1033         carp (
1034           "You requested 'quote_names' but your storage class ($s_class) does "
1035         . 'not explicitly define a default sql_quote_char and you have not '
1036         . 'supplied a quote_char as part of your connection_info. DBIC will '
1037         .q{default to the ANSI SQL standard quote '"', which works most of }
1038         . "the time. Please file an RT ticket against '$s_class'."
1039         );
1040
1041         '"'; # RV
1042       };
1043
1044       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
1045     }
1046
1047     $self->_sql_maker($sql_maker_class->new(
1048       bindtype=>'columns',
1049       array_datatypes => 1,
1050       limit_dialect => $dialect,
1051       ($quote_char ? (quote_char => $quote_char) : ()),
1052       name_sep => ($name_sep || '.'),
1053       %opts,
1054     ));
1055   }
1056   return $self->_sql_maker;
1057 }
1058
1059 # nothing to do by default
1060 sub _rebless {}
1061 sub _init {}
1062
1063 sub _populate_dbh {
1064   my ($self) = @_;
1065
1066   my @info = @{$self->_dbi_connect_info || []};
1067   $self->_dbh(undef); # in case ->connected failed we might get sent here
1068   $self->_dbh_details({}); # reset everything we know
1069
1070   $self->_dbh($self->_connect(@info));
1071
1072   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1073
1074   $self->_determine_driver;
1075
1076   # Always set the transaction depth on connect, since
1077   #  there is no transaction in progress by definition
1078   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1079
1080   $self->_run_connection_actions unless $self->{_in_determine_driver};
1081 }
1082
1083 sub _run_connection_actions {
1084   my $self = shift;
1085   my @actions;
1086
1087   push @actions, ( $self->on_connect_call || () );
1088   push @actions, $self->_parse_connect_do ('on_connect_do');
1089
1090   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1091 }
1092
1093
1094
1095 sub set_use_dbms_capability {
1096   $_[0]->set_inherited ($_[1], $_[2]);
1097 }
1098
1099 sub get_use_dbms_capability {
1100   my ($self, $capname) = @_;
1101
1102   my $use = $self->get_inherited ($capname);
1103   return defined $use
1104     ? $use
1105     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1106   ;
1107 }
1108
1109 sub set_dbms_capability {
1110   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1111 }
1112
1113 sub get_dbms_capability {
1114   my ($self, $capname) = @_;
1115
1116   my $cap = $self->_dbh_details->{capability}{$capname};
1117
1118   unless (defined $cap) {
1119     if (my $meth = $self->can ("_determine$capname")) {
1120       $cap = $self->$meth ? 1 : 0;
1121     }
1122     else {
1123       $cap = 0;
1124     }
1125
1126     $self->set_dbms_capability ($capname, $cap);
1127   }
1128
1129   return $cap;
1130 }
1131
1132 sub _server_info {
1133   my $self = shift;
1134
1135   my $info;
1136   unless ($info = $self->_dbh_details->{info}) {
1137
1138     $info = {};
1139
1140     my $server_version = try { $self->_get_server_version };
1141
1142     if (defined $server_version) {
1143       $info->{dbms_version} = $server_version;
1144
1145       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1146       my @verparts = split (/\./, $numeric_version);
1147       if (
1148         @verparts
1149           &&
1150         $verparts[0] <= 999
1151       ) {
1152         # consider only up to 3 version parts, iff not more than 3 digits
1153         my @use_parts;
1154         while (@verparts && @use_parts < 3) {
1155           my $p = shift @verparts;
1156           last if $p > 999;
1157           push @use_parts, $p;
1158         }
1159         push @use_parts, 0 while @use_parts < 3;
1160
1161         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1162       }
1163     }
1164
1165     $self->_dbh_details->{info} = $info;
1166   }
1167
1168   return $info;
1169 }
1170
1171 sub _get_server_version {
1172   shift->_dbh_get_info(18);
1173 }
1174
1175 sub _dbh_get_info {
1176   my ($self, $info) = @_;
1177
1178   return try { $self->_get_dbh->get_info($info) } || undef;
1179 }
1180
1181 sub _determine_driver {
1182   my ($self) = @_;
1183
1184   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1185     my $started_connected = 0;
1186     local $self->{_in_determine_driver} = 1;
1187
1188     if (ref($self) eq __PACKAGE__) {
1189       my $driver;
1190       if ($self->_dbh) { # we are connected
1191         $driver = $self->_dbh->{Driver}{Name};
1192         $started_connected = 1;
1193       } else {
1194         # if connect_info is a CODEREF, we have no choice but to connect
1195         if (ref $self->_dbi_connect_info->[0] &&
1196             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1197           $self->_populate_dbh;
1198           $driver = $self->_dbh->{Driver}{Name};
1199         }
1200         else {
1201           # try to use dsn to not require being connected, the driver may still
1202           # force a connection in _rebless to determine version
1203           # (dsn may not be supplied at all if all we do is make a mock-schema)
1204           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1205           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1206           $driver ||= $ENV{DBI_DRIVER};
1207         }
1208       }
1209
1210       if ($driver) {
1211         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1212         if ($self->load_optional_class($storage_class)) {
1213           mro::set_mro($storage_class, 'c3');
1214           bless $self, $storage_class;
1215           $self->_rebless();
1216         }
1217       }
1218     }
1219
1220     $self->_driver_determined(1);
1221
1222     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1223
1224     $self->_init; # run driver-specific initializations
1225
1226     $self->_run_connection_actions
1227         if !$started_connected && defined $self->_dbh;
1228   }
1229 }
1230
1231 sub _do_connection_actions {
1232   my $self          = shift;
1233   my $method_prefix = shift;
1234   my $call          = shift;
1235
1236   if (not ref($call)) {
1237     my $method = $method_prefix . $call;
1238     $self->$method(@_);
1239   } elsif (ref($call) eq 'CODE') {
1240     $self->$call(@_);
1241   } elsif (ref($call) eq 'ARRAY') {
1242     if (ref($call->[0]) ne 'ARRAY') {
1243       $self->_do_connection_actions($method_prefix, $_) for @$call;
1244     } else {
1245       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1246     }
1247   } else {
1248     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1249   }
1250
1251   return $self;
1252 }
1253
1254 sub connect_call_do_sql {
1255   my $self = shift;
1256   $self->_do_query(@_);
1257 }
1258
1259 sub disconnect_call_do_sql {
1260   my $self = shift;
1261   $self->_do_query(@_);
1262 }
1263
1264 # override in db-specific backend when necessary
1265 sub connect_call_datetime_setup { 1 }
1266
1267 sub _do_query {
1268   my ($self, $action) = @_;
1269
1270   if (ref $action eq 'CODE') {
1271     $action = $action->($self);
1272     $self->_do_query($_) foreach @$action;
1273   }
1274   else {
1275     # Most debuggers expect ($sql, @bind), so we need to exclude
1276     # the attribute hash which is the second argument to $dbh->do
1277     # furthermore the bind values are usually to be presented
1278     # as named arrayref pairs, so wrap those here too
1279     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1280     my $sql = shift @do_args;
1281     my $attrs = shift @do_args;
1282     my @bind = map { [ undef, $_ ] } @do_args;
1283
1284     $self->_query_start($sql, @bind);
1285     $self->_get_dbh->do($sql, $attrs, @do_args);
1286     $self->_query_end($sql, @bind);
1287   }
1288
1289   return $self;
1290 }
1291
1292 sub _connect {
1293   my ($self, @info) = @_;
1294
1295   $self->throw_exception("You failed to provide any connection info")
1296     if !@info;
1297
1298   my ($old_connect_via, $dbh);
1299
1300   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1301     $old_connect_via = $DBI::connect_via;
1302     $DBI::connect_via = 'connect';
1303   }
1304
1305   try {
1306     if(ref $info[0] eq 'CODE') {
1307       $dbh = $info[0]->();
1308     }
1309     else {
1310       require DBI;
1311       $dbh = DBI->connect(@info);
1312     }
1313
1314     if (!$dbh) {
1315       die $DBI::errstr;
1316     }
1317
1318     unless ($self->unsafe) {
1319
1320       $self->throw_exception(
1321         'Refusing clobbering of {HandleError} installed on externally supplied '
1322        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1323       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1324
1325       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1326       # request, or an external handle. Complain and set anyway
1327       unless ($dbh->{RaiseError}) {
1328         carp( ref $info[0] eq 'CODE'
1329
1330           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1331            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1332            .'attribute has been supplied'
1333
1334           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1335            .'unsafe => 1. Toggling RaiseError back to true'
1336         );
1337
1338         $dbh->{RaiseError} = 1;
1339       }
1340
1341       # this odd anonymous coderef dereference is in fact really
1342       # necessary to avoid the unwanted effect described in perl5
1343       # RT#75792
1344       sub {
1345         my $weak_self = $_[0];
1346         weaken $weak_self;
1347
1348         # the coderef is blessed so we can distinguish it from externally
1349         # supplied handles (which must be preserved)
1350         $_[1]->{HandleError} = bless sub {
1351           if ($weak_self) {
1352             $weak_self->throw_exception("DBI Exception: $_[0]");
1353           }
1354           else {
1355             # the handler may be invoked by something totally out of
1356             # the scope of DBIC
1357             DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1358           }
1359         }, '__DBIC__DBH__ERROR__HANDLER__';
1360       }->($self, $dbh);
1361     }
1362   }
1363   catch {
1364     $self->throw_exception("DBI Connection failed: $_")
1365   }
1366   finally {
1367     $DBI::connect_via = $old_connect_via if $old_connect_via;
1368   };
1369
1370   $self->_dbh_autocommit($dbh->{AutoCommit});
1371   $dbh;
1372 }
1373
1374 sub svp_begin {
1375   my ($self, $name) = @_;
1376
1377   $name = $self->_svp_generate_name
1378     unless defined $name;
1379
1380   $self->throw_exception ("You can't use savepoints outside a transaction")
1381     if $self->{transaction_depth} == 0;
1382
1383   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1384     unless $self->can('_svp_begin');
1385
1386   push @{ $self->{savepoints} }, $name;
1387
1388   $self->debugobj->svp_begin($name) if $self->debug;
1389
1390   return $self->_svp_begin($name);
1391 }
1392
1393 sub svp_release {
1394   my ($self, $name) = @_;
1395
1396   $self->throw_exception ("You can't use savepoints outside a transaction")
1397     if $self->{transaction_depth} == 0;
1398
1399   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1400     unless $self->can('_svp_release');
1401
1402   if (defined $name) {
1403     $self->throw_exception ("Savepoint '$name' does not exist")
1404       unless grep { $_ eq $name } @{ $self->{savepoints} };
1405
1406     # Dig through the stack until we find the one we are releasing.  This keeps
1407     # the stack up to date.
1408     my $svp;
1409
1410     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1411   } else {
1412     $name = pop @{ $self->{savepoints} };
1413   }
1414
1415   $self->debugobj->svp_release($name) if $self->debug;
1416
1417   return $self->_svp_release($name);
1418 }
1419
1420 sub svp_rollback {
1421   my ($self, $name) = @_;
1422
1423   $self->throw_exception ("You can't use savepoints outside a transaction")
1424     if $self->{transaction_depth} == 0;
1425
1426   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1427     unless $self->can('_svp_rollback');
1428
1429   if (defined $name) {
1430       # If they passed us a name, verify that it exists in the stack
1431       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1432           $self->throw_exception("Savepoint '$name' does not exist!");
1433       }
1434
1435       # Dig through the stack until we find the one we are releasing.  This keeps
1436       # the stack up to date.
1437       while(my $s = pop(@{ $self->{savepoints} })) {
1438           last if($s eq $name);
1439       }
1440       # Add the savepoint back to the stack, as a rollback doesn't remove the
1441       # named savepoint, only everything after it.
1442       push(@{ $self->{savepoints} }, $name);
1443   } else {
1444       # We'll assume they want to rollback to the last savepoint
1445       $name = $self->{savepoints}->[-1];
1446   }
1447
1448   $self->debugobj->svp_rollback($name) if $self->debug;
1449
1450   return $self->_svp_rollback($name);
1451 }
1452
1453 sub _svp_generate_name {
1454   my ($self) = @_;
1455   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1456 }
1457
1458 sub txn_begin {
1459   my $self = shift;
1460
1461   # this means we have not yet connected and do not know the AC status
1462   # (e.g. coderef $dbh)
1463   if (! defined $self->_dbh_autocommit) {
1464     $self->ensure_connected;
1465   }
1466   # otherwise re-connect on pid changes, so
1467   # that the txn_depth is adjusted properly
1468   # the lightweight _get_dbh is good enoug here
1469   # (only superficial handle check, no pings)
1470   else {
1471     $self->_get_dbh;
1472   }
1473
1474   if($self->transaction_depth == 0) {
1475     $self->debugobj->txn_begin()
1476       if $self->debug;
1477     $self->_dbh_begin_work;
1478   }
1479   elsif ($self->auto_savepoint) {
1480     $self->svp_begin;
1481   }
1482   $self->{transaction_depth}++;
1483 }
1484
1485 sub _dbh_begin_work {
1486   my $self = shift;
1487
1488   # if the user is utilizing txn_do - good for him, otherwise we need to
1489   # ensure that the $dbh is healthy on BEGIN.
1490   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1491   # will be replaced by a failure of begin_work itself (which will be
1492   # then retried on reconnect)
1493   if ($self->{_in_dbh_do}) {
1494     $self->_dbh->begin_work;
1495   } else {
1496     $self->dbh_do(sub { $_[1]->begin_work });
1497   }
1498 }
1499
1500 sub txn_commit {
1501   my $self = shift;
1502   if (! $self->_dbh) {
1503     $self->throw_exception('cannot COMMIT on a disconnected handle');
1504   }
1505   elsif ($self->{transaction_depth} == 1) {
1506     $self->debugobj->txn_commit()
1507       if ($self->debug);
1508     $self->_dbh_commit;
1509     $self->{transaction_depth} = 0
1510       if $self->_dbh_autocommit;
1511   }
1512   elsif($self->{transaction_depth} > 1) {
1513     $self->{transaction_depth}--;
1514     $self->svp_release
1515       if $self->auto_savepoint;
1516   }
1517   elsif (! $self->_dbh->FETCH('AutoCommit') ) {
1518
1519     carp "Storage transaction_depth $self->{transaction_depth} does not match "
1520         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1521
1522     $self->debugobj->txn_commit()
1523       if ($self->debug);
1524     $self->_dbh_commit;
1525     $self->{transaction_depth} = 0
1526       if $self->_dbh_autocommit;
1527   }
1528   else {
1529     $self->throw_exception( 'Refusing to commit without a started transaction' );
1530   }
1531 }
1532
1533 sub _dbh_commit {
1534   my $self = shift;
1535   my $dbh  = $self->_dbh
1536     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1537   $dbh->commit;
1538 }
1539
1540 sub txn_rollback {
1541   my $self = shift;
1542   my $dbh = $self->_dbh;
1543   try {
1544     if ($self->{transaction_depth} == 1) {
1545       $self->debugobj->txn_rollback()
1546         if ($self->debug);
1547       $self->{transaction_depth} = 0
1548         if $self->_dbh_autocommit;
1549       $self->_dbh_rollback;
1550     }
1551     elsif($self->{transaction_depth} > 1) {
1552       $self->{transaction_depth}--;
1553       if ($self->auto_savepoint) {
1554         $self->svp_rollback;
1555         $self->svp_release;
1556       }
1557     }
1558     else {
1559       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1560     }
1561   }
1562   catch {
1563     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1564
1565     if ($_ !~ /$exception_class/) {
1566       # ensure that a failed rollback resets the transaction depth
1567       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1568     }
1569
1570     $self->throw_exception($_)
1571   };
1572 }
1573
1574 sub _dbh_rollback {
1575   my $self = shift;
1576   my $dbh  = $self->_dbh
1577     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1578   $dbh->rollback;
1579 }
1580
1581 # This used to be the top-half of _execute.  It was split out to make it
1582 #  easier to override in NoBindVars without duping the rest.  It takes up
1583 #  all of _execute's args, and emits $sql, @bind.
1584 sub _prep_for_execute {
1585   my ($self, $op, $extra_bind, $ident, $args) = @_;
1586
1587   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1588     $ident = $ident->from();
1589   }
1590
1591   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1592
1593   unshift(@bind,
1594     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1595       if $extra_bind;
1596   return ($sql, \@bind);
1597 }
1598
1599
1600 sub _fix_bind_params {
1601     my ($self, @bind) = @_;
1602
1603     ### Turn @bind from something like this:
1604     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1605     ### to this:
1606     ###   ( "'1'", "'1'", "'3'" )
1607     return
1608         map {
1609             if ( defined( $_ && $_->[1] ) ) {
1610                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1611             }
1612             else { q{NULL}; }
1613         } @bind;
1614 }
1615
1616 sub _query_start {
1617     my ( $self, $sql, @bind ) = @_;
1618
1619     if ( $self->debug ) {
1620         @bind = $self->_fix_bind_params(@bind);
1621
1622         $self->debugobj->query_start( $sql, @bind );
1623     }
1624 }
1625
1626 sub _query_end {
1627     my ( $self, $sql, @bind ) = @_;
1628
1629     if ( $self->debug ) {
1630         @bind = $self->_fix_bind_params(@bind);
1631         $self->debugobj->query_end( $sql, @bind );
1632     }
1633 }
1634
1635 sub _dbh_execute {
1636   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1637
1638   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1639
1640   $self->_query_start( $sql, @$bind );
1641
1642   my $sth = $self->sth($sql,$op);
1643
1644   my $placeholder_index = 1;
1645
1646   foreach my $bound (@$bind) {
1647     my $attributes = {};
1648     my($column_name, @data) = @$bound;
1649
1650     if ($bind_attributes) {
1651       $attributes = $bind_attributes->{$column_name}
1652       if defined $bind_attributes->{$column_name};
1653     }
1654
1655     foreach my $data (@data) {
1656       my $ref = ref $data;
1657
1658       if ($ref and overload::Method($data, '""') ) {
1659         $data = "$data";
1660       }
1661       elsif ($ref eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1662         $sth->bind_param_inout(
1663           $placeholder_index++,
1664           $data,
1665           $self->_max_column_bytesize($ident, $column_name),
1666           $attributes
1667         );
1668         next;
1669       }
1670
1671       $sth->bind_param($placeholder_index++, $data, $attributes);
1672     }
1673   }
1674
1675   # Can this fail without throwing an exception anyways???
1676   my $rv = $sth->execute();
1677   $self->throw_exception(
1678     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1679   ) if !$rv;
1680
1681   $self->_query_end( $sql, @$bind );
1682
1683   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1684 }
1685
1686 sub _execute {
1687     my $self = shift;
1688     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1689 }
1690
1691 sub _prefetch_autovalues {
1692   my ($self, $source, $to_insert) = @_;
1693
1694   my $colinfo = $source->columns_info;
1695
1696   my %values;
1697   for my $col (keys %$colinfo) {
1698     if (
1699       $colinfo->{$col}{auto_nextval}
1700         and
1701       (
1702         ! exists $to_insert->{$col}
1703           or
1704         ref $to_insert->{$col} eq 'SCALAR'
1705       )
1706     ) {
1707       $values{$col} = $self->_sequence_fetch(
1708         'NEXTVAL',
1709         ( $colinfo->{$col}{sequence} ||=
1710             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1711         ),
1712       );
1713     }
1714   }
1715
1716   \%values;
1717 }
1718
1719 sub insert {
1720   my ($self, $source, $to_insert) = @_;
1721
1722   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1723
1724   # fuse the values
1725   $to_insert = { %$to_insert, %$prefetched_values };
1726
1727   # list of primary keys we try to fetch from the database
1728   # both not-exsists and scalarrefs are considered
1729   my %fetch_pks;
1730   for ($source->primary_columns) {
1731     $fetch_pks{$_} = scalar keys %fetch_pks  # so we can preserve order for prettyness
1732       if ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR';
1733   }
1734
1735   my ($sqla_opts, @ir_container);
1736   if ($self->_use_insert_returning) {
1737
1738     # retain order as declared in the resultsource
1739     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1740       push @{$sqla_opts->{returning}}, $_;
1741       $sqla_opts->{returning_container} = \@ir_container
1742         if $self->_use_insert_returning_bound;
1743     }
1744   }
1745
1746   my $bind_attributes = $self->source_bind_attributes($source);
1747
1748   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1749
1750   my %returned_cols;
1751
1752   if (my $retlist = $sqla_opts->{returning}) {
1753     @ir_container = try {
1754       local $SIG{__WARN__} = sub {};
1755       my @r = $sth->fetchrow_array;
1756       $sth->finish;
1757       @r;
1758     } unless @ir_container;
1759
1760     @returned_cols{@$retlist} = @ir_container if @ir_container;
1761   }
1762
1763   return { %$prefetched_values, %returned_cols };
1764 }
1765
1766
1767 ## Currently it is assumed that all values passed will be "normal", i.e. not
1768 ## scalar refs, or at least, all the same type as the first set, the statement is
1769 ## only prepped once.
1770 sub insert_bulk {
1771   my ($self, $source, $cols, $data) = @_;
1772
1773   my %colvalues;
1774   @colvalues{@$cols} = (0..$#$cols);
1775
1776   for my $i (0..$#$cols) {
1777     my $first_val = $data->[0][$i];
1778     next unless ref $first_val eq 'SCALAR';
1779
1780     $colvalues{ $cols->[$i] } = $first_val;
1781   }
1782
1783   # check for bad data and stringify stringifiable objects
1784   my $bad_slice = sub {
1785     my ($msg, $col_idx, $slice_idx) = @_;
1786     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1787       $msg,
1788       $cols->[$col_idx],
1789       do {
1790         require Data::Dumper::Concise;
1791         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1792         Data::Dumper::Concise::Dumper ({
1793           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1794         }),
1795       }
1796     );
1797   };
1798
1799   for my $datum_idx (0..$#$data) {
1800     my $datum = $data->[$datum_idx];
1801
1802     for my $col_idx (0..$#$cols) {
1803       my $val            = $datum->[$col_idx];
1804       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1805       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1806
1807       if ($is_literal_sql) {
1808         if (not ref $val) {
1809           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1810         }
1811         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1812           $bad_slice->("$reftype reference found where literal SQL expected",
1813             $col_idx, $datum_idx);
1814         }
1815         elsif ($$val ne $$sqla_bind){
1816           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1817             $col_idx, $datum_idx);
1818         }
1819       }
1820       elsif (my $reftype = ref $val) {
1821         require overload;
1822         if (overload::Method($val, '""')) {
1823           $datum->[$col_idx] = "".$val;
1824         }
1825         else {
1826           $bad_slice->("$reftype reference found where bind expected",
1827             $col_idx, $datum_idx);
1828         }
1829       }
1830     }
1831   }
1832
1833   my ($sql, $bind) = $self->_prep_for_execute (
1834     'insert', undef, $source, [\%colvalues]
1835   );
1836
1837   if (! @$bind) {
1838     # if the bindlist is empty - make sure all "values" are in fact
1839     # literal scalarrefs. If not the case this means the storage ate
1840     # them away (e.g. the NoBindVars component) and interpolated them
1841     # directly into the SQL. This obviosly can't be good for multi-inserts
1842
1843     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1844       if first { ref $_ ne 'SCALAR' } values %colvalues;
1845   }
1846
1847   # neither _execute_array, nor _execute_inserts_with_no_binds are
1848   # atomic (even if _execute _array is a single call). Thus a safety
1849   # scope guard
1850   my $guard = $self->txn_scope_guard;
1851
1852   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1853   my $sth = $self->sth($sql);
1854   my $rv = do {
1855     if (@$bind) {
1856       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1857       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1858     }
1859     else {
1860       # bind_param_array doesn't work if there are no binds
1861       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1862     }
1863   };
1864
1865   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1866
1867   $guard->commit;
1868
1869   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1870 }
1871
1872 sub _execute_array {
1873   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1874
1875   ## This must be an arrayref, else nothing works!
1876   my $tuple_status = [];
1877
1878   ## Get the bind_attributes, if any exist
1879   my $bind_attributes = $self->source_bind_attributes($source);
1880
1881   ## Bind the values and execute
1882   my $placeholder_index = 1;
1883
1884   foreach my $bound (@$bind) {
1885
1886     my $attributes = {};
1887     my ($column_name, $data_index) = @$bound;
1888
1889     if( $bind_attributes ) {
1890       $attributes = $bind_attributes->{$column_name}
1891       if defined $bind_attributes->{$column_name};
1892     }
1893
1894     my @data = map { $_->[$data_index] } @$data;
1895
1896     $sth->bind_param_array(
1897       $placeholder_index,
1898       [@data],
1899       (%$attributes ?  $attributes : ()),
1900     );
1901     $placeholder_index++;
1902   }
1903
1904   my ($rv, $err);
1905   try {
1906     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1907   }
1908   catch {
1909     $err = shift;
1910   };
1911
1912   # Not all DBDs are create equal. Some throw on error, some return
1913   # an undef $rv, and some set $sth->err - try whatever we can
1914   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
1915     ! defined $err
1916       and
1917     ( !defined $rv or $sth->err )
1918   );
1919
1920   # Statement must finish even if there was an exception.
1921   try {
1922     $sth->finish
1923   }
1924   catch {
1925     $err = shift unless defined $err
1926   };
1927
1928   if (defined $err) {
1929     my $i = 0;
1930     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1931
1932     $self->throw_exception("Unexpected populate error: $err")
1933       if ($i > $#$tuple_status);
1934
1935     require Data::Dumper::Concise;
1936     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1937       ($tuple_status->[$i][1] || $err),
1938       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
1939     );
1940   }
1941
1942   return $rv;
1943 }
1944
1945 sub _dbh_execute_array {
1946     my ($self, $sth, $tuple_status, @extra) = @_;
1947
1948     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1949 }
1950
1951 sub _dbh_execute_inserts_with_no_binds {
1952   my ($self, $sth, $count) = @_;
1953
1954   my $err;
1955   try {
1956     my $dbh = $self->_get_dbh;
1957     local $dbh->{RaiseError} = 1;
1958     local $dbh->{PrintError} = 0;
1959
1960     $sth->execute foreach 1..$count;
1961   }
1962   catch {
1963     $err = shift;
1964   };
1965
1966   # Make sure statement is finished even if there was an exception.
1967   try {
1968     $sth->finish
1969   }
1970   catch {
1971     $err = shift unless defined $err;
1972   };
1973
1974   $self->throw_exception($err) if defined $err;
1975
1976   return $count;
1977 }
1978
1979 sub update {
1980   my ($self, $source, @args) = @_;
1981
1982   my $bind_attrs = $self->source_bind_attributes($source);
1983
1984   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1985 }
1986
1987
1988 sub delete {
1989   my ($self, $source, @args) = @_;
1990
1991   my $bind_attrs = $self->source_bind_attributes($source);
1992
1993   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1994 }
1995
1996 # We were sent here because the $rs contains a complex search
1997 # which will require a subquery to select the correct rows
1998 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1999 #
2000 # Generating a single PK column subquery is trivial and supported
2001 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
2002 # Look at _multipk_update_delete()
2003 sub _subq_update_delete {
2004   my $self = shift;
2005   my ($rs, $op, $values) = @_;
2006
2007   my $rsrc = $rs->result_source;
2008
2009   # quick check if we got a sane rs on our hands
2010   my @pcols = $rsrc->_pri_cols;
2011
2012   my $sel = $rs->_resolved_attrs->{select};
2013   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
2014
2015   if (
2016       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
2017         ne
2018       join ("\x00", sort @$sel )
2019   ) {
2020     $self->throw_exception (
2021       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
2022     );
2023   }
2024
2025   if (@pcols == 1) {
2026     return $self->$op (
2027       $rsrc,
2028       $op eq 'update' ? $values : (),
2029       { $pcols[0] => { -in => $rs->as_query } },
2030     );
2031   }
2032
2033   else {
2034     return $self->_multipk_update_delete (@_);
2035   }
2036 }
2037
2038 # ANSI SQL does not provide a reliable way to perform a multicol-PK
2039 # resultset update/delete involving subqueries. So by default resort
2040 # to simple (and inefficient) delete_all style per-row opearations,
2041 # while allowing specific storages to override this with a faster
2042 # implementation.
2043 #
2044 sub _multipk_update_delete {
2045   return shift->_per_row_update_delete (@_);
2046 }
2047
2048 # This is the default loop used to delete/update rows for multi PK
2049 # resultsets, and used by mysql exclusively (because it can't do anything
2050 # else).
2051 #
2052 # We do not use $row->$op style queries, because resultset update/delete
2053 # is not expected to cascade (this is what delete_all/update_all is for).
2054 #
2055 # There should be no race conditions as the entire operation is rolled
2056 # in a transaction.
2057 #
2058 sub _per_row_update_delete {
2059   my $self = shift;
2060   my ($rs, $op, $values) = @_;
2061
2062   my $rsrc = $rs->result_source;
2063   my @pcols = $rsrc->_pri_cols;
2064
2065   my $guard = $self->txn_scope_guard;
2066
2067   # emulate the return value of $sth->execute for non-selects
2068   my $row_cnt = '0E0';
2069
2070   my $subrs_cur = $rs->cursor;
2071   my @all_pk = $subrs_cur->all;
2072   for my $pks ( @all_pk) {
2073
2074     my $cond;
2075     for my $i (0.. $#pcols) {
2076       $cond->{$pcols[$i]} = $pks->[$i];
2077     }
2078
2079     $self->$op (
2080       $rsrc,
2081       $op eq 'update' ? $values : (),
2082       $cond,
2083     );
2084
2085     $row_cnt++;
2086   }
2087
2088   $guard->commit;
2089
2090   return $row_cnt;
2091 }
2092
2093 sub _select {
2094   my $self = shift;
2095   $self->_execute($self->_select_args(@_));
2096 }
2097
2098 sub _select_args_to_query {
2099   my $self = shift;
2100
2101   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
2102   #  = $self->_select_args($ident, $select, $cond, $attrs);
2103   my ($op, $bind, $ident, $bind_attrs, @args) =
2104     $self->_select_args(@_);
2105
2106   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2107   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
2108   $prepared_bind ||= [];
2109
2110   return wantarray
2111     ? ($sql, $prepared_bind, $bind_attrs)
2112     : \[ "($sql)", @$prepared_bind ]
2113   ;
2114 }
2115
2116 sub _select_args {
2117   my ($self, $ident, $select, $where, $attrs) = @_;
2118
2119   my $sql_maker = $self->sql_maker;
2120   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2121
2122   $attrs = {
2123     %$attrs,
2124     select => $select,
2125     from => $ident,
2126     where => $where,
2127     $rs_alias && $alias2source->{$rs_alias}
2128       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2129       : ()
2130     ,
2131   };
2132
2133   # calculate bind_attrs before possible $ident mangling
2134   my $bind_attrs = {};
2135   for my $alias (keys %$alias2source) {
2136     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2137     for my $col (keys %$bindtypes) {
2138
2139       my $fqcn = join ('.', $alias, $col);
2140       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2141
2142       # Unqialified column names are nice, but at the same time can be
2143       # rather ambiguous. What we do here is basically go along with
2144       # the loop, adding an unqualified column slot to $bind_attrs,
2145       # alongside the fully qualified name. As soon as we encounter
2146       # another column by that name (which would imply another table)
2147       # we unset the unqualified slot and never add any info to it
2148       # to avoid erroneous type binding. If this happens the users
2149       # only choice will be to fully qualify his column name
2150
2151       if (exists $bind_attrs->{$col}) {
2152         $bind_attrs->{$col} = {};
2153       }
2154       else {
2155         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2156       }
2157     }
2158   }
2159
2160   # Sanity check the attributes (SQLMaker does it too, but
2161   # in case of a software_limit we'll never reach there)
2162   if (defined $attrs->{offset}) {
2163     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2164       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2165   }
2166   $attrs->{offset} ||= 0;
2167
2168   if (defined $attrs->{rows}) {
2169     $self->throw_exception("The rows attribute must be a positive integer if present")
2170       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2171   }
2172   elsif ($attrs->{offset}) {
2173     # MySQL actually recommends this approach.  I cringe.
2174     $attrs->{rows} = $sql_maker->__max_int;
2175   }
2176
2177   my @limit;
2178
2179   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2180   # storage, unless software limit was requested
2181   if (
2182     #limited has_many
2183     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2184        ||
2185     # grouped prefetch (to satisfy group_by == select)
2186     ( $attrs->{group_by}
2187         &&
2188       @{$attrs->{group_by}}
2189         &&
2190       $attrs->{_prefetch_selector_range}
2191     )
2192   ) {
2193     ($ident, $select, $where, $attrs)
2194       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2195   }
2196   elsif (! $attrs->{software_limit} ) {
2197     push @limit, $attrs->{rows}, $attrs->{offset};
2198   }
2199
2200   # try to simplify the joinmap further (prune unreferenced type-single joins)
2201   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2202
2203 ###
2204   # This would be the point to deflate anything found in $where
2205   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2206   # expect a row object. And all we have is a resultsource (it is trivial
2207   # to extract deflator coderefs via $alias2source above).
2208   #
2209   # I don't see a way forward other than changing the way deflators are
2210   # invoked, and that's just bad...
2211 ###
2212
2213   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2214 }
2215
2216 # Returns a counting SELECT for a simple count
2217 # query. Abstracted so that a storage could override
2218 # this to { count => 'firstcol' } or whatever makes
2219 # sense as a performance optimization
2220 sub _count_select {
2221   #my ($self, $source, $rs_attrs) = @_;
2222   return { count => '*' };
2223 }
2224
2225
2226 sub source_bind_attributes {
2227   my ($self, $source) = @_;
2228
2229   my $bind_attributes;
2230
2231   my $colinfo = $source->columns_info;
2232
2233   for my $col (keys %$colinfo) {
2234     if (my $dt = $colinfo->{$col}{data_type} ) {
2235       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2236     }
2237   }
2238
2239   return $bind_attributes;
2240 }
2241
2242 =head2 select
2243
2244 =over 4
2245
2246 =item Arguments: $ident, $select, $condition, $attrs
2247
2248 =back
2249
2250 Handle a SQL select statement.
2251
2252 =cut
2253
2254 sub select {
2255   my $self = shift;
2256   my ($ident, $select, $condition, $attrs) = @_;
2257   return $self->cursor_class->new($self, \@_, $attrs);
2258 }
2259
2260 sub select_single {
2261   my $self = shift;
2262   my ($rv, $sth, @bind) = $self->_select(@_);
2263   my @row = $sth->fetchrow_array;
2264   my @nextrow = $sth->fetchrow_array if @row;
2265   if(@row && @nextrow) {
2266     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2267   }
2268   # Need to call finish() to work round broken DBDs
2269   $sth->finish();
2270   return @row;
2271 }
2272
2273 =head2 sql_limit_dialect
2274
2275 This is an accessor for the default SQL limit dialect used by a particular
2276 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2277 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2278 see L<DBIx::Class::SQLMaker::LimitDialects>.
2279
2280 =head2 sth
2281
2282 =over 4
2283
2284 =item Arguments: $sql
2285
2286 =back
2287
2288 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2289
2290 =cut
2291
2292 sub _dbh_sth {
2293   my ($self, $dbh, $sql) = @_;
2294
2295   # 3 is the if_active parameter which avoids active sth re-use
2296   my $sth = $self->disable_sth_caching
2297     ? $dbh->prepare($sql)
2298     : $dbh->prepare_cached($sql, {}, 3);
2299
2300   # XXX You would think RaiseError would make this impossible,
2301   #  but apparently that's not true :(
2302   $self->throw_exception(
2303     $dbh->errstr
2304       ||
2305     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2306             .'an exception and/or setting $dbh->errstr',
2307       length ($sql) > 20
2308         ? substr($sql, 0, 20) . '...'
2309         : $sql
2310       ,
2311       'DBD::' . $dbh->{Driver}{Name},
2312     )
2313   ) if !$sth;
2314
2315   $sth;
2316 }
2317
2318 sub sth {
2319   my ($self, $sql) = @_;
2320   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2321 }
2322
2323 sub _dbh_columns_info_for {
2324   my ($self, $dbh, $table) = @_;
2325
2326   if ($dbh->can('column_info')) {
2327     my %result;
2328     my $caught;
2329     try {
2330       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2331       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2332       $sth->execute();
2333       while ( my $info = $sth->fetchrow_hashref() ){
2334         my %column_info;
2335         $column_info{data_type}   = $info->{TYPE_NAME};
2336         $column_info{size}      = $info->{COLUMN_SIZE};
2337         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2338         $column_info{default_value} = $info->{COLUMN_DEF};
2339         my $col_name = $info->{COLUMN_NAME};
2340         $col_name =~ s/^\"(.*)\"$/$1/;
2341
2342         $result{$col_name} = \%column_info;
2343       }
2344     } catch {
2345       $caught = 1;
2346     };
2347     return \%result if !$caught && scalar keys %result;
2348   }
2349
2350   my %result;
2351   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2352   $sth->execute;
2353   my @columns = @{$sth->{NAME_lc}};
2354   for my $i ( 0 .. $#columns ){
2355     my %column_info;
2356     $column_info{data_type} = $sth->{TYPE}->[$i];
2357     $column_info{size} = $sth->{PRECISION}->[$i];
2358     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2359
2360     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2361       $column_info{data_type} = $1;
2362       $column_info{size}    = $2;
2363     }
2364
2365     $result{$columns[$i]} = \%column_info;
2366   }
2367   $sth->finish;
2368
2369   foreach my $col (keys %result) {
2370     my $colinfo = $result{$col};
2371     my $type_num = $colinfo->{data_type};
2372     my $type_name;
2373     if(defined $type_num && $dbh->can('type_info')) {
2374       my $type_info = $dbh->type_info($type_num);
2375       $type_name = $type_info->{TYPE_NAME} if $type_info;
2376       $colinfo->{data_type} = $type_name if $type_name;
2377     }
2378   }
2379
2380   return \%result;
2381 }
2382
2383 sub columns_info_for {
2384   my ($self, $table) = @_;
2385   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2386 }
2387
2388 =head2 last_insert_id
2389
2390 Return the row id of the last insert.
2391
2392 =cut
2393
2394 sub _dbh_last_insert_id {
2395     my ($self, $dbh, $source, $col) = @_;
2396
2397     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2398
2399     return $id if defined $id;
2400
2401     my $class = ref $self;
2402     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2403 }
2404
2405 sub last_insert_id {
2406   my $self = shift;
2407   $self->_dbh_last_insert_id ($self->_dbh, @_);
2408 }
2409
2410 =head2 _native_data_type
2411
2412 =over 4
2413
2414 =item Arguments: $type_name
2415
2416 =back
2417
2418 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2419 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2420 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2421
2422 The default implementation returns C<undef>, implement in your Storage driver if
2423 you need this functionality.
2424
2425 Should map types from other databases to the native RDBMS type, for example
2426 C<VARCHAR2> to C<VARCHAR>.
2427
2428 Types with modifiers should map to the underlying data type. For example,
2429 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2430
2431 Composite types should map to the container type, for example
2432 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2433
2434 =cut
2435
2436 sub _native_data_type {
2437   #my ($self, $data_type) = @_;
2438   return undef
2439 }
2440
2441 # Check if placeholders are supported at all
2442 sub _determine_supports_placeholders {
2443   my $self = shift;
2444   my $dbh  = $self->_get_dbh;
2445
2446   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2447   # but it is inaccurate more often than not
2448   return try {
2449     local $dbh->{PrintError} = 0;
2450     local $dbh->{RaiseError} = 1;
2451     $dbh->do('select ?', {}, 1);
2452     1;
2453   }
2454   catch {
2455     0;
2456   };
2457 }
2458
2459 # Check if placeholders bound to non-string types throw exceptions
2460 #
2461 sub _determine_supports_typeless_placeholders {
2462   my $self = shift;
2463   my $dbh  = $self->_get_dbh;
2464
2465   return try {
2466     local $dbh->{PrintError} = 0;
2467     local $dbh->{RaiseError} = 1;
2468     # this specifically tests a bind that is NOT a string
2469     $dbh->do('select 1 where 1 = ?', {}, 1);
2470     1;
2471   }
2472   catch {
2473     0;
2474   };
2475 }
2476
2477 =head2 sqlt_type
2478
2479 Returns the database driver name.
2480
2481 =cut
2482
2483 sub sqlt_type {
2484   shift->_get_dbh->{Driver}->{Name};
2485 }
2486
2487 =head2 bind_attribute_by_data_type
2488
2489 Given a datatype from column info, returns a database specific bind
2490 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2491 let the database planner just handle it.
2492
2493 Generally only needed for special case column types, like bytea in postgres.
2494
2495 =cut
2496
2497 sub bind_attribute_by_data_type {
2498     return;
2499 }
2500
2501 =head2 is_datatype_numeric
2502
2503 Given a datatype from column_info, returns a boolean value indicating if
2504 the current RDBMS considers it a numeric value. This controls how
2505 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2506 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2507 be performed instead of the usual C<eq>.
2508
2509 =cut
2510
2511 sub is_datatype_numeric {
2512   my ($self, $dt) = @_;
2513
2514   return 0 unless $dt;
2515
2516   return $dt =~ /^ (?:
2517     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2518   ) $/ix;
2519 }
2520
2521
2522 =head2 create_ddl_dir
2523
2524 =over 4
2525
2526 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2527
2528 =back
2529
2530 Creates a SQL file based on the Schema, for each of the specified
2531 database engines in C<\@databases> in the given directory.
2532 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2533
2534 Given a previous version number, this will also create a file containing
2535 the ALTER TABLE statements to transform the previous schema into the
2536 current one. Note that these statements may contain C<DROP TABLE> or
2537 C<DROP COLUMN> statements that can potentially destroy data.
2538
2539 The file names are created using the C<ddl_filename> method below, please
2540 override this method in your schema if you would like a different file
2541 name format. For the ALTER file, the same format is used, replacing
2542 $version in the name with "$preversion-$version".
2543
2544 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2545 The most common value for this would be C<< { add_drop_table => 1 } >>
2546 to have the SQL produced include a C<DROP TABLE> statement for each table
2547 created. For quoting purposes supply C<quote_table_names> and
2548 C<quote_field_names>.
2549
2550 If no arguments are passed, then the following default values are assumed:
2551
2552 =over 4
2553
2554 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2555
2556 =item version    - $schema->schema_version
2557
2558 =item directory  - './'
2559
2560 =item preversion - <none>
2561
2562 =back
2563
2564 By default, C<\%sqlt_args> will have
2565
2566  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2567
2568 merged with the hash passed in. To disable any of those features, pass in a
2569 hashref like the following
2570
2571  { ignore_constraint_names => 0, # ... other options }
2572
2573
2574 WARNING: You are strongly advised to check all SQL files created, before applying
2575 them.
2576
2577 =cut
2578
2579 sub create_ddl_dir {
2580   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2581
2582   unless ($dir) {
2583     carp "No directory given, using ./\n";
2584     $dir = './';
2585   } else {
2586       -d $dir
2587         or
2588       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2589         or
2590       $self->throw_exception(
2591         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2592       );
2593   }
2594
2595   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2596
2597   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2598   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2599
2600   my $schema_version = $schema->schema_version || '1.x';
2601   $version ||= $schema_version;
2602
2603   $sqltargs = {
2604     add_drop_table => 1,
2605     ignore_constraint_names => 1,
2606     ignore_index_names => 1,
2607     %{$sqltargs || {}}
2608   };
2609
2610   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2611     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2612   }
2613
2614   my $sqlt = SQL::Translator->new( $sqltargs );
2615
2616   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2617   my $sqlt_schema = $sqlt->translate({ data => $schema })
2618     or $self->throw_exception ($sqlt->error);
2619
2620   foreach my $db (@$databases) {
2621     $sqlt->reset();
2622     $sqlt->{schema} = $sqlt_schema;
2623     $sqlt->producer($db);
2624
2625     my $file;
2626     my $filename = $schema->ddl_filename($db, $version, $dir);
2627     if (-e $filename && ($version eq $schema_version )) {
2628       # if we are dumping the current version, overwrite the DDL
2629       carp "Overwriting existing DDL file - $filename";
2630       unlink($filename);
2631     }
2632
2633     my $output = $sqlt->translate;
2634     if(!$output) {
2635       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2636       next;
2637     }
2638     if(!open($file, ">$filename")) {
2639       $self->throw_exception("Can't open $filename for writing ($!)");
2640       next;
2641     }
2642     print $file $output;
2643     close($file);
2644
2645     next unless ($preversion);
2646
2647     require SQL::Translator::Diff;
2648
2649     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2650     if(!-e $prefilename) {
2651       carp("No previous schema file found ($prefilename)");
2652       next;
2653     }
2654
2655     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2656     if(-e $difffile) {
2657       carp("Overwriting existing diff file - $difffile");
2658       unlink($difffile);
2659     }
2660
2661     my $source_schema;
2662     {
2663       my $t = SQL::Translator->new($sqltargs);
2664       $t->debug( 0 );
2665       $t->trace( 0 );
2666
2667       $t->parser( $db )
2668         or $self->throw_exception ($t->error);
2669
2670       my $out = $t->translate( $prefilename )
2671         or $self->throw_exception ($t->error);
2672
2673       $source_schema = $t->schema;
2674
2675       $source_schema->name( $prefilename )
2676         unless ( $source_schema->name );
2677     }
2678
2679     # The "new" style of producers have sane normalization and can support
2680     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2681     # And we have to diff parsed SQL against parsed SQL.
2682     my $dest_schema = $sqlt_schema;
2683
2684     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2685       my $t = SQL::Translator->new($sqltargs);
2686       $t->debug( 0 );
2687       $t->trace( 0 );
2688
2689       $t->parser( $db )
2690         or $self->throw_exception ($t->error);
2691
2692       my $out = $t->translate( $filename )
2693         or $self->throw_exception ($t->error);
2694
2695       $dest_schema = $t->schema;
2696
2697       $dest_schema->name( $filename )
2698         unless $dest_schema->name;
2699     }
2700
2701     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2702                                                   $dest_schema,   $db,
2703                                                   $sqltargs
2704                                                  );
2705     if(!open $file, ">$difffile") {
2706       $self->throw_exception("Can't write to $difffile ($!)");
2707       next;
2708     }
2709     print $file $diff;
2710     close($file);
2711   }
2712 }
2713
2714 =head2 deployment_statements
2715
2716 =over 4
2717
2718 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2719
2720 =back
2721
2722 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2723
2724 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2725 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2726
2727 C<$directory> is used to return statements from files in a previously created
2728 L</create_ddl_dir> directory and is optional. The filenames are constructed
2729 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2730
2731 If no C<$directory> is specified then the statements are constructed on the
2732 fly using L<SQL::Translator> and C<$version> is ignored.
2733
2734 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2735
2736 =cut
2737
2738 sub deployment_statements {
2739   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2740   $type ||= $self->sqlt_type;
2741   $version ||= $schema->schema_version || '1.x';
2742   $dir ||= './';
2743   my $filename = $schema->ddl_filename($type, $version, $dir);
2744   if(-f $filename)
2745   {
2746       # FIXME replace this block when a proper sane sql parser is available
2747       my $file;
2748       open($file, "<$filename")
2749         or $self->throw_exception("Can't open $filename ($!)");
2750       my @rows = <$file>;
2751       close($file);
2752       return join('', @rows);
2753   }
2754
2755   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2756     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2757   }
2758
2759   # sources needs to be a parser arg, but for simplicty allow at top level
2760   # coming in
2761   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2762       if exists $sqltargs->{sources};
2763
2764   my $tr = SQL::Translator->new(
2765     producer => "SQL::Translator::Producer::${type}",
2766     %$sqltargs,
2767     parser => 'SQL::Translator::Parser::DBIx::Class',
2768     data => $schema,
2769   );
2770
2771   my @ret;
2772   if (wantarray) {
2773     @ret = $tr->translate;
2774   }
2775   else {
2776     $ret[0] = $tr->translate;
2777   }
2778
2779   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2780     unless (@ret && defined $ret[0]);
2781
2782   return wantarray ? @ret : $ret[0];
2783 }
2784
2785 # FIXME deploy() currently does not accurately report sql errors
2786 # Will always return true while errors are warned
2787 sub deploy {
2788   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2789   my $deploy = sub {
2790     my $line = shift;
2791     return if(!$line);
2792     return if($line =~ /^--/);
2793     # next if($line =~ /^DROP/m);
2794     return if($line =~ /^BEGIN TRANSACTION/m);
2795     return if($line =~ /^COMMIT/m);
2796     return if $line =~ /^\s+$/; # skip whitespace only
2797     $self->_query_start($line);
2798     try {
2799       # do a dbh_do cycle here, as we need some error checking in
2800       # place (even though we will ignore errors)
2801       $self->dbh_do (sub { $_[1]->do($line) });
2802     } catch {
2803       carp qq{$_ (running "${line}")};
2804     };
2805     $self->_query_end($line);
2806   };
2807   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2808   if (@statements > 1) {
2809     foreach my $statement (@statements) {
2810       $deploy->( $statement );
2811     }
2812   }
2813   elsif (@statements == 1) {
2814     # split on single line comments and end of statements
2815     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2816       $deploy->( $line );
2817     }
2818   }
2819 }
2820
2821 =head2 datetime_parser
2822
2823 Returns the datetime parser class
2824
2825 =cut
2826
2827 sub datetime_parser {
2828   my $self = shift;
2829   return $self->{datetime_parser} ||= do {
2830     $self->build_datetime_parser(@_);
2831   };
2832 }
2833
2834 =head2 datetime_parser_type
2835
2836 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2837
2838 =head2 build_datetime_parser
2839
2840 See L</datetime_parser>
2841
2842 =cut
2843
2844 sub build_datetime_parser {
2845   my $self = shift;
2846   my $type = $self->datetime_parser_type(@_);
2847   return $type;
2848 }
2849
2850
2851 =head2 is_replicating
2852
2853 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2854 replicate from a master database.  Default is undef, which is the result
2855 returned by databases that don't support replication.
2856
2857 =cut
2858
2859 sub is_replicating {
2860     return;
2861
2862 }
2863
2864 =head2 lag_behind_master
2865
2866 Returns a number that represents a certain amount of lag behind a master db
2867 when a given storage is replicating.  The number is database dependent, but
2868 starts at zero and increases with the amount of lag. Default in undef
2869
2870 =cut
2871
2872 sub lag_behind_master {
2873     return;
2874 }
2875
2876 =head2 relname_to_table_alias
2877
2878 =over 4
2879
2880 =item Arguments: $relname, $join_count
2881
2882 =back
2883
2884 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2885 queries.
2886
2887 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2888 way these aliases are named.
2889
2890 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2891 otherwise C<"$relname">.
2892
2893 =cut
2894
2895 sub relname_to_table_alias {
2896   my ($self, $relname, $join_count) = @_;
2897
2898   my $alias = ($join_count && $join_count > 1 ?
2899     join('_', $relname, $join_count) : $relname);
2900
2901   return $alias;
2902 }
2903
2904 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2905 # version and it may be necessary to amend or override it for a specific storage
2906 # if such binds are necessary.
2907 sub _max_column_bytesize {
2908   my ($self, $source, $col) = @_;
2909
2910   my $inf = $source->column_info($col);
2911   return $inf->{_max_bytesize} ||= do {
2912
2913     my $max_size;
2914
2915     if (my $data_type = $inf->{data_type}) {
2916       $data_type = lc($data_type);
2917
2918       # String/sized-binary types
2919       if ($data_type =~ /^(?:l?(?:var)?char(?:acter)?(?:\s*varying)?
2920                              |(?:var)?binary(?:\s*varying)?|raw)\b/x
2921       ) {
2922         $max_size = $inf->{size};
2923       }
2924       # Other charset/unicode types, assume scale of 4
2925       elsif ($data_type =~ /^(?:national\s*character(?:\s*varying)?|nchar
2926                               |univarchar
2927                               |nvarchar)\b/x
2928       ) {
2929         $max_size = $inf->{size} * 4 if $inf->{size};
2930       }
2931       # Blob types
2932       elsif ($self->_is_lob_type($data_type)) {
2933         # default to longreadlen
2934       }
2935       else {
2936         $max_size = 100;  # for all other (numeric?) datatypes
2937       }
2938     }
2939
2940     $max_size ||= $self->_get_dbh->{LongReadLen} || 8000;
2941   };
2942 }
2943
2944 # Determine if a data_type is some type of BLOB
2945 # FIXME: these regexes are expensive, result of these checks should be cached in
2946 # the column_info .
2947 sub _is_lob_type {
2948   my ($self, $data_type) = @_;
2949   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
2950     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
2951                                   |varchar|character\s*varying|nvarchar
2952                                   |national\s*character\s*varying))?\z/xi);
2953 }
2954
2955 sub _is_binary_lob_type {
2956   my ($self, $data_type) = @_;
2957   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
2958     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
2959 }
2960
2961 sub _is_text_lob_type {
2962   my ($self, $data_type) = @_;
2963   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
2964     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
2965                         |national\s*character\s*varying))\z/xi);
2966 }
2967
2968 1;
2969
2970 =head1 USAGE NOTES
2971
2972 =head2 DBIx::Class and AutoCommit
2973
2974 DBIx::Class can do some wonderful magic with handling exceptions,
2975 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2976 (the default) combined with C<txn_do> for transaction support.
2977
2978 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2979 in an assumed transaction between commits, and you're telling us you'd
2980 like to manage that manually.  A lot of the magic protections offered by
2981 this module will go away.  We can't protect you from exceptions due to database
2982 disconnects because we don't know anything about how to restart your
2983 transactions.  You're on your own for handling all sorts of exceptional
2984 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2985 be with raw DBI.
2986
2987
2988 =head1 AUTHORS
2989
2990 Matt S. Trout <mst@shadowcatsystems.co.uk>
2991
2992 Andy Grundman <andy@hybridized.org>
2993
2994 =head1 LICENSE
2995
2996 You may distribute this code under the same terms as Perl itself.
2997
2998 =cut