Cleanup Oracle's 00a28188 / add support for update/delete with blobs in WHERE
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use strict;
5 use warnings;
6
7 use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
8 use mro 'c3';
9
10 use DBIx::Class::Carp;
11 use DBIx::Class::Exception;
12 use Scalar::Util qw/refaddr weaken reftype blessed/;
13 use List::Util qw/first/;
14 use Sub::Name 'subname';
15 use Try::Tiny;
16 use overload ();
17 use namespace::clean;
18
19 # default cursor class, overridable in connect_info attributes
20 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
21
22 __PACKAGE__->mk_group_accessors('inherited' => qw/
23   sql_limit_dialect sql_quote_char sql_name_sep
24 /);
25
26 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
27
28 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
29 __PACKAGE__->datetime_parser_type('DateTime::Format::MySQL'); # historic default
30
31 __PACKAGE__->sql_name_sep('.');
32
33 __PACKAGE__->mk_group_accessors('simple' => qw/
34   _connect_info _dbi_connect_info _dbic_connect_attributes _driver_determined
35   _dbh _dbh_details _conn_pid _sql_maker _sql_maker_opts _dbh_autocommit
36 /);
37
38 # the values for these accessors are picked out (and deleted) from
39 # the attribute hashref passed to connect_info
40 my @storage_options = qw/
41   on_connect_call on_disconnect_call on_connect_do on_disconnect_do
42   disable_sth_caching unsafe auto_savepoint
43 /;
44 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
45
46
47 # capability definitions, using a 2-tiered accessor system
48 # The rationale is:
49 #
50 # A driver/user may define _use_X, which blindly without any checks says:
51 # "(do not) use this capability", (use_dbms_capability is an "inherited"
52 # type accessor)
53 #
54 # If _use_X is undef, _supports_X is then queried. This is a "simple" style
55 # accessor, which in turn calls _determine_supports_X, and stores the return
56 # in a special slot on the storage object, which is wiped every time a $dbh
57 # reconnection takes place (it is not guaranteed that upon reconnection we
58 # will get the same rdbms version). _determine_supports_X does not need to
59 # exist on a driver, as we ->can for it before calling.
60
61 my @capabilities = (qw/
62   insert_returning
63   insert_returning_bound
64   placeholders
65   typeless_placeholders
66   join_optimizer
67 /);
68 __PACKAGE__->mk_group_accessors( dbms_capability => map { "_supports_$_" } @capabilities );
69 __PACKAGE__->mk_group_accessors( use_dbms_capability => map { "_use_$_" } (@capabilities ) );
70
71 # on by default, not strictly a capability (pending rewrite)
72 __PACKAGE__->_use_join_optimizer (1);
73 sub _determine_supports_join_optimizer { 1 };
74
75 # Each of these methods need _determine_driver called before itself
76 # in order to function reliably. This is a purely DRY optimization
77 #
78 # get_(use)_dbms_capability need to be called on the correct Storage
79 # class, as _use_X may be hardcoded class-wide, and _supports_X calls
80 # _determine_supports_X which obv. needs a correct driver as well
81 my @rdbms_specific_methods = qw/
82   deployment_statements
83   sqlt_type
84   sql_maker
85   build_datetime_parser
86   datetime_parser_type
87
88   txn_begin
89   insert
90   insert_bulk
91   update
92   delete
93   select
94   select_single
95
96   get_use_dbms_capability
97   get_dbms_capability
98
99   _server_info
100   _get_server_version
101 /;
102
103 for my $meth (@rdbms_specific_methods) {
104
105   my $orig = __PACKAGE__->can ($meth)
106     or die "$meth is not a ::Storage::DBI method!";
107
108   no strict qw/refs/;
109   no warnings qw/redefine/;
110   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
111     if (
112       # only fire when invoked on an instance, a valid class-based invocation
113       # would e.g. be setting a default for an inherited accessor
114       ref $_[0]
115         and
116       ! $_[0]->_driver_determined
117         and
118       ! $_[0]->{_in_determine_driver}
119     ) {
120       $_[0]->_determine_driver;
121
122       # This for some reason crashes and burns on perl 5.8.1
123       # IFF the method ends up throwing an exception
124       #goto $_[0]->can ($meth);
125
126       my $cref = $_[0]->can ($meth);
127       goto $cref;
128     }
129
130     goto $orig;
131   };
132 }
133
134 =head1 NAME
135
136 DBIx::Class::Storage::DBI - DBI storage handler
137
138 =head1 SYNOPSIS
139
140   my $schema = MySchema->connect('dbi:SQLite:my.db');
141
142   $schema->storage->debug(1);
143
144   my @stuff = $schema->storage->dbh_do(
145     sub {
146       my ($storage, $dbh, @args) = @_;
147       $dbh->do("DROP TABLE authors");
148     },
149     @column_list
150   );
151
152   $schema->resultset('Book')->search({
153      written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
154   });
155
156 =head1 DESCRIPTION
157
158 This class represents the connection to an RDBMS via L<DBI>.  See
159 L<DBIx::Class::Storage> for general information.  This pod only
160 documents DBI-specific methods and behaviors.
161
162 =head1 METHODS
163
164 =cut
165
166 sub new {
167   my $new = shift->next::method(@_);
168
169   $new->_sql_maker_opts({});
170   $new->_dbh_details({});
171   $new->{_in_do_block} = 0;
172   $new->{_dbh_gen} = 0;
173
174   # read below to see what this does
175   $new->_arm_global_destructor;
176
177   $new;
178 }
179
180 # This is hack to work around perl shooting stuff in random
181 # order on exit(). If we do not walk the remaining storage
182 # objects in an END block, there is a *small but real* chance
183 # of a fork()ed child to kill the parent's shared DBI handle,
184 # *before perl reaches the DESTROY in this package*
185 # Yes, it is ugly and effective.
186 # Additionally this registry is used by the CLONE method to
187 # make sure no handles are shared between threads
188 {
189   my %seek_and_destroy;
190
191   sub _arm_global_destructor {
192     my $self = shift;
193     my $key = refaddr ($self);
194     $seek_and_destroy{$key} = $self;
195     weaken ($seek_and_destroy{$key});
196   }
197
198   END {
199     local $?; # just in case the DBI destructor changes it somehow
200
201     # destroy just the object if not native to this process/thread
202     $_->_verify_pid for (grep
203       { defined $_ }
204       values %seek_and_destroy
205     );
206   }
207
208   sub CLONE {
209     # As per DBI's recommendation, DBIC disconnects all handles as
210     # soon as possible (DBIC will reconnect only on demand from within
211     # the thread)
212     for (values %seek_and_destroy) {
213       next unless $_;
214       $_->{_dbh_gen}++;  # so that existing cursors will drop as well
215       $_->_dbh(undef);
216
217       $_->transaction_depth(0);
218       $_->savepoints([]);
219     }
220   }
221 }
222
223 sub DESTROY {
224   my $self = shift;
225
226   # some databases spew warnings on implicit disconnect
227   local $SIG{__WARN__} = sub {};
228   $self->_dbh(undef);
229
230   # this op is necessary, since the very last perl runtime statement
231   # triggers a global destruction shootout, and the $SIG localization
232   # may very well be destroyed before perl actually gets to do the
233   # $dbh undef
234   1;
235 }
236
237 # handle pid changes correctly - do not destroy parent's connection
238 sub _verify_pid {
239   my $self = shift;
240
241   my $pid = $self->_conn_pid;
242   if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
243     $dbh->{InactiveDestroy} = 1;
244     $self->{_dbh_gen}++;
245     $self->_dbh(undef);
246     $self->transaction_depth(0);
247     $self->savepoints([]);
248   }
249
250   return;
251 }
252
253 =head2 connect_info
254
255 This method is normally called by L<DBIx::Class::Schema/connection>, which
256 encapsulates its argument list in an arrayref before passing them here.
257
258 The argument list may contain:
259
260 =over
261
262 =item *
263
264 The same 4-element argument set one would normally pass to
265 L<DBI/connect>, optionally followed by
266 L<extra attributes|/DBIx::Class specific connection attributes>
267 recognized by DBIx::Class:
268
269   $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
270
271 =item *
272
273 A single code reference which returns a connected
274 L<DBI database handle|DBI/connect> optionally followed by
275 L<extra attributes|/DBIx::Class specific connection attributes> recognized
276 by DBIx::Class:
277
278   $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
279
280 =item *
281
282 A single hashref with all the attributes and the dsn/user/password
283 mixed together:
284
285   $connect_info_args = [{
286     dsn => $dsn,
287     user => $user,
288     password => $pass,
289     %dbi_attributes,
290     %extra_attributes,
291   }];
292
293   $connect_info_args = [{
294     dbh_maker => sub { DBI->connect (...) },
295     %dbi_attributes,
296     %extra_attributes,
297   }];
298
299 This is particularly useful for L<Catalyst> based applications, allowing the
300 following config (L<Config::General> style):
301
302   <Model::DB>
303     schema_class   App::DB
304     <connect_info>
305       dsn          dbi:mysql:database=test
306       user         testuser
307       password     TestPass
308       AutoCommit   1
309     </connect_info>
310   </Model::DB>
311
312 The C<dsn>/C<user>/C<password> combination can be substituted by the
313 C<dbh_maker> key whose value is a coderef that returns a connected
314 L<DBI database handle|DBI/connect>
315
316 =back
317
318 Please note that the L<DBI> docs recommend that you always explicitly
319 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
320 recommends that it be set to I<1>, and that you perform transactions
321 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
322 to I<1> if you do not do explicitly set it to zero.  This is the default
323 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
324
325 =head3 DBIx::Class specific connection attributes
326
327 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
328 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
329 the following connection options. These options can be mixed in with your other
330 L<DBI> connection attributes, or placed in a separate hashref
331 (C<\%extra_attributes>) as shown above.
332
333 Every time C<connect_info> is invoked, any previous settings for
334 these options will be cleared before setting the new ones, regardless of
335 whether any options are specified in the new C<connect_info>.
336
337
338 =over
339
340 =item on_connect_do
341
342 Specifies things to do immediately after connecting or re-connecting to
343 the database.  Its value may contain:
344
345 =over
346
347 =item a scalar
348
349 This contains one SQL statement to execute.
350
351 =item an array reference
352
353 This contains SQL statements to execute in order.  Each element contains
354 a string or a code reference that returns a string.
355
356 =item a code reference
357
358 This contains some code to execute.  Unlike code references within an
359 array reference, its return value is ignored.
360
361 =back
362
363 =item on_disconnect_do
364
365 Takes arguments in the same form as L</on_connect_do> and executes them
366 immediately before disconnecting from the database.
367
368 Note, this only runs if you explicitly call L</disconnect> on the
369 storage object.
370
371 =item on_connect_call
372
373 A more generalized form of L</on_connect_do> that calls the specified
374 C<connect_call_METHOD> methods in your storage driver.
375
376   on_connect_do => 'select 1'
377
378 is equivalent to:
379
380   on_connect_call => [ [ do_sql => 'select 1' ] ]
381
382 Its values may contain:
383
384 =over
385
386 =item a scalar
387
388 Will call the C<connect_call_METHOD> method.
389
390 =item a code reference
391
392 Will execute C<< $code->($storage) >>
393
394 =item an array reference
395
396 Each value can be a method name or code reference.
397
398 =item an array of arrays
399
400 For each array, the first item is taken to be the C<connect_call_> method name
401 or code reference, and the rest are parameters to it.
402
403 =back
404
405 Some predefined storage methods you may use:
406
407 =over
408
409 =item do_sql
410
411 Executes a SQL string or a code reference that returns a SQL string. This is
412 what L</on_connect_do> and L</on_disconnect_do> use.
413
414 It can take:
415
416 =over
417
418 =item a scalar
419
420 Will execute the scalar as SQL.
421
422 =item an arrayref
423
424 Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
425 attributes hashref and bind values.
426
427 =item a code reference
428
429 Will execute C<< $code->($storage) >> and execute the return array refs as
430 above.
431
432 =back
433
434 =item datetime_setup
435
436 Execute any statements necessary to initialize the database session to return
437 and accept datetime/timestamp values used with
438 L<DBIx::Class::InflateColumn::DateTime>.
439
440 Only necessary for some databases, see your specific storage driver for
441 implementation details.
442
443 =back
444
445 =item on_disconnect_call
446
447 Takes arguments in the same form as L</on_connect_call> and executes them
448 immediately before disconnecting from the database.
449
450 Calls the C<disconnect_call_METHOD> methods as opposed to the
451 C<connect_call_METHOD> methods called by L</on_connect_call>.
452
453 Note, this only runs if you explicitly call L</disconnect> on the
454 storage object.
455
456 =item disable_sth_caching
457
458 If set to a true value, this option will disable the caching of
459 statement handles via L<DBI/prepare_cached>.
460
461 =item limit_dialect
462
463 Sets a specific SQL::Abstract::Limit-style limit dialect, overriding the
464 default L</sql_limit_dialect> setting of the storage (if any). For a list
465 of available limit dialects see L<DBIx::Class::SQLMaker::LimitDialects>.
466
467 =item quote_names
468
469 When true automatically sets L</quote_char> and L</name_sep> to the characters
470 appropriate for your particular RDBMS. This option is preferred over specifying
471 L</quote_char> directly.
472
473 =item quote_char
474
475 Specifies what characters to use to quote table and column names.
476
477 C<quote_char> expects either a single character, in which case is it
478 is placed on either side of the table/column name, or an arrayref of length
479 2 in which case the table/column name is placed between the elements.
480
481 For example under MySQL you should use C<< quote_char => '`' >>, and for
482 SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
483
484 =item name_sep
485
486 This parameter is only useful in conjunction with C<quote_char>, and is used to
487 specify the character that separates elements (schemas, tables, columns) from
488 each other. If unspecified it defaults to the most commonly used C<.>.
489
490 =item unsafe
491
492 This Storage driver normally installs its own C<HandleError>, sets
493 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
494 all database handles, including those supplied by a coderef.  It does this
495 so that it can have consistent and useful error behavior.
496
497 If you set this option to a true value, Storage will not do its usual
498 modifications to the database handle's attributes, and instead relies on
499 the settings in your connect_info DBI options (or the values you set in
500 your connection coderef, in the case that you are connecting via coderef).
501
502 Note that your custom settings can cause Storage to malfunction,
503 especially if you set a C<HandleError> handler that suppresses exceptions
504 and/or disable C<RaiseError>.
505
506 =item auto_savepoint
507
508 If this option is true, L<DBIx::Class> will use savepoints when nesting
509 transactions, making it possible to recover from failure in the inner
510 transaction without having to abort all outer transactions.
511
512 =item cursor_class
513
514 Use this argument to supply a cursor class other than the default
515 L<DBIx::Class::Storage::DBI::Cursor>.
516
517 =back
518
519 Some real-life examples of arguments to L</connect_info> and
520 L<DBIx::Class::Schema/connect>
521
522   # Simple SQLite connection
523   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
524
525   # Connect via subref
526   ->connect_info([ sub { DBI->connect(...) } ]);
527
528   # Connect via subref in hashref
529   ->connect_info([{
530     dbh_maker => sub { DBI->connect(...) },
531     on_connect_do => 'alter session ...',
532   }]);
533
534   # A bit more complicated
535   ->connect_info(
536     [
537       'dbi:Pg:dbname=foo',
538       'postgres',
539       'my_pg_password',
540       { AutoCommit => 1 },
541       { quote_char => q{"} },
542     ]
543   );
544
545   # Equivalent to the previous example
546   ->connect_info(
547     [
548       'dbi:Pg:dbname=foo',
549       'postgres',
550       'my_pg_password',
551       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
552     ]
553   );
554
555   # Same, but with hashref as argument
556   # See parse_connect_info for explanation
557   ->connect_info(
558     [{
559       dsn         => 'dbi:Pg:dbname=foo',
560       user        => 'postgres',
561       password    => 'my_pg_password',
562       AutoCommit  => 1,
563       quote_char  => q{"},
564       name_sep    => q{.},
565     }]
566   );
567
568   # Subref + DBIx::Class-specific connection options
569   ->connect_info(
570     [
571       sub { DBI->connect(...) },
572       {
573           quote_char => q{`},
574           name_sep => q{@},
575           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
576           disable_sth_caching => 1,
577       },
578     ]
579   );
580
581
582
583 =cut
584
585 sub connect_info {
586   my ($self, $info) = @_;
587
588   return $self->_connect_info if !$info;
589
590   $self->_connect_info($info); # copy for _connect_info
591
592   $info = $self->_normalize_connect_info($info)
593     if ref $info eq 'ARRAY';
594
595   for my $storage_opt (keys %{ $info->{storage_options} }) {
596     my $value = $info->{storage_options}{$storage_opt};
597
598     $self->$storage_opt($value);
599   }
600
601   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
602   #  the new set of options
603   $self->_sql_maker(undef);
604   $self->_sql_maker_opts({});
605
606   for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
607     my $value = $info->{sql_maker_options}{$sql_maker_opt};
608
609     $self->_sql_maker_opts->{$sql_maker_opt} = $value;
610   }
611
612   my %attrs = (
613     %{ $self->_default_dbi_connect_attributes || {} },
614     %{ $info->{attributes} || {} },
615   );
616
617   my @args = @{ $info->{arguments} };
618
619   if (keys %attrs and ref $args[0] ne 'CODE') {
620     carp
621         'You provided explicit AutoCommit => 0 in your connection_info. '
622       . 'This is almost universally a bad idea (see the footnotes of '
623       . 'DBIx::Class::Storage::DBI for more info). If you still want to '
624       . 'do this you can set $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK} to disable '
625       . 'this warning.'
626       if ! $attrs{AutoCommit} and ! $ENV{DBIC_UNSAFE_AUTOCOMMIT_OK};
627
628     push @args, \%attrs if keys %attrs;
629   }
630   $self->_dbi_connect_info(\@args);
631
632   # FIXME - dirty:
633   # save attributes them in a separate accessor so they are always
634   # introspectable, even in case of a CODE $dbhmaker
635   $self->_dbic_connect_attributes (\%attrs);
636
637   return $self->_connect_info;
638 }
639
640 sub _normalize_connect_info {
641   my ($self, $info_arg) = @_;
642   my %info;
643
644   my @args = @$info_arg;  # take a shallow copy for further mutilation
645
646   # combine/pre-parse arguments depending on invocation style
647
648   my %attrs;
649   if (ref $args[0] eq 'CODE') {     # coderef with optional \%extra_attributes
650     %attrs = %{ $args[1] || {} };
651     @args = $args[0];
652   }
653   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
654     %attrs = %{$args[0]};
655     @args = ();
656     if (my $code = delete $attrs{dbh_maker}) {
657       @args = $code;
658
659       my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
660       if (@ignored) {
661         carp sprintf (
662             'Attribute(s) %s in connect_info were ignored, as they can not be applied '
663           . "to the result of 'dbh_maker'",
664
665           join (', ', map { "'$_'" } (@ignored) ),
666         );
667       }
668     }
669     else {
670       @args = delete @attrs{qw/dsn user password/};
671     }
672   }
673   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
674     %attrs = (
675       % { $args[3] || {} },
676       % { $args[4] || {} },
677     );
678     @args = @args[0,1,2];
679   }
680
681   $info{arguments} = \@args;
682
683   my @storage_opts = grep exists $attrs{$_},
684     @storage_options, 'cursor_class';
685
686   @{ $info{storage_options} }{@storage_opts} =
687     delete @attrs{@storage_opts} if @storage_opts;
688
689   my @sql_maker_opts = grep exists $attrs{$_},
690     qw/limit_dialect quote_char name_sep quote_names/;
691
692   @{ $info{sql_maker_options} }{@sql_maker_opts} =
693     delete @attrs{@sql_maker_opts} if @sql_maker_opts;
694
695   $info{attributes} = \%attrs if %attrs;
696
697   return \%info;
698 }
699
700 sub _default_dbi_connect_attributes () {
701   +{
702     AutoCommit => 1,
703     PrintError => 0,
704     RaiseError => 1,
705     ShowErrorStatement => 1,
706   };
707 }
708
709 =head2 on_connect_do
710
711 This method is deprecated in favour of setting via L</connect_info>.
712
713 =cut
714
715 =head2 on_disconnect_do
716
717 This method is deprecated in favour of setting via L</connect_info>.
718
719 =cut
720
721 sub _parse_connect_do {
722   my ($self, $type) = @_;
723
724   my $val = $self->$type;
725   return () if not defined $val;
726
727   my @res;
728
729   if (not ref($val)) {
730     push @res, [ 'do_sql', $val ];
731   } elsif (ref($val) eq 'CODE') {
732     push @res, $val;
733   } elsif (ref($val) eq 'ARRAY') {
734     push @res, map { [ 'do_sql', $_ ] } @$val;
735   } else {
736     $self->throw_exception("Invalid type for $type: ".ref($val));
737   }
738
739   return \@res;
740 }
741
742 =head2 dbh_do
743
744 Arguments: ($subref | $method_name), @extra_coderef_args?
745
746 Execute the given $subref or $method_name using the new exception-based
747 connection management.
748
749 The first two arguments will be the storage object that C<dbh_do> was called
750 on and a database handle to use.  Any additional arguments will be passed
751 verbatim to the called subref as arguments 2 and onwards.
752
753 Using this (instead of $self->_dbh or $self->dbh) ensures correct
754 exception handling and reconnection (or failover in future subclasses).
755
756 Your subref should have no side-effects outside of the database, as
757 there is the potential for your subref to be partially double-executed
758 if the database connection was stale/dysfunctional.
759
760 Example:
761
762   my @stuff = $schema->storage->dbh_do(
763     sub {
764       my ($storage, $dbh, @cols) = @_;
765       my $cols = join(q{, }, @cols);
766       $dbh->selectrow_array("SELECT $cols FROM foo");
767     },
768     @column_list
769   );
770
771 =cut
772
773 sub dbh_do {
774   my $self = shift;
775   my $code = shift;
776
777   my $dbh = $self->_get_dbh;
778
779   return $self->$code($dbh, @_)
780     if ( $self->{_in_do_block} || $self->{transaction_depth} );
781
782   local $self->{_in_do_block} = 1;
783
784   # take a ref instead of a copy, to preserve coderef @_ aliasing semantics
785   my $args = \@_;
786
787   try {
788     $self->$code ($dbh, @$args);
789   } catch {
790     $self->throw_exception($_) if $self->connected;
791
792     # We were not connected - reconnect and retry, but let any
793     #  exception fall right through this time
794     carp "Retrying dbh_do($code) after catching disconnected exception: $_"
795       if $ENV{DBIC_STORAGE_RETRY_DEBUG};
796
797     $self->_populate_dbh;
798     $self->$code($self->_dbh, @$args);
799   };
800 }
801
802 sub txn_do {
803   # connects or reconnects on pid change, necessary to grab correct txn_depth
804   $_[0]->_get_dbh;
805   local $_[0]->{_in_do_block} = 1;
806   shift->next::method(@_);
807 }
808
809 =head2 disconnect
810
811 Our C<disconnect> method also performs a rollback first if the
812 database is not in C<AutoCommit> mode.
813
814 =cut
815
816 sub disconnect {
817   my ($self) = @_;
818
819   if( $self->_dbh ) {
820     my @actions;
821
822     push @actions, ( $self->on_disconnect_call || () );
823     push @actions, $self->_parse_connect_do ('on_disconnect_do');
824
825     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
826
827     # stops the "implicit rollback on disconnect" warning
828     $self->_exec_txn_rollback unless $self->_dbh_autocommit;
829
830     %{ $self->_dbh->{CachedKids} } = ();
831     $self->_dbh->disconnect;
832     $self->_dbh(undef);
833     $self->{_dbh_gen}++;
834   }
835 }
836
837 =head2 with_deferred_fk_checks
838
839 =over 4
840
841 =item Arguments: C<$coderef>
842
843 =item Return Value: The return value of $coderef
844
845 =back
846
847 Storage specific method to run the code ref with FK checks deferred or
848 in MySQL's case disabled entirely.
849
850 =cut
851
852 # Storage subclasses should override this
853 sub with_deferred_fk_checks {
854   my ($self, $sub) = @_;
855   $sub->();
856 }
857
858 =head2 connected
859
860 =over
861
862 =item Arguments: none
863
864 =item Return Value: 1|0
865
866 =back
867
868 Verifies that the current database handle is active and ready to execute
869 an SQL statement (e.g. the connection did not get stale, server is still
870 answering, etc.) This method is used internally by L</dbh>.
871
872 =cut
873
874 sub connected {
875   my $self = shift;
876   return 0 unless $self->_seems_connected;
877
878   #be on the safe side
879   local $self->_dbh->{RaiseError} = 1;
880
881   return $self->_ping;
882 }
883
884 sub _seems_connected {
885   my $self = shift;
886
887   $self->_verify_pid;
888
889   my $dbh = $self->_dbh
890     or return 0;
891
892   return $dbh->FETCH('Active');
893 }
894
895 sub _ping {
896   my $self = shift;
897
898   my $dbh = $self->_dbh or return 0;
899
900   return $dbh->ping;
901 }
902
903 sub ensure_connected {
904   my ($self) = @_;
905
906   unless ($self->connected) {
907     $self->_populate_dbh;
908   }
909 }
910
911 =head2 dbh
912
913 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
914 is guaranteed to be healthy by implicitly calling L</connected>, and if
915 necessary performing a reconnection before returning. Keep in mind that this
916 is very B<expensive> on some database engines. Consider using L</dbh_do>
917 instead.
918
919 =cut
920
921 sub dbh {
922   my ($self) = @_;
923
924   if (not $self->_dbh) {
925     $self->_populate_dbh;
926   } else {
927     $self->ensure_connected;
928   }
929   return $self->_dbh;
930 }
931
932 # this is the internal "get dbh or connect (don't check)" method
933 sub _get_dbh {
934   my $self = shift;
935   $self->_verify_pid;
936   $self->_populate_dbh unless $self->_dbh;
937   return $self->_dbh;
938 }
939
940 sub sql_maker {
941   my ($self) = @_;
942   unless ($self->_sql_maker) {
943     my $sql_maker_class = $self->sql_maker_class;
944
945     my %opts = %{$self->_sql_maker_opts||{}};
946     my $dialect =
947       $opts{limit_dialect}
948         ||
949       $self->sql_limit_dialect
950         ||
951       do {
952         my $s_class = (ref $self) || $self;
953         carp (
954           "Your storage class ($s_class) does not set sql_limit_dialect and you "
955         . 'have not supplied an explicit limit_dialect in your connection_info. '
956         . 'DBIC will attempt to use the GenericSubQ dialect, which works on most '
957         . 'databases but can be (and often is) painfully slow. '
958         . "Please file an RT ticket against '$s_class' ."
959         );
960
961         'GenericSubQ';
962       }
963     ;
964
965     my ($quote_char, $name_sep);
966
967     if ($opts{quote_names}) {
968       $quote_char = (delete $opts{quote_char}) || $self->sql_quote_char || do {
969         my $s_class = (ref $self) || $self;
970         carp (
971           "You requested 'quote_names' but your storage class ($s_class) does "
972         . 'not explicitly define a default sql_quote_char and you have not '
973         . 'supplied a quote_char as part of your connection_info. DBIC will '
974         .q{default to the ANSI SQL standard quote '"', which works most of }
975         . "the time. Please file an RT ticket against '$s_class'."
976         );
977
978         '"'; # RV
979       };
980
981       $name_sep = (delete $opts{name_sep}) || $self->sql_name_sep;
982     }
983
984     $self->_sql_maker($sql_maker_class->new(
985       bindtype=>'columns',
986       array_datatypes => 1,
987       limit_dialect => $dialect,
988       ($quote_char ? (quote_char => $quote_char) : ()),
989       name_sep => ($name_sep || '.'),
990       %opts,
991     ));
992   }
993   return $self->_sql_maker;
994 }
995
996 # nothing to do by default
997 sub _rebless {}
998 sub _init {}
999
1000 sub _populate_dbh {
1001   my ($self) = @_;
1002
1003   my @info = @{$self->_dbi_connect_info || []};
1004   $self->_dbh(undef); # in case ->connected failed we might get sent here
1005   $self->_dbh_details({}); # reset everything we know
1006
1007   $self->_dbh($self->_connect(@info));
1008
1009   $self->_conn_pid($$) if $^O ne 'MSWin32'; # on win32 these are in fact threads
1010
1011   $self->_determine_driver;
1012
1013   # Always set the transaction depth on connect, since
1014   #  there is no transaction in progress by definition
1015   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
1016
1017   $self->_run_connection_actions unless $self->{_in_determine_driver};
1018 }
1019
1020 sub _run_connection_actions {
1021   my $self = shift;
1022   my @actions;
1023
1024   push @actions, ( $self->on_connect_call || () );
1025   push @actions, $self->_parse_connect_do ('on_connect_do');
1026
1027   $self->_do_connection_actions(connect_call_ => $_) for @actions;
1028 }
1029
1030
1031
1032 sub set_use_dbms_capability {
1033   $_[0]->set_inherited ($_[1], $_[2]);
1034 }
1035
1036 sub get_use_dbms_capability {
1037   my ($self, $capname) = @_;
1038
1039   my $use = $self->get_inherited ($capname);
1040   return defined $use
1041     ? $use
1042     : do { $capname =~ s/^_use_/_supports_/; $self->get_dbms_capability ($capname) }
1043   ;
1044 }
1045
1046 sub set_dbms_capability {
1047   $_[0]->_dbh_details->{capability}{$_[1]} = $_[2];
1048 }
1049
1050 sub get_dbms_capability {
1051   my ($self, $capname) = @_;
1052
1053   my $cap = $self->_dbh_details->{capability}{$capname};
1054
1055   unless (defined $cap) {
1056     if (my $meth = $self->can ("_determine$capname")) {
1057       $cap = $self->$meth ? 1 : 0;
1058     }
1059     else {
1060       $cap = 0;
1061     }
1062
1063     $self->set_dbms_capability ($capname, $cap);
1064   }
1065
1066   return $cap;
1067 }
1068
1069 sub _server_info {
1070   my $self = shift;
1071
1072   my $info;
1073   unless ($info = $self->_dbh_details->{info}) {
1074
1075     $info = {};
1076
1077     my $server_version = try { $self->_get_server_version };
1078
1079     if (defined $server_version) {
1080       $info->{dbms_version} = $server_version;
1081
1082       my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
1083       my @verparts = split (/\./, $numeric_version);
1084       if (
1085         @verparts
1086           &&
1087         $verparts[0] <= 999
1088       ) {
1089         # consider only up to 3 version parts, iff not more than 3 digits
1090         my @use_parts;
1091         while (@verparts && @use_parts < 3) {
1092           my $p = shift @verparts;
1093           last if $p > 999;
1094           push @use_parts, $p;
1095         }
1096         push @use_parts, 0 while @use_parts < 3;
1097
1098         $info->{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
1099       }
1100     }
1101
1102     $self->_dbh_details->{info} = $info;
1103   }
1104
1105   return $info;
1106 }
1107
1108 sub _get_server_version {
1109   shift->_dbh_get_info(18);
1110 }
1111
1112 sub _dbh_get_info {
1113   my ($self, $info) = @_;
1114
1115   return try { $self->_get_dbh->get_info($info) } || undef;
1116 }
1117
1118 sub _determine_driver {
1119   my ($self) = @_;
1120
1121   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
1122     my $started_connected = 0;
1123     local $self->{_in_determine_driver} = 1;
1124
1125     if (ref($self) eq __PACKAGE__) {
1126       my $driver;
1127       if ($self->_dbh) { # we are connected
1128         $driver = $self->_dbh->{Driver}{Name};
1129         $started_connected = 1;
1130       } else {
1131         # if connect_info is a CODEREF, we have no choice but to connect
1132         if (ref $self->_dbi_connect_info->[0] &&
1133             reftype $self->_dbi_connect_info->[0] eq 'CODE') {
1134           $self->_populate_dbh;
1135           $driver = $self->_dbh->{Driver}{Name};
1136         }
1137         else {
1138           # try to use dsn to not require being connected, the driver may still
1139           # force a connection in _rebless to determine version
1140           # (dsn may not be supplied at all if all we do is make a mock-schema)
1141           my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
1142           ($driver) = $dsn =~ /dbi:([^:]+):/i;
1143           $driver ||= $ENV{DBI_DRIVER};
1144         }
1145       }
1146
1147       if ($driver) {
1148         my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
1149         if ($self->load_optional_class($storage_class)) {
1150           mro::set_mro($storage_class, 'c3');
1151           bless $self, $storage_class;
1152           $self->_rebless();
1153         }
1154       }
1155     }
1156
1157     $self->_driver_determined(1);
1158
1159     Class::C3->reinitialize() if DBIx::Class::_ENV_::OLD_MRO;
1160
1161     $self->_init; # run driver-specific initializations
1162
1163     $self->_run_connection_actions
1164         if !$started_connected && defined $self->_dbh;
1165   }
1166 }
1167
1168 sub _do_connection_actions {
1169   my $self          = shift;
1170   my $method_prefix = shift;
1171   my $call          = shift;
1172
1173   if (not ref($call)) {
1174     my $method = $method_prefix . $call;
1175     $self->$method(@_);
1176   } elsif (ref($call) eq 'CODE') {
1177     $self->$call(@_);
1178   } elsif (ref($call) eq 'ARRAY') {
1179     if (ref($call->[0]) ne 'ARRAY') {
1180       $self->_do_connection_actions($method_prefix, $_) for @$call;
1181     } else {
1182       $self->_do_connection_actions($method_prefix, @$_) for @$call;
1183     }
1184   } else {
1185     $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
1186   }
1187
1188   return $self;
1189 }
1190
1191 sub connect_call_do_sql {
1192   my $self = shift;
1193   $self->_do_query(@_);
1194 }
1195
1196 sub disconnect_call_do_sql {
1197   my $self = shift;
1198   $self->_do_query(@_);
1199 }
1200
1201 # override in db-specific backend when necessary
1202 sub connect_call_datetime_setup { 1 }
1203
1204 sub _do_query {
1205   my ($self, $action) = @_;
1206
1207   if (ref $action eq 'CODE') {
1208     $action = $action->($self);
1209     $self->_do_query($_) foreach @$action;
1210   }
1211   else {
1212     # Most debuggers expect ($sql, @bind), so we need to exclude
1213     # the attribute hash which is the second argument to $dbh->do
1214     # furthermore the bind values are usually to be presented
1215     # as named arrayref pairs, so wrap those here too
1216     my @do_args = (ref $action eq 'ARRAY') ? (@$action) : ($action);
1217     my $sql = shift @do_args;
1218     my $attrs = shift @do_args;
1219     my @bind = map { [ undef, $_ ] } @do_args;
1220
1221     $self->_query_start($sql, \@bind);
1222     $self->_get_dbh->do($sql, $attrs, @do_args);
1223     $self->_query_end($sql, \@bind);
1224   }
1225
1226   return $self;
1227 }
1228
1229 sub _connect {
1230   my ($self, @info) = @_;
1231
1232   $self->throw_exception("You failed to provide any connection info")
1233     if !@info;
1234
1235   my ($old_connect_via, $dbh);
1236
1237   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
1238     $old_connect_via = $DBI::connect_via;
1239     $DBI::connect_via = 'connect';
1240   }
1241
1242   try {
1243     if(ref $info[0] eq 'CODE') {
1244       $dbh = $info[0]->();
1245     }
1246     else {
1247       require DBI;
1248       $dbh = DBI->connect(@info);
1249     }
1250
1251     if (!$dbh) {
1252       die $DBI::errstr;
1253     }
1254
1255     unless ($self->unsafe) {
1256
1257       $self->throw_exception(
1258         'Refusing clobbering of {HandleError} installed on externally supplied '
1259        ."DBI handle $dbh. Either remove the handler or use the 'unsafe' attribute."
1260       ) if $dbh->{HandleError} and ref $dbh->{HandleError} ne '__DBIC__DBH__ERROR__HANDLER__';
1261
1262       # Default via _default_dbi_connect_attributes is 1, hence it was an explicit
1263       # request, or an external handle. Complain and set anyway
1264       unless ($dbh->{RaiseError}) {
1265         carp( ref $info[0] eq 'CODE'
1266
1267           ? "The 'RaiseError' of the externally supplied DBI handle is set to false. "
1268            ."DBIx::Class will toggle it back to true, unless the 'unsafe' connect "
1269            .'attribute has been supplied'
1270
1271           : 'RaiseError => 0 supplied in your connection_info, without an explicit '
1272            .'unsafe => 1. Toggling RaiseError back to true'
1273         );
1274
1275         $dbh->{RaiseError} = 1;
1276       }
1277
1278       # this odd anonymous coderef dereference is in fact really
1279       # necessary to avoid the unwanted effect described in perl5
1280       # RT#75792
1281       sub {
1282         my $weak_self = $_[0];
1283         weaken $weak_self;
1284
1285         # the coderef is blessed so we can distinguish it from externally
1286         # supplied handles (which must be preserved)
1287         $_[1]->{HandleError} = bless sub {
1288           if ($weak_self) {
1289             $weak_self->throw_exception("DBI Exception: $_[0]");
1290           }
1291           else {
1292             # the handler may be invoked by something totally out of
1293             # the scope of DBIC
1294             DBIx::Class::Exception->throw("DBI Exception (unhandled by DBIC, ::Schema GCed): $_[0]");
1295           }
1296         }, '__DBIC__DBH__ERROR__HANDLER__';
1297       }->($self, $dbh);
1298     }
1299   }
1300   catch {
1301     $self->throw_exception("DBI Connection failed: $_")
1302   }
1303   finally {
1304     $DBI::connect_via = $old_connect_via if $old_connect_via;
1305   };
1306
1307   $self->_dbh_autocommit($dbh->{AutoCommit});
1308   $dbh;
1309 }
1310
1311 sub txn_begin {
1312   my $self = shift;
1313
1314   # this means we have not yet connected and do not know the AC status
1315   # (e.g. coderef $dbh), need a full-fledged connection check
1316   if (! defined $self->_dbh_autocommit) {
1317     $self->ensure_connected;
1318   }
1319   # Otherwise simply connect or re-connect on pid changes
1320   else {
1321     $self->_get_dbh;
1322   }
1323
1324   $self->next::method(@_);
1325 }
1326
1327 sub _exec_txn_begin {
1328   my $self = shift;
1329
1330   # if the user is utilizing txn_do - good for him, otherwise we need to
1331   # ensure that the $dbh is healthy on BEGIN.
1332   # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
1333   # will be replaced by a failure of begin_work itself (which will be
1334   # then retried on reconnect)
1335   if ($self->{_in_do_block}) {
1336     $self->_dbh->begin_work;
1337   } else {
1338     $self->dbh_do(sub { $_[1]->begin_work });
1339   }
1340 }
1341
1342 sub txn_commit {
1343   my $self = shift;
1344
1345   $self->_verify_pid if $self->_dbh;
1346   $self->throw_exception("Unable to txn_commit() on a disconnected storage")
1347     unless $self->_dbh;
1348
1349   # esoteric case for folks using external $dbh handles
1350   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1351     carp "Storage transaction_depth 0 does not match "
1352         ."false AutoCommit of $self->{_dbh}, attempting COMMIT anyway";
1353     $self->transaction_depth(1);
1354   }
1355
1356   $self->next::method(@_);
1357
1358   # if AutoCommit is disabled txn_depth never goes to 0
1359   # as a new txn is started immediately on commit
1360   $self->transaction_depth(1) if (
1361     !$self->transaction_depth
1362       and 
1363     defined $self->_dbh_autocommit
1364       and
1365     ! $self->_dbh_autocommit
1366   );
1367 }
1368
1369 sub _exec_txn_commit {
1370   shift->_dbh->commit;
1371 }
1372
1373 sub txn_rollback {
1374   my $self = shift;
1375
1376   $self->_verify_pid if $self->_dbh;
1377   $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
1378     unless $self->_dbh;
1379
1380   # esoteric case for folks using external $dbh handles
1381   if (! $self->transaction_depth and ! $self->_dbh->FETCH('AutoCommit') ) {
1382     carp "Storage transaction_depth 0 does not match "
1383         ."false AutoCommit of $self->{_dbh}, attempting ROLLBACK anyway";
1384     $self->transaction_depth(1);
1385   }
1386
1387   $self->next::method(@_);
1388
1389   # if AutoCommit is disabled txn_depth never goes to 0
1390   # as a new txn is started immediately on commit
1391   $self->transaction_depth(1) if (
1392     !$self->transaction_depth
1393       and 
1394     defined $self->_dbh_autocommit
1395       and
1396     ! $self->_dbh_autocommit
1397   );
1398 }
1399
1400 sub _exec_txn_rollback {
1401   shift->_dbh->rollback;
1402 }
1403
1404 # generate some identical methods
1405 for my $meth (qw/svp_begin svp_release svp_rollback/) {
1406   no strict qw/refs/;
1407   *{__PACKAGE__ ."::$meth"} = subname $meth => sub {
1408     my $self = shift;
1409     $self->_verify_pid if $self->_dbh;
1410     $self->throw_exception("Unable to $meth() on a disconnected storage")
1411       unless $self->_dbh;
1412     $self->next::method(@_);
1413   };
1414 }
1415
1416 # This used to be the top-half of _execute.  It was split out to make it
1417 #  easier to override in NoBindVars without duping the rest.  It takes up
1418 #  all of _execute's args, and emits $sql, @bind.
1419 sub _prep_for_execute {
1420   #my ($self, $op, $ident, $args) = @_;
1421   return shift->_gen_sql_bind(@_)
1422 }
1423
1424 sub _gen_sql_bind {
1425   my ($self, $op, $ident, $args) = @_;
1426
1427   my ($sql, @bind) = $self->sql_maker->$op(
1428     blessed($ident) ? $ident->from : $ident,
1429     @$args,
1430   );
1431
1432   my (@final_bind, $colinfos);
1433   my $resolve_bindinfo = sub {
1434     $colinfos ||= $self->_resolve_column_info($ident);
1435     if (my $col = $_[1]->{dbic_colname}) {
1436       $_[1]->{sqlt_datatype} ||= $colinfos->{$col}{data_type}
1437         if $colinfos->{$col}{data_type};
1438       $_[1]->{sqlt_size} ||= $colinfos->{$col}{size}
1439         if $colinfos->{$col}{size};
1440     }
1441     $_[1];
1442   };
1443
1444   for my $e (@{$args->[2]{bind}||[]}, @bind) {
1445     push @final_bind, [ do {
1446       if (ref $e ne 'ARRAY') {
1447         ({}, $e)
1448       }
1449       elsif (! defined $e->[0]) {
1450         ({}, $e->[1])
1451       }
1452       elsif (ref $e->[0] eq 'HASH') {
1453         (
1454           (first { $e->[0]{$_} } qw/dbd_attrs sqlt_datatype/) ? $e->[0] : $self->$resolve_bindinfo($e->[0]),
1455           $e->[1]
1456         )
1457       }
1458       elsif (ref $e->[0] eq 'SCALAR') {
1459         ( { sqlt_datatype => ${$e->[0]} }, $e->[1] )
1460       }
1461       else {
1462         ( $self->$resolve_bindinfo({ dbic_colname => $e->[0] }), $e->[1] )
1463       }
1464     }];
1465   }
1466
1467   ($sql, \@final_bind);
1468 }
1469
1470 sub _format_for_trace {
1471   #my ($self, $bind) = @_;
1472
1473   ### Turn @bind from something like this:
1474   ###   ( [ "artist", 1 ], [ \%attrs, 3 ] )
1475   ### to this:
1476   ###   ( "'1'", "'3'" )
1477
1478   map {
1479     defined( $_ && $_->[1] )
1480       ? qq{'$_->[1]'}
1481       : q{NULL}
1482   } @{$_[1] || []};
1483 }
1484
1485 sub _query_start {
1486   my ( $self, $sql, $bind ) = @_;
1487
1488   $self->debugobj->query_start( $sql, $self->_format_for_trace($bind) )
1489     if $self->debug;
1490 }
1491
1492 sub _query_end {
1493   my ( $self, $sql, $bind ) = @_;
1494
1495   $self->debugobj->query_end( $sql, $self->_format_for_trace($bind) )
1496     if $self->debug;
1497 }
1498
1499 my $sba_compat;
1500 sub _dbi_attrs_for_bind {
1501   my ($self, $ident, $bind) = @_;
1502
1503   if (! defined $sba_compat) {
1504     $self->_determine_driver;
1505     $sba_compat = $self->can('source_bind_attributes') == \&source_bind_attributes
1506       ? 0
1507       : 1
1508     ;
1509   }
1510
1511   my $sba_attrs;
1512   if ($sba_compat) {
1513     my $class = ref $self;
1514     carp_unique (
1515       "The source_bind_attributes() override in $class relies on a deprecated codepath. "
1516      .'You are strongly advised to switch your code to override bind_attribute_by_datatype() '
1517      .'instead. This legacy compat shim will also disappear some time before DBIC 0.09'
1518     );
1519
1520     my $sba_attrs = $self->source_bind_attributes
1521   }
1522
1523   my @attrs;
1524
1525   for (map { $_->[0] } @$bind) {
1526     push @attrs, do {
1527       if (exists $_->{dbd_attrs}) {
1528         $_->{dbd_attrs}
1529       }
1530       elsif($_->{sqlt_datatype}) {
1531         $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
1532       }
1533       elsif ($sba_attrs and $_->{dbic_colname}) {
1534         $sba_attrs->{$_->{dbic_colname}} || undef;
1535       }
1536       else {
1537         undef;  # always push something at this position
1538       }
1539     }
1540   }
1541
1542   return \@attrs;
1543 }
1544
1545 sub _execute {
1546   my ($self, $op, $ident, @args) = @_;
1547
1548   my ($sql, $bind) = $self->_prep_for_execute($op, $ident, \@args);
1549
1550   shift->dbh_do(    # retry over disconnects
1551     '_dbh_execute',
1552     $sql,
1553     $bind,
1554     $self->_dbi_attrs_for_bind($ident, $bind)
1555   );
1556 }
1557
1558 sub _dbh_execute {
1559   my ($self, undef, $sql, $bind, $bind_attrs) = @_;
1560
1561   $self->_query_start( $sql, $bind );
1562   my $sth = $self->_sth($sql);
1563
1564   for my $i (0 .. $#$bind) {
1565     if (ref $bind->[$i][1] eq 'SCALAR') {  # any scalarrefs are assumed to be bind_inouts
1566       $sth->bind_param_inout(
1567         $i + 1, # bind params counts are 1-based
1568         $bind->[$i][1],
1569         $bind->[$i][0]{dbd_size} || $self->_max_column_bytesize($bind->[$i][0]), # size
1570         $bind_attrs->[$i],
1571       );
1572     }
1573     else {
1574       $sth->bind_param(
1575         $i + 1,
1576         (ref $bind->[$i][1] and overload::Method($bind->[$i][1], '""'))
1577           ? "$bind->[$i][1]"
1578           : $bind->[$i][1]
1579         ,
1580         $bind_attrs->[$i],
1581       );
1582     }
1583   }
1584
1585   # Can this fail without throwing an exception anyways???
1586   my $rv = $sth->execute();
1587   $self->throw_exception(
1588     $sth->errstr || $sth->err || 'Unknown error: execute() returned false, but error flags were not set...'
1589   ) if !$rv;
1590
1591   $self->_query_end( $sql, $bind );
1592
1593   return (wantarray ? ($rv, $sth, @$bind) : $rv);
1594 }
1595
1596 sub _prefetch_autovalues {
1597   my ($self, $source, $to_insert) = @_;
1598
1599   my $colinfo = $source->columns_info;
1600
1601   my %values;
1602   for my $col (keys %$colinfo) {
1603     if (
1604       $colinfo->{$col}{auto_nextval}
1605         and
1606       (
1607         ! exists $to_insert->{$col}
1608           or
1609         ref $to_insert->{$col} eq 'SCALAR'
1610           or
1611         (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1612       )
1613     ) {
1614       $values{$col} = $self->_sequence_fetch(
1615         'NEXTVAL',
1616         ( $colinfo->{$col}{sequence} ||=
1617             $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
1618         ),
1619       );
1620     }
1621   }
1622
1623   \%values;
1624 }
1625
1626 sub insert {
1627   my ($self, $source, $to_insert) = @_;
1628
1629   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
1630
1631   # fuse the values, but keep a separate list of prefetched_values so that
1632   # they can be fused once again with the final return
1633   $to_insert = { %$to_insert, %$prefetched_values };
1634
1635   my $col_infos = $source->columns_info;
1636   my %pcols = map { $_ => 1 } $source->primary_columns;
1637   my %retrieve_cols;
1638   for my $col ($source->columns) {
1639     # nothing to retrieve when explicit values are supplied
1640     next if (defined $to_insert->{$col} and ! (
1641       ref $to_insert->{$col} eq 'SCALAR'
1642         or
1643       (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
1644     ));
1645
1646     # the 'scalar keys' is a trick to preserve the ->columns declaration order
1647     $retrieve_cols{$col} = scalar keys %retrieve_cols if (
1648       $pcols{$col}
1649         or
1650       $col_infos->{$col}{retrieve_on_insert}
1651     );
1652   };
1653
1654   my ($sqla_opts, @ir_container);
1655   if (%retrieve_cols and $self->_use_insert_returning) {
1656     $sqla_opts->{returning_container} = \@ir_container
1657       if $self->_use_insert_returning_bound;
1658
1659     $sqla_opts->{returning} = [
1660       sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
1661     ];
1662   }
1663
1664   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
1665
1666   my %returned_cols = %$to_insert;
1667   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
1668     @ir_container = try {
1669       local $SIG{__WARN__} = sub {};
1670       my @r = $sth->fetchrow_array;
1671       $sth->finish;
1672       @r;
1673     } unless @ir_container;
1674
1675     @returned_cols{@$retlist} = @ir_container if @ir_container;
1676   }
1677   else {
1678     # pull in PK if needed and then everything else
1679     if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
1680
1681       $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
1682         unless $self->can('last_insert_id');
1683
1684       my @pri_values = $self->last_insert_id($source, @missing_pri);
1685
1686       $self->throw_exception( "Can't get last insert id" )
1687         unless (@pri_values == @missing_pri);
1688
1689       @returned_cols{@missing_pri} = @pri_values;
1690       delete $retrieve_cols{$_} for @missing_pri;
1691     }
1692
1693     # if there is more left to pull
1694     if (%retrieve_cols) {
1695       $self->throw_exception(
1696         'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
1697       ) unless %pcols;
1698
1699       my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
1700
1701       my $cur = DBIx::Class::ResultSet->new($source, {
1702         where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
1703         select => \@left_to_fetch,
1704       })->cursor;
1705
1706       @returned_cols{@left_to_fetch} = $cur->next;
1707
1708       $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
1709         if scalar $cur->next;
1710     }
1711   }
1712
1713   return { %$prefetched_values, %returned_cols };
1714 }
1715
1716 sub insert_bulk {
1717   my ($self, $source, $cols, $data) = @_;
1718
1719   # FIXME - perhaps this is not even needed? does DBI stringify?
1720   #
1721   # forcibly stringify whatever is stringifiable
1722   for my $r (0 .. $#$data) {
1723     for my $c (0 .. $#{$data->[$r]}) {
1724       $data->[$r][$c] = "$data->[$r][$c]"
1725         if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
1726     }
1727   }
1728
1729   # check the data for consistency
1730   # report a sensible error on bad data
1731   #
1732   # also create a list of dynamic binds (ones that will be changing
1733   # for each row)
1734   my $dyn_bind_idx;
1735   for my $col_idx (0..$#$cols) {
1736
1737     # the first "row" is used as a point of reference
1738     my $reference_val = $data->[0][$col_idx];
1739     my $is_literal = ref $reference_val eq 'SCALAR';
1740     my $is_literal_bind = ( !$is_literal and (
1741       ref $reference_val eq 'REF'
1742         and
1743       ref $$reference_val eq 'ARRAY'
1744     ) );
1745
1746     $dyn_bind_idx->{$col_idx} = 1
1747       if (!$is_literal and !$is_literal_bind);
1748
1749     # use a closure for convenience (less to pass)
1750     my $bad_slice = sub {
1751       my ($msg, $slice_idx) = @_;
1752       $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
1753         $msg,
1754         $cols->[$col_idx],
1755         do {
1756           require Data::Dumper::Concise;
1757           local $Data::Dumper::Maxdepth = 2;
1758           Data::Dumper::Concise::Dumper ({
1759             map { $cols->[$_] =>
1760               $data->[$slice_idx][$_]
1761             } (0 .. $#$cols)
1762           }),
1763         }
1764       );
1765     };
1766
1767     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
1768       my $val = $data->[$row_idx][$col_idx];
1769
1770       if ($is_literal) {
1771         if (ref $val ne 'SCALAR') {
1772           $bad_slice->(
1773             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
1774             $row_idx
1775           );
1776         }
1777         elsif ($$val ne $$reference_val) {
1778           $bad_slice->(
1779             "Inconsistent literal SQL value (expecting \\'$$reference_val')",
1780             $row_idx
1781           );
1782         }
1783       }
1784       elsif ($is_literal_bind) {
1785         if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
1786           $bad_slice->(
1787             "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
1788             $row_idx
1789           );
1790         }
1791         elsif (${$val}->[0] ne ${$reference_val}->[0]) {
1792           $bad_slice->(
1793             "Inconsistent literal SQL-bind value (expecting \\['${$reference_val}->[0]', ... ])",
1794             $row_idx
1795           );
1796         }
1797       }
1798       elsif (ref $val) {
1799         if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
1800           $bad_slice->("Literal SQL found where a plain bind value is expected", $row_idx);
1801         }
1802         else {
1803           $bad_slice->("$val reference found where bind expected", $row_idx);
1804         }
1805       }
1806     }
1807   }
1808
1809   # Get the sql with bind values interpolated where necessary. For dynamic
1810   # binds convert the values of the first row into a literal+bind combo, with
1811   # extra positional info in the bind attr hashref. This will allow us to match
1812   # the order properly, and is so contrived because a user-supplied literal
1813   # bind (or something else specific to a resultsource and/or storage driver)
1814   # can inject extra binds along the way, so one can't rely on "shift
1815   # positions" ordering at all. Also we can't just hand SQLA a set of some
1816   # known "values" (e.g. hashrefs that can be later matched up by address),
1817   # because we want to supply a real value on which perhaps e.g. datatype
1818   # checks will be performed
1819   my ($sql, $proto_bind) = $self->_prep_for_execute (
1820     'insert',
1821     $source,
1822     [ { map { $cols->[$_] => $dyn_bind_idx->{$_}
1823       ? \[ '?', [
1824           { dbic_colname => $cols->[$_], _bind_data_slice_idx => $_ }
1825             =>
1826           $data->[0][$_]
1827         ] ]
1828       : $data->[0][$_]
1829     } (0..$#$cols) } ],
1830   );
1831
1832   if (! @$proto_bind and keys %$dyn_bind_idx) {
1833     # if the bindlist is empty and we had some dynamic binds, this means the
1834     # storage ate them away (e.g. the NoBindVars component) and interpolated
1835     # them directly into the SQL. This obviosly can't be good for multi-inserts
1836     $self->throw_exception('Cannot insert_bulk without support for placeholders');
1837   }
1838
1839   # neither _execute_array, nor _execute_inserts_with_no_binds are
1840   # atomic (even if _execute _array is a single call). Thus a safety
1841   # scope guard
1842   my $guard = $self->txn_scope_guard;
1843
1844   $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
1845   my $sth = $self->_sth($sql);
1846   my $rv = do {
1847     if (@$proto_bind) {
1848       # proto bind contains the information on which pieces of $data to pull
1849       # $cols is passed in only for prettier error-reporting
1850       $self->_execute_array( $source, $sth, $proto_bind, $cols, $data );
1851     }
1852     else {
1853       # bind_param_array doesn't work if there are no binds
1854       $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
1855     }
1856   };
1857
1858   $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
1859
1860   $guard->commit;
1861
1862   return (wantarray ? ($rv, $sth, @$proto_bind) : $rv);
1863 }
1864
1865 sub _execute_array {
1866   my ($self, $source, $sth, $proto_bind, $cols, $data, @extra) = @_;
1867
1868   ## This must be an arrayref, else nothing works!
1869   my $tuple_status = [];
1870
1871   my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
1872
1873   # Bind the values by column slices
1874   for my $i (0 .. $#$proto_bind) {
1875     my $data_slice_idx = (
1876       ref $proto_bind->[$i][0] eq 'HASH'
1877         and
1878       exists $proto_bind->[$i][0]{_bind_data_slice_idx}
1879     ) ? $proto_bind->[$i][0]{_bind_data_slice_idx} : undef;
1880
1881     $sth->bind_param_array(
1882       $i+1, # DBI bind indexes are 1-based
1883       defined $data_slice_idx
1884         # either get a "column" of dynamic values, or just repeat the same
1885         # bind over and over
1886         ? [ map { $_->[$data_slice_idx] } @$data ]
1887         : [ ($proto_bind->[$i][1]) x @$data ]
1888       ,
1889       defined $bind_attrs->[$i] ? $bind_attrs->[$i] : (), # some DBDs throw up when given an undef
1890     );
1891   }
1892
1893   my ($rv, $err);
1894   try {
1895     $rv = $self->_dbh_execute_array($sth, $tuple_status, @extra);
1896   }
1897   catch {
1898     $err = shift;
1899   };
1900
1901   # Not all DBDs are create equal. Some throw on error, some return
1902   # an undef $rv, and some set $sth->err - try whatever we can
1903   $err = ($sth->errstr || 'UNKNOWN ERROR ($sth->errstr is unset)') if (
1904     ! defined $err
1905       and
1906     ( !defined $rv or $sth->err )
1907   );
1908
1909   # Statement must finish even if there was an exception.
1910   try {
1911     $sth->finish
1912   }
1913   catch {
1914     $err = shift unless defined $err
1915   };
1916
1917   if (defined $err) {
1918     my $i = 0;
1919     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
1920
1921     $self->throw_exception("Unexpected populate error: $err")
1922       if ($i > $#$tuple_status);
1923
1924     require Data::Dumper::Concise;
1925     $self->throw_exception(sprintf "execute_array() aborted with '%s' at populate slice:\n%s",
1926       ($tuple_status->[$i][1] || $err),
1927       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
1928     );
1929   }
1930
1931   return $rv;
1932 }
1933
1934 sub _dbh_execute_array {
1935   #my ($self, $sth, $tuple_status, @extra) = @_;
1936   return $_[1]->execute_array({ArrayTupleStatus => $_[2]});
1937 }
1938
1939 sub _dbh_execute_inserts_with_no_binds {
1940   my ($self, $sth, $count) = @_;
1941
1942   my $err;
1943   try {
1944     my $dbh = $self->_get_dbh;
1945     local $dbh->{RaiseError} = 1;
1946     local $dbh->{PrintError} = 0;
1947
1948     $sth->execute foreach 1..$count;
1949   }
1950   catch {
1951     $err = shift;
1952   };
1953
1954   # Make sure statement is finished even if there was an exception.
1955   try {
1956     $sth->finish
1957   }
1958   catch {
1959     $err = shift unless defined $err;
1960   };
1961
1962   $self->throw_exception($err) if defined $err;
1963
1964   return $count;
1965 }
1966
1967 sub update {
1968   #my ($self, $source, @args) = @_;
1969   shift->_execute('update', @_);
1970 }
1971
1972
1973 sub delete {
1974   #my ($self, $source, @args) = @_;
1975   shift->_execute('delete', @_);
1976 }
1977
1978 # We were sent here because the $rs contains a complex search
1979 # which will require a subquery to select the correct rows
1980 # (i.e. joined or limited resultsets, or non-introspectable conditions)
1981 #
1982 # Generating a single PK column subquery is trivial and supported
1983 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
1984 # Look at _multipk_update_delete()
1985 sub _subq_update_delete {
1986   my $self = shift;
1987   my ($rs, $op, $values) = @_;
1988
1989   my $rsrc = $rs->result_source;
1990
1991   # quick check if we got a sane rs on our hands
1992   my @pcols = $rsrc->_pri_cols;
1993
1994   my $sel = $rs->_resolved_attrs->{select};
1995   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
1996
1997   if (
1998       join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
1999         ne
2000       join ("\x00", sort @$sel )
2001   ) {
2002     $self->throw_exception (
2003       '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
2004     );
2005   }
2006
2007   if (@pcols == 1) {
2008     return $self->$op (
2009       $rsrc,
2010       $op eq 'update' ? $values : (),
2011       { $pcols[0] => { -in => $rs->as_query } },
2012     );
2013   }
2014
2015   else {
2016     return $self->_multipk_update_delete (@_);
2017   }
2018 }
2019
2020 # ANSI SQL does not provide a reliable way to perform a multicol-PK
2021 # resultset update/delete involving subqueries. So by default resort
2022 # to simple (and inefficient) delete_all style per-row opearations,
2023 # while allowing specific storages to override this with a faster
2024 # implementation.
2025 #
2026 sub _multipk_update_delete {
2027   return shift->_per_row_update_delete (@_);
2028 }
2029
2030 # This is the default loop used to delete/update rows for multi PK
2031 # resultsets, and used by mysql exclusively (because it can't do anything
2032 # else).
2033 #
2034 # We do not use $row->$op style queries, because resultset update/delete
2035 # is not expected to cascade (this is what delete_all/update_all is for).
2036 #
2037 # There should be no race conditions as the entire operation is rolled
2038 # in a transaction.
2039 #
2040 sub _per_row_update_delete {
2041   my $self = shift;
2042   my ($rs, $op, $values) = @_;
2043
2044   my $rsrc = $rs->result_source;
2045   my @pcols = $rsrc->_pri_cols;
2046
2047   my $guard = $self->txn_scope_guard;
2048
2049   # emulate the return value of $sth->execute for non-selects
2050   my $row_cnt = '0E0';
2051
2052   my $subrs_cur = $rs->cursor;
2053   my @all_pk = $subrs_cur->all;
2054   for my $pks ( @all_pk) {
2055
2056     my $cond;
2057     for my $i (0.. $#pcols) {
2058       $cond->{$pcols[$i]} = $pks->[$i];
2059     }
2060
2061     $self->$op (
2062       $rsrc,
2063       $op eq 'update' ? $values : (),
2064       $cond,
2065     );
2066
2067     $row_cnt++;
2068   }
2069
2070   $guard->commit;
2071
2072   return $row_cnt;
2073 }
2074
2075 sub _select {
2076   my $self = shift;
2077   $self->_execute($self->_select_args(@_));
2078 }
2079
2080 sub _select_args_to_query {
2081   my $self = shift;
2082
2083   # my ($op, $ident, $select, $cond, $rs_attrs, $rows, $offset)
2084   #  = $self->_select_args($ident, $select, $cond, $attrs);
2085   my ($op, $ident, @args) =
2086     $self->_select_args(@_);
2087
2088   # my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
2089   my ($sql, $prepared_bind) = $self->_gen_sql_bind($op, $ident, \@args);
2090   $prepared_bind ||= [];
2091
2092   return wantarray
2093     ? ($sql, $prepared_bind)
2094     : \[ "($sql)", @$prepared_bind ]
2095   ;
2096 }
2097
2098 sub _select_args {
2099   my ($self, $ident, $select, $where, $attrs) = @_;
2100
2101   my $sql_maker = $self->sql_maker;
2102   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
2103
2104   $attrs = {
2105     %$attrs,
2106     select => $select,
2107     from => $ident,
2108     where => $where,
2109     $rs_alias && $alias2source->{$rs_alias}
2110       ? ( _rsroot_rsrc => $alias2source->{$rs_alias} )
2111       : ()
2112     ,
2113   };
2114
2115   # Sanity check the attributes (SQLMaker does it too, but
2116   # in case of a software_limit we'll never reach there)
2117   if (defined $attrs->{offset}) {
2118     $self->throw_exception('A supplied offset attribute must be a non-negative integer')
2119       if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
2120   }
2121
2122   if (defined $attrs->{rows}) {
2123     $self->throw_exception("The rows attribute must be a positive integer if present")
2124       if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
2125   }
2126   elsif ($attrs->{offset}) {
2127     # MySQL actually recommends this approach.  I cringe.
2128     $attrs->{rows} = $sql_maker->__max_int;
2129   }
2130
2131   my @limit;
2132
2133   # see if we need to tear the prefetch apart otherwise delegate the limiting to the
2134   # storage, unless software limit was requested
2135   if (
2136     #limited has_many
2137     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
2138        ||
2139     # grouped prefetch (to satisfy group_by == select)
2140     ( $attrs->{group_by}
2141         &&
2142       @{$attrs->{group_by}}
2143         &&
2144       $attrs->{_prefetch_selector_range}
2145     )
2146   ) {
2147     ($ident, $select, $where, $attrs)
2148       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
2149   }
2150   elsif (! $attrs->{software_limit} ) {
2151     push @limit, (
2152       $attrs->{rows} || (),
2153       $attrs->{offset} || (),
2154     );
2155   }
2156
2157   # try to simplify the joinmap further (prune unreferenced type-single joins)
2158   $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
2159
2160 ###
2161   # This would be the point to deflate anything found in $where
2162   # (and leave $attrs->{bind} intact). Problem is - inflators historically
2163   # expect a row object. And all we have is a resultsource (it is trivial
2164   # to extract deflator coderefs via $alias2source above).
2165   #
2166   # I don't see a way forward other than changing the way deflators are
2167   # invoked, and that's just bad...
2168 ###
2169
2170   return ('select', $ident, $select, $where, $attrs, @limit);
2171 }
2172
2173 # Returns a counting SELECT for a simple count
2174 # query. Abstracted so that a storage could override
2175 # this to { count => 'firstcol' } or whatever makes
2176 # sense as a performance optimization
2177 sub _count_select {
2178   #my ($self, $source, $rs_attrs) = @_;
2179   return { count => '*' };
2180 }
2181
2182 sub source_bind_attributes {
2183   shift->throw_exception(
2184     'source_bind_attributes() was never meant to be a callable public method - '
2185    .'please contact the DBIC dev-team and describe your use case so that a reasonable '
2186    .'solution can be provided'
2187    ."\nhttp://search.cpan.org/dist/DBIx-Class/lib/DBIx/Class.pm#GETTING_HELP/SUPPORT"
2188   );
2189 }
2190
2191 =head2 select
2192
2193 =over 4
2194
2195 =item Arguments: $ident, $select, $condition, $attrs
2196
2197 =back
2198
2199 Handle a SQL select statement.
2200
2201 =cut
2202
2203 sub select {
2204   my $self = shift;
2205   my ($ident, $select, $condition, $attrs) = @_;
2206   return $self->cursor_class->new($self, \@_, $attrs);
2207 }
2208
2209 sub select_single {
2210   my $self = shift;
2211   my ($rv, $sth, @bind) = $self->_select(@_);
2212   my @row = $sth->fetchrow_array;
2213   my @nextrow = $sth->fetchrow_array if @row;
2214   if(@row && @nextrow) {
2215     carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
2216   }
2217   # Need to call finish() to work round broken DBDs
2218   $sth->finish();
2219   return @row;
2220 }
2221
2222 =head2 sql_limit_dialect
2223
2224 This is an accessor for the default SQL limit dialect used by a particular
2225 storage driver. Can be overridden by supplying an explicit L</limit_dialect>
2226 to L<DBIx::Class::Schema/connect>. For a list of available limit dialects
2227 see L<DBIx::Class::SQLMaker::LimitDialects>.
2228
2229 =head2 sth
2230
2231 =over 4
2232
2233 =item Arguments: $sql
2234
2235 =back
2236
2237 Returns a L<DBI> sth (statement handle) for the supplied SQL.
2238
2239 =cut
2240
2241 sub _dbh_sth {
2242   my ($self, $dbh, $sql) = @_;
2243
2244   # 3 is the if_active parameter which avoids active sth re-use
2245   my $sth = $self->disable_sth_caching
2246     ? $dbh->prepare($sql)
2247     : $dbh->prepare_cached($sql, {}, 3);
2248
2249   # XXX You would think RaiseError would make this impossible,
2250   #  but apparently that's not true :(
2251   $self->throw_exception(
2252     $dbh->errstr
2253       ||
2254     sprintf( "\$dbh->prepare() of '%s' through %s failed *silently* without "
2255             .'an exception and/or setting $dbh->errstr',
2256       length ($sql) > 20
2257         ? substr($sql, 0, 20) . '...'
2258         : $sql
2259       ,
2260       'DBD::' . $dbh->{Driver}{Name},
2261     )
2262   ) if !$sth;
2263
2264   $sth;
2265 }
2266
2267 sub sth {
2268   carp_unique 'sth was mistakenly marked/documented as public, stop calling it (will be removed before DBIC v0.09)';
2269   shift->_sth(@_);
2270 }
2271
2272 sub _sth {
2273   my ($self, $sql) = @_;
2274   $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
2275 }
2276
2277 sub _dbh_columns_info_for {
2278   my ($self, $dbh, $table) = @_;
2279
2280   if ($dbh->can('column_info')) {
2281     my %result;
2282     my $caught;
2283     try {
2284       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
2285       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
2286       $sth->execute();
2287       while ( my $info = $sth->fetchrow_hashref() ){
2288         my %column_info;
2289         $column_info{data_type}   = $info->{TYPE_NAME};
2290         $column_info{size}      = $info->{COLUMN_SIZE};
2291         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
2292         $column_info{default_value} = $info->{COLUMN_DEF};
2293         my $col_name = $info->{COLUMN_NAME};
2294         $col_name =~ s/^\"(.*)\"$/$1/;
2295
2296         $result{$col_name} = \%column_info;
2297       }
2298     } catch {
2299       $caught = 1;
2300     };
2301     return \%result if !$caught && scalar keys %result;
2302   }
2303
2304   my %result;
2305   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
2306   $sth->execute;
2307   my @columns = @{$sth->{NAME_lc}};
2308   for my $i ( 0 .. $#columns ){
2309     my %column_info;
2310     $column_info{data_type} = $sth->{TYPE}->[$i];
2311     $column_info{size} = $sth->{PRECISION}->[$i];
2312     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
2313
2314     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
2315       $column_info{data_type} = $1;
2316       $column_info{size}    = $2;
2317     }
2318
2319     $result{$columns[$i]} = \%column_info;
2320   }
2321   $sth->finish;
2322
2323   foreach my $col (keys %result) {
2324     my $colinfo = $result{$col};
2325     my $type_num = $colinfo->{data_type};
2326     my $type_name;
2327     if(defined $type_num && $dbh->can('type_info')) {
2328       my $type_info = $dbh->type_info($type_num);
2329       $type_name = $type_info->{TYPE_NAME} if $type_info;
2330       $colinfo->{data_type} = $type_name if $type_name;
2331     }
2332   }
2333
2334   return \%result;
2335 }
2336
2337 sub columns_info_for {
2338   my ($self, $table) = @_;
2339   $self->_dbh_columns_info_for ($self->_get_dbh, $table);
2340 }
2341
2342 =head2 last_insert_id
2343
2344 Return the row id of the last insert.
2345
2346 =cut
2347
2348 sub _dbh_last_insert_id {
2349     my ($self, $dbh, $source, $col) = @_;
2350
2351     my $id = try { $dbh->last_insert_id (undef, undef, $source->name, $col) };
2352
2353     return $id if defined $id;
2354
2355     my $class = ref $self;
2356     $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
2357 }
2358
2359 sub last_insert_id {
2360   my $self = shift;
2361   $self->_dbh_last_insert_id ($self->_dbh, @_);
2362 }
2363
2364 =head2 _native_data_type
2365
2366 =over 4
2367
2368 =item Arguments: $type_name
2369
2370 =back
2371
2372 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
2373 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
2374 L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
2375
2376 The default implementation returns C<undef>, implement in your Storage driver if
2377 you need this functionality.
2378
2379 Should map types from other databases to the native RDBMS type, for example
2380 C<VARCHAR2> to C<VARCHAR>.
2381
2382 Types with modifiers should map to the underlying data type. For example,
2383 C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
2384
2385 Composite types should map to the container type, for example
2386 C<ENUM(foo,bar,baz)> becomes C<ENUM>.
2387
2388 =cut
2389
2390 sub _native_data_type {
2391   #my ($self, $data_type) = @_;
2392   return undef
2393 }
2394
2395 # Check if placeholders are supported at all
2396 sub _determine_supports_placeholders {
2397   my $self = shift;
2398   my $dbh  = $self->_get_dbh;
2399
2400   # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
2401   # but it is inaccurate more often than not
2402   return try {
2403     local $dbh->{PrintError} = 0;
2404     local $dbh->{RaiseError} = 1;
2405     $dbh->do('select ?', {}, 1);
2406     1;
2407   }
2408   catch {
2409     0;
2410   };
2411 }
2412
2413 # Check if placeholders bound to non-string types throw exceptions
2414 #
2415 sub _determine_supports_typeless_placeholders {
2416   my $self = shift;
2417   my $dbh  = $self->_get_dbh;
2418
2419   return try {
2420     local $dbh->{PrintError} = 0;
2421     local $dbh->{RaiseError} = 1;
2422     # this specifically tests a bind that is NOT a string
2423     $dbh->do('select 1 where 1 = ?', {}, 1);
2424     1;
2425   }
2426   catch {
2427     0;
2428   };
2429 }
2430
2431 =head2 sqlt_type
2432
2433 Returns the database driver name.
2434
2435 =cut
2436
2437 sub sqlt_type {
2438   shift->_get_dbh->{Driver}->{Name};
2439 }
2440
2441 =head2 bind_attribute_by_data_type
2442
2443 Given a datatype from column info, returns a database specific bind
2444 attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
2445 let the database planner just handle it.
2446
2447 Generally only needed for special case column types, like bytea in postgres.
2448
2449 =cut
2450
2451 sub bind_attribute_by_data_type {
2452     return;
2453 }
2454
2455 =head2 is_datatype_numeric
2456
2457 Given a datatype from column_info, returns a boolean value indicating if
2458 the current RDBMS considers it a numeric value. This controls how
2459 L<DBIx::Class::Row/set_column> decides whether to mark the column as
2460 dirty - when the datatype is deemed numeric a C<< != >> comparison will
2461 be performed instead of the usual C<eq>.
2462
2463 =cut
2464
2465 sub is_datatype_numeric {
2466   #my ($self, $dt) = @_;
2467
2468   return 0 unless $_[1];
2469
2470   $_[1] =~ /^ (?:
2471     numeric | int(?:eger)? | (?:tiny|small|medium|big)int | dec(?:imal)? | real | float | double (?: \s+ precision)? | (?:big)?serial
2472   ) $/ix;
2473 }
2474
2475
2476 =head2 create_ddl_dir
2477
2478 =over 4
2479
2480 =item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
2481
2482 =back
2483
2484 Creates a SQL file based on the Schema, for each of the specified
2485 database engines in C<\@databases> in the given directory.
2486 (note: specify L<SQL::Translator> names, not L<DBI> driver names).
2487
2488 Given a previous version number, this will also create a file containing
2489 the ALTER TABLE statements to transform the previous schema into the
2490 current one. Note that these statements may contain C<DROP TABLE> or
2491 C<DROP COLUMN> statements that can potentially destroy data.
2492
2493 The file names are created using the C<ddl_filename> method below, please
2494 override this method in your schema if you would like a different file
2495 name format. For the ALTER file, the same format is used, replacing
2496 $version in the name with "$preversion-$version".
2497
2498 See L<SQL::Translator/METHODS> for a list of values for C<\%sqlt_args>.
2499 The most common value for this would be C<< { add_drop_table => 1 } >>
2500 to have the SQL produced include a C<DROP TABLE> statement for each table
2501 created. For quoting purposes supply C<quote_table_names> and
2502 C<quote_field_names>.
2503
2504 If no arguments are passed, then the following default values are assumed:
2505
2506 =over 4
2507
2508 =item databases  - ['MySQL', 'SQLite', 'PostgreSQL']
2509
2510 =item version    - $schema->schema_version
2511
2512 =item directory  - './'
2513
2514 =item preversion - <none>
2515
2516 =back
2517
2518 By default, C<\%sqlt_args> will have
2519
2520  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
2521
2522 merged with the hash passed in. To disable any of those features, pass in a
2523 hashref like the following
2524
2525  { ignore_constraint_names => 0, # ... other options }
2526
2527
2528 WARNING: You are strongly advised to check all SQL files created, before applying
2529 them.
2530
2531 =cut
2532
2533 sub create_ddl_dir {
2534   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
2535
2536   unless ($dir) {
2537     carp "No directory given, using ./\n";
2538     $dir = './';
2539   } else {
2540       -d $dir
2541         or
2542       (require File::Path and File::Path::make_path ("$dir"))  # make_path does not like objects (i.e. Path::Class::Dir)
2543         or
2544       $self->throw_exception(
2545         "Failed to create '$dir': " . ($! || $@ || 'error unknown')
2546       );
2547   }
2548
2549   $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
2550
2551   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
2552   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
2553
2554   my $schema_version = $schema->schema_version || '1.x';
2555   $version ||= $schema_version;
2556
2557   $sqltargs = {
2558     add_drop_table => 1,
2559     ignore_constraint_names => 1,
2560     ignore_index_names => 1,
2561     %{$sqltargs || {}}
2562   };
2563
2564   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
2565     $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2566   }
2567
2568   my $sqlt = SQL::Translator->new( $sqltargs );
2569
2570   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
2571   my $sqlt_schema = $sqlt->translate({ data => $schema })
2572     or $self->throw_exception ($sqlt->error);
2573
2574   foreach my $db (@$databases) {
2575     $sqlt->reset();
2576     $sqlt->{schema} = $sqlt_schema;
2577     $sqlt->producer($db);
2578
2579     my $file;
2580     my $filename = $schema->ddl_filename($db, $version, $dir);
2581     if (-e $filename && ($version eq $schema_version )) {
2582       # if we are dumping the current version, overwrite the DDL
2583       carp "Overwriting existing DDL file - $filename";
2584       unlink($filename);
2585     }
2586
2587     my $output = $sqlt->translate;
2588     if(!$output) {
2589       carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
2590       next;
2591     }
2592     if(!open($file, ">$filename")) {
2593       $self->throw_exception("Can't open $filename for writing ($!)");
2594       next;
2595     }
2596     print $file $output;
2597     close($file);
2598
2599     next unless ($preversion);
2600
2601     require SQL::Translator::Diff;
2602
2603     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
2604     if(!-e $prefilename) {
2605       carp("No previous schema file found ($prefilename)");
2606       next;
2607     }
2608
2609     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
2610     if(-e $difffile) {
2611       carp("Overwriting existing diff file - $difffile");
2612       unlink($difffile);
2613     }
2614
2615     my $source_schema;
2616     {
2617       my $t = SQL::Translator->new($sqltargs);
2618       $t->debug( 0 );
2619       $t->trace( 0 );
2620
2621       $t->parser( $db )
2622         or $self->throw_exception ($t->error);
2623
2624       my $out = $t->translate( $prefilename )
2625         or $self->throw_exception ($t->error);
2626
2627       $source_schema = $t->schema;
2628
2629       $source_schema->name( $prefilename )
2630         unless ( $source_schema->name );
2631     }
2632
2633     # The "new" style of producers have sane normalization and can support
2634     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
2635     # And we have to diff parsed SQL against parsed SQL.
2636     my $dest_schema = $sqlt_schema;
2637
2638     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
2639       my $t = SQL::Translator->new($sqltargs);
2640       $t->debug( 0 );
2641       $t->trace( 0 );
2642
2643       $t->parser( $db )
2644         or $self->throw_exception ($t->error);
2645
2646       my $out = $t->translate( $filename )
2647         or $self->throw_exception ($t->error);
2648
2649       $dest_schema = $t->schema;
2650
2651       $dest_schema->name( $filename )
2652         unless $dest_schema->name;
2653     }
2654
2655     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
2656                                                   $dest_schema,   $db,
2657                                                   $sqltargs
2658                                                  );
2659     if(!open $file, ">$difffile") {
2660       $self->throw_exception("Can't write to $difffile ($!)");
2661       next;
2662     }
2663     print $file $diff;
2664     close($file);
2665   }
2666 }
2667
2668 =head2 deployment_statements
2669
2670 =over 4
2671
2672 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
2673
2674 =back
2675
2676 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
2677
2678 The L<SQL::Translator> (not L<DBI>) database driver name can be explicitly
2679 provided in C<$type>, otherwise the result of L</sqlt_type> is used as default.
2680
2681 C<$directory> is used to return statements from files in a previously created
2682 L</create_ddl_dir> directory and is optional. The filenames are constructed
2683 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
2684
2685 If no C<$directory> is specified then the statements are constructed on the
2686 fly using L<SQL::Translator> and C<$version> is ignored.
2687
2688 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
2689
2690 =cut
2691
2692 sub deployment_statements {
2693   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
2694   $type ||= $self->sqlt_type;
2695   $version ||= $schema->schema_version || '1.x';
2696   $dir ||= './';
2697   my $filename = $schema->ddl_filename($type, $version, $dir);
2698   if(-f $filename)
2699   {
2700       # FIXME replace this block when a proper sane sql parser is available
2701       my $file;
2702       open($file, "<$filename")
2703         or $self->throw_exception("Can't open $filename ($!)");
2704       my @rows = <$file>;
2705       close($file);
2706       return join('', @rows);
2707   }
2708
2709   unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
2710     $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
2711   }
2712
2713   # sources needs to be a parser arg, but for simplicty allow at top level
2714   # coming in
2715   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
2716       if exists $sqltargs->{sources};
2717
2718   my $tr = SQL::Translator->new(
2719     producer => "SQL::Translator::Producer::${type}",
2720     %$sqltargs,
2721     parser => 'SQL::Translator::Parser::DBIx::Class',
2722     data => $schema,
2723   );
2724
2725   my @ret;
2726   if (wantarray) {
2727     @ret = $tr->translate;
2728   }
2729   else {
2730     $ret[0] = $tr->translate;
2731   }
2732
2733   $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
2734     unless (@ret && defined $ret[0]);
2735
2736   return wantarray ? @ret : $ret[0];
2737 }
2738
2739 # FIXME deploy() currently does not accurately report sql errors
2740 # Will always return true while errors are warned
2741 sub deploy {
2742   my ($self, $schema, $type, $sqltargs, $dir) = @_;
2743   my $deploy = sub {
2744     my $line = shift;
2745     return if(!$line);
2746     return if($line =~ /^--/);
2747     # next if($line =~ /^DROP/m);
2748     return if($line =~ /^BEGIN TRANSACTION/m);
2749     return if($line =~ /^COMMIT/m);
2750     return if $line =~ /^\s+$/; # skip whitespace only
2751     $self->_query_start($line);
2752     try {
2753       # do a dbh_do cycle here, as we need some error checking in
2754       # place (even though we will ignore errors)
2755       $self->dbh_do (sub { $_[1]->do($line) });
2756     } catch {
2757       carp qq{$_ (running "${line}")};
2758     };
2759     $self->_query_end($line);
2760   };
2761   my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
2762   if (@statements > 1) {
2763     foreach my $statement (@statements) {
2764       $deploy->( $statement );
2765     }
2766   }
2767   elsif (@statements == 1) {
2768     # split on single line comments and end of statements
2769     foreach my $line ( split(/\s*--.*\n|;\n/, $statements[0])) {
2770       $deploy->( $line );
2771     }
2772   }
2773 }
2774
2775 =head2 datetime_parser
2776
2777 Returns the datetime parser class
2778
2779 =cut
2780
2781 sub datetime_parser {
2782   my $self = shift;
2783   return $self->{datetime_parser} ||= do {
2784     $self->build_datetime_parser(@_);
2785   };
2786 }
2787
2788 =head2 datetime_parser_type
2789
2790 Defines the datetime parser class - currently defaults to L<DateTime::Format::MySQL>
2791
2792 =head2 build_datetime_parser
2793
2794 See L</datetime_parser>
2795
2796 =cut
2797
2798 sub build_datetime_parser {
2799   my $self = shift;
2800   my $type = $self->datetime_parser_type(@_);
2801   return $type;
2802 }
2803
2804
2805 =head2 is_replicating
2806
2807 A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
2808 replicate from a master database.  Default is undef, which is the result
2809 returned by databases that don't support replication.
2810
2811 =cut
2812
2813 sub is_replicating {
2814     return;
2815
2816 }
2817
2818 =head2 lag_behind_master
2819
2820 Returns a number that represents a certain amount of lag behind a master db
2821 when a given storage is replicating.  The number is database dependent, but
2822 starts at zero and increases with the amount of lag. Default in undef
2823
2824 =cut
2825
2826 sub lag_behind_master {
2827     return;
2828 }
2829
2830 =head2 relname_to_table_alias
2831
2832 =over 4
2833
2834 =item Arguments: $relname, $join_count
2835
2836 =back
2837
2838 L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
2839 queries.
2840
2841 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
2842 way these aliases are named.
2843
2844 The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
2845 otherwise C<"$relname">.
2846
2847 =cut
2848
2849 sub relname_to_table_alias {
2850   my ($self, $relname, $join_count) = @_;
2851
2852   my $alias = ($join_count && $join_count > 1 ?
2853     join('_', $relname, $join_count) : $relname);
2854
2855   return $alias;
2856 }
2857
2858 # The size in bytes to use for DBI's ->bind_param_inout, this is the generic
2859 # version and it may be necessary to amend or override it for a specific storage
2860 # if such binds are necessary.
2861 sub _max_column_bytesize {
2862   my ($self, $attr) = @_;
2863
2864   my $max_size;
2865
2866   if ($attr->{sqlt_datatype}) {
2867     my $data_type = lc($attr->{sqlt_datatype});
2868
2869     if ($attr->{sqlt_size}) {
2870
2871       # String/sized-binary types
2872       if ($data_type =~ /^(?:
2873           l? (?:var)? char(?:acter)? (?:\s*varying)?
2874             |
2875           (?:var)? binary (?:\s*varying)? 
2876             |
2877           raw
2878         )\b/x
2879       ) {
2880         $max_size = $attr->{sqlt_size};
2881       }
2882       # Other charset/unicode types, assume scale of 4
2883       elsif ($data_type =~ /^(?:
2884           national \s* character (?:\s*varying)?
2885             |
2886           nchar
2887             |
2888           univarchar
2889             |
2890           nvarchar
2891         )\b/x
2892       ) {
2893         $max_size = $attr->{sqlt_size} * 4;
2894       }
2895     }
2896
2897     if (!$max_size and !$self->_is_lob_type($data_type)) {
2898       $max_size = 100 # for all other (numeric?) datatypes
2899     }
2900   }
2901
2902   $max_size || $self->_dbic_connect_attributes->{LongReadLen} || $self->_get_dbh->{LongReadLen} || 8000;
2903 }
2904
2905 # Determine if a data_type is some type of BLOB
2906 sub _is_lob_type {
2907   my ($self, $data_type) = @_;
2908   $data_type && ($data_type =~ /lob|bfile|text|image|bytea|memo/i
2909     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary
2910                                   |varchar|character\s*varying|nvarchar
2911                                   |national\s*character\s*varying))?\z/xi);
2912 }
2913
2914 sub _is_binary_lob_type {
2915   my ($self, $data_type) = @_;
2916   $data_type && ($data_type =~ /blob|bfile|image|bytea/i
2917     || $data_type =~ /^long(?:\s+(?:raw|bit\s*varying|varbit|binary))?\z/xi);
2918 }
2919
2920 sub _is_text_lob_type {
2921   my ($self, $data_type) = @_;
2922   $data_type && ($data_type =~ /^(?:clob|memo)\z/i
2923     || $data_type =~ /^long(?:\s+(?:varchar|character\s*varying|nvarchar
2924                         |national\s*character\s*varying))\z/xi);
2925 }
2926
2927 1;
2928
2929 =head1 USAGE NOTES
2930
2931 =head2 DBIx::Class and AutoCommit
2932
2933 DBIx::Class can do some wonderful magic with handling exceptions,
2934 disconnections, and transactions when you use C<< AutoCommit => 1 >>
2935 (the default) combined with L<txn_do|DBIx::Class::Storage/txn_do> for
2936 transaction support.
2937
2938 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
2939 in an assumed transaction between commits, and you're telling us you'd
2940 like to manage that manually.  A lot of the magic protections offered by
2941 this module will go away.  We can't protect you from exceptions due to database
2942 disconnects because we don't know anything about how to restart your
2943 transactions.  You're on your own for handling all sorts of exceptional
2944 cases if you choose the C<< AutoCommit => 0 >> path, just as you would
2945 be with raw DBI.
2946
2947
2948 =head1 AUTHORS
2949
2950 Matt S. Trout <mst@shadowcatsystems.co.uk>
2951
2952 Andy Grundman <andy@hybridized.org>
2953
2954 =head1 LICENSE
2955
2956 You may distribute this code under the same terms as Perl itself.
2957
2958 =cut