Trailing WS crusade - got to save them bits
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use DBIx::Class::Exception;
12 use Scalar::Util qw/refaddr weaken reftype blessed/;
13 use List::Util qw/first/;
14 use Sub::Name 'subname';
15 use Try::Tiny;
16 use overload ();
17 use namespace::clean;
18
19 # default cursor class, overridable in connect_info attributes
20 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
21
22 __PACKAGE__->mk_group_accessors('inherited' => qw/
23   sql_limit_dialect sql_quote_char sql_name_sep
24 /);
25
26 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
27
28 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
29 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
30
31 __PACKAGE__->sql_name_sep('.');
32
33 __PACKAGE__->mk_group_accessors('simple' => qw/
34   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
35   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
36 /);
37
38 # the values for these accessors are picked out (and deleted) from
39 # the attribute hashref passed to connect_info
40 my @storage_options = qw/
41   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
42   disable_sth_caching unsafe auto_savepoint
43 /;
44 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
45
46
47 # capability definitions, using a 2-tiered accessor system
48 # The rationale is:
49 #
50 # A driver/user may define _use_X, which blindly without any checks says:
51 # "(do not) use this capability", (use_dbms_capability is an "inherited"
52 # type accessor)
53 #
54 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
55 # accessor, which in turn calls _determine_supports_X, and stores the return
56 # in a special slot on the storage object, which is wiped every time a $dbh
57 # reconnection takes place (it is not guaranteed that upon reconnection we
58 # will get the same rdbms version). _determine_supports_X does not need to
59 # exist on a driver, as we ->can for it before calling.
60
61 my @capabilities = (qw/
62   insert_returning
63   insert_returning_bound
64   placeholders
65   typeless_placeholders
66   join_optimizer
67 /);
68 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
69 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
70
71 # on by default, not strictly a capability (pending rewrite)
72 __PACKAGE__->_use_join_optimizer (1);
73 sub _determine_supports_join_optimizer { 1 };
74
75 # Each of these methods need _determine_driver called before itself
76 # in order to function reliably. This is a purely DRY optimization
77 #
78 # get_(use)_dbms_capability need to be called on the correct Storage
79 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
80 # _determine_supports_X which obv. needs a correct driver as well
81 my @rdbms_specific_methods = qw/
82   deployment_statements
83   sqlt_type
84   sql_maker
85   build_datetime_parser
86   datetime_parser_type
87
88   txn_begin
89   insert
90   insert_bulk
91   update
92   delete
93   select
94   select_single
95   with_deferred_fk_checks
96
97   get_use_dbms_capability
98   get_dbms_capability
99
100   _server_info
101   _get_server_version
102 /;
103
104 for my $meth (@rdbms_specific_methods) {
105
106   my $orig = __PACKAGE__->can ($meth)
107     or die "$meth is not a ::Storage::DBI method!";
108
109   no strict qw/refs/;
110   no warnings qw/redefine/;
111   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
112     if (
113       # only fire when invoked on an instance, a valid class-based invocation
114       # would e.g. be setting a default for an inherited accessor
115       ref $_[0]
116         and
117       ! $_[0]->_driver_determined
118         and
119       ! $_[0]->{_in_determine_driver}
120     ) {
121       $_[0]->_determine_driver;
122
123       # This for some reason crashes and burns on perl 5.8.1
124       # IFF the method ends up throwing an exception
125       #goto $_[0]->can ($meth);
126
127       my $cref = $_[0]->can ($meth);
128       goto $cref;
129     }
130
131     goto $orig;
132   };
133 }
134
135 =head1 NAME
136
137 DBIx::Class::Storage::DBI - DBI storage handler
138
139 =head1 SYNOPSIS
140
141   my $schema = MySchema->connect('dbi:SQLite:my.db');
142
143   $schema->storage->debug(1);
144
145   my @stuff = $schema->storage->dbh_do(
146     sub {
147       my ($storage, $dbh, @args) = @_;
148       $dbh->do("DROP TABLE authors");
149     },
150     @column_list
151   );
152
153   $schema->resultset('Book')->search({
154      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
155   });
156
157 =head1 DESCRIPTION
158
159 This class represents the connection to an RDBMS via L<DBI>.  See
160 L<DBIx::Class::Storage> for general information.  This pod only
161 documents DBI-specific methods and behaviors.
162
163 =head1 METHODS
164
165 =cut
166
167 sub new {
168   my $new = shift->next::method(@_);
169
170   $new->_sql_maker_opts({});
171   $new->_dbh_details({});
172   $new->{_in_do_block} = 0;
173   $new->{_dbh_gen} = 0;
174
175   # read below to see what this does
176   $new->_arm_global_destructor;
177
178   $new;
179 }
180
181 # This is hack to work around perl shooting stuff in random
182 # order on exit(). If we do not walk the remaining storage
183 # objects in an END block, there is a *small but real* chance
184 # of a fork()ed child to kill the parent's shared DBI handle,
185 # *before perl reaches the DESTROY in this package*
186 # Yes, it is ugly and effective.
187 # Additionally this registry is used by the CLONE method to
188 # make sure no handles are shared between threads
189 {
190   my %seek_and_destroy;
191
192   sub _arm_global_destructor {
193     my $self = shift;
194     my $key = refaddr ($self);
195     $seek_and_destroy{$key} = $self;
196     weaken ($seek_and_destroy{$key});
197   }
198
199   END {
200     local $?; # just in case the DBI destructor changes it somehow
201
202     # destroy just the object if not native to this process/thread
203     $_->_verify_pid for (grep
204       { defined $_ }
205       values %seek_and_destroy
206     );
207   }
208
209   sub CLONE {
210     # As per DBI's recommendation, DBIC disconnects all handles as
211     # soon as possible (DBIC will reconnect only on demand from within
212     # the thread)
213     for (values %seek_and_destroy) {
214       next unless $_;
215       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
216       $_->_dbh(undef);
217
218       $_->transaction_depth(0);
219       $_->savepoints([]);
220     }
221   }
222 }
223
224 sub DESTROY {
225   my $self = shift;
226
227   # some databases spew warnings on implicit disconnect
228   local $SIG{__WARN__} = sub {};
229   $self->_dbh(undef);
230
231   # this op is necessary, since the very last perl runtime statement
232   # triggers a global destruction shootout, and the $SIG localization
233   # may very well be destroyed before perl actually gets to do the
234   # $dbh undef
235   1;
236 }
237
238 # handle pid changes correctly - do not destroy parent's connection
239 sub _verify_pid {
240   my $self = shift;
241
242   my $pid = $self->_conn_pid;
243   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
244     $dbh->{InactiveDestroy} = 1;
245     $self->{_dbh_gen}++;
246     $self->_dbh(undef);
247     $self->transaction_depth(0);
248     $self->savepoints([]);
249   }
250
251   return;
252 }
253
254 =head2 connect_info
255
256 This method is normally called by L<DBIx::Class::Schema/connection>, which
257 encapsulates its argument list in an arrayref before passing them here.
258
259 The argument list may contain:
260
261 =over
262
263 =item *
264
265 The same 4-element argument set one would normally pass to
266 L<DBI/connect>, optionally followed by
267 L<extra attributes|/DBIx::Class specific connection attributes>
268 recognized by DBIx::Class:
269
270   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
271
272 =item *
273
274 A single code reference which returns a connected
275 L<DBI database handle|DBI/connect> optionally followed by
276 L<extra attributes|/DBIx::Class specific connection attributes> recognized
277 by DBIx::Class:
278
279   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
280
281 =item *
282
283 A single hashref with all the attributes and the dsn/user/password
284 mixed together:
285
286   $connect_info_args = [{
287     dsn => $dsn,
288     user => $user,
289     password => $pass,
290     %dbi_attributes,
291     %extra_attributes,
292   }];
293
294   $connect_info_args = [{
295     dbh_maker => sub { DBI->connect (...) },
296     %dbi_attributes,
297     %extra_attributes,
298   }];
299
300 This is particularly useful for L<Catalyst> based applications, allowing the
301 following config (L<Config::General> style):
302
303   <Model::DB>
304     schema_class   App::DB
305     <connect_info>
306       dsn          dbi:mysql:database=test
307       user         testuser
308       password     TestPass
309       AutoCommit   1
310     </connect_info>
311   </Model::DB>
312
313 The C<dsn>/C<user>/C<password> combination can be substituted by the
314 C<dbh_maker> key whose value is a coderef that returns a connected
315 L<DBI database handle|DBI/connect>
316
317 =back
318
319 Please note that the L<DBI> docs recommend that you always explicitly
320 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
321 recommends that it be set to I<1>, and that you perform transactions
322 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
323 to I<1> if you do not do explicitly set it to zero.  This is the default
324 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
325
326 =head3 DBIx::Class specific connection attributes
327
328 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
329 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
330 the following connection options. These options can be mixed in with your other
331 L<DBI> connection attributes, or placed in a separate hashref
332 (C<\%extra_attributes>) as shown above.
333
334 Every time C<connect_info> is invoked, any previous settings for
335 these options will be cleared before setting the new ones, regardless of
336 whether any options are specified in the new C<connect_info>.
337
338
339 =over
340
341 =item on_connect_do
342
343 Specifies things to do immediately after connecting or re-connecting to
344 the database.  Its value may contain:
345
346 =over
347
348 =item a scalar
349
350 This contains one SQL statement to execute.
351
352 =item an array reference
353
354 This contains SQL statements to execute in order.  Each element contains
355 a string or a code reference that returns a string.
356
357 =item a code reference
358
359 This contains some code to execute.  Unlike code references within an
360 array reference, its return value is ignored.
361
362 =back
363
364 =item on_disconnect_do
365
366 Takes arguments in the same form as L</on_connect_do> and executes them
367 immediately before disconnecting from the database.
368
369 Note, this only runs if you explicitly call L</disconnect> on the
370 storage object.
371
372 =item on_connect_call
373
374 A more generalized form of L</on_connect_do> that calls the specified
375 C<connect_call_METHOD> methods in your storage driver.
376
377   on_connect_do => 'select 1'
378
379 is equivalent to:
380
381   on_connect_call => [ [ do_sql => 'select 1' ] ]
382
383 Its values may contain:
384
385 =over
386
387 =item a scalar
388
389 Will call the C<connect_call_METHOD> method.
390
391 =item a code reference
392
393 Will execute C<< $code->($storage) >>
394
395 =item an array reference
396
397 Each value can be a method name or code reference.
398
399 =item an array of arrays
400
401 For each array, the first item is taken to be the C<connect_call_> method name
402 or code reference, and the rest are parameters to it.
403
404 =back
405
406 Some predefined storage methods you may use:
407
408 =over
409
410 =item do_sql
411
412 Executes a SQL string or a code reference that returns a SQL string. This is
413 what L</on_connect_do> and L</on_disconnect_do> use.
414
415 It can take:
416
417 =over
418
419 =item a scalar
420
421 Will execute the scalar as SQL.
422
423 =item an arrayref
424
425 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
426 attributes hashref and bind values.
427
428 =item a code reference
429
430 Will execute C<< $code->($storage) >> and execute the return array refs as
431 above.
432
433 =back
434
435 =item datetime_setup
436
437 Execute any statements necessary to initialize the database session to return
438 and accept datetime/timestamp values used with
439 L<DBIx::Class::InflateColumn::DateTime>.
440
441 Only necessary for some databases, see your specific storage driver for
442 implementation details.
443
444 =back
445
446 =item on_disconnect_call
447
448 Takes arguments in the same form as L</on_connect_call> and executes them
449 immediately before disconnecting from the database.
450
451 Calls the C<disconnect_call_METHOD> methods as opposed to the
452 C<connect_call_METHOD> methods called by L</on_connect_call>.
453
454 Note, this only runs if you explicitly call L</disconnect> on the
455 storage object.
456
457 =item disable_sth_caching
458
459 If set to a true value, this option will disable the caching of
460 statement handles via L<DBI/prepare_cached>.
461
462 =item limit_dialect
463
464 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
465 default L</sql_limit_dialect> setting of the storage (if any). For a list
466 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
467
468 =item quote_names
469
470 When true automatically sets L</quote_char> and L</name_sep> to the characters
471 appropriate for your particular RDBMS. This option is preferred over specifying
472 L</quote_char> directly.
473
474 =item quote_char
475
476 Specifies what characters to use to quote table and column names.
477
478 C<quote_char> expects either a single character, in which case is it
479 is placed on either side of the table/column name, or an arrayref of length
480 2 in which case the table/column name is placed between the elements.
481
482 For example under MySQL you should use C<< quote_char => '`' >>, and for
483 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
484
485 =item name_sep
486
487 This parameter is only useful in conjunction with C<quote_char>, and is used to
488 specify the character that separates elements (schemas, tables, columns) from
489 each other. If unspecified it defaults to the most commonly used C<.>.
490
491 =item unsafe
492
493 This Storage driver normally installs its own C<HandleError>, sets
494 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
495 all database handles, including those supplied by a coderef.  It does this
496 so that it can have consistent and useful error behavior.
497
498 If you set this option to a true value, Storage will not do its usual
499 modifications to the database handle's attributes, and instead relies on
500 the settings in your connect_info DBI options (or the values you set in
501 your connection coderef, in the case that you are connecting via coderef).
502
503 Note that your custom settings can cause Storage to malfunction,
504 especially if you set a C<HandleError> handler that suppresses exceptions
505 and/or disable C<RaiseError>.
506
507 =item auto_savepoint
508
509 If this option is true, L<DBIx::Class> will use savepoints when nesting
510 transactions, making it possible to recover from failure in the inner
511 transaction without having to abort all outer transactions.
512
513 =item cursor_class
514
515 Use this argument to supply a cursor class other than the default
516 L<DBIx::Class::Storage::DBI::Cursor>.
517
518 =back
519
520 Some real-life examples of arguments to L</connect_info> and
521 L<DBIx::Class::Schema/connect>
522
523   # Simple SQLite connection
524   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
525
526   # Connect via subref
527   ->connect_info([ sub { DBI->connect(...) } ]);
528
529   # Connect via subref in hashref
530   ->connect_info([{
531     dbh_maker => sub { DBI->connect(...) },
532     on_connect_do => 'alter session ...',
533   }]);
534
535   # A bit more complicated
536   ->connect_info(
537     [
538       'dbi:Pg:dbname=foo',
539       'postgres',
540       'my_pg_password',
541       { AutoCommit => 1 },
542       { quote_char => q{"} },
543     ]
544   );
545
546   # Equivalent to the previous example
547   ->connect_info(
548     [
549       'dbi:Pg:dbname=foo',
550       'postgres',
551       'my_pg_password',
552       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
553     ]
554   );
555
556   # Same, but with hashref as argument
557   # See parse_connect_info for explanation
558   ->connect_info(
559     [{
560       dsn         => 'dbi:Pg:dbname=foo',
561       user        => 'postgres',
562       password    => 'my_pg_password',
563       AutoCommit  => 1,
564       quote_char  => q{"},
565       name_sep    => q{.},
566     }]
567   );
568
569   # Subref + DBIx::Class-specific connection options
570   ->connect_info(
571     [
572       sub { DBI->connect(...) },
573       {
574           quote_char => q{`},
575           name_sep => q{@},
576           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
577           disable_sth_caching => 1,
578       },
579     ]
580   );
581
582
583
584 =cut
585
586 sub connect_info {
587   my ($self, $info) = @_;
588
589   return $self->_connect_info if !$info;
590
591   $self->_connect_info($info); # copy for _connect_info
592
593   $info = $self->_normalize_connect_info($info)
594     if ref $info eq 'ARRAY';
595
596   for my $storage_opt (keys %{ $info->{storage_options} }) {
597     my $value = $info->{storage_options}{$storage_opt};
598
599     $self->$storage_opt($value);
600   }
601
602   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
603   #  the new set of options
604   $self->_sql_maker(undef);
605   $self->_sql_maker_opts({});
606
607   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
608     my $value = $info->{sql_maker_options}{$sql_maker_opt};
609
610     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
611   }
612
613   my %attrs = (
614     %{ $self->_default_dbi_connect_attributes || {} },
615     %{ $info->{attributes} || {} },
616   );
617
618   my @args = @{ $info->{arguments} };
619
620   if (keys %attrs and ref $args[0] ne 'CODE') {
621     carp
622         'You provided explicit AutoCommit => 0 in your connection_info. '
623       . 'This is almost universally a bad idea (see the footnotes of '
624       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
625       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
626       . 'this warning.'
627       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
628
629     push @args, \%attrs if keys %attrs;
630   }
631   $self->_dbi_connect_info(\@args);
632
633   # FIXME - dirty:
634   # save attributes them in a separate accessor so they are always
635   # introspectable, even in case of a CODE $dbhmaker
636   $self->_dbic_connect_attributes (\%attrs);
637
638   return $self->_connect_info;
639 }
640
641 sub _normalize_connect_info {
642   my ($self, $info_arg) = @_;
643   my %info;
644
645   my @args = @$info_arg;  # take a shallow copy for further mutilation
646
647   # combine/pre-parse arguments depending on invocation style
648
649   my %attrs;
650   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
651     %attrs = %{ $args[1] || {} };
652     @args = $args[0];
653   }
654   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
655     %attrs = %{$args[0]};
656     @args = ();
657     if (my $code = delete $attrs{dbh_maker}) {
658       @args = $code;
659
660       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
661       if (@ignored) {
662         carp sprintf (
663             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
664           . "to the result of 'dbh_maker'",
665
666           join (', ', map { "'$_'" } (@ignored) ),
667         );
668       }
669     }
670     else {
671       @args = delete @attrs{qw/dsn user password/};
672     }
673   }
674   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
675     %attrs = (
676       % { $args[3] || {} },
677       % { $args[4] || {} },
678     );
679     @args = @args[0,1,2];
680   }
681
682   $info{arguments} = \@args;
683
684   my @storage_opts = grep exists $attrs{$_},
685     @storage_options, 'cursor_class';
686
687   @{ $info{storage_options} }{@storage_opts} =
688     delete @attrs{@storage_opts} if @storage_opts;
689
690   my @sql_maker_opts = grep exists $attrs{$_},
691     qw/limit_dialect quote_char name_sep quote_names/;
692
693   @{ $info{sql_maker_options} }{@sql_maker_opts} =
694     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
695
696   $info{attributes} = \%attrs if %attrs;
697
698   return \%info;
699 }
700
701 sub _default_dbi_connect_attributes () {
702   +{
703     AutoCommit => 1,
704     PrintError => 0,
705     RaiseError => 1,
706     ShowErrorStatement => 1,
707   };
708 }
709
710 =head2 on_connect_do
711
712 This method is deprecated in favour of setting via L</connect_info>.
713
714 =cut
715
716 =head2 on_disconnect_do
717
718 This method is deprecated in favour of setting via L</connect_info>.
719
720 =cut
721
722 sub _parse_connect_do {
723   my ($self, $type) = @_;
724
725   my $val = $self->$type;
726   return () if not defined $val;
727
728   my @res;
729
730   if (not ref($val)) {
731     push @res, [ 'do_sql', $val ];
732   } elsif (ref($val) eq 'CODE') {
733     push @res, $val;
734   } elsif (ref($val) eq 'ARRAY') {
735     push @res, map { [ 'do_sql', $_ ] } @$val;
736   } else {
737     $self->throw_exception("Invalid type for $type: ".ref($val));
738   }
739
740   return \@res;
741 }
742
743 =head2 dbh_do
744
745 Arguments: ($subref | $method_name), @extra_coderef_args?
746
747 Execute the given $subref or $method_name using the new exception-based
748 connection management.
749
750 The first two arguments will be the storage object that C<dbh_do> was called
751 on and a database handle to use.  Any additional arguments will be passed
752 verbatim to the called subref as arguments 2 and onwards.
753
754 Using this (instead of $self->_dbh or $self->dbh) ensures correct
755 exception handling and reconnection (or failover in future subclasses).
756
757 Your subref should have no side-effects outside of the database, as
758 there is the potential for your subref to be partially double-executed
759 if the database connection was stale/dysfunctional.
760
761 Example:
762
763   my @stuff = $schema->storage->dbh_do(
764     sub {
765       my ($storage, $dbh, @cols) = @_;
766       my $cols = join(q{, }, @cols);
767       $dbh->selectrow_array("SELECT $cols FROM foo");
768     },
769     @column_list
770   );
771
772 =cut
773
774 sub dbh_do {
775   my $self = shift;
776   my $code = shift;
777
778   my $dbh = $self->_get_dbh;
779
780   return $self->$code($dbh, @_)
781     if ( $self->{_in_do_block} || $self->{transaction_depth} );
782
783   local $self->{_in_do_block} = 1;
784
785   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
786   my $args = \@_;
787
788   try {
789     $self->$code ($dbh, @$args);
790   } catch {
791     $self->throw_exception($_) if $self->connected;
792
793     # We were not connected - reconnect and retry, but let any
794     #  exception fall right through this time
795     carp "Retrying dbh_do($code) after catching disconnected exception: $_"
796       if $ENV{DBIC_STORAGE_RETRY_DEBUG};
797
798     $self->_populate_dbh;
799     $self->$code($self->_dbh, @$args);
800   };
801 }
802
803 sub txn_do {
804   # connects or reconnects on pid change, necessary to grab correct txn_depth
805   $_[0]->_get_dbh;
806   local $_[0]->{_in_do_block} = 1;
807   shift->next::method(@_);
808 }
809
810 =head2 disconnect
811
812 Our C<disconnect> method also performs a rollback first if the
813 database is not in C<AutoCommit> mode.
814
815 =cut
816
817 sub disconnect {
818   my ($self) = @_;
819
820   if( $self->_dbh ) {
821     my @actions;
822
823     push @actions, ( $self->on_disconnect_call || () );
824     push @actions, $self->_parse_connect_do ('on_disconnect_do');
825
826     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
827
828     # stops the "implicit rollback on disconnect" warning
829     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
830
831     %{ $self->_dbh->{CachedKids} } = ();
832     $self->_dbh->disconnect;
833     $self->_dbh(undef);
834     $self->{_dbh_gen}++;
835   }
836 }
837
838 =head2 with_deferred_fk_checks
839
840 =over 4
841
842 =item Arguments: C<$coderef>
843
844 =item Return Value: The return value of $coderef
845
846 =back
847
848 Storage specific method to run the code ref with FK checks deferred or
849 in MySQL's case disabled entirely.
850
851 =cut
852
853 # Storage subclasses should override this
854 sub with_deferred_fk_checks {
855   my ($self, $sub) = @_;
856   $sub->();
857 }
858
859 =head2 connected
860
861 =over
862
863 =item Arguments: none
864
865 =item Return Value: 1|0
866
867 =back
868
869 Verifies that the current database handle is active and ready to execute
870 an SQL statement (e.g. the connection did not get stale, server is still
871 answering, etc.) This method is used internally by L</dbh>.
872
873 =cut
874
875 sub connected {
876   my $self = shift;
877   return 0 unless $self->_seems_connected;
878
879   #be on the safe side
880   local $self->_dbh->{RaiseError} = 1;
881
882   return $self->_ping;
883 }
884
885 sub _seems_connected {
886   my $self = shift;
887
888   $self->_verify_pid;
889
890   my $dbh = $self->_dbh
891     or return 0;
892
893   return $dbh->FETCH('Active');
894 }
895
896 sub _ping {
897   my $self = shift;
898
899   my $dbh = $self->_dbh or return 0;
900
901   return $dbh->ping;
902 }
903
904 sub ensure_connected {
905   my ($self) = @_;
906
907   unless ($self->connected) {
908     $self->_populate_dbh;
909   }
910 }
911
912 =head2 dbh
913
914 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
915 is guaranteed to be healthy by implicitly calling L</connected>, and if
916 necessary performing a reconnection before returning. Keep in mind that this
917 is very B<expensive> on some database engines. Consider using L</dbh_do>
918 instead.
919
920 =cut
921
922 sub dbh {
923   my ($self) = @_;
924
925   if (not $self->_dbh) {
926     $self->_populate_dbh;
927   } else {
928     $self->ensure_connected;
929   }
930   return $self->_dbh;
931 }
932
933 # this is the internal "get dbh or connect (don't check)" method
934 sub _get_dbh {
935   my $self = shift;
936   $self->_verify_pid;
937   $self->_populate_dbh unless $self->_dbh;
938   return $self->_dbh;
939 }
940
941 sub sql_maker {
942   my ($self) = @_;
943   unless ($self->_sql_maker) {
944     my $sql_maker_class = $self->sql_maker_class;
945
946     my %opts = %{$self->_sql_maker_opts||{}};
947     my $dialect =
948       $opts{limit_dialect}
949         ||
950       $self->sql_limit_dialect
951         ||
952       do {
953         my $s_class = (ref $self) || $self;
954         carp (
955           "Your storage class ($s_class) does not set sql_limit_dialect and you "
956         . 'have not supplied an explicit limit_dialect in your connection_info. '
957         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
958         . 'databases but can be (and often is) painfully slow. '
959         . "Please file an RT ticket against '$s_class' ."
960         );
961
962         'GenericSubQ';
963       }
964     ;
965
966     my ($quote_char, $name_sep);
967
968     if ($opts{quote_names}) {
969       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
970         my $s_class = (ref $self) || $self;
971         carp (
972           "You requested 'quote_names' but your storage class ($s_class) does "
973         . 'not explicitly define a default sql_quote_char and you have not '
974         . 'supplied a quote_char as part of your connection_info. DBIC will '
975         .q{default to the ANSI SQL standard quote '"', which works most of }
976         . "the time. Please file an RT ticket against '$s_class'."
977         );
978
979         '"'; # RV
980       };
981
982       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
983     }
984
985     $self->_sql_maker($sql_maker_class->new(
986       bindtype=>'columns',
987       array_datatypes => 1,
988       limit_dialect => $dialect,
989       ($quote_char ? (quote_char => $quote_char) : ()),
990       name_sep => ($name_sep || '.'),
991       %opts,
992     ));
993   }
994   return $self->_sql_maker;
995 }
996
997 # nothing to do by default
998 sub _rebless {}
999 sub _init {}
1000
1001 sub _populate_dbh {
1002   my ($self) = @_;
1003
1004   my @info = @{$self->_dbi_connect_info || []};
1005   $self->_dbh(undef); # in case ->connected failed we might get sent here
1006   $self->_dbh_details({}); # reset everything we know
1007
1008   $self->_dbh($self->_connect(@info));
1009
1010   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1011
1012   $self->_determine_driver;
1013
1014   # Always set the transaction depth on connect, since
1015   #  there is no transaction in progress by definition
1016   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1017
1018   $self->_run_connection_actions unless $self->{_in_determine_driver};
1019 }
1020
1021 sub _run_connection_actions {
1022   my $self = shift;
1023   my @actions;
1024
1025   push @actions, ( $self->on_connect_call || () );
1026   push @actions, $self->_parse_connect_do ('on_connect_do');
1027
1028   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1029 }
1030
1031
1032
1033 sub set_use_dbms_capability {
1034   $_[0]->set_inherited ($_[1], $_[2]);
1035 }
1036
1037 sub get_use_dbms_capability {
1038   my ($self, $capname) = @_;
1039
1040   my $use = $self->get_inherited ($capname);
1041   return defined $use
1042     ? $use
1043     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1044   ;
1045 }
1046
1047 sub set_dbms_capability {
1048   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1049 }
1050
1051 sub get_dbms_capability {
1052   my ($self, $capname) = @_;
1053
1054   my $cap = $self->_dbh_details->{capability}{$capname};
1055
1056   unless (defined $cap) {
1057     if (my $meth = $self->can ("_determine$capname")) {
1058       $cap = $self->$meth ? 1 : 0;
1059     }
1060     else {
1061       $cap = 0;
1062     }
1063
1064     $self->set_dbms_capability ($capname, $cap);
1065   }
1066
1067   return $cap;
1068 }
1069
1070 sub _server_info {
1071   my $self = shift;
1072
1073   my $info;
1074   unless ($info = $self->_dbh_details->{info}) {
1075
1076     $info = {};
1077
1078     my $server_version = try { $self->_get_server_version };
1079
1080     if (defined $server_version) {
1081       $info->{dbms_version} = $server_version;
1082
1083       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1084       my @verparts = split (/\./, $numeric_version);
1085       if (
1086         @verparts
1087           &&
1088         $verparts[0] <= 999
1089       ) {
1090         # consider only up to 3 version parts, iff not more than 3 digits
1091         my @use_parts;
1092         while (@verparts && @use_parts < 3) {
1093           my $p = shift @verparts;
1094           last if $p > 999;
1095           push @use_parts, $p;
1096         }
1097         push @use_parts, 0 while @use_parts < 3;
1098
1099         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1100       }
1101     }
1102
1103     $self->_dbh_details->{info} = $info;
1104   }
1105
1106   return $info;
1107 }
1108
1109 sub _get_server_version {
1110   shift->_dbh_get_info(18);
1111 }
1112
1113 sub _dbh_get_info {
1114   my ($self, $info) = @_;
1115
1116   return try { $self->_get_dbh->get_info($info) } || undef;
1117 }
1118
1119 sub _determine_driver {
1120   my ($self) = @_;
1121
1122   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1123     my $started_connected = 0;
1124     local $self->{_in_determine_driver} = 1;
1125
1126     if (ref($self) eq __PACKAGE__) {
1127       my $driver;
1128       if ($self->_dbh) { # we are connected
1129         $driver = $self->_dbh->{Driver}{Name};
1130         $started_connected = 1;
1131       } else {
1132         # if connect_info is a CODEREF, we have no choice but to connect
1133         if (ref $self->_dbi_connect_info->[0] &&
1134             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1135           $self->_populate_dbh;
1136           $driver = $self->_dbh->{Driver}{Name};
1137         }
1138         else {
1139           # try to use dsn to not require being connected, the driver may still
1140           # force a connection in _rebless to determine version
1141           # (dsn may not be supplied at all if all we do is make a mock-schema)
1142           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1143           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1144           $driver ||= $ENV{DBI_DRIVER};
1145         }
1146       }
1147
1148       if ($driver) {
1149         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1150         if ($self->load_optional_class($storage_class)) {
1151           mro::set_mro($storage_class, 'c3');
1152           bless $self, $storage_class;
1153           $self->_rebless();
1154         }
1155       }
1156     }
1157
1158     $self->_driver_determined(1);
1159
1160     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1161
1162     $self->_init; # run driver-specific initializations
1163
1164     $self->_run_connection_actions
1165         if !$started_connected && defined $self->_dbh;
1166   }
1167 }
1168
1169 sub _do_connection_actions {
1170   my $self          = shift;
1171   my $method_prefix = shift;
1172   my $call          = shift;
1173
1174   if (not ref($call)) {
1175     my $method = $method_prefix . $call;
1176     $self->$method(@_);
1177   } elsif (ref($call) eq 'CODE') {
1178     $self->$call(@_);
1179   } elsif (ref($call) eq 'ARRAY') {
1180     if (ref($call->[0]) ne 'ARRAY') {
1181       $self->_do_connection_actions($method_prefix, $_) for @$call;
1182     } else {
1183       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1184     }
1185   } else {
1186     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1187   }
1188
1189   return $self;
1190 }
1191
1192 sub connect_call_do_sql {
1193   my $self = shift;
1194   $self->_do_query(@_);
1195 }
1196
1197 sub disconnect_call_do_sql {
1198   my $self = shift;
1199   $self->_do_query(@_);
1200 }
1201
1202 # override in db-specific backend when necessary
1203 sub connect_call_datetime_setup { 1 }
1204
1205 sub _do_query {
1206   my ($self, $action) = @_;
1207
1208   if (ref $action eq 'CODE') {
1209     $action = $action->($self);
1210     $self->_do_query($_) foreach @$action;
1211   }
1212   else {
1213     # Most debuggers expect ($sql, @bind), so we need to exclude
1214     # the attribute hash which is the second argument to $dbh->do
1215     # furthermore the bind values are usually to be presented
1216     # as named arrayref pairs, so wrap those here too
1217     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1218     my $sql = shift @do_args;
1219     my $attrs = shift @do_args;
1220     my @bind = map { [ undef, $_ ] } @do_args;
1221
1222     $self->_query_start($sql, \@bind);
1223     $self->_get_dbh->do($sql, $attrs, @do_args);
1224     $self->_query_end($sql, \@bind);
1225   }
1226
1227   return $self;
1228 }
1229
1230 sub _connect {
1231   my ($self, @info) = @_;
1232
1233   $self->throw_exception("You failed to provide any connection info")
1234     if !@info;
1235
1236   my ($old_connect_via, $dbh);
1237
1238   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1239     $old_connect_via = $DBI::connect_via;
1240     $DBI::connect_via = 'connect';
1241   }
1242
1243   try {
1244     if(ref $info[0] eq 'CODE') {
1245       $dbh = $info[0]->();
1246     }
1247     else {
1248       require DBI;
1249       $dbh = DBI->connect(@info);
1250     }
1251
1252     if (!$dbh) {
1253       die $DBI::errstr;
1254     }
1255
1256     unless ($self->unsafe) {
1257
1258       $self->throw_exception(
1259         'Refusing clobbering of {HandleError} installed on externally supplied '
1260        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1261       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1262
1263       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1264       # request, or an external handle. Complain and set anyway
1265       unless ($dbh->{RaiseError}) {
1266         carp( ref $info[0] eq 'CODE'
1267
1268           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1269            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1270            .'attribute has been supplied'
1271
1272           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1273            .'unsafe => 1. Toggling RaiseError back to true'
1274         );
1275
1276         $dbh->{RaiseError} = 1;
1277       }
1278
1279       # this odd anonymous coderef dereference is in fact really
1280       # necessary to avoid the unwanted effect described in perl5
1281       # RT#75792
1282       sub {
1283         my $weak_self = $_[0];
1284         weaken $weak_self;
1285
1286         # the coderef is blessed so we can distinguish it from externally
1287         # supplied handles (which must be preserved)
1288         $_[1]->{HandleError} = bless sub {
1289           if ($weak_self) {
1290             $weak_self->throw_exception("DBI Exception: $_[0]");
1291           }
1292           else {
1293             # the handler may be invoked by something totally out of
1294             # the scope of DBIC
1295             DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1296           }
1297         }, '__DBIC__DBH__ERROR__HANDLER__';
1298       }->($self, $dbh);
1299     }
1300   }
1301   catch {
1302     $self->throw_exception("DBI Connection failed: $_")
1303   }
1304   finally {
1305     $DBI::connect_via = $old_connect_via if $old_connect_via;
1306   };
1307
1308   $self->_dbh_autocommit($dbh->{AutoCommit});
1309   $dbh;
1310 }
1311
1312 sub txn_begin {
1313   my $self = shift;
1314
1315   # this means we have not yet connected and do not know the AC status
1316   # (e.g. coderef $dbh), need a full-fledged connection check
1317   if (! defined $self->_dbh_autocommit) {
1318     $self->ensure_connected;
1319   }
1320   # Otherwise simply connect or re-connect on pid changes
1321   else {
1322     $self->_get_dbh;
1323   }
1324
1325   $self->next::method(@_);
1326 }
1327
1328 sub _exec_txn_begin {
1329   my $self = shift;
1330
1331   # if the user is utilizing txn_do - good for him, otherwise we need to
1332   # ensure that the $dbh is healthy on BEGIN.
1333   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1334   # will be replaced by a failure of begin_work itself (which will be
1335   # then retried on reconnect)
1336   if ($self->{_in_do_block}) {
1337     $self->_dbh->begin_work;
1338   } else {
1339     $self->dbh_do(sub { $_[1]->begin_work });
1340   }
1341 }
1342
1343 sub txn_commit {
1344   my $self = shift;
1345
1346   $self->_verify_pid if $self->_dbh;
1347   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1348     unless $self->_dbh;
1349
1350   # esoteric case for folks using external $dbh handles
1351   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1352     carp "Storage transaction_depth 0 does not match "
1353         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1354     $self->transaction_depth(1);
1355   }
1356
1357   $self->next::method(@_);
1358
1359   # if AutoCommit is disabled txn_depth never goes to 0
1360   # as a new txn is started immediately on commit
1361   $self->transaction_depth(1) if (
1362     !$self->transaction_depth
1363       and
1364     defined $self->_dbh_autocommit
1365       and
1366     ! $self->_dbh_autocommit
1367   );
1368 }
1369
1370 sub _exec_txn_commit {
1371   shift->_dbh->commit;
1372 }
1373
1374 sub txn_rollback {
1375   my $self = shift;
1376
1377   $self->_verify_pid if $self->_dbh;
1378   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1379     unless $self->_dbh;
1380
1381   # esoteric case for folks using external $dbh handles
1382   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1383     carp "Storage transaction_depth 0 does not match "
1384         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1385     $self->transaction_depth(1);
1386   }
1387
1388   $self->next::method(@_);
1389
1390   # if AutoCommit is disabled txn_depth never goes to 0
1391   # as a new txn is started immediately on commit
1392   $self->transaction_depth(1) if (
1393     !$self->transaction_depth
1394       and
1395     defined $self->_dbh_autocommit
1396       and
1397     ! $self->_dbh_autocommit
1398   );
1399 }
1400
1401 sub _exec_txn_rollback {
1402   shift->_dbh->rollback;
1403 }
1404
1405 # generate some identical methods
1406 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1407   no strict qw/refs/;
1408   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1409     my $self = shift;
1410     $self->_verify_pid if $self->_dbh;
1411     $self->throw_exception("Unable to $meth() on a disconnected storage")
1412       unless $self->_dbh;
1413     $self->next::method(@_);
1414   };
1415 }
1416
1417 # This used to be the top-half of _execute.  It was split out to make it
1418 #  easier to override in NoBindVars without duping the rest.  It takes up
1419 #  all of _execute's args, and emits $sql, @bind.
1420 sub _prep_for_execute {
1421   #my ($self, $op, $ident, $args) = @_;
1422   return shift->_gen_sql_bind(@_)
1423 }
1424
1425 sub _gen_sql_bind {
1426   my ($self, $op, $ident, $args) = @_;
1427
1428   my ($sql, @bind) = $self->sql_maker->$op(
1429     blessed($ident) ? $ident->from : $ident,
1430     @$args,
1431   );
1432
1433   my (@final_bind, $colinfos);
1434   my $resolve_bindinfo = sub {
1435     $colinfos ||= $self->_resolve_column_info($ident);
1436     if (my $col = $_[1]->{dbic_colname}) {
1437       $_[1]->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1438         if $colinfos->{$col}{data_type};
1439       $_[1]->{sqlt_size} ||= $colinfos->{$col}{size}
1440         if $colinfos->{$col}{size};
1441     }
1442     $_[1];
1443   };
1444
1445   for my $e (@{$args->[2]{bind}||[]}, @bind) {
1446     push @final_bind, [ do {
1447       if (ref $e ne 'ARRAY') {
1448         ({}, $e)
1449       }
1450       elsif (! defined $e->[0]) {
1451         ({}, $e->[1])
1452       }
1453       elsif (ref $e->[0] eq 'HASH') {
1454         (
1455           (first { $e->[0]{$_} } qw/dbd_attrs sqlt_datatype/) ? $e->[0] : $self->$resolve_bindinfo($e->[0]),
1456           $e->[1]
1457         )
1458       }
1459       elsif (ref $e->[0] eq 'SCALAR') {
1460         ( { sqlt_datatype => ${$e->[0]} }, $e->[1] )
1461       }
1462       else {
1463         ( $self->$resolve_bindinfo({ dbic_colname => $e->[0] }), $e->[1] )
1464       }
1465     }];
1466   }
1467
1468   ($sql, \@final_bind);
1469 }
1470
1471 sub _format_for_trace {
1472   #my ($self, $bind) = @_;
1473
1474   ### Turn @bind from something like this:
1475   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1476   ### to this:
1477   ###   ( "'1'", "'3'" )
1478
1479   map {
1480     defined( $_ && $_->[1] )
1481       ? qq{'$_->[1]'}
1482       : q{NULL}
1483   } @{$_[1] || []};
1484 }
1485
1486 sub _query_start {
1487   my ( $self, $sql, $bind ) = @_;
1488
1489   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1490     if $self->debug;
1491 }
1492
1493 sub _query_end {
1494   my ( $self, $sql, $bind ) = @_;
1495
1496   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1497     if $self->debug;
1498 }
1499
1500 my $sba_compat;
1501 sub _dbi_attrs_for_bind {
1502   my ($self, $ident, $bind) = @_;
1503
1504   if (! defined $sba_compat) {
1505     $self->_determine_driver;
1506     $sba_compat = $self->can('source_bind_attributes') == \&source_bind_attributes
1507       ? 0
1508       : 1
1509     ;
1510   }
1511
1512   my $sba_attrs;
1513   if ($sba_compat) {
1514     my $class = ref $self;
1515     carp_unique (
1516       "The source_bind_attributes() override in $class relies on a deprecated codepath. "
1517      .'You are strongly advised to switch your code to override bind_attribute_by_datatype() '
1518      .'instead. This legacy compat shim will also disappear some time before DBIC 0.09'
1519     );
1520
1521     my $sba_attrs = $self->source_bind_attributes
1522   }
1523
1524   my @attrs;
1525
1526   for (map { $_->[0] } @$bind) {
1527     push @attrs, do {
1528       if (exists $_->{dbd_attrs}) {
1529         $_->{dbd_attrs}
1530       }
1531       elsif($_->{sqlt_datatype}) {
1532         # cache the result in the dbh_details hash, as it can not change unless
1533         # we connect to something else
1534         my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
1535         if (not exists $cache->{$_->{sqlt_datatype}}) {
1536           $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1537         }
1538         $cache->{$_->{sqlt_datatype}};
1539       }
1540       elsif ($sba_attrs and $_->{dbic_colname}) {
1541         $sba_attrs->{$_->{dbic_colname}} || undef;
1542       }
1543       else {
1544         undef;  # always push something at this position
1545       }
1546     }
1547   }
1548
1549   return \@attrs;
1550 }
1551
1552 sub _execute {
1553   my ($self, $op, $ident, @args) = @_;
1554
1555   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1556
1557   shift->dbh_do(    # retry over disconnects
1558     '_dbh_execute',
1559     $sql,
1560     $bind,
1561     $self->_dbi_attrs_for_bind($ident, $bind)
1562   );
1563 }
1564
1565 sub _dbh_execute {
1566   my ($self, undef, $sql, $bind, $bind_attrs) = @_;
1567
1568   $self->_query_start( $sql, $bind );
1569   my $sth = $self->_sth($sql);
1570
1571   for my $i (0 .. $#$bind) {
1572     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1573       $sth->bind_param_inout(
1574         $i + 1, # bind params counts are 1-based
1575         $bind->[$i][1],
1576         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1577         $bind_attrs->[$i],
1578       );
1579     }
1580     else {
1581       $sth->bind_param(
1582         $i + 1,
1583         (ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""'))
1584           ? "$bind->[$i][1]"
1585           : $bind->[$i][1]
1586         ,
1587         $bind_attrs->[$i],
1588       );
1589     }
1590   }
1591
1592   # Can this fail without throwing an exception anyways???
1593   my $rv = $sth->execute();
1594   $self->throw_exception(
1595     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1596   ) if !$rv;
1597
1598   $self->_query_end( $sql, $bind );
1599
1600   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1601 }
1602
1603 sub _prefetch_autovalues {
1604   my ($self, $source, $to_insert) = @_;
1605
1606   my $colinfo = $source->columns_info;
1607
1608   my %values;
1609   for my $col (keys %$colinfo) {
1610     if (
1611       $colinfo->{$col}{auto_nextval}
1612         and
1613       (
1614         ! exists $to_insert->{$col}
1615           or
1616         ref $to_insert->{$col} eq 'SCALAR'
1617           or
1618         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1619       )
1620     ) {
1621       $values{$col} = $self->_sequence_fetch(
1622         'NEXTVAL',
1623         ( $colinfo->{$col}{sequence} ||=
1624             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1625         ),
1626       );
1627     }
1628   }
1629
1630   \%values;
1631 }
1632
1633 sub insert {
1634   my ($self, $source, $to_insert) = @_;
1635
1636   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1637
1638   # fuse the values, but keep a separate list of prefetched_values so that
1639   # they can be fused once again with the final return
1640   $to_insert = { %$to_insert, %$prefetched_values };
1641
1642   my $col_infos = $source->columns_info;
1643   my %pcols = map { $_ => 1 } $source->primary_columns;
1644   my %retrieve_cols;
1645   for my $col ($source->columns) {
1646     # nothing to retrieve when explicit values are supplied
1647     next if (defined $to_insert->{$col} and ! (
1648       ref $to_insert->{$col} eq 'SCALAR'
1649         or
1650       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1651     ));
1652
1653     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1654     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1655       $pcols{$col}
1656         or
1657       $col_infos->{$col}{retrieve_on_insert}
1658     );
1659   };
1660
1661   my ($sqla_opts, @ir_container);
1662   if (%retrieve_cols and $self->_use_insert_returning) {
1663     $sqla_opts->{returning_container} = \@ir_container
1664       if $self->_use_insert_returning_bound;
1665
1666     $sqla_opts->{returning} = [
1667       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1668     ];
1669   }
1670
1671   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1672
1673   my %returned_cols = %$to_insert;
1674   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1675     @ir_container = try {
1676       local $SIG{__WARN__} = sub {};
1677       my @r = $sth->fetchrow_array;
1678       $sth->finish;
1679       @r;
1680     } unless @ir_container;
1681
1682     @returned_cols{@$retlist} = @ir_container if @ir_container;
1683   }
1684   else {
1685     # pull in PK if needed and then everything else
1686     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1687
1688       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1689         unless $self->can('last_insert_id');
1690
1691       my @pri_values = $self->last_insert_id($source, @missing_pri);
1692
1693       $self->throw_exception( "Can't get last insert id" )
1694         unless (@pri_values == @missing_pri);
1695
1696       @returned_cols{@missing_pri} = @pri_values;
1697       delete $retrieve_cols{$_} for @missing_pri;
1698     }
1699
1700     # if there is more left to pull
1701     if (%retrieve_cols) {
1702       $self->throw_exception(
1703         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
1704       ) unless %pcols;
1705
1706       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
1707
1708       my $cur = DBIx::Class::ResultSet->new($source, {
1709         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
1710         select => \@left_to_fetch,
1711       })->cursor;
1712
1713       @returned_cols{@left_to_fetch} = $cur->next;
1714
1715       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
1716         if scalar $cur->next;
1717     }
1718   }
1719
1720   return { %$prefetched_values, %returned_cols };
1721 }
1722
1723 sub insert_bulk {
1724   my ($self, $source, $cols, $data) = @_;
1725
1726   # FIXME - perhaps this is not even needed? does DBI stringify?
1727   #
1728   # forcibly stringify whatever is stringifiable
1729   for my $r (0 .. $#$data) {
1730     for my $c (0 .. $#{$data->[$r]}) {
1731       $data->[$r][$c] = "$data->[$r][$c]"
1732         if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
1733     }
1734   }
1735
1736   # check the data for consistency
1737   # report a sensible error on bad data
1738   #
1739   # also create a list of dynamic binds (ones that will be changing
1740   # for each row)
1741   my $dyn_bind_idx;
1742   for my $col_idx (0..$#$cols) {
1743
1744     # the first "row" is used as a point of reference
1745     my $reference_val = $data->[0][$col_idx];
1746     my $is_literal = ref $reference_val eq 'SCALAR';
1747     my $is_literal_bind = ( !$is_literal and (
1748       ref $reference_val eq 'REF'
1749         and
1750       ref $$reference_val eq 'ARRAY'
1751     ) );
1752
1753     $dyn_bind_idx->{$col_idx} = 1
1754       if (!$is_literal and !$is_literal_bind);
1755
1756     # use a closure for convenience (less to pass)
1757     my $bad_slice = sub {
1758       my ($msg, $slice_idx) = @_;
1759       $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1760         $msg,
1761         $cols->[$col_idx],
1762         do {
1763           require Data::Dumper::Concise;
1764           local $Data::Dumper::Maxdepth = 2;
1765           Data::Dumper::Concise::Dumper ({
1766             map { $cols->[$_] =>
1767               $data->[$slice_idx][$_]
1768             } (0 .. $#$cols)
1769           }),
1770         }
1771       );
1772     };
1773
1774     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
1775       my $val = $data->[$row_idx][$col_idx];
1776
1777       if ($is_literal) {
1778         if (ref $val ne 'SCALAR') {
1779           $bad_slice->(
1780             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
1781             $row_idx
1782           );
1783         }
1784         elsif ($$val ne $$reference_val) {
1785           $bad_slice->(
1786             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
1787             $row_idx
1788           );
1789         }
1790       }
1791       elsif ($is_literal_bind) {
1792         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
1793           $bad_slice->(
1794             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
1795             $row_idx
1796           );
1797         }
1798         elsif (${$val}->[0] ne ${$reference_val}->[0]) {
1799           $bad_slice->(
1800             "Inconsistent literal SQL-bind value (expecting \\['${$reference_val}->[0]', ... ])",
1801             $row_idx
1802           );
1803         }
1804       }
1805       elsif (ref $val) {
1806         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
1807           $bad_slice->("Literal SQL found where a plain bind value is expected", $row_idx);
1808         }
1809         else {
1810           $bad_slice->("$val reference found where bind expected", $row_idx);
1811         }
1812       }
1813     }
1814   }
1815
1816   # Get the sql with bind values interpolated where necessary. For dynamic
1817   # binds convert the values of the first row into a literal+bind combo, with
1818   # extra positional info in the bind attr hashref. This will allow us to match
1819   # the order properly, and is so contrived because a user-supplied literal
1820   # bind (or something else specific to a resultsource and/or storage driver)
1821   # can inject extra binds along the way, so one can't rely on "shift
1822   # positions" ordering at all. Also we can't just hand SQLA a set of some
1823   # known "values" (e.g. hashrefs that can be later matched up by address),
1824   # because we want to supply a real value on which perhaps e.g. datatype
1825   # checks will be performed
1826   my ($sql, $proto_bind) = $self->_prep_for_execute (
1827     'insert',
1828     $source,
1829     [ { map { $cols->[$_] => $dyn_bind_idx->{$_}
1830       ? \[ '?', [
1831           { dbic_colname => $cols->[$_], _bind_data_slice_idx => $_ }
1832             =>
1833           $data->[0][$_]
1834         ] ]
1835       : $data->[0][$_]
1836     } (0..$#$cols) } ],
1837   );
1838
1839   if (! @$proto_bind and keys %$dyn_bind_idx) {
1840     # if the bindlist is empty and we had some dynamic binds, this means the
1841     # storage ate them away (e.g. the NoBindVars component) and interpolated
1842     # them directly into the SQL. This obviosly can't be good for multi-inserts
1843     $self->throw_exception('Cannot insert_bulk without support for placeholders');
1844   }
1845
1846   # neither _execute_array, nor _execute_inserts_with_no_binds are
1847   # atomic (even if _execute _array is a single call). Thus a safety
1848   # scope guard
1849   my $guard = $self->txn_scope_guard;
1850
1851   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
1852   my $sth = $self->_sth($sql);
1853   my $rv = do {
1854     if (@$proto_bind) {
1855       # proto bind contains the information on which pieces of $data to pull
1856       # $cols is passed in only for prettier error-reporting
1857       $self->_execute_array( $source, $sth, $proto_bind, $cols, $data );
1858     }
1859     else {
1860       # bind_param_array doesn't work if there are no binds
1861       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1862     }
1863   };
1864
1865   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
1866
1867   $guard->commit;
1868
1869   return (wantarray ? ($rv, $sth, @$proto_bind) : $rv);
1870 }
1871
1872 sub _execute_array {
1873   my ($self, $source, $sth, $proto_bind, $cols, $data, @extra) = @_;
1874
1875   ## This must be an arrayref, else nothing works!
1876   my $tuple_status = [];
1877
1878   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
1879
1880   # Bind the values by column slices
1881   for my $i (0 .. $#$proto_bind) {
1882     my $data_slice_idx = (
1883       ref $proto_bind->[$i][0] eq 'HASH'
1884         and
1885       exists $proto_bind->[$i][0]{_bind_data_slice_idx}
1886     ) ? $proto_bind->[$i][0]{_bind_data_slice_idx} : undef;
1887
1888     $sth->bind_param_array(
1889       $i+1, # DBI bind indexes are 1-based
1890       defined $data_slice_idx
1891         # either get a "column" of dynamic values, or just repeat the same
1892         # bind over and over
1893         ? [ map { $_->[$data_slice_idx] } @$data ]
1894         : [ ($proto_bind->[$i][1]) x @$data ]
1895       ,
1896       defined $bind_attrs->[$i] ? $bind_attrs->[$i] : (), # some DBDs throw up when given an undef
1897     );
1898   }
1899
1900   my ($rv, $err);
1901   try {
1902     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1903   }
1904   catch {
1905     $err = shift;
1906   };
1907
1908   # Not all DBDs are create equal. Some throw on error, some return
1909   # an undef $rv, and some set $sth->err - try whatever we can
1910   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
1911     ! defined $err
1912       and
1913     ( !defined $rv or $sth->err )
1914   );
1915
1916   # Statement must finish even if there was an exception.
1917   try {
1918     $sth->finish
1919   }
1920   catch {
1921     $err = shift unless defined $err
1922   };
1923
1924   if (defined $err) {
1925     my $i = 0;
1926     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1927
1928     $self->throw_exception("Unexpected populate error: $err")
1929       if ($i > $#$tuple_status);
1930
1931     require Data::Dumper::Concise;
1932     $self->throw_exception(sprintf "execute_array() aborted with '%s' at populate slice:\n%s",
1933       ($tuple_status->[$i][1] || $err),
1934       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
1935     );
1936   }
1937
1938   return $rv;
1939 }
1940
1941 sub _dbh_execute_array {
1942   #my ($self, $sth, $tuple_status, @extra) = @_;
1943   return $_[1]->execute_array({ArrayTupleStatus => $_[2]});
1944 }
1945
1946 sub _dbh_execute_inserts_with_no_binds {
1947   my ($self, $sth, $count) = @_;
1948
1949   my $err;
1950   try {
1951     my $dbh = $self->_get_dbh;
1952     local $dbh->{RaiseError} = 1;
1953     local $dbh->{PrintError} = 0;
1954
1955     $sth->execute foreach 1..$count;
1956   }
1957   catch {
1958     $err = shift;
1959   };
1960
1961   # Make sure statement is finished even if there was an exception.
1962   try {
1963     $sth->finish
1964   }
1965   catch {
1966     $err = shift unless defined $err;
1967   };
1968
1969   $self->throw_exception($err) if defined $err;
1970
1971   return $count;
1972 }
1973
1974 sub update {
1975   #my ($self, $source, @args) = @_;
1976   shift->_execute('update', @_);
1977 }
1978
1979
1980 sub delete {
1981   #my ($self, $source, @args) = @_;
1982   shift->_execute('delete', @_);
1983 }
1984
1985 # We were sent here because the $rs contains a complex search
1986 # which will require a subquery to select the correct rows
1987 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1988 #
1989 # Generating a single PK column subquery is trivial and supported
1990 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1991 # Look at _multipk_update_delete()
1992 sub _subq_update_delete {
1993   my $self = shift;
1994   my ($rs, $op, $values) = @_;
1995
1996   my $rsrc = $rs->result_source;
1997
1998   # quick check if we got a sane rs on our hands
1999   my @pcols = $rsrc->_pri_cols;
2000
2001   my $sel = $rs->_resolved_attrs->{select};
2002   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
2003
2004   if (
2005       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
2006         ne
2007       join ("\x00", sort @$sel )
2008   ) {
2009     $self->throw_exception (
2010       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
2011     );
2012   }
2013
2014   if (@pcols == 1) {
2015     return $self->$op (
2016       $rsrc,
2017       $op eq 'update' ? $values : (),
2018       { $pcols[0] => { -in => $rs->as_query } },
2019     );
2020   }
2021
2022   else {
2023     return $self->_multipk_update_delete (@_);
2024   }
2025 }
2026
2027 # ANSI SQL does not provide a reliable way to perform a multicol-PK
2028 # resultset update/delete involving subqueries. So by default resort
2029 # to simple (and inefficient) delete_all style per-row opearations,
2030 # while allowing specific storages to override this with a faster
2031 # implementation.
2032 #
2033 sub _multipk_update_delete {
2034   return shift->_per_row_update_delete (@_);
2035 }
2036
2037 # This is the default loop used to delete/update rows for multi PK
2038 # resultsets, and used by mysql exclusively (because it can't do anything
2039 # else).
2040 #
2041 # We do not use $row->$op style queries, because resultset update/delete
2042 # is not expected to cascade (this is what delete_all/update_all is for).
2043 #
2044 # There should be no race conditions as the entire operation is rolled
2045 # in a transaction.
2046 #
2047 sub _per_row_update_delete {
2048   my $self = shift;
2049   my ($rs, $op, $values) = @_;
2050
2051   my $rsrc = $rs->result_source;
2052   my @pcols = $rsrc->_pri_cols;
2053
2054   my $guard = $self->txn_scope_guard;
2055
2056   # emulate the return value of $sth->execute for non-selects
2057   my $row_cnt = '0E0';
2058
2059   my $subrs_cur = $rs->cursor;
2060   my @all_pk = $subrs_cur->all;
2061   for my $pks ( @all_pk) {
2062
2063     my $cond;
2064     for my $i (0.. $#pcols) {
2065       $cond->{$pcols[$i]} = $pks->[$i];
2066     }
2067
2068     $self->$op (
2069       $rsrc,
2070       $op eq 'update' ? $values : (),
2071       $cond,
2072     );
2073
2074     $row_cnt++;
2075   }
2076
2077   $guard->commit;
2078
2079   return $row_cnt;
2080 }
2081
2082 sub _select {
2083   my $self = shift;
2084   $self->_execute($self->_select_args(@_));
2085 }
2086
2087 sub _select_args_to_query {
2088   my $self = shift;
2089
2090   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2091   #  = $self->_select_args($ident, $select, $cond, $attrs);
2092   my ($op, $ident, @args) =
2093     $self->_select_args(@_);
2094
2095   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2096   my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, \@args);
2097   $prepared_bind ||= [];
2098
2099   return wantarray
2100     ? ($sql, $prepared_bind)
2101     : \[ "($sql)", @$prepared_bind ]
2102   ;
2103 }
2104
2105 sub _select_args {
2106   my ($self, $ident, $select, $where, $attrs) = @_;
2107
2108   my $sql_maker = $self->sql_maker;
2109   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2110
2111   $attrs = {
2112     %$attrs,
2113     select => $select,
2114     from => $ident,
2115     where => $where,
2116     $rs_alias && $alias2source->{$rs_alias}
2117       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2118       : ()
2119     ,
2120   };
2121
2122   # Sanity check the attributes (SQLMaker does it too, but
2123   # in case of a software_limit we'll never reach there)
2124   if (defined $attrs->{offset}) {
2125     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2126       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2127   }
2128
2129   if (defined $attrs->{rows}) {
2130     $self->throw_exception("The rows attribute must be a positive integer if present")
2131       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2132   }
2133   elsif ($attrs->{offset}) {
2134     # MySQL actually recommends this approach.  I cringe.
2135     $attrs->{rows} = $sql_maker->__max_int;
2136   }
2137
2138   my @limit;
2139
2140   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2141   # storage, unless software limit was requested
2142   if (
2143     #limited has_many
2144     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2145        ||
2146     # grouped prefetch (to satisfy group_by == select)
2147     ( $attrs->{group_by}
2148         &&
2149       @{$attrs->{group_by}}
2150         &&
2151       $attrs->{_prefetch_selector_range}
2152     )
2153   ) {
2154     ($ident, $select, $where, $attrs)
2155       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2156   }
2157   elsif (! $attrs->{software_limit} ) {
2158     push @limit, (
2159       $attrs->{rows} || (),
2160       $attrs->{offset} || (),
2161     );
2162   }
2163
2164   # try to simplify the joinmap further (prune unreferenced type-single joins)
2165   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2166
2167 ###
2168   # This would be the point to deflate anything found in $where
2169   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2170   # expect a row object. And all we have is a resultsource (it is trivial
2171   # to extract deflator coderefs via $alias2source above).
2172   #
2173   # I don't see a way forward other than changing the way deflators are
2174   # invoked, and that's just bad...
2175 ###
2176
2177   return ('select', $ident, $select, $where, $attrs, @limit);
2178 }
2179
2180 # Returns a counting SELECT for a simple count
2181 # query. Abstracted so that a storage could override
2182 # this to { count => 'firstcol' } or whatever makes
2183 # sense as a performance optimization
2184 sub _count_select {
2185   #my ($self, $source, $rs_attrs) = @_;
2186   return { count => '*' };
2187 }
2188
2189 sub source_bind_attributes {
2190   shift->throw_exception(
2191     'source_bind_attributes() was never meant to be a callable public method - '
2192    .'please contact the DBIC dev-team and describe your use case so that a reasonable '
2193    .'solution can be provided'
2194    ."\nhttp://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT"
2195   );
2196 }
2197
2198 =head2 select
2199
2200 =over 4
2201
2202 =item Arguments: $ident, $select, $condition, $attrs
2203
2204 =back
2205
2206 Handle a SQL select statement.
2207
2208 =cut
2209
2210 sub select {
2211   my $self = shift;
2212   my ($ident, $select, $condition, $attrs) = @_;
2213   return $self->cursor_class->new($self, \@_, $attrs);
2214 }
2215
2216 sub select_single {
2217   my $self = shift;
2218   my ($rv, $sth, @bind) = $self->_select(@_);
2219   my @row = $sth->fetchrow_array;
2220   my @nextrow = $sth->fetchrow_array if @row;
2221   if(@row && @nextrow) {
2222     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2223   }
2224   # Need to call finish() to work round broken DBDs
2225   $sth->finish();
2226   return @row;
2227 }
2228
2229 =head2 sql_limit_dialect
2230
2231 This is an accessor for the default SQL limit dialect used by a particular
2232 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2233 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2234 see L<DBIx::Class::SQLMaker::LimitDialects>.
2235
2236 =head2 sth
2237
2238 =over 4
2239
2240 =item Arguments: $sql
2241
2242 =back
2243
2244 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2245
2246 =cut
2247
2248 sub _dbh_sth {
2249   my ($self, $dbh, $sql) = @_;
2250
2251   # 3 is the if_active parameter which avoids active sth re-use
2252   my $sth = $self->disable_sth_caching
2253     ? $dbh->prepare($sql)
2254     : $dbh->prepare_cached($sql, {}, 3);
2255
2256   # XXX You would think RaiseError would make this impossible,
2257   #  but apparently that's not true :(
2258   $self->throw_exception(
2259     $dbh->errstr
2260       ||
2261     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2262             .'an exception and/or setting $dbh->errstr',
2263       length ($sql) > 20
2264         ? substr($sql, 0, 20) . '...'
2265         : $sql
2266       ,
2267       'DBD::' . $dbh->{Driver}{Name},
2268     )
2269   ) if !$sth;
2270
2271   $sth;
2272 }
2273
2274 sub sth {
2275   carp_unique 'sth was mistakenly marked/documented as public, stop calling it (will be removed before DBIC v0.09)';
2276   shift->_sth(@_);
2277 }
2278
2279 sub _sth {
2280   my ($self, $sql) = @_;
2281   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2282 }
2283
2284 sub _dbh_columns_info_for {
2285   my ($self, $dbh, $table) = @_;
2286
2287   if ($dbh->can('column_info')) {
2288     my %result;
2289     my $caught;
2290     try {
2291       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2292       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2293       $sth->execute();
2294       while ( my $info = $sth->fetchrow_hashref() ){
2295         my %column_info;
2296         $column_info{data_type}   = $info->{TYPE_NAME};
2297         $column_info{size}      = $info->{COLUMN_SIZE};
2298         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2299         $column_info{default_value} = $info->{COLUMN_DEF};
2300         my $col_name = $info->{COLUMN_NAME};
2301         $col_name =~ s/^\"(.*)\"$/$1/;
2302
2303         $result{$col_name} = \%column_info;
2304       }
2305     } catch {
2306       $caught = 1;
2307     };
2308     return \%result if !$caught && scalar keys %result;
2309   }
2310
2311   my %result;
2312   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2313   $sth->execute;
2314   my @columns = @{$sth->{NAME_lc}};
2315   for my $i ( 0 .. $#columns ){
2316     my %column_info;
2317     $column_info{data_type} = $sth->{TYPE}->[$i];
2318     $column_info{size} = $sth->{PRECISION}->[$i];
2319     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2320
2321     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2322       $column_info{data_type} = $1;
2323       $column_info{size}    = $2;
2324     }
2325
2326     $result{$columns[$i]} = \%column_info;
2327   }
2328   $sth->finish;
2329
2330   foreach my $col (keys %result) {
2331     my $colinfo = $result{$col};
2332     my $type_num = $colinfo->{data_type};
2333     my $type_name;
2334     if(defined $type_num && $dbh->can('type_info')) {
2335       my $type_info = $dbh->type_info($type_num);
2336       $type_name = $type_info->{TYPE_NAME} if $type_info;
2337       $colinfo->{data_type} = $type_name if $type_name;
2338     }
2339   }
2340
2341   return \%result;
2342 }
2343
2344 sub columns_info_for {
2345   my ($self, $table) = @_;
2346   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2347 }
2348
2349 =head2 last_insert_id
2350
2351 Return the row id of the last insert.
2352
2353 =cut
2354
2355 sub _dbh_last_insert_id {
2356     my ($self, $dbh, $source, $col) = @_;
2357
2358     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2359
2360     return $id if defined $id;
2361
2362     my $class = ref $self;
2363     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2364 }
2365
2366 sub last_insert_id {
2367   my $self = shift;
2368   $self->_dbh_last_insert_id ($self->_dbh, @_);
2369 }
2370
2371 =head2 _native_data_type
2372
2373 =over 4
2374
2375 =item Arguments: $type_name
2376
2377 =back
2378
2379 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2380 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2381 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2382
2383 The default implementation returns C<undef>, implement in your Storage driver if
2384 you need this functionality.
2385
2386 Should map types from other databases to the native RDBMS type, for example
2387 C<VARCHAR2> to C<VARCHAR>.
2388
2389 Types with modifiers should map to the underlying data type. For example,
2390 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2391
2392 Composite types should map to the container type, for example
2393 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2394
2395 =cut
2396
2397 sub _native_data_type {
2398   #my ($self, $data_type) = @_;
2399   return undef
2400 }
2401
2402 # Check if placeholders are supported at all
2403 sub _determine_supports_placeholders {
2404   my $self = shift;
2405   my $dbh  = $self->_get_dbh;
2406
2407   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2408   # but it is inaccurate more often than not
2409   return try {
2410     local $dbh->{PrintError} = 0;
2411     local $dbh->{RaiseError} = 1;
2412     $dbh->do('select ?', {}, 1);
2413     1;
2414   }
2415   catch {
2416     0;
2417   };
2418 }
2419
2420 # Check if placeholders bound to non-string types throw exceptions
2421 #
2422 sub _determine_supports_typeless_placeholders {
2423   my $self = shift;
2424   my $dbh  = $self->_get_dbh;
2425
2426   return try {
2427     local $dbh->{PrintError} = 0;
2428     local $dbh->{RaiseError} = 1;
2429     # this specifically tests a bind that is NOT a string
2430     $dbh->do('select 1 where 1 = ?', {}, 1);
2431     1;
2432   }
2433   catch {
2434     0;
2435   };
2436 }
2437
2438 =head2 sqlt_type
2439
2440 Returns the database driver name.
2441
2442 =cut
2443
2444 sub sqlt_type {
2445   shift->_get_dbh->{Driver}->{Name};
2446 }
2447
2448 =head2 bind_attribute_by_data_type
2449
2450 Given a datatype from column info, returns a database specific bind
2451 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2452 let the database planner just handle it.
2453
2454 Generally only needed for special case column types, like bytea in postgres.
2455
2456 =cut
2457
2458 sub bind_attribute_by_data_type {
2459     return;
2460 }
2461
2462 =head2 is_datatype_numeric
2463
2464 Given a datatype from column_info, returns a boolean value indicating if
2465 the current RDBMS considers it a numeric value. This controls how
2466 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2467 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2468 be performed instead of the usual C<eq>.
2469
2470 =cut
2471
2472 sub is_datatype_numeric {
2473   #my ($self, $dt) = @_;
2474
2475   return 0 unless $_[1];
2476
2477   $_[1] =~ /^ (?:
2478     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2479   ) $/ix;
2480 }
2481
2482
2483 =head2 create_ddl_dir
2484
2485 =over 4
2486
2487 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2488
2489 =back
2490
2491 Creates a SQL file based on the Schema, for each of the specified
2492 database engines in C<\@databases> in the given directory.
2493 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2494
2495 Given a previous version number, this will also create a file containing
2496 the ALTER TABLE statements to transform the previous schema into the
2497 current one. Note that these statements may contain C<DROP TABLE> or
2498 C<DROP COLUMN> statements that can potentially destroy data.
2499
2500 The file names are created using the C<ddl_filename> method below, please
2501 override this method in your schema if you would like a different file
2502 name format. For the ALTER file, the same format is used, replacing
2503 $version in the name with "$preversion-$version".
2504
2505 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2506 The most common value for this would be C<< { add_drop_table => 1 } >>
2507 to have the SQL produced include a C<DROP TABLE> statement for each table
2508 created. For quoting purposes supply C<quote_table_names> and
2509 C<quote_field_names>.
2510
2511 If no arguments are passed, then the following default values are assumed:
2512
2513 =over 4
2514
2515 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2516
2517 =item version    - $schema->schema_version
2518
2519 =item directory  - './'
2520
2521 =item preversion - <none>
2522
2523 =back
2524
2525 By default, C<\%sqlt_args> will have
2526
2527  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2528
2529 merged with the hash passed in. To disable any of those features, pass in a
2530 hashref like the following
2531
2532  { ignore_constraint_names => 0, # ... other options }
2533
2534
2535 WARNING: You are strongly advised to check all SQL files created, before applying
2536 them.
2537
2538 =cut
2539
2540 sub create_ddl_dir {
2541   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2542
2543   unless ($dir) {
2544     carp "No directory given, using ./\n";
2545     $dir = './';
2546   } else {
2547       -d $dir
2548         or
2549       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2550         or
2551       $self->throw_exception(
2552         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2553       );
2554   }
2555
2556   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2557
2558   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2559   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2560
2561   my $schema_version = $schema->schema_version || '1.x';
2562   $version ||= $schema_version;
2563
2564   $sqltargs = {
2565     add_drop_table => 1,
2566     ignore_constraint_names => 1,
2567     ignore_index_names => 1,
2568     %{$sqltargs || {}}
2569   };
2570
2571   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2572     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2573   }
2574
2575   my $sqlt = SQL::Translator->new( $sqltargs );
2576
2577   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2578   my $sqlt_schema = $sqlt->translate({ data => $schema })
2579     or $self->throw_exception ($sqlt->error);
2580
2581   foreach my $db (@$databases) {
2582     $sqlt->reset();
2583     $sqlt->{schema} = $sqlt_schema;
2584     $sqlt->producer($db);
2585
2586     my $file;
2587     my $filename = $schema->ddl_filename($db, $version, $dir);
2588     if (-e $filename && ($version eq $schema_version )) {
2589       # if we are dumping the current version, overwrite the DDL
2590       carp "Overwriting existing DDL file - $filename";
2591       unlink($filename);
2592     }
2593
2594     my $output = $sqlt->translate;
2595     if(!$output) {
2596       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2597       next;
2598     }
2599     if(!open($file, ">$filename")) {
2600       $self->throw_exception("Can't open $filename for writing ($!)");
2601       next;
2602     }
2603     print $file $output;
2604     close($file);
2605
2606     next unless ($preversion);
2607
2608     require SQL::Translator::Diff;
2609
2610     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2611     if(!-e $prefilename) {
2612       carp("No previous schema file found ($prefilename)");
2613       next;
2614     }
2615
2616     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2617     if(-e $difffile) {
2618       carp("Overwriting existing diff file - $difffile");
2619       unlink($difffile);
2620     }
2621
2622     my $source_schema;
2623     {
2624       my $t = SQL::Translator->new($sqltargs);
2625       $t->debug( 0 );
2626       $t->trace( 0 );
2627
2628       $t->parser( $db )
2629         or $self->throw_exception ($t->error);
2630
2631       my $out = $t->translate( $prefilename )
2632         or $self->throw_exception ($t->error);
2633
2634       $source_schema = $t->schema;
2635
2636       $source_schema->name( $prefilename )
2637         unless ( $source_schema->name );
2638     }
2639
2640     # The "new" style of producers have sane normalization and can support
2641     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2642     # And we have to diff parsed SQL against parsed SQL.
2643     my $dest_schema = $sqlt_schema;
2644
2645     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2646       my $t = SQL::Translator->new($sqltargs);
2647       $t->debug( 0 );
2648       $t->trace( 0 );
2649
2650       $t->parser( $db )
2651         or $self->throw_exception ($t->error);
2652
2653       my $out = $t->translate( $filename )
2654         or $self->throw_exception ($t->error);
2655
2656       $dest_schema = $t->schema;
2657
2658       $dest_schema->name( $filename )
2659         unless $dest_schema->name;
2660     }
2661
2662     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2663                                                   $dest_schema,   $db,
2664                                                   $sqltargs
2665                                                  );
2666     if(!open $file, ">$difffile") {
2667       $self->throw_exception("Can't write to $difffile ($!)");
2668       next;
2669     }
2670     print $file $diff;
2671     close($file);
2672   }
2673 }
2674
2675 =head2 deployment_statements
2676
2677 =over 4
2678
2679 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2680
2681 =back
2682
2683 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2684
2685 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2686 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2687
2688 C<$directory> is used to return statements from files in a previously created
2689 L</create_ddl_dir> directory and is optional. The filenames are constructed
2690 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2691
2692 If no C<$directory> is specified then the statements are constructed on the
2693 fly using L<SQL::Translator> and C<$version> is ignored.
2694
2695 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2696
2697 =cut
2698
2699 sub deployment_statements {
2700   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2701   $type ||= $self->sqlt_type;
2702   $version ||= $schema->schema_version || '1.x';
2703   $dir ||= './';
2704   my $filename = $schema->ddl_filename($type, $version, $dir);
2705   if(-f $filename)
2706   {
2707       # FIXME replace this block when a proper sane sql parser is available
2708       my $file;
2709       open($file, "<$filename")
2710         or $self->throw_exception("Can't open $filename ($!)");
2711       my @rows = <$file>;
2712       close($file);
2713       return join('', @rows);
2714   }
2715
2716   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2717     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2718   }
2719
2720   # sources needs to be a parser arg, but for simplicty allow at top level
2721   # coming in
2722   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2723       if exists $sqltargs->{sources};
2724
2725   my $tr = SQL::Translator->new(
2726     producer => "SQL::Translator::Producer::${type}",
2727     %$sqltargs,
2728     parser => 'SQL::Translator::Parser::DBIx::Class',
2729     data => $schema,
2730   );
2731
2732   my @ret;
2733   if (wantarray) {
2734     @ret = $tr->translate;
2735   }
2736   else {
2737     $ret[0] = $tr->translate;
2738   }
2739
2740   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2741     unless (@ret && defined $ret[0]);
2742
2743   return wantarray ? @ret : $ret[0];
2744 }
2745
2746 # FIXME deploy() currently does not accurately report sql errors
2747 # Will always return true while errors are warned
2748 sub deploy {
2749   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2750   my $deploy = sub {
2751     my $line = shift;
2752     return if(!$line);
2753     return if($line =~ /^--/);
2754     # next if($line =~ /^DROP/m);
2755     return if($line =~ /^BEGIN TRANSACTION/m);
2756     return if($line =~ /^COMMIT/m);
2757     return if $line =~ /^\s+$/; # skip whitespace only
2758     $self->_query_start($line);
2759     try {
2760       # do a dbh_do cycle here, as we need some error checking in
2761       # place (even though we will ignore errors)
2762       $self->dbh_do (sub { $_[1]->do($line) });
2763     } catch {
2764       carp qq{$_ (running "${line}")};
2765     };
2766     $self->_query_end($line);
2767   };
2768   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2769   if (@statements > 1) {
2770     foreach my $statement (@statements) {
2771       $deploy->( $statement );
2772     }
2773   }
2774   elsif (@statements == 1) {
2775     # split on single line comments and end of statements
2776     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2777       $deploy->( $line );
2778     }
2779   }
2780 }
2781
2782 =head2 datetime_parser
2783
2784 Returns the datetime parser class
2785
2786 =cut
2787
2788 sub datetime_parser {
2789   my $self = shift;
2790   return $self->{datetime_parser} ||= do {
2791     $self->build_datetime_parser(@_);
2792   };
2793 }
2794
2795 =head2 datetime_parser_type
2796
2797 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2798
2799 =head2 build_datetime_parser
2800
2801 See L</datetime_parser>
2802
2803 =cut
2804
2805 sub build_datetime_parser {
2806   my $self = shift;
2807   my $type = $self->datetime_parser_type(@_);
2808   return $type;
2809 }
2810
2811
2812 =head2 is_replicating
2813
2814 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2815 replicate from a master database.  Default is undef, which is the result
2816 returned by databases that don't support replication.
2817
2818 =cut
2819
2820 sub is_replicating {
2821     return;
2822
2823 }
2824
2825 =head2 lag_behind_master
2826
2827 Returns a number that represents a certain amount of lag behind a master db
2828 when a given storage is replicating.  The number is database dependent, but
2829 starts at zero and increases with the amount of lag. Default in undef
2830
2831 =cut
2832
2833 sub lag_behind_master {
2834     return;
2835 }
2836
2837 =head2 relname_to_table_alias
2838
2839 =over 4
2840
2841 =item Arguments: $relname, $join_count
2842
2843 =back
2844
2845 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2846 queries.
2847
2848 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2849 way these aliases are named.
2850
2851 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2852 otherwise C<"$relname">.
2853
2854 =cut
2855
2856 sub relname_to_table_alias {
2857   my ($self, $relname, $join_count) = @_;
2858
2859   my $alias = ($join_count && $join_count > 1 ?
2860     join('_', $relname, $join_count) : $relname);
2861
2862   return $alias;
2863 }
2864
2865 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2866 # version and it may be necessary to amend or override it for a specific storage
2867 # if such binds are necessary.
2868 sub _max_column_bytesize {
2869   my ($self, $attr) = @_;
2870
2871   my $max_size;
2872
2873   if ($attr->{sqlt_datatype}) {
2874     my $data_type = lc($attr->{sqlt_datatype});
2875
2876     if ($attr->{sqlt_size}) {
2877
2878       # String/sized-binary types
2879       if ($data_type =~ /^(?:
2880           l? (?:var)? char(?:acter)? (?:\s*varying)?
2881             |
2882           (?:var)? binary (?:\s*varying)?
2883             |
2884           raw
2885         )\b/x
2886       ) {
2887         $max_size = $attr->{sqlt_size};
2888       }
2889       # Other charset/unicode types, assume scale of 4
2890       elsif ($data_type =~ /^(?:
2891           national \s* character (?:\s*varying)?
2892             |
2893           nchar
2894             |
2895           univarchar
2896             |
2897           nvarchar
2898         )\b/x
2899       ) {
2900         $max_size = $attr->{sqlt_size} * 4;
2901       }
2902     }
2903
2904     if (!$max_size and !$self->_is_lob_type($data_type)) {
2905       $max_size = 100 # for all other (numeric?) datatypes
2906     }
2907   }
2908
2909   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
2910 }
2911
2912 # Determine if a data_type is some type of BLOB
2913 sub _is_lob_type {
2914   my ($self, $data_type) = @_;
2915   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
2916     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
2917                                   |varchar|character\s*varying|nvarchar
2918                                   |national\s*character\s*varying))?\z/xi);
2919 }
2920
2921 sub _is_binary_lob_type {
2922   my ($self, $data_type) = @_;
2923   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
2924     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
2925 }
2926
2927 sub _is_text_lob_type {
2928   my ($self, $data_type) = @_;
2929   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
2930     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
2931                         |national\s*character\s*varying))\z/xi);
2932 }
2933
2934 1;
2935
2936 =head1 USAGE NOTES
2937
2938 =head2 DBIx::Class and AutoCommit
2939
2940 DBIx::Class can do some wonderful magic with handling exceptions,
2941 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2942 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
2943 transaction support.
2944
2945 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2946 in an assumed transaction between commits, and you're telling us you'd
2947 like to manage that manually.  A lot of the magic protections offered by
2948 this module will go away.  We can't protect you from exceptions due to database
2949 disconnects because we don't know anything about how to restart your
2950 transactions.  You're on your own for handling all sorts of exceptional
2951 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2952 be with raw DBI.
2953
2954
2955 =head1 AUTHORS
2956
2957 Matt S. Trout <mst@shadowcatsystems.co.uk>
2958
2959 Andy Grundman <andy@hybridized.org>
2960
2961 =head1 LICENSE
2962
2963 You may distribute this code under the same terms as Perl itself.
2964
2965 =cut