Lazy-load as many of the non-essential modules as possible
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
11 use DBI;
12 use DBIx::Class::Storage::DBI::Cursor;
13 use Scalar::Util qw/refaddr weaken reftype blessed/;
14 use List::Util qw/first/;
15 use Sub::Name 'subname';
16 use Try::Tiny;
17 use overload ();
18 use namespace::clean;
19
20
21 # default cursor class, overridable in connect_info attributes
22 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
23
24 __PACKAGE__->mk_group_accessors('inherited' => qw/
25   sql_maker_class sql_limit_dialect sql_quote_char sql_name_sep
26 /);
27
28 __PACKAGE__->sql_name_sep('.');
29
30 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
31
32 __PACKAGE__->mk_group_accessors('simple' => qw/
33   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
34   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts
35   transaction_depth _dbh_autocommit  savepoints
36 /);
37
38 # the values for these accessors are picked out (and deleted) from
39 # the attribute hashref passed to connect_info
40 my @storage_options = qw/
41   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
42   disable_sth_caching unsafe auto_savepoint
43 /;
44 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
45
46
47 # capability definitions, using a 2-tiered accessor system
48 # The rationale is:
49 #
50 # A driver/user may define _use_X, which blindly without any checks says:
51 # "(do not) use this capability", (use_dbms_capability is an "inherited"
52 # type accessor)
53 #
54 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
55 # accessor, which in turn calls _determine_supports_X, and stores the return
56 # in a special slot on the storage object, which is wiped every time a $dbh
57 # reconnection takes place (it is not guaranteed that upon reconnection we
58 # will get the same rdbms version). _determine_supports_X does not need to
59 # exist on a driver, as we ->can for it before calling.
60
61 my @capabilities = (qw/
62   insert_returning
63   insert_returning_bound
64   placeholders
65   typeless_placeholders
66   join_optimizer
67 /);
68 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
69 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
70
71 # on by default, not strictly a capability (pending rewrite)
72 __PACKAGE__->_use_join_optimizer (1);
73 sub _determine_supports_join_optimizer { 1 };
74
75 # Each of these methods need _determine_driver called before itself
76 # in order to function reliably. This is a purely DRY optimization
77 #
78 # get_(use)_dbms_capability need to be called on the correct Storage
79 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
80 # _determine_supports_X which obv. needs a correct driver as well
81 my @rdbms_specific_methods = qw/
82   deployment_statements
83   sqlt_type
84   sql_maker
85   build_datetime_parser
86   datetime_parser_type
87
88   insert
89   insert_bulk
90   update
91   delete
92   select
93   select_single
94
95   get_use_dbms_capability
96   get_dbms_capability
97
98   _server_info
99   _get_server_version
100 /;
101
102 for my $meth (@rdbms_specific_methods) {
103
104   my $orig = __PACKAGE__->can ($meth)
105     or die "$meth is not a ::Storage::DBI method!";
106
107   no strict qw/refs/;
108   no warnings qw/redefine/;
109   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
110     if (not $_[0]->_driver_determined and not $_[0]->{_in_determine_driver}) {
111       $_[0]->_determine_driver;
112
113       # This for some reason crashes and burns on perl 5.8.1
114       # IFF the method ends up throwing an exception
115       #goto $_[0]->can ($meth);
116
117       my $cref = $_[0]->can ($meth);
118       goto $cref;
119     }
120     goto $orig;
121   };
122 }
123
124
125 =head1 NAME
126
127 DBIx::Class::Storage::DBI - DBI storage handler
128
129 =head1 SYNOPSIS
130
131   my $schema = MySchema->connect('dbi:SQLite:my.db');
132
133   $schema->storage->debug(1);
134
135   my @stuff = $schema->storage->dbh_do(
136     sub {
137       my ($storage, $dbh, @args) = @_;
138       $dbh->do("DROP TABLE authors");
139     },
140     @column_list
141   );
142
143   $schema->resultset('Book')->search({
144      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
145   });
146
147 =head1 DESCRIPTION
148
149 This class represents the connection to an RDBMS via L<DBI>.  See
150 L<DBIx::Class::Storage> for general information.  This pod only
151 documents DBI-specific methods and behaviors.
152
153 =head1 METHODS
154
155 =cut
156
157 sub new {
158   my $new = shift->next::method(@_);
159
160   $new->transaction_depth(0);
161   $new->_sql_maker_opts({});
162   $new->_dbh_details({});
163   $new->{savepoints} = [];
164   $new->{_in_dbh_do} = 0;
165   $new->{_dbh_gen} = 0;
166
167   # read below to see what this does
168   $new->_arm_global_destructor;
169
170   $new;
171 }
172
173 # This is hack to work around perl shooting stuff in random
174 # order on exit(). If we do not walk the remaining storage
175 # objects in an END block, there is a *small but real* chance
176 # of a fork()ed child to kill the parent's shared DBI handle,
177 # *before perl reaches the DESTROY in this package*
178 # Yes, it is ugly and effective.
179 # Additionally this registry is used by the CLONE method to
180 # make sure no handles are shared between threads
181 {
182   my %seek_and_destroy;
183
184   sub _arm_global_destructor {
185     my $self = shift;
186     my $key = refaddr ($self);
187     $seek_and_destroy{$key} = $self;
188     weaken ($seek_and_destroy{$key});
189   }
190
191   END {
192     local $?; # just in case the DBI destructor changes it somehow
193
194     # destroy just the object if not native to this process/thread
195     $_->_verify_pid for (grep
196       { defined $_ }
197       values %seek_and_destroy
198     );
199   }
200
201   sub CLONE {
202     # As per DBI's recommendation, DBIC disconnects all handles as
203     # soon as possible (DBIC will reconnect only on demand from within
204     # the thread)
205     for (values %seek_and_destroy) {
206       next unless $_;
207       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
208       $_->_dbh(undef);
209     }
210   }
211 }
212
213 sub DESTROY {
214   my $self = shift;
215
216   # some databases spew warnings on implicit disconnect
217   local $SIG{__WARN__} = sub {};
218   $self->_dbh(undef);
219
220   # this op is necessary, since the very last perl runtime statement
221   # triggers a global destruction shootout, and the $SIG localization
222   # may very well be destroyed before perl actually gets to do the
223   # $dbh undef
224   1;
225 }
226
227 # handle pid changes correctly - do not destroy parent's connection
228 sub _verify_pid {
229   my $self = shift;
230
231   my $pid = $self->_conn_pid;
232   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
233     $dbh->{InactiveDestroy} = 1;
234     $self->{_dbh_gen}++;
235     $self->_dbh(undef);
236   }
237
238   return;
239 }
240
241 =head2 connect_info
242
243 This method is normally called by L<DBIx::Class::Schema/connection>, which
244 encapsulates its argument list in an arrayref before passing them here.
245
246 The argument list may contain:
247
248 =over
249
250 =item *
251
252 The same 4-element argument set one would normally pass to
253 L<DBI/connect>, optionally followed by
254 L<extra attributes|/DBIx::Class specific connection attributes>
255 recognized by DBIx::Class:
256
257   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
258
259 =item *
260
261 A single code reference which returns a connected
262 L<DBI database handle|DBI/connect> optionally followed by
263 L<extra attributes|/DBIx::Class specific connection attributes> recognized
264 by DBIx::Class:
265
266   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
267
268 =item *
269
270 A single hashref with all the attributes and the dsn/user/password
271 mixed together:
272
273   $connect_info_args = [{
274     dsn => $dsn,
275     user => $user,
276     password => $pass,
277     %dbi_attributes,
278     %extra_attributes,
279   }];
280
281   $connect_info_args = [{
282     dbh_maker => sub { DBI->connect (...) },
283     %dbi_attributes,
284     %extra_attributes,
285   }];
286
287 This is particularly useful for L<Catalyst> based applications, allowing the
288 following config (L<Config::General> style):
289
290   <Model::DB>
291     schema_class   App::DB
292     <connect_info>
293       dsn          dbi:mysql:database=test
294       user         testuser
295       password     TestPass
296       AutoCommit   1
297     </connect_info>
298   </Model::DB>
299
300 The C<dsn>/C<user>/C<password> combination can be substituted by the
301 C<dbh_maker> key whose value is a coderef that returns a connected
302 L<DBI database handle|DBI/connect>
303
304 =back
305
306 Please note that the L<DBI> docs recommend that you always explicitly
307 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
308 recommends that it be set to I<1>, and that you perform transactions
309 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
310 to I<1> if you do not do explicitly set it to zero.  This is the default
311 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
312
313 =head3 DBIx::Class specific connection attributes
314
315 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
316 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
317 the following connection options. These options can be mixed in with your other
318 L<DBI> connection attributes, or placed in a separate hashref
319 (C<\%extra_attributes>) as shown above.
320
321 Every time C<connect_info> is invoked, any previous settings for
322 these options will be cleared before setting the new ones, regardless of
323 whether any options are specified in the new C<connect_info>.
324
325
326 =over
327
328 =item on_connect_do
329
330 Specifies things to do immediately after connecting or re-connecting to
331 the database.  Its value may contain:
332
333 =over
334
335 =item a scalar
336
337 This contains one SQL statement to execute.
338
339 =item an array reference
340
341 This contains SQL statements to execute in order.  Each element contains
342 a string or a code reference that returns a string.
343
344 =item a code reference
345
346 This contains some code to execute.  Unlike code references within an
347 array reference, its return value is ignored.
348
349 =back
350
351 =item on_disconnect_do
352
353 Takes arguments in the same form as L</on_connect_do> and executes them
354 immediately before disconnecting from the database.
355
356 Note, this only runs if you explicitly call L</disconnect> on the
357 storage object.
358
359 =item on_connect_call
360
361 A more generalized form of L</on_connect_do> that calls the specified
362 C<connect_call_METHOD> methods in your storage driver.
363
364   on_connect_do => 'select 1'
365
366 is equivalent to:
367
368   on_connect_call => [ [ do_sql => 'select 1' ] ]
369
370 Its values may contain:
371
372 =over
373
374 =item a scalar
375
376 Will call the C<connect_call_METHOD> method.
377
378 =item a code reference
379
380 Will execute C<< $code->($storage) >>
381
382 =item an array reference
383
384 Each value can be a method name or code reference.
385
386 =item an array of arrays
387
388 For each array, the first item is taken to be the C<connect_call_> method name
389 or code reference, and the rest are parameters to it.
390
391 =back
392
393 Some predefined storage methods you may use:
394
395 =over
396
397 =item do_sql
398
399 Executes a SQL string or a code reference that returns a SQL string. This is
400 what L</on_connect_do> and L</on_disconnect_do> use.
401
402 It can take:
403
404 =over
405
406 =item a scalar
407
408 Will execute the scalar as SQL.
409
410 =item an arrayref
411
412 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
413 attributes hashref and bind values.
414
415 =item a code reference
416
417 Will execute C<< $code->($storage) >> and execute the return array refs as
418 above.
419
420 =back
421
422 =item datetime_setup
423
424 Execute any statements necessary to initialize the database session to return
425 and accept datetime/timestamp values used with
426 L<DBIx::Class::InflateColumn::DateTime>.
427
428 Only necessary for some databases, see your specific storage driver for
429 implementation details.
430
431 =back
432
433 =item on_disconnect_call
434
435 Takes arguments in the same form as L</on_connect_call> and executes them
436 immediately before disconnecting from the database.
437
438 Calls the C<disconnect_call_METHOD> methods as opposed to the
439 C<connect_call_METHOD> methods called by L</on_connect_call>.
440
441 Note, this only runs if you explicitly call L</disconnect> on the
442 storage object.
443
444 =item disable_sth_caching
445
446 If set to a true value, this option will disable the caching of
447 statement handles via L<DBI/prepare_cached>.
448
449 =item limit_dialect
450
451 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
452 default L</sql_limit_dialect> setting of the storage (if any). For a list
453 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
454
455 =item quote_names
456
457 When true automatically sets L</quote_char> and L</name_sep> to the characters
458 appropriate for your particular RDBMS. This option is preferred over specifying
459 L</quote_char> directly.
460
461 =item quote_char
462
463 Specifies what characters to use to quote table and column names.
464
465 C<quote_char> expects either a single character, in which case is it
466 is placed on either side of the table/column name, or an arrayref of length
467 2 in which case the table/column name is placed between the elements.
468
469 For example under MySQL you should use C<< quote_char => '`' >>, and for
470 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
471
472 =item name_sep
473
474 This parameter is only useful in conjunction with C<quote_char>, and is used to
475 specify the character that separates elements (schemas, tables, columns) from
476 each other. If unspecified it defaults to the most commonly used C<.>.
477
478 =item unsafe
479
480 This Storage driver normally installs its own C<HandleError>, sets
481 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
482 all database handles, including those supplied by a coderef.  It does this
483 so that it can have consistent and useful error behavior.
484
485 If you set this option to a true value, Storage will not do its usual
486 modifications to the database handle's attributes, and instead relies on
487 the settings in your connect_info DBI options (or the values you set in
488 your connection coderef, in the case that you are connecting via coderef).
489
490 Note that your custom settings can cause Storage to malfunction,
491 especially if you set a C<HandleError> handler that suppresses exceptions
492 and/or disable C<RaiseError>.
493
494 =item auto_savepoint
495
496 If this option is true, L<DBIx::Class> will use savepoints when nesting
497 transactions, making it possible to recover from failure in the inner
498 transaction without having to abort all outer transactions.
499
500 =item cursor_class
501
502 Use this argument to supply a cursor class other than the default
503 L<DBIx::Class::Storage::DBI::Cursor>.
504
505 =back
506
507 Some real-life examples of arguments to L</connect_info> and
508 L<DBIx::Class::Schema/connect>
509
510   # Simple SQLite connection
511   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
512
513   # Connect via subref
514   ->connect_info([ sub { DBI->connect(...) } ]);
515
516   # Connect via subref in hashref
517   ->connect_info([{
518     dbh_maker => sub { DBI->connect(...) },
519     on_connect_do => 'alter session ...',
520   }]);
521
522   # A bit more complicated
523   ->connect_info(
524     [
525       'dbi:Pg:dbname=foo',
526       'postgres',
527       'my_pg_password',
528       { AutoCommit => 1 },
529       { quote_char => q{"} },
530     ]
531   );
532
533   # Equivalent to the previous example
534   ->connect_info(
535     [
536       'dbi:Pg:dbname=foo',
537       'postgres',
538       'my_pg_password',
539       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
540     ]
541   );
542
543   # Same, but with hashref as argument
544   # See parse_connect_info for explanation
545   ->connect_info(
546     [{
547       dsn         => 'dbi:Pg:dbname=foo',
548       user        => 'postgres',
549       password    => 'my_pg_password',
550       AutoCommit  => 1,
551       quote_char  => q{"},
552       name_sep    => q{.},
553     }]
554   );
555
556   # Subref + DBIx::Class-specific connection options
557   ->connect_info(
558     [
559       sub { DBI->connect(...) },
560       {
561           quote_char => q{`},
562           name_sep => q{@},
563           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
564           disable_sth_caching => 1,
565       },
566     ]
567   );
568
569
570
571 =cut
572
573 sub connect_info {
574   my ($self, $info) = @_;
575
576   return $self->_connect_info if !$info;
577
578   $self->_connect_info($info); # copy for _connect_info
579
580   $info = $self->_normalize_connect_info($info)
581     if ref $info eq 'ARRAY';
582
583   for my $storage_opt (keys %{ $info->{storage_options} }) {
584     my $value = $info->{storage_options}{$storage_opt};
585
586     $self->$storage_opt($value);
587   }
588
589   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
590   #  the new set of options
591   $self->_sql_maker(undef);
592   $self->_sql_maker_opts({});
593
594   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
595     my $value = $info->{sql_maker_options}{$sql_maker_opt};
596
597     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
598   }
599
600   my %attrs = (
601     %{ $self->_default_dbi_connect_attributes || {} },
602     %{ $info->{attributes} || {} },
603   );
604
605   my @args = @{ $info->{arguments} };
606
607   if (keys %attrs and ref $args[0] ne 'CODE') {
608     carp
609         'You provided explicit AutoCommit => 0 in your connection_info. '
610       . 'This is almost universally a bad idea (see the footnotes of '
611       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
612       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
613       . 'this warning.'
614       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
615
616     push @args, \%attrs if keys %attrs;
617   }
618   $self->_dbi_connect_info(\@args);
619
620   # FIXME - dirty:
621   # save attributes them in a separate accessor so they are always
622   # introspectable, even in case of a CODE $dbhmaker
623   $self->_dbic_connect_attributes (\%attrs);
624
625   return $self->_connect_info;
626 }
627
628 sub _normalize_connect_info {
629   my ($self, $info_arg) = @_;
630   my %info;
631
632   my @args = @$info_arg;  # take a shallow copy for further mutilation
633
634   # combine/pre-parse arguments depending on invocation style
635
636   my %attrs;
637   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
638     %attrs = %{ $args[1] || {} };
639     @args = $args[0];
640   }
641   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
642     %attrs = %{$args[0]};
643     @args = ();
644     if (my $code = delete $attrs{dbh_maker}) {
645       @args = $code;
646
647       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
648       if (@ignored) {
649         carp sprintf (
650             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
651           . "to the result of 'dbh_maker'",
652
653           join (', ', map { "'$_'" } (@ignored) ),
654         );
655       }
656     }
657     else {
658       @args = delete @attrs{qw/dsn user password/};
659     }
660   }
661   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
662     %attrs = (
663       % { $args[3] || {} },
664       % { $args[4] || {} },
665     );
666     @args = @args[0,1,2];
667   }
668
669   $info{arguments} = \@args;
670
671   my @storage_opts = grep exists $attrs{$_},
672     @storage_options, 'cursor_class';
673
674   @{ $info{storage_options} }{@storage_opts} =
675     delete @attrs{@storage_opts} if @storage_opts;
676
677   my @sql_maker_opts = grep exists $attrs{$_},
678     qw/limit_dialect quote_char name_sep quote_names/;
679
680   @{ $info{sql_maker_options} }{@sql_maker_opts} =
681     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
682
683   $info{attributes} = \%attrs if %attrs;
684
685   return \%info;
686 }
687
688 sub _default_dbi_connect_attributes () {
689   +{
690     AutoCommit => 1,
691     PrintError => 0,
692     RaiseError => 1,
693     ShowErrorStatement => 1,
694   };
695 }
696
697 =head2 on_connect_do
698
699 This method is deprecated in favour of setting via L</connect_info>.
700
701 =cut
702
703 =head2 on_disconnect_do
704
705 This method is deprecated in favour of setting via L</connect_info>.
706
707 =cut
708
709 sub _parse_connect_do {
710   my ($self, $type) = @_;
711
712   my $val = $self->$type;
713   return () if not defined $val;
714
715   my @res;
716
717   if (not ref($val)) {
718     push @res, [ 'do_sql', $val ];
719   } elsif (ref($val) eq 'CODE') {
720     push @res, $val;
721   } elsif (ref($val) eq 'ARRAY') {
722     push @res, map { [ 'do_sql', $_ ] } @$val;
723   } else {
724     $self->throw_exception("Invalid type for $type: ".ref($val));
725   }
726
727   return \@res;
728 }
729
730 =head2 dbh_do
731
732 Arguments: ($subref | $method_name), @extra_coderef_args?
733
734 Execute the given $subref or $method_name using the new exception-based
735 connection management.
736
737 The first two arguments will be the storage object that C<dbh_do> was called
738 on and a database handle to use.  Any additional arguments will be passed
739 verbatim to the called subref as arguments 2 and onwards.
740
741 Using this (instead of $self->_dbh or $self->dbh) ensures correct
742 exception handling and reconnection (or failover in future subclasses).
743
744 Your subref should have no side-effects outside of the database, as
745 there is the potential for your subref to be partially double-executed
746 if the database connection was stale/dysfunctional.
747
748 Example:
749
750   my @stuff = $schema->storage->dbh_do(
751     sub {
752       my ($storage, $dbh, @cols) = @_;
753       my $cols = join(q{, }, @cols);
754       $dbh->selectrow_array("SELECT $cols FROM foo");
755     },
756     @column_list
757   );
758
759 =cut
760
761 sub dbh_do {
762   my $self = shift;
763   my $code = shift;
764
765   my $dbh = $self->_get_dbh;
766
767   return $self->$code($dbh, @_)
768     if ( $self->{_in_dbh_do} || $self->{transaction_depth} );
769
770   local $self->{_in_dbh_do} = 1;
771
772   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
773   my $args = \@_;
774   return try {
775     $self->$code ($dbh, @$args);
776   } catch {
777     $self->throw_exception($_) if $self->connected;
778
779     # We were not connected - reconnect and retry, but let any
780     #  exception fall right through this time
781     carp "Retrying $code after catching disconnected exception: $_"
782       if $ENV{DBIC_DBIRETRY_DEBUG};
783
784     $self->_populate_dbh;
785     $self->$code($self->_dbh, @$args);
786   };
787 }
788
789 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
790 # It also informs dbh_do to bypass itself while under the direction of txn_do,
791 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
792 sub txn_do {
793   my $self = shift;
794   my $coderef = shift;
795
796   ref $coderef eq 'CODE' or $self->throw_exception
797     ('$coderef must be a CODE reference');
798
799   local $self->{_in_dbh_do} = 1;
800
801   my @result;
802   my $want = wantarray;
803
804   my $tried = 0;
805   while(1) {
806     my $exception;
807
808     # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
809     my $args = \@_;
810
811     try {
812       $self->txn_begin;
813       my $txn_start_depth = $self->transaction_depth;
814       if($want) {
815           @result = $coderef->(@$args);
816       }
817       elsif(defined $want) {
818           $result[0] = $coderef->(@$args);
819       }
820       else {
821           $coderef->(@$args);
822       }
823
824       my $delta_txn = $txn_start_depth - $self->transaction_depth;
825       if ($delta_txn == 0) {
826         $self->txn_commit;
827       }
828       elsif ($delta_txn != 1) {
829         # an off-by-one would mean we fired a rollback
830         carp "Unexpected reduction of transaction depth by $delta_txn after execution of $coderef";
831       }
832     } catch {
833       $exception = $_;
834     };
835
836     if(! defined $exception) { return wantarray ? @result : $result[0] }
837
838     if($self->transaction_depth > 1 || $tried++ || $self->connected) {
839       my $rollback_exception;
840       try { $self->txn_rollback } catch { $rollback_exception = shift };
841       if(defined $rollback_exception) {
842         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
843         $self->throw_exception($exception)  # propagate nested rollback
844           if $rollback_exception =~ /$exception_class/;
845
846         $self->throw_exception(
847           "Transaction aborted: ${exception}. "
848           . "Rollback failed: ${rollback_exception}"
849         );
850       }
851       $self->throw_exception($exception)
852     }
853
854     # We were not connected, and was first try - reconnect and retry
855     # via the while loop
856     carp "Retrying $coderef after catching disconnected exception: $exception"
857       if $ENV{DBIC_TXNRETRY_DEBUG};
858     $self->_populate_dbh;
859   }
860 }
861
862 =head2 disconnect
863
864 Our C<disconnect> method also performs a rollback first if the
865 database is not in C<AutoCommit> mode.
866
867 =cut
868
869 sub disconnect {
870   my ($self) = @_;
871
872   if( $self->_dbh ) {
873     my @actions;
874
875     push @actions, ( $self->on_disconnect_call || () );
876     push @actions, $self->_parse_connect_do ('on_disconnect_do');
877
878     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
879
880     $self->_dbh_rollback unless $self->_dbh_autocommit;
881
882     %{ $self->_dbh->{CachedKids} } = ();
883     $self->_dbh->disconnect;
884     $self->_dbh(undef);
885     $self->{_dbh_gen}++;
886   }
887 }
888
889 =head2 with_deferred_fk_checks
890
891 =over 4
892
893 =item Arguments: C<$coderef>
894
895 =item Return Value: The return value of $coderef
896
897 =back
898
899 Storage specific method to run the code ref with FK checks deferred or
900 in MySQL's case disabled entirely.
901
902 =cut
903
904 # Storage subclasses should override this
905 sub with_deferred_fk_checks {
906   my ($self, $sub) = @_;
907   $sub->();
908 }
909
910 =head2 connected
911
912 =over
913
914 =item Arguments: none
915
916 =item Return Value: 1|0
917
918 =back
919
920 Verifies that the current database handle is active and ready to execute
921 an SQL statement (e.g. the connection did not get stale, server is still
922 answering, etc.) This method is used internally by L</dbh>.
923
924 =cut
925
926 sub connected {
927   my $self = shift;
928   return 0 unless $self->_seems_connected;
929
930   #be on the safe side
931   local $self->_dbh->{RaiseError} = 1;
932
933   return $self->_ping;
934 }
935
936 sub _seems_connected {
937   my $self = shift;
938
939   $self->_verify_pid;
940
941   my $dbh = $self->_dbh
942     or return 0;
943
944   return $dbh->FETCH('Active');
945 }
946
947 sub _ping {
948   my $self = shift;
949
950   my $dbh = $self->_dbh or return 0;
951
952   return $dbh->ping;
953 }
954
955 sub ensure_connected {
956   my ($self) = @_;
957
958   unless ($self->connected) {
959     $self->_populate_dbh;
960   }
961 }
962
963 =head2 dbh
964
965 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
966 is guaranteed to be healthy by implicitly calling L</connected>, and if
967 necessary performing a reconnection before returning. Keep in mind that this
968 is very B<expensive> on some database engines. Consider using L</dbh_do>
969 instead.
970
971 =cut
972
973 sub dbh {
974   my ($self) = @_;
975
976   if (not $self->_dbh) {
977     $self->_populate_dbh;
978   } else {
979     $self->ensure_connected;
980   }
981   return $self->_dbh;
982 }
983
984 # this is the internal "get dbh or connect (don't check)" method
985 sub _get_dbh {
986   my $self = shift;
987   $self->_verify_pid;
988   $self->_populate_dbh unless $self->_dbh;
989   return $self->_dbh;
990 }
991
992 sub sql_maker {
993   my ($self) = @_;
994   unless ($self->_sql_maker) {
995     my $sql_maker_class = $self->sql_maker_class;
996     $self->ensure_class_loaded ($sql_maker_class);
997
998     my %opts = %{$self->_sql_maker_opts||{}};
999     my $dialect =
1000       $opts{limit_dialect}
1001         ||
1002       $self->sql_limit_dialect
1003         ||
1004       do {
1005         my $s_class = (ref $self) || $self;
1006         carp (
1007           "Your storage class ($s_class) does not set sql_limit_dialect and you "
1008         . 'have not supplied an explicit limit_dialect in your connection_info. '
1009         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
1010         . 'databases but can be (and often is) painfully slow. '
1011         . "Please file an RT ticket against '$s_class' ."
1012         );
1013
1014         'GenericSubQ';
1015       }
1016     ;
1017
1018     my ($quote_char, $name_sep);
1019
1020     if ($opts{quote_names}) {
1021       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
1022         my $s_class = (ref $self) || $self;
1023         carp (
1024           "You requested 'quote_names' but your storage class ($s_class) does "
1025         . 'not explicitly define a default sql_quote_char and you have not '
1026         . 'supplied a quote_char as part of your connection_info. DBIC will '
1027         .q{default to the ANSI SQL standard quote '"', which works most of }
1028         . "the time. Please file an RT ticket against '$s_class'."
1029         );
1030
1031         '"'; # RV
1032       };
1033
1034       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
1035     }
1036
1037     $self->_sql_maker($sql_maker_class->new(
1038       bindtype=>'columns',
1039       array_datatypes => 1,
1040       limit_dialect => $dialect,
1041       ($quote_char ? (quote_char => $quote_char) : ()),
1042       name_sep => ($name_sep || '.'),
1043       %opts,
1044     ));
1045   }
1046   return $self->_sql_maker;
1047 }
1048
1049 # nothing to do by default
1050 sub _rebless {}
1051 sub _init {}
1052
1053 sub _populate_dbh {
1054   my ($self) = @_;
1055
1056   my @info = @{$self->_dbi_connect_info || []};
1057   $self->_dbh(undef); # in case ->connected failed we might get sent here
1058   $self->_dbh_details({}); # reset everything we know
1059
1060   $self->_dbh($self->_connect(@info));
1061
1062   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1063
1064   $self->_determine_driver;
1065
1066   # Always set the transaction depth on connect, since
1067   #  there is no transaction in progress by definition
1068   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1069
1070   $self->_run_connection_actions unless $self->{_in_determine_driver};
1071 }
1072
1073 sub _run_connection_actions {
1074   my $self = shift;
1075   my @actions;
1076
1077   push @actions, ( $self->on_connect_call || () );
1078   push @actions, $self->_parse_connect_do ('on_connect_do');
1079
1080   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1081 }
1082
1083
1084
1085 sub set_use_dbms_capability {
1086   $_[0]->set_inherited ($_[1], $_[2]);
1087 }
1088
1089 sub get_use_dbms_capability {
1090   my ($self, $capname) = @_;
1091
1092   my $use = $self->get_inherited ($capname);
1093   return defined $use
1094     ? $use
1095     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1096   ;
1097 }
1098
1099 sub set_dbms_capability {
1100   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1101 }
1102
1103 sub get_dbms_capability {
1104   my ($self, $capname) = @_;
1105
1106   my $cap = $self->_dbh_details->{capability}{$capname};
1107
1108   unless (defined $cap) {
1109     if (my $meth = $self->can ("_determine$capname")) {
1110       $cap = $self->$meth ? 1 : 0;
1111     }
1112     else {
1113       $cap = 0;
1114     }
1115
1116     $self->set_dbms_capability ($capname, $cap);
1117   }
1118
1119   return $cap;
1120 }
1121
1122 sub _server_info {
1123   my $self = shift;
1124
1125   my $info;
1126   unless ($info = $self->_dbh_details->{info}) {
1127
1128     $info = {};
1129
1130     my $server_version = try { $self->_get_server_version };
1131
1132     if (defined $server_version) {
1133       $info->{dbms_version} = $server_version;
1134
1135       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1136       my @verparts = split (/\./, $numeric_version);
1137       if (
1138         @verparts
1139           &&
1140         $verparts[0] <= 999
1141       ) {
1142         # consider only up to 3 version parts, iff not more than 3 digits
1143         my @use_parts;
1144         while (@verparts && @use_parts < 3) {
1145           my $p = shift @verparts;
1146           last if $p > 999;
1147           push @use_parts, $p;
1148         }
1149         push @use_parts, 0 while @use_parts < 3;
1150
1151         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1152       }
1153     }
1154
1155     $self->_dbh_details->{info} = $info;
1156   }
1157
1158   return $info;
1159 }
1160
1161 sub _get_server_version {
1162   shift->_dbh_get_info(18);
1163 }
1164
1165 sub _dbh_get_info {
1166   my ($self, $info) = @_;
1167
1168   return try { $self->_get_dbh->get_info($info) } || undef;
1169 }
1170
1171 sub _determine_driver {
1172   my ($self) = @_;
1173
1174   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1175     my $started_connected = 0;
1176     local $self->{_in_determine_driver} = 1;
1177
1178     if (ref($self) eq __PACKAGE__) {
1179       my $driver;
1180       if ($self->_dbh) { # we are connected
1181         $driver = $self->_dbh->{Driver}{Name};
1182         $started_connected = 1;
1183       } else {
1184         # if connect_info is a CODEREF, we have no choice but to connect
1185         if (ref $self->_dbi_connect_info->[0] &&
1186             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1187           $self->_populate_dbh;
1188           $driver = $self->_dbh->{Driver}{Name};
1189         }
1190         else {
1191           # try to use dsn to not require being connected, the driver may still
1192           # force a connection in _rebless to determine version
1193           # (dsn may not be supplied at all if all we do is make a mock-schema)
1194           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1195           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1196           $driver ||= $ENV{DBI_DRIVER};
1197         }
1198       }
1199
1200       if ($driver) {
1201         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1202         if ($self->load_optional_class($storage_class)) {
1203           mro::set_mro($storage_class, 'c3');
1204           bless $self, $storage_class;
1205           $self->_rebless();
1206         }
1207       }
1208     }
1209
1210     $self->_driver_determined(1);
1211
1212     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1213
1214     $self->_init; # run driver-specific initializations
1215
1216     $self->_run_connection_actions
1217         if !$started_connected && defined $self->_dbh;
1218   }
1219 }
1220
1221 sub _do_connection_actions {
1222   my $self          = shift;
1223   my $method_prefix = shift;
1224   my $call          = shift;
1225
1226   if (not ref($call)) {
1227     my $method = $method_prefix . $call;
1228     $self->$method(@_);
1229   } elsif (ref($call) eq 'CODE') {
1230     $self->$call(@_);
1231   } elsif (ref($call) eq 'ARRAY') {
1232     if (ref($call->[0]) ne 'ARRAY') {
1233       $self->_do_connection_actions($method_prefix, $_) for @$call;
1234     } else {
1235       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1236     }
1237   } else {
1238     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1239   }
1240
1241   return $self;
1242 }
1243
1244 sub connect_call_do_sql {
1245   my $self = shift;
1246   $self->_do_query(@_);
1247 }
1248
1249 sub disconnect_call_do_sql {
1250   my $self = shift;
1251   $self->_do_query(@_);
1252 }
1253
1254 # override in db-specific backend when necessary
1255 sub connect_call_datetime_setup { 1 }
1256
1257 sub _do_query {
1258   my ($self, $action) = @_;
1259
1260   if (ref $action eq 'CODE') {
1261     $action = $action->($self);
1262     $self->_do_query($_) foreach @$action;
1263   }
1264   else {
1265     # Most debuggers expect ($sql, @bind), so we need to exclude
1266     # the attribute hash which is the second argument to $dbh->do
1267     # furthermore the bind values are usually to be presented
1268     # as named arrayref pairs, so wrap those here too
1269     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1270     my $sql = shift @do_args;
1271     my $attrs = shift @do_args;
1272     my @bind = map { [ undef, $_ ] } @do_args;
1273
1274     $self->_query_start($sql, @bind);
1275     $self->_get_dbh->do($sql, $attrs, @do_args);
1276     $self->_query_end($sql, @bind);
1277   }
1278
1279   return $self;
1280 }
1281
1282 sub _connect {
1283   my ($self, @info) = @_;
1284
1285   $self->throw_exception("You failed to provide any connection info")
1286     if !@info;
1287
1288   my ($old_connect_via, $dbh);
1289
1290   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1291     $old_connect_via = $DBI::connect_via;
1292     $DBI::connect_via = 'connect';
1293   }
1294
1295   try {
1296     if(ref $info[0] eq 'CODE') {
1297        $dbh = $info[0]->();
1298     }
1299     else {
1300        $dbh = DBI->connect(@info);
1301     }
1302
1303     if (!$dbh) {
1304       die $DBI::errstr;
1305     }
1306
1307     unless ($self->unsafe) {
1308
1309       $self->throw_exception(
1310         'Refusing clobbering of {HandleError} installed on externally supplied '
1311        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1312       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1313
1314       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1315       # request, or an external handle. Complain and set anyway
1316       unless ($dbh->{RaiseError}) {
1317         carp( ref $info[0] eq 'CODE'
1318
1319           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1320            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1321            .'attribute has been supplied'
1322
1323           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1324            .'unsafe => 1. Toggling RaiseError back to true'
1325         );
1326
1327         $dbh->{RaiseError} = 1;
1328       }
1329
1330       # this odd anonymous coderef dereference is in fact really
1331       # necessary to avoid the unwanted effect described in perl5
1332       # RT#75792
1333       sub {
1334         my $weak_self = $_[0];
1335         weaken $weak_self;
1336
1337         # the coderef is blessed so we can distinguish it from externally
1338         # supplied handles (which must be preserved)
1339         $_[1]->{HandleError} = bless sub {
1340           if ($weak_self) {
1341             $weak_self->throw_exception("DBI Exception: $_[0]");
1342           }
1343           else {
1344             # the handler may be invoked by something totally out of
1345             # the scope of DBIC
1346             croak ("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1347           }
1348         }, '__DBIC__DBH__ERROR__HANDLER__';
1349       }->($self, $dbh);
1350     }
1351   }
1352   catch {
1353     $self->throw_exception("DBI Connection failed: $_")
1354   }
1355   finally {
1356     $DBI::connect_via = $old_connect_via if $old_connect_via;
1357   };
1358
1359   $self->_dbh_autocommit($dbh->{AutoCommit});
1360   $dbh;
1361 }
1362
1363 sub svp_begin {
1364   my ($self, $name) = @_;
1365
1366   $name = $self->_svp_generate_name
1367     unless defined $name;
1368
1369   $self->throw_exception ("You can't use savepoints outside a transaction")
1370     if $self->{transaction_depth} == 0;
1371
1372   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1373     unless $self->can('_svp_begin');
1374
1375   push @{ $self->{savepoints} }, $name;
1376
1377   $self->debugobj->svp_begin($name) if $self->debug;
1378
1379   return $self->_svp_begin($name);
1380 }
1381
1382 sub svp_release {
1383   my ($self, $name) = @_;
1384
1385   $self->throw_exception ("You can't use savepoints outside a transaction")
1386     if $self->{transaction_depth} == 0;
1387
1388   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1389     unless $self->can('_svp_release');
1390
1391   if (defined $name) {
1392     $self->throw_exception ("Savepoint '$name' does not exist")
1393       unless grep { $_ eq $name } @{ $self->{savepoints} };
1394
1395     # Dig through the stack until we find the one we are releasing.  This keeps
1396     # the stack up to date.
1397     my $svp;
1398
1399     do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1400   } else {
1401     $name = pop @{ $self->{savepoints} };
1402   }
1403
1404   $self->debugobj->svp_release($name) if $self->debug;
1405
1406   return $self->_svp_release($name);
1407 }
1408
1409 sub svp_rollback {
1410   my ($self, $name) = @_;
1411
1412   $self->throw_exception ("You can't use savepoints outside a transaction")
1413     if $self->{transaction_depth} == 0;
1414
1415   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1416     unless $self->can('_svp_rollback');
1417
1418   if (defined $name) {
1419       # If they passed us a name, verify that it exists in the stack
1420       unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1421           $self->throw_exception("Savepoint '$name' does not exist!");
1422       }
1423
1424       # Dig through the stack until we find the one we are releasing.  This keeps
1425       # the stack up to date.
1426       while(my $s = pop(@{ $self->{savepoints} })) {
1427           last if($s eq $name);
1428       }
1429       # Add the savepoint back to the stack, as a rollback doesn't remove the
1430       # named savepoint, only everything after it.
1431       push(@{ $self->{savepoints} }, $name);
1432   } else {
1433       # We'll assume they want to rollback to the last savepoint
1434       $name = $self->{savepoints}->[-1];
1435   }
1436
1437   $self->debugobj->svp_rollback($name) if $self->debug;
1438
1439   return $self->_svp_rollback($name);
1440 }
1441
1442 sub _svp_generate_name {
1443   my ($self) = @_;
1444   return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
1445 }
1446
1447 sub txn_begin {
1448   my $self = shift;
1449
1450   # this means we have not yet connected and do not know the AC status
1451   # (e.g. coderef $dbh)
1452   if (! defined $self->_dbh_autocommit) {
1453     $self->ensure_connected;
1454   }
1455   # otherwise re-connect on pid changes, so
1456   # that the txn_depth is adjusted properly
1457   # the lightweight _get_dbh is good enoug here
1458   # (only superficial handle check, no pings)
1459   else {
1460     $self->_get_dbh;
1461   }
1462
1463   if($self->transaction_depth == 0) {
1464     $self->debugobj->txn_begin()
1465       if $self->debug;
1466     $self->_dbh_begin_work;
1467   }
1468   elsif ($self->auto_savepoint) {
1469     $self->svp_begin;
1470   }
1471   $self->{transaction_depth}++;
1472 }
1473
1474 sub _dbh_begin_work {
1475   my $self = shift;
1476
1477   # if the user is utilizing txn_do - good for him, otherwise we need to
1478   # ensure that the $dbh is healthy on BEGIN.
1479   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1480   # will be replaced by a failure of begin_work itself (which will be
1481   # then retried on reconnect)
1482   if ($self->{_in_dbh_do}) {
1483     $self->_dbh->begin_work;
1484   } else {
1485     $self->dbh_do(sub { $_[1]->begin_work });
1486   }
1487 }
1488
1489 sub txn_commit {
1490   my $self = shift;
1491   if (! $self->_dbh) {
1492     $self->throw_exception('cannot COMMIT on a disconnected handle');
1493   }
1494   elsif ($self->{transaction_depth} == 1) {
1495     $self->debugobj->txn_commit()
1496       if ($self->debug);
1497     $self->_dbh_commit;
1498     $self->{transaction_depth} = 0
1499       if $self->_dbh_autocommit;
1500   }
1501   elsif($self->{transaction_depth} > 1) {
1502     $self->{transaction_depth}--;
1503     $self->svp_release
1504       if $self->auto_savepoint;
1505   }
1506   elsif (! $self->_dbh->FETCH('AutoCommit') ) {
1507
1508     carp "Storage transaction_depth $self->{transaction_depth} does not match "
1509         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1510
1511     $self->debugobj->txn_commit()
1512       if ($self->debug);
1513     $self->_dbh_commit;
1514     $self->{transaction_depth} = 0
1515       if $self->_dbh_autocommit;
1516   }
1517   else {
1518     $self->throw_exception( 'Refusing to commit without a started transaction' );
1519   }
1520 }
1521
1522 sub _dbh_commit {
1523   my $self = shift;
1524   my $dbh  = $self->_dbh
1525     or $self->throw_exception('cannot COMMIT on a disconnected handle');
1526   $dbh->commit;
1527 }
1528
1529 sub txn_rollback {
1530   my $self = shift;
1531   my $dbh = $self->_dbh;
1532   try {
1533     if ($self->{transaction_depth} == 1) {
1534       $self->debugobj->txn_rollback()
1535         if ($self->debug);
1536       $self->{transaction_depth} = 0
1537         if $self->_dbh_autocommit;
1538       $self->_dbh_rollback;
1539     }
1540     elsif($self->{transaction_depth} > 1) {
1541       $self->{transaction_depth}--;
1542       if ($self->auto_savepoint) {
1543         $self->svp_rollback;
1544         $self->svp_release;
1545       }
1546     }
1547     else {
1548       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
1549     }
1550   }
1551   catch {
1552     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1553
1554     if ($_ !~ /$exception_class/) {
1555       # ensure that a failed rollback resets the transaction depth
1556       $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1557     }
1558
1559     $self->throw_exception($_)
1560   };
1561 }
1562
1563 sub _dbh_rollback {
1564   my $self = shift;
1565   my $dbh  = $self->_dbh
1566     or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
1567   $dbh->rollback;
1568 }
1569
1570 # This used to be the top-half of _execute.  It was split out to make it
1571 #  easier to override in NoBindVars without duping the rest.  It takes up
1572 #  all of _execute's args, and emits $sql, @bind.
1573 sub _prep_for_execute {
1574   my ($self, $op, $extra_bind, $ident, $args) = @_;
1575
1576   if( blessed $ident && $ident->isa("DBIx::Class::ResultSource") ) {
1577     $ident = $ident->from();
1578   }
1579
1580   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
1581
1582   unshift(@bind,
1583     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1584       if $extra_bind;
1585   return ($sql, \@bind);
1586 }
1587
1588
1589 sub _fix_bind_params {
1590     my ($self, @bind) = @_;
1591
1592     ### Turn @bind from something like this:
1593     ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1594     ### to this:
1595     ###   ( "'1'", "'1'", "'3'" )
1596     return
1597         map {
1598             if ( defined( $_ && $_->[1] ) ) {
1599                 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1600             }
1601             else { q{NULL}; }
1602         } @bind;
1603 }
1604
1605 sub _query_start {
1606     my ( $self, $sql, @bind ) = @_;
1607
1608     if ( $self->debug ) {
1609         @bind = $self->_fix_bind_params(@bind);
1610
1611         $self->debugobj->query_start( $sql, @bind );
1612     }
1613 }
1614
1615 sub _query_end {
1616     my ( $self, $sql, @bind ) = @_;
1617
1618     if ( $self->debug ) {
1619         @bind = $self->_fix_bind_params(@bind);
1620         $self->debugobj->query_end( $sql, @bind );
1621     }
1622 }
1623
1624 sub _dbh_execute {
1625   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
1626
1627   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
1628
1629   $self->_query_start( $sql, @$bind );
1630
1631   my $sth = $self->sth($sql,$op);
1632
1633   my $placeholder_index = 1;
1634
1635   foreach my $bound (@$bind) {
1636     my $attributes = {};
1637     my($column_name, @data) = @$bound;
1638
1639     if ($bind_attributes) {
1640       $attributes = $bind_attributes->{$column_name}
1641       if defined $bind_attributes->{$column_name};
1642     }
1643
1644     foreach my $data (@data) {
1645       my $ref = ref $data;
1646
1647       if ($ref and overload::Method($data, '""') ) {
1648         $data = "$data";
1649       }
1650       elsif ($ref eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1651         $sth->bind_param_inout(
1652           $placeholder_index++,
1653           $data,
1654           $self->_max_column_bytesize($ident, $column_name),
1655           $attributes
1656         );
1657         next;
1658       }
1659
1660       $sth->bind_param($placeholder_index++, $data, $attributes);
1661     }
1662   }
1663
1664   # Can this fail without throwing an exception anyways???
1665   my $rv = $sth->execute();
1666   $self->throw_exception(
1667     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1668   ) if !$rv;
1669
1670   $self->_query_end( $sql, @$bind );
1671
1672   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1673 }
1674
1675 sub _execute {
1676     my $self = shift;
1677     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
1678 }
1679
1680 sub _prefetch_autovalues {
1681   my ($self, $source, $to_insert) = @_;
1682
1683   my $colinfo = $source->columns_info;
1684
1685   my %values;
1686   for my $col (keys %$colinfo) {
1687     if (
1688       $colinfo->{$col}{auto_nextval}
1689         and
1690       (
1691         ! exists $to_insert->{$col}
1692           or
1693         ref $to_insert->{$col} eq 'SCALAR'
1694       )
1695     ) {
1696       $values{$col} = $self->_sequence_fetch(
1697         'NEXTVAL',
1698         ( $colinfo->{$col}{sequence} ||=
1699             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1700         ),
1701       );
1702     }
1703   }
1704
1705   \%values;
1706 }
1707
1708 sub insert {
1709   my ($self, $source, $to_insert) = @_;
1710
1711   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1712
1713   # fuse the values
1714   $to_insert = { %$to_insert, %$prefetched_values };
1715
1716   # list of primary keys we try to fetch from the database
1717   # both not-exsists and scalarrefs are considered
1718   my %fetch_pks;
1719   for ($source->primary_columns) {
1720     $fetch_pks{$_} = scalar keys %fetch_pks  # so we can preserve order for prettyness
1721       if ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR';
1722   }
1723
1724   my ($sqla_opts, @ir_container);
1725   if ($self->_use_insert_returning) {
1726
1727     # retain order as declared in the resultsource
1728     for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
1729       push @{$sqla_opts->{returning}}, $_;
1730       $sqla_opts->{returning_container} = \@ir_container
1731         if $self->_use_insert_returning_bound;
1732     }
1733   }
1734
1735   my $bind_attributes = $self->source_bind_attributes($source);
1736
1737   my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $sqla_opts);
1738
1739   my %returned_cols;
1740
1741   if (my $retlist = $sqla_opts->{returning}) {
1742     @ir_container = try {
1743       local $SIG{__WARN__} = sub {};
1744       my @r = $sth->fetchrow_array;
1745       $sth->finish;
1746       @r;
1747     } unless @ir_container;
1748
1749     @returned_cols{@$retlist} = @ir_container if @ir_container;
1750   }
1751
1752   return { %$prefetched_values, %returned_cols };
1753 }
1754
1755
1756 ## Currently it is assumed that all values passed will be "normal", i.e. not
1757 ## scalar refs, or at least, all the same type as the first set, the statement is
1758 ## only prepped once.
1759 sub insert_bulk {
1760   my ($self, $source, $cols, $data) = @_;
1761
1762   my %colvalues;
1763   @colvalues{@$cols} = (0..$#$cols);
1764
1765   for my $i (0..$#$cols) {
1766     my $first_val = $data->[0][$i];
1767     next unless ref $first_val eq 'SCALAR';
1768
1769     $colvalues{ $cols->[$i] } = $first_val;
1770   }
1771
1772   # check for bad data and stringify stringifiable objects
1773   my $bad_slice = sub {
1774     my ($msg, $col_idx, $slice_idx) = @_;
1775     $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1776       $msg,
1777       $cols->[$col_idx],
1778       do {
1779         require Data::Dumper::Concise;
1780         local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
1781         Data::Dumper::Concise::Dumper ({
1782           map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
1783         }),
1784       }
1785     );
1786   };
1787
1788   for my $datum_idx (0..$#$data) {
1789     my $datum = $data->[$datum_idx];
1790
1791     for my $col_idx (0..$#$cols) {
1792       my $val            = $datum->[$col_idx];
1793       my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
1794       my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
1795
1796       if ($is_literal_sql) {
1797         if (not ref $val) {
1798           $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
1799         }
1800         elsif ((my $reftype = ref $val) ne 'SCALAR') {
1801           $bad_slice->("$reftype reference found where literal SQL expected",
1802             $col_idx, $datum_idx);
1803         }
1804         elsif ($$val ne $$sqla_bind){
1805           $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
1806             $col_idx, $datum_idx);
1807         }
1808       }
1809       elsif (my $reftype = ref $val) {
1810         require overload;
1811         if (overload::Method($val, '""')) {
1812           $datum->[$col_idx] = "".$val;
1813         }
1814         else {
1815           $bad_slice->("$reftype reference found where bind expected",
1816             $col_idx, $datum_idx);
1817         }
1818       }
1819     }
1820   }
1821
1822   my ($sql, $bind) = $self->_prep_for_execute (
1823     'insert', undef, $source, [\%colvalues]
1824   );
1825
1826   if (! @$bind) {
1827     # if the bindlist is empty - make sure all "values" are in fact
1828     # literal scalarrefs. If not the case this means the storage ate
1829     # them away (e.g. the NoBindVars component) and interpolated them
1830     # directly into the SQL. This obviosly can't be good for multi-inserts
1831
1832     $self->throw_exception('Cannot insert_bulk without support for placeholders')
1833       if first { ref $_ ne 'SCALAR' } values %colvalues;
1834   }
1835
1836   # neither _execute_array, nor _execute_inserts_with_no_binds are
1837   # atomic (even if _execute _array is a single call). Thus a safety
1838   # scope guard
1839   my $guard = $self->txn_scope_guard;
1840
1841   $self->_query_start( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1842   my $sth = $self->sth($sql);
1843   my $rv = do {
1844     if (@$bind) {
1845       #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1846       $self->_execute_array( $source, $sth, $bind, $cols, $data );
1847     }
1848     else {
1849       # bind_param_array doesn't work if there are no binds
1850       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1851     }
1852   };
1853
1854   $self->_query_end( $sql, @$bind ? [ dummy => '__BULK_INSERT__' ] : () );
1855
1856   $guard->commit;
1857
1858   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1859 }
1860
1861 sub _execute_array {
1862   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
1863
1864   ## This must be an arrayref, else nothing works!
1865   my $tuple_status = [];
1866
1867   ## Get the bind_attributes, if any exist
1868   my $bind_attributes = $self->source_bind_attributes($source);
1869
1870   ## Bind the values and execute
1871   my $placeholder_index = 1;
1872
1873   foreach my $bound (@$bind) {
1874
1875     my $attributes = {};
1876     my ($column_name, $data_index) = @$bound;
1877
1878     if( $bind_attributes ) {
1879       $attributes = $bind_attributes->{$column_name}
1880       if defined $bind_attributes->{$column_name};
1881     }
1882
1883     my @data = map { $_->[$data_index] } @$data;
1884
1885     $sth->bind_param_array(
1886       $placeholder_index,
1887       [@data],
1888       (%$attributes ?  $attributes : ()),
1889     );
1890     $placeholder_index++;
1891   }
1892
1893   my ($rv, $err);
1894   try {
1895     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1896   }
1897   catch {
1898     $err = shift;
1899   };
1900
1901   # Not all DBDs are create equal. Some throw on error, some return
1902   # an undef $rv, and some set $sth->err - try whatever we can
1903   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
1904     ! defined $err
1905       and
1906     ( !defined $rv or $sth->err )
1907   );
1908
1909   # Statement must finish even if there was an exception.
1910   try {
1911     $sth->finish
1912   }
1913   catch {
1914     $err = shift unless defined $err
1915   };
1916
1917   if (defined $err) {
1918     my $i = 0;
1919     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1920
1921     $self->throw_exception("Unexpected populate error: $err")
1922       if ($i > $#$tuple_status);
1923
1924     require Data::Dumper::Concise;
1925     $self->throw_exception(sprintf "%s for populate slice:\n%s",
1926       ($tuple_status->[$i][1] || $err),
1927       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
1928     );
1929   }
1930
1931   return $rv;
1932 }
1933
1934 sub _dbh_execute_array {
1935     my ($self, $sth, $tuple_status, @extra) = @_;
1936
1937     return $sth->execute_array({ArrayTupleStatus => $tuple_status});
1938 }
1939
1940 sub _dbh_execute_inserts_with_no_binds {
1941   my ($self, $sth, $count) = @_;
1942
1943   my $err;
1944   try {
1945     my $dbh = $self->_get_dbh;
1946     local $dbh->{RaiseError} = 1;
1947     local $dbh->{PrintError} = 0;
1948
1949     $sth->execute foreach 1..$count;
1950   }
1951   catch {
1952     $err = shift;
1953   };
1954
1955   # Make sure statement is finished even if there was an exception.
1956   try {
1957     $sth->finish
1958   }
1959   catch {
1960     $err = shift unless defined $err;
1961   };
1962
1963   $self->throw_exception($err) if defined $err;
1964
1965   return $count;
1966 }
1967
1968 sub update {
1969   my ($self, $source, @args) = @_;
1970
1971   my $bind_attrs = $self->source_bind_attributes($source);
1972
1973   return $self->_execute('update' => [], $source, $bind_attrs, @args);
1974 }
1975
1976
1977 sub delete {
1978   my ($self, $source, @args) = @_;
1979
1980   my $bind_attrs = $self->source_bind_attributes($source);
1981
1982   return $self->_execute('delete' => [], $source, $bind_attrs, @args);
1983 }
1984
1985 # We were sent here because the $rs contains a complex search
1986 # which will require a subquery to select the correct rows
1987 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1988 #
1989 # Generating a single PK column subquery is trivial and supported
1990 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1991 # Look at _multipk_update_delete()
1992 sub _subq_update_delete {
1993   my $self = shift;
1994   my ($rs, $op, $values) = @_;
1995
1996   my $rsrc = $rs->result_source;
1997
1998   # quick check if we got a sane rs on our hands
1999   my @pcols = $rsrc->_pri_cols;
2000
2001   my $sel = $rs->_resolved_attrs->{select};
2002   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
2003
2004   if (
2005       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
2006         ne
2007       join ("\x00", sort @$sel )
2008   ) {
2009     $self->throw_exception (
2010       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
2011     );
2012   }
2013
2014   if (@pcols == 1) {
2015     return $self->$op (
2016       $rsrc,
2017       $op eq 'update' ? $values : (),
2018       { $pcols[0] => { -in => $rs->as_query } },
2019     );
2020   }
2021
2022   else {
2023     return $self->_multipk_update_delete (@_);
2024   }
2025 }
2026
2027 # ANSI SQL does not provide a reliable way to perform a multicol-PK
2028 # resultset update/delete involving subqueries. So by default resort
2029 # to simple (and inefficient) delete_all style per-row opearations,
2030 # while allowing specific storages to override this with a faster
2031 # implementation.
2032 #
2033 sub _multipk_update_delete {
2034   return shift->_per_row_update_delete (@_);
2035 }
2036
2037 # This is the default loop used to delete/update rows for multi PK
2038 # resultsets, and used by mysql exclusively (because it can't do anything
2039 # else).
2040 #
2041 # We do not use $row->$op style queries, because resultset update/delete
2042 # is not expected to cascade (this is what delete_all/update_all is for).
2043 #
2044 # There should be no race conditions as the entire operation is rolled
2045 # in a transaction.
2046 #
2047 sub _per_row_update_delete {
2048   my $self = shift;
2049   my ($rs, $op, $values) = @_;
2050
2051   my $rsrc = $rs->result_source;
2052   my @pcols = $rsrc->_pri_cols;
2053
2054   my $guard = $self->txn_scope_guard;
2055
2056   # emulate the return value of $sth->execute for non-selects
2057   my $row_cnt = '0E0';
2058
2059   my $subrs_cur = $rs->cursor;
2060   my @all_pk = $subrs_cur->all;
2061   for my $pks ( @all_pk) {
2062
2063     my $cond;
2064     for my $i (0.. $#pcols) {
2065       $cond->{$pcols[$i]} = $pks->[$i];
2066     }
2067
2068     $self->$op (
2069       $rsrc,
2070       $op eq 'update' ? $values : (),
2071       $cond,
2072     );
2073
2074     $row_cnt++;
2075   }
2076
2077   $guard->commit;
2078
2079   return $row_cnt;
2080 }
2081
2082 sub _select {
2083   my $self = shift;
2084   $self->_execute($self->_select_args(@_));
2085 }
2086
2087 sub _select_args_to_query {
2088   my $self = shift;
2089
2090   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
2091   #  = $self->_select_args($ident, $select, $cond, $attrs);
2092   my ($op, $bind, $ident, $bind_attrs, @args) =
2093     $self->_select_args(@_);
2094
2095   # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2096   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
2097   $prepared_bind ||= [];
2098
2099   return wantarray
2100     ? ($sql, $prepared_bind, $bind_attrs)
2101     : \[ "($sql)", @$prepared_bind ]
2102   ;
2103 }
2104
2105 sub _select_args {
2106   my ($self, $ident, $select, $where, $attrs) = @_;
2107
2108   my $sql_maker = $self->sql_maker;
2109   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2110
2111   $attrs = {
2112     %$attrs,
2113     select => $select,
2114     from => $ident,
2115     where => $where,
2116     $rs_alias && $alias2source->{$rs_alias}
2117       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2118       : ()
2119     ,
2120   };
2121
2122   # calculate bind_attrs before possible $ident mangling
2123   my $bind_attrs = {};
2124   for my $alias (keys %$alias2source) {
2125     my $bindtypes = $self->source_bind_attributes ($alias2source->{$alias}) || {};
2126     for my $col (keys %$bindtypes) {
2127
2128       my $fqcn = join ('.', $alias, $col);
2129       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
2130
2131       # Unqialified column names are nice, but at the same time can be
2132       # rather ambiguous. What we do here is basically go along with
2133       # the loop, adding an unqualified column slot to $bind_attrs,
2134       # alongside the fully qualified name. As soon as we encounter
2135       # another column by that name (which would imply another table)
2136       # we unset the unqualified slot and never add any info to it
2137       # to avoid erroneous type binding. If this happens the users
2138       # only choice will be to fully qualify his column name
2139
2140       if (exists $bind_attrs->{$col}) {
2141         $bind_attrs->{$col} = {};
2142       }
2143       else {
2144         $bind_attrs->{$col} = $bind_attrs->{$fqcn};
2145       }
2146     }
2147   }
2148
2149   # Sanity check the attributes (SQLMaker does it too, but
2150   # in case of a software_limit we'll never reach there)
2151   if (defined $attrs->{offset}) {
2152     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2153       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2154   }
2155   $attrs->{offset} ||= 0;
2156
2157   if (defined $attrs->{rows}) {
2158     $self->throw_exception("The rows attribute must be a positive integer if present")
2159       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2160   }
2161   elsif ($attrs->{offset}) {
2162     # MySQL actually recommends this approach.  I cringe.
2163     $attrs->{rows} = $sql_maker->__max_int;
2164   }
2165
2166   my @limit;
2167
2168   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2169   # storage, unless software limit was requested
2170   if (
2171     #limited has_many
2172     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2173        ||
2174     # grouped prefetch (to satisfy group_by == select)
2175     ( $attrs->{group_by}
2176         &&
2177       @{$attrs->{group_by}}
2178         &&
2179       $attrs->{_prefetch_selector_range}
2180     )
2181   ) {
2182     ($ident, $select, $where, $attrs)
2183       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2184   }
2185   elsif (! $attrs->{software_limit} ) {
2186     push @limit, $attrs->{rows}, $attrs->{offset};
2187   }
2188
2189   # try to simplify the joinmap further (prune unreferenced type-single joins)
2190   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2191
2192 ###
2193   # This would be the point to deflate anything found in $where
2194   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2195   # expect a row object. And all we have is a resultsource (it is trivial
2196   # to extract deflator coderefs via $alias2source above).
2197   #
2198   # I don't see a way forward other than changing the way deflators are
2199   # invoked, and that's just bad...
2200 ###
2201
2202   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
2203 }
2204
2205 # Returns a counting SELECT for a simple count
2206 # query. Abstracted so that a storage could override
2207 # this to { count => 'firstcol' } or whatever makes
2208 # sense as a performance optimization
2209 sub _count_select {
2210   #my ($self, $source, $rs_attrs) = @_;
2211   return { count => '*' };
2212 }
2213
2214
2215 sub source_bind_attributes {
2216   my ($self, $source) = @_;
2217
2218   my $bind_attributes;
2219
2220   my $colinfo = $source->columns_info;
2221
2222   for my $col (keys %$colinfo) {
2223     if (my $dt = $colinfo->{$col}{data_type} ) {
2224       $bind_attributes->{$col} = $self->bind_attribute_by_data_type($dt)
2225     }
2226   }
2227
2228   return $bind_attributes;
2229 }
2230
2231 =head2 select
2232
2233 =over 4
2234
2235 =item Arguments: $ident, $select, $condition, $attrs
2236
2237 =back
2238
2239 Handle a SQL select statement.
2240
2241 =cut
2242
2243 sub select {
2244   my $self = shift;
2245   my ($ident, $select, $condition, $attrs) = @_;
2246   return $self->cursor_class->new($self, \@_, $attrs);
2247 }
2248
2249 sub select_single {
2250   my $self = shift;
2251   my ($rv, $sth, @bind) = $self->_select(@_);
2252   my @row = $sth->fetchrow_array;
2253   my @nextrow = $sth->fetchrow_array if @row;
2254   if(@row && @nextrow) {
2255     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2256   }
2257   # Need to call finish() to work round broken DBDs
2258   $sth->finish();
2259   return @row;
2260 }
2261
2262 =head2 sql_limit_dialect
2263
2264 This is an accessor for the default SQL limit dialect used by a particular
2265 storage driver. Can be overriden by supplying an explicit L</limit_dialect>
2266 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2267 see L<DBIx::Class::SQLMaker::LimitDialects>.
2268
2269 =head2 sth
2270
2271 =over 4
2272
2273 =item Arguments: $sql
2274
2275 =back
2276
2277 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2278
2279 =cut
2280
2281 sub _dbh_sth {
2282   my ($self, $dbh, $sql) = @_;
2283
2284   # 3 is the if_active parameter which avoids active sth re-use
2285   my $sth = $self->disable_sth_caching
2286     ? $dbh->prepare($sql)
2287     : $dbh->prepare_cached($sql, {}, 3);
2288
2289   # XXX You would think RaiseError would make this impossible,
2290   #  but apparently that's not true :(
2291   $self->throw_exception(
2292     $dbh->errstr
2293       ||
2294     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2295             .'an exception and/or setting $dbh->errstr',
2296       length ($sql) > 20
2297         ? substr($sql, 0, 20) . '...'
2298         : $sql
2299       ,
2300       'DBD::' . $dbh->{Driver}{Name},
2301     )
2302   ) if !$sth;
2303
2304   $sth;
2305 }
2306
2307 sub sth {
2308   my ($self, $sql) = @_;
2309   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2310 }
2311
2312 sub _dbh_columns_info_for {
2313   my ($self, $dbh, $table) = @_;
2314
2315   if ($dbh->can('column_info')) {
2316     my %result;
2317     my $caught;
2318     try {
2319       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2320       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2321       $sth->execute();
2322       while ( my $info = $sth->fetchrow_hashref() ){
2323         my %column_info;
2324         $column_info{data_type}   = $info->{TYPE_NAME};
2325         $column_info{size}      = $info->{COLUMN_SIZE};
2326         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2327         $column_info{default_value} = $info->{COLUMN_DEF};
2328         my $col_name = $info->{COLUMN_NAME};
2329         $col_name =~ s/^\"(.*)\"$/$1/;
2330
2331         $result{$col_name} = \%column_info;
2332       }
2333     } catch {
2334       $caught = 1;
2335     };
2336     return \%result if !$caught && scalar keys %result;
2337   }
2338
2339   my %result;
2340   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2341   $sth->execute;
2342   my @columns = @{$sth->{NAME_lc}};
2343   for my $i ( 0 .. $#columns ){
2344     my %column_info;
2345     $column_info{data_type} = $sth->{TYPE}->[$i];
2346     $column_info{size} = $sth->{PRECISION}->[$i];
2347     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2348
2349     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2350       $column_info{data_type} = $1;
2351       $column_info{size}    = $2;
2352     }
2353
2354     $result{$columns[$i]} = \%column_info;
2355   }
2356   $sth->finish;
2357
2358   foreach my $col (keys %result) {
2359     my $colinfo = $result{$col};
2360     my $type_num = $colinfo->{data_type};
2361     my $type_name;
2362     if(defined $type_num && $dbh->can('type_info')) {
2363       my $type_info = $dbh->type_info($type_num);
2364       $type_name = $type_info->{TYPE_NAME} if $type_info;
2365       $colinfo->{data_type} = $type_name if $type_name;
2366     }
2367   }
2368
2369   return \%result;
2370 }
2371
2372 sub columns_info_for {
2373   my ($self, $table) = @_;
2374   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2375 }
2376
2377 =head2 last_insert_id
2378
2379 Return the row id of the last insert.
2380
2381 =cut
2382
2383 sub _dbh_last_insert_id {
2384     my ($self, $dbh, $source, $col) = @_;
2385
2386     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2387
2388     return $id if defined $id;
2389
2390     my $class = ref $self;
2391     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2392 }
2393
2394 sub last_insert_id {
2395   my $self = shift;
2396   $self->_dbh_last_insert_id ($self->_dbh, @_);
2397 }
2398
2399 =head2 _native_data_type
2400
2401 =over 4
2402
2403 =item Arguments: $type_name
2404
2405 =back
2406
2407 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2408 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2409 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2410
2411 The default implementation returns C<undef>, implement in your Storage driver if
2412 you need this functionality.
2413
2414 Should map types from other databases to the native RDBMS type, for example
2415 C<VARCHAR2> to C<VARCHAR>.
2416
2417 Types with modifiers should map to the underlying data type. For example,
2418 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2419
2420 Composite types should map to the container type, for example
2421 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2422
2423 =cut
2424
2425 sub _native_data_type {
2426   #my ($self, $data_type) = @_;
2427   return undef
2428 }
2429
2430 # Check if placeholders are supported at all
2431 sub _determine_supports_placeholders {
2432   my $self = shift;
2433   my $dbh  = $self->_get_dbh;
2434
2435   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2436   # but it is inaccurate more often than not
2437   return try {
2438     local $dbh->{PrintError} = 0;
2439     local $dbh->{RaiseError} = 1;
2440     $dbh->do('select ?', {}, 1);
2441     1;
2442   }
2443   catch {
2444     0;
2445   };
2446 }
2447
2448 # Check if placeholders bound to non-string types throw exceptions
2449 #
2450 sub _determine_supports_typeless_placeholders {
2451   my $self = shift;
2452   my $dbh  = $self->_get_dbh;
2453
2454   return try {
2455     local $dbh->{PrintError} = 0;
2456     local $dbh->{RaiseError} = 1;
2457     # this specifically tests a bind that is NOT a string
2458     $dbh->do('select 1 where 1 = ?', {}, 1);
2459     1;
2460   }
2461   catch {
2462     0;
2463   };
2464 }
2465
2466 =head2 sqlt_type
2467
2468 Returns the database driver name.
2469
2470 =cut
2471
2472 sub sqlt_type {
2473   shift->_get_dbh->{Driver}->{Name};
2474 }
2475
2476 =head2 bind_attribute_by_data_type
2477
2478 Given a datatype from column info, returns a database specific bind
2479 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2480 let the database planner just handle it.
2481
2482 Generally only needed for special case column types, like bytea in postgres.
2483
2484 =cut
2485
2486 sub bind_attribute_by_data_type {
2487     return;
2488 }
2489
2490 =head2 is_datatype_numeric
2491
2492 Given a datatype from column_info, returns a boolean value indicating if
2493 the current RDBMS considers it a numeric value. This controls how
2494 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2495 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2496 be performed instead of the usual C<eq>.
2497
2498 =cut
2499
2500 sub is_datatype_numeric {
2501   my ($self, $dt) = @_;
2502
2503   return 0 unless $dt;
2504
2505   return $dt =~ /^ (?:
2506     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2507   ) $/ix;
2508 }
2509
2510
2511 =head2 create_ddl_dir
2512
2513 =over 4
2514
2515 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2516
2517 =back
2518
2519 Creates a SQL file based on the Schema, for each of the specified
2520 database engines in C<\@databases> in the given directory.
2521 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2522
2523 Given a previous version number, this will also create a file containing
2524 the ALTER TABLE statements to transform the previous schema into the
2525 current one. Note that these statements may contain C<DROP TABLE> or
2526 C<DROP COLUMN> statements that can potentially destroy data.
2527
2528 The file names are created using the C<ddl_filename> method below, please
2529 override this method in your schema if you would like a different file
2530 name format. For the ALTER file, the same format is used, replacing
2531 $version in the name with "$preversion-$version".
2532
2533 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2534 The most common value for this would be C<< { add_drop_table => 1 } >>
2535 to have the SQL produced include a C<DROP TABLE> statement for each table
2536 created. For quoting purposes supply C<quote_table_names> and
2537 C<quote_field_names>.
2538
2539 If no arguments are passed, then the following default values are assumed:
2540
2541 =over 4
2542
2543 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2544
2545 =item version    - $schema->schema_version
2546
2547 =item directory  - './'
2548
2549 =item preversion - <none>
2550
2551 =back
2552
2553 By default, C<\%sqlt_args> will have
2554
2555  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2556
2557 merged with the hash passed in. To disable any of those features, pass in a
2558 hashref like the following
2559
2560  { ignore_constraint_names => 0, # ... other options }
2561
2562
2563 WARNING: You are strongly advised to check all SQL files created, before applying
2564 them.
2565
2566 =cut
2567
2568 sub create_ddl_dir {
2569   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2570
2571   unless ($dir) {
2572     carp "No directory given, using ./\n";
2573     $dir = './';
2574   } else {
2575       -d $dir
2576         or
2577       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2578         or
2579       $self->throw_exception(
2580         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2581       );
2582   }
2583
2584   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2585
2586   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2587   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2588
2589   my $schema_version = $schema->schema_version || '1.x';
2590   $version ||= $schema_version;
2591
2592   $sqltargs = {
2593     add_drop_table => 1,
2594     ignore_constraint_names => 1,
2595     ignore_index_names => 1,
2596     %{$sqltargs || {}}
2597   };
2598
2599   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2600     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2601   }
2602
2603   my $sqlt = SQL::Translator->new( $sqltargs );
2604
2605   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2606   my $sqlt_schema = $sqlt->translate({ data => $schema })
2607     or $self->throw_exception ($sqlt->error);
2608
2609   foreach my $db (@$databases) {
2610     $sqlt->reset();
2611     $sqlt->{schema} = $sqlt_schema;
2612     $sqlt->producer($db);
2613
2614     my $file;
2615     my $filename = $schema->ddl_filename($db, $version, $dir);
2616     if (-e $filename && ($version eq $schema_version )) {
2617       # if we are dumping the current version, overwrite the DDL
2618       carp "Overwriting existing DDL file - $filename";
2619       unlink($filename);
2620     }
2621
2622     my $output = $sqlt->translate;
2623     if(!$output) {
2624       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2625       next;
2626     }
2627     if(!open($file, ">$filename")) {
2628       $self->throw_exception("Can't open $filename for writing ($!)");
2629       next;
2630     }
2631     print $file $output;
2632     close($file);
2633
2634     next unless ($preversion);
2635
2636     require SQL::Translator::Diff;
2637
2638     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2639     if(!-e $prefilename) {
2640       carp("No previous schema file found ($prefilename)");
2641       next;
2642     }
2643
2644     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2645     if(-e $difffile) {
2646       carp("Overwriting existing diff file - $difffile");
2647       unlink($difffile);
2648     }
2649
2650     my $source_schema;
2651     {
2652       my $t = SQL::Translator->new($sqltargs);
2653       $t->debug( 0 );
2654       $t->trace( 0 );
2655
2656       $t->parser( $db )
2657         or $self->throw_exception ($t->error);
2658
2659       my $out = $t->translate( $prefilename )
2660         or $self->throw_exception ($t->error);
2661
2662       $source_schema = $t->schema;
2663
2664       $source_schema->name( $prefilename )
2665         unless ( $source_schema->name );
2666     }
2667
2668     # The "new" style of producers have sane normalization and can support
2669     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2670     # And we have to diff parsed SQL against parsed SQL.
2671     my $dest_schema = $sqlt_schema;
2672
2673     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2674       my $t = SQL::Translator->new($sqltargs);
2675       $t->debug( 0 );
2676       $t->trace( 0 );
2677
2678       $t->parser( $db )
2679         or $self->throw_exception ($t->error);
2680
2681       my $out = $t->translate( $filename )
2682         or $self->throw_exception ($t->error);
2683
2684       $dest_schema = $t->schema;
2685
2686       $dest_schema->name( $filename )
2687         unless $dest_schema->name;
2688     }
2689
2690     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2691                                                   $dest_schema,   $db,
2692                                                   $sqltargs
2693                                                  );
2694     if(!open $file, ">$difffile") {
2695       $self->throw_exception("Can't write to $difffile ($!)");
2696       next;
2697     }
2698     print $file $diff;
2699     close($file);
2700   }
2701 }
2702
2703 =head2 deployment_statements
2704
2705 =over 4
2706
2707 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2708
2709 =back
2710
2711 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2712
2713 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2714 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2715
2716 C<$directory> is used to return statements from files in a previously created
2717 L</create_ddl_dir> directory and is optional. The filenames are constructed
2718 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2719
2720 If no C<$directory> is specified then the statements are constructed on the
2721 fly using L<SQL::Translator> and C<$version> is ignored.
2722
2723 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2724
2725 =cut
2726
2727 sub deployment_statements {
2728   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2729   $type ||= $self->sqlt_type;
2730   $version ||= $schema->schema_version || '1.x';
2731   $dir ||= './';
2732   my $filename = $schema->ddl_filename($type, $version, $dir);
2733   if(-f $filename)
2734   {
2735       # FIXME replace this block when a proper sane sql parser is available
2736       my $file;
2737       open($file, "<$filename")
2738         or $self->throw_exception("Can't open $filename ($!)");
2739       my @rows = <$file>;
2740       close($file);
2741       return join('', @rows);
2742   }
2743
2744   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2745     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2746   }
2747
2748   # sources needs to be a parser arg, but for simplicty allow at top level
2749   # coming in
2750   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2751       if exists $sqltargs->{sources};
2752
2753   my $tr = SQL::Translator->new(
2754     producer => "SQL::Translator::Producer::${type}",
2755     %$sqltargs,
2756     parser => 'SQL::Translator::Parser::DBIx::Class',
2757     data => $schema,
2758   );
2759
2760   my @ret;
2761   if (wantarray) {
2762     @ret = $tr->translate;
2763   }
2764   else {
2765     $ret[0] = $tr->translate;
2766   }
2767
2768   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2769     unless (@ret && defined $ret[0]);
2770
2771   return wantarray ? @ret : $ret[0];
2772 }
2773
2774 # FIXME deploy() currently does not accurately report sql errors
2775 # Will always return true while errors are warned
2776 sub deploy {
2777   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2778   my $deploy = sub {
2779     my $line = shift;
2780     return if(!$line);
2781     return if($line =~ /^--/);
2782     # next if($line =~ /^DROP/m);
2783     return if($line =~ /^BEGIN TRANSACTION/m);
2784     return if($line =~ /^COMMIT/m);
2785     return if $line =~ /^\s+$/; # skip whitespace only
2786     $self->_query_start($line);
2787     try {
2788       # do a dbh_do cycle here, as we need some error checking in
2789       # place (even though we will ignore errors)
2790       $self->dbh_do (sub { $_[1]->do($line) });
2791     } catch {
2792       carp qq{$_ (running "${line}")};
2793     };
2794     $self->_query_end($line);
2795   };
2796   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2797   if (@statements > 1) {
2798     foreach my $statement (@statements) {
2799       $deploy->( $statement );
2800     }
2801   }
2802   elsif (@statements == 1) {
2803     # split on single line comments and end of statements
2804     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2805       $deploy->( $line );
2806     }
2807   }
2808 }
2809
2810 =head2 datetime_parser
2811
2812 Returns the datetime parser class
2813
2814 =cut
2815
2816 sub datetime_parser {
2817   my $self = shift;
2818   return $self->{datetime_parser} ||= do {
2819     $self->build_datetime_parser(@_);
2820   };
2821 }
2822
2823 =head2 datetime_parser_type
2824
2825 Defines (returns) the datetime parser class - currently hardwired to
2826 L<DateTime::Format::MySQL>
2827
2828 =cut
2829
2830 sub datetime_parser_type { "DateTime::Format::MySQL"; }
2831
2832 =head2 build_datetime_parser
2833
2834 See L</datetime_parser>
2835
2836 =cut
2837
2838 sub build_datetime_parser {
2839   my $self = shift;
2840   my $type = $self->datetime_parser_type(@_);
2841   $self->ensure_class_loaded ($type);
2842   return $type;
2843 }
2844
2845
2846 =head2 is_replicating
2847
2848 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2849 replicate from a master database.  Default is undef, which is the result
2850 returned by databases that don't support replication.
2851
2852 =cut
2853
2854 sub is_replicating {
2855     return;
2856
2857 }
2858
2859 =head2 lag_behind_master
2860
2861 Returns a number that represents a certain amount of lag behind a master db
2862 when a given storage is replicating.  The number is database dependent, but
2863 starts at zero and increases with the amount of lag. Default in undef
2864
2865 =cut
2866
2867 sub lag_behind_master {
2868     return;
2869 }
2870
2871 =head2 relname_to_table_alias
2872
2873 =over 4
2874
2875 =item Arguments: $relname, $join_count
2876
2877 =back
2878
2879 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2880 queries.
2881
2882 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2883 way these aliases are named.
2884
2885 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2886 otherwise C<"$relname">.
2887
2888 =cut
2889
2890 sub relname_to_table_alias {
2891   my ($self, $relname, $join_count) = @_;
2892
2893   my $alias = ($join_count && $join_count > 1 ?
2894     join('_', $relname, $join_count) : $relname);
2895
2896   return $alias;
2897 }
2898
2899 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2900 # version and it may be necessary to amend or override it for a specific storage
2901 # if such binds are necessary.
2902 sub _max_column_bytesize {
2903   my ($self, $source, $col) = @_;
2904
2905   my $inf = $source->column_info($col);
2906   return $inf->{_max_bytesize} ||= do {
2907
2908     my $max_size;
2909
2910     if (my $data_type = $inf->{data_type}) {
2911       $data_type = lc($data_type);
2912
2913       # String/sized-binary types
2914       if ($data_type =~ /^(?:l?(?:var)?char(?:acter)?(?:\s*varying)?
2915                              |(?:var)?binary(?:\s*varying)?|raw)\b/x
2916       ) {
2917         $max_size = $inf->{size};
2918       }
2919       # Other charset/unicode types, assume scale of 4
2920       elsif ($data_type =~ /^(?:national\s*character(?:\s*varying)?|nchar
2921                               |univarchar
2922                               |nvarchar)\b/x
2923       ) {
2924         $max_size = $inf->{size} * 4 if $inf->{size};
2925       }
2926       # Blob types
2927       elsif ($self->_is_lob_type($data_type)) {
2928         # default to longreadlen
2929       }
2930       else {
2931         $max_size = 100;  # for all other (numeric?) datatypes
2932       }
2933     }
2934
2935     $max_size ||= $self->_get_dbh->{LongReadLen} || 8000;
2936   };
2937 }
2938
2939 # Determine if a data_type is some type of BLOB
2940 # FIXME: these regexes are expensive, result of these checks should be cached in
2941 # the column_info .
2942 sub _is_lob_type {
2943   my ($self, $data_type) = @_;
2944   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
2945     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
2946                                   |varchar|character\s*varying|nvarchar
2947                                   |national\s*character\s*varying))?\z/xi);
2948 }
2949
2950 sub _is_binary_lob_type {
2951   my ($self, $data_type) = @_;
2952   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
2953     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
2954 }
2955
2956 sub _is_text_lob_type {
2957   my ($self, $data_type) = @_;
2958   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
2959     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
2960                         |national\s*character\s*varying))\z/xi);
2961 }
2962
2963 1;
2964
2965 =head1 USAGE NOTES
2966
2967 =head2 DBIx::Class and AutoCommit
2968
2969 DBIx::Class can do some wonderful magic with handling exceptions,
2970 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2971 (the default) combined with C<txn_do> for transaction support.
2972
2973 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2974 in an assumed transaction between commits, and you're telling us you'd
2975 like to manage that manually.  A lot of the magic protections offered by
2976 this module will go away.  We can't protect you from exceptions due to database
2977 disconnects because we don't know anything about how to restart your
2978 transactions.  You're on your own for handling all sorts of exceptional
2979 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2980 be with raw DBI.
2981
2982
2983 =head1 AUTHORS
2984
2985 Matt S. Trout <mst@shadowcatsystems.co.uk>
2986
2987 Andy Grundman <andy@hybridized.org>
2988
2989 =head1 LICENSE
2990
2991 You may distribute this code under the same terms as Perl itself.
2992
2993 =cut