* Converted some of the test cases to use SQL::Abstract::Test.
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
CommitLineData
8b445e33 1package DBIx::Class::Storage::DBI;
e673f011 2# -*- mode: cperl; cperl-indent-level: 2 -*-
8b445e33 3
a62cf8d4 4use base 'DBIx::Class::Storage';
5
eda28767 6use strict;
20a2c954 7use warnings;
550adccc 8use Carp::Clan qw/^DBIx::Class/;
8b445e33 9use DBI;
aeaf3ce2 10use SQL::Abstract::Limit;
28927b50 11use DBIx::Class::Storage::DBI::Cursor;
4c248161 12use DBIx::Class::Storage::Statistics;
664612fb 13use Scalar::Util qw/blessed weaken/;
046ad905 14
541df64a 15__PACKAGE__->mk_group_accessors('simple' =>
16 qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
92fe2181 17 _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
046ad905 18);
19
92fe2181 20# the values for these accessors are picked out (and deleted) from
21# the attribute hashref passed to connect_info
22my @storage_options = qw/
23 on_connect_do on_disconnect_do disable_sth_caching unsafe auto_savepoint
24/;
25__PACKAGE__->mk_group_accessors('simple' => @storage_options);
26
27
28# default cursor class, overridable in connect_info attributes
e4eb8ee1 29__PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
30
95ba7ee4 31__PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
32__PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
33
bd7efd39 34BEGIN {
35
ae5a51b5 36package # Hide from PAUSE
37 DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
bd7efd39 38
39use base qw/SQL::Abstract::Limit/;
40
2cc3a7be 41# This prevents the caching of $dbh in S::A::L, I believe
42sub new {
43 my $self = shift->SUPER::new(@_);
44
45 # If limit_dialect is a ref (like a $dbh), go ahead and replace
46 # it with what it resolves to:
47 $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
48 if ref $self->{limit_dialect};
49
50 $self;
51}
52
260129d8 53sub _RowNumberOver {
54 my ($self, $sql, $order, $rows, $offset ) = @_;
55
56 $offset += 1;
57 my $last = $rows + $offset;
58 my ( $order_by ) = $self->_order_by( $order );
59
60 $sql = <<"";
61SELECT * FROM
62(
63 SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
64 $sql
65 $order_by
66 ) Q1
67) Q2
68WHERE ROW_NUM BETWEEN $offset AND $last
69
70 return $sql;
71}
72
73
2cc3a7be 74# While we're at it, this should make LIMIT queries more efficient,
75# without digging into things too deeply
758272ec 76use Scalar::Util 'blessed';
2cc3a7be 77sub _find_syntax {
78 my ($self, $syntax) = @_;
758272ec 79 my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
260129d8 80 if(ref($self) && $dbhname && $dbhname eq 'DB2') {
81 return 'RowNumberOver';
82 }
83
2cc3a7be 84 $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
85}
86
54540863 87sub select {
88 my ($self, $table, $fields, $where, $order, @rest) = @_;
6346a152 89 $table = $self->_quote($table) unless ref($table);
eac29141 90 local $self->{rownum_hack_count} = 1
91 if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
54540863 92 @rest = (-1) unless defined $rest[0];
0823196c 93 die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
94 # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
8839560b 95 local $self->{having_bind} = [];
bc0c9800 96 my ($sql, @ret) = $self->SUPER::select(
97 $table, $self->_recurse_fields($fields), $where, $order, @rest
98 );
95ba7ee4 99 $sql .=
100 $self->{for} ?
101 (
102 $self->{for} eq 'update' ? ' FOR UPDATE' :
103 $self->{for} eq 'shared' ? ' FOR SHARE' :
104 ''
105 ) :
106 ''
107 ;
8839560b 108 return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
54540863 109}
110
6346a152 111sub insert {
112 my $self = shift;
113 my $table = shift;
114 $table = $self->_quote($table) unless ref($table);
115 $self->SUPER::insert($table, @_);
116}
117
118sub update {
119 my $self = shift;
120 my $table = shift;
121 $table = $self->_quote($table) unless ref($table);
122 $self->SUPER::update($table, @_);
123}
124
125sub delete {
126 my $self = shift;
127 my $table = shift;
128 $table = $self->_quote($table) unless ref($table);
129 $self->SUPER::delete($table, @_);
130}
131
54540863 132sub _emulate_limit {
133 my $self = shift;
134 if ($_[3] == -1) {
135 return $_[1].$self->_order_by($_[2]);
136 } else {
137 return $self->SUPER::_emulate_limit(@_);
138 }
139}
140
141sub _recurse_fields {
e8e971f2 142 my ($self, $fields, $params) = @_;
54540863 143 my $ref = ref $fields;
144 return $self->_quote($fields) unless $ref;
145 return $$fields if $ref eq 'SCALAR';
146
147 if ($ref eq 'ARRAY') {
1d78a406 148 return join(', ', map {
eac29141 149 $self->_recurse_fields($_)
1d78a406 150 .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
151 ? ' AS col'.$self->{rownum_hack_count}++
152 : '')
e8e971f2 153 } @$fields);
54540863 154 } elsif ($ref eq 'HASH') {
155 foreach my $func (keys %$fields) {
156 return $self->_sqlcase($func)
157 .'( '.$self->_recurse_fields($fields->{$func}).' )';
158 }
159 }
160}
161
162sub _order_by {
163 my $self = shift;
164 my $ret = '';
8839560b 165 my @extra;
54540863 166 if (ref $_[0] eq 'HASH') {
167 if (defined $_[0]->{group_by}) {
168 $ret = $self->_sqlcase(' group by ')
1d78a406 169 .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
54540863 170 }
8839560b 171 if (defined $_[0]->{having}) {
172 my $frag;
173 ($frag, @extra) = $self->_recurse_where($_[0]->{having});
174 push(@{$self->{having_bind}}, @extra);
175 $ret .= $self->_sqlcase(' having ').$frag;
176 }
54540863 177 if (defined $_[0]->{order_by}) {
7ce5cbe7 178 $ret .= $self->_order_by($_[0]->{order_by});
54540863 179 }
d09c569a 180 } elsif (ref $_[0] eq 'SCALAR') {
e535069e 181 $ret = $self->_sqlcase(' order by ').${ $_[0] };
d09c569a 182 } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
183 my @order = @{+shift};
184 $ret = $self->_sqlcase(' order by ')
185 .join(', ', map {
186 my $r = $self->_order_by($_, @_);
187 $r =~ s/^ ?ORDER BY //i;
188 $r;
189 } @order);
54540863 190 } else {
191 $ret = $self->SUPER::_order_by(@_);
192 }
193 return $ret;
194}
195
f48dd03f 196sub _order_directions {
197 my ($self, $order) = @_;
198 $order = $order->{order_by} if ref $order eq 'HASH';
199 return $self->SUPER::_order_directions($order);
200}
201
2a816814 202sub _table {
bd7efd39 203 my ($self, $from) = @_;
204 if (ref $from eq 'ARRAY') {
205 return $self->_recurse_from(@$from);
206 } elsif (ref $from eq 'HASH') {
207 return $self->_make_as($from);
208 } else {
6346a152 209 return $from; # would love to quote here but _table ends up getting called
210 # twice during an ->select without a limit clause due to
211 # the way S::A::Limit->select works. should maybe consider
212 # bypassing this and doing S::A::select($self, ...) in
213 # our select method above. meantime, quoting shims have
214 # been added to select/insert/update/delete here
bd7efd39 215 }
216}
217
218sub _recurse_from {
219 my ($self, $from, @join) = @_;
220 my @sqlf;
221 push(@sqlf, $self->_make_as($from));
222 foreach my $j (@join) {
223 my ($to, $on) = @$j;
73856587 224
54540863 225 # check whether a join type exists
226 my $join_clause = '';
ca7b9fdf 227 my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
228 if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
229 $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
54540863 230 } else {
231 $join_clause = ' JOIN ';
232 }
73856587 233 push(@sqlf, $join_clause);
234
bd7efd39 235 if (ref $to eq 'ARRAY') {
236 push(@sqlf, '(', $self->_recurse_from(@$to), ')');
237 } else {
96cdbbab 238 push(@sqlf, $self->_make_as($to));
bd7efd39 239 }
9b459129 240 push(@sqlf, ' ON (', $self->_join_condition($on), ')');
bd7efd39 241 }
242 return join('', @sqlf);
243}
244
245sub _make_as {
246 my ($self, $from) = @_;
54540863 247 return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
bc0c9800 248 reverse each %{$self->_skip_options($from)});
73856587 249}
250
251sub _skip_options {
54540863 252 my ($self, $hash) = @_;
253 my $clean_hash = {};
254 $clean_hash->{$_} = $hash->{$_}
255 for grep {!/^-/} keys %$hash;
256 return $clean_hash;
bd7efd39 257}
258
259sub _join_condition {
260 my ($self, $cond) = @_;
5efe4c79 261 if (ref $cond eq 'HASH') {
262 my %j;
bc0c9800 263 for (keys %$cond) {
635b9634 264 my $v = $cond->{$_};
265 if (ref $v) {
266 # XXX no throw_exception() in this package and croak() fails with strange results
267 Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
268 if ref($v) ne 'SCALAR';
269 $j{$_} = $v;
270 }
271 else {
272 my $x = '= '.$self->_quote($v); $j{$_} = \$x;
273 }
bc0c9800 274 };
635b9634 275 return scalar($self->_recurse_where(\%j));
5efe4c79 276 } elsif (ref $cond eq 'ARRAY') {
277 return join(' OR ', map { $self->_join_condition($_) } @$cond);
278 } else {
279 die "Can't handle this yet!";
280 }
bd7efd39 281}
282
2a816814 283sub _quote {
284 my ($self, $label) = @_;
285 return '' unless defined $label;
3b24f6ea 286 return "*" if $label eq '*';
41728a6e 287 return $label unless $self->{quote_char};
3b24f6ea 288 if(ref $self->{quote_char} eq "ARRAY"){
289 return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
290 if !defined $self->{name_sep};
291 my $sep = $self->{name_sep};
292 return join($self->{name_sep},
293 map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1] }
294 split(/\Q$sep\E/,$label));
295 }
2a816814 296 return $self->SUPER::_quote($label);
297}
298
7be93b07 299sub limit_dialect {
300 my $self = shift;
301 $self->{limit_dialect} = shift if @_;
302 return $self->{limit_dialect};
303}
304
2437a1e3 305sub quote_char {
306 my $self = shift;
307 $self->{quote_char} = shift if @_;
308 return $self->{quote_char};
309}
310
311sub name_sep {
312 my $self = shift;
313 $self->{name_sep} = shift if @_;
314 return $self->{name_sep};
315}
316
bd7efd39 317} # End of BEGIN block
318
b327f988 319=head1 NAME
320
321DBIx::Class::Storage::DBI - DBI storage handler
322
323=head1 SYNOPSIS
324
5d52945a 325 my $schema = MySchema->connect('dbi:SQLite:my.db');
326
327 $schema->storage->debug(1);
328 $schema->dbh_do("DROP TABLE authors");
329
330 $schema->resultset('Book')->search({
331 written_on => $schema->storage->datetime_parser(DateTime->now)
332 });
333
b327f988 334=head1 DESCRIPTION
335
046ad905 336This class represents the connection to an RDBMS via L<DBI>. See
337L<DBIx::Class::Storage> for general information. This pod only
338documents DBI-specific methods and behaviors.
b327f988 339
340=head1 METHODS
341
9b83fccd 342=cut
343
8b445e33 344sub new {
046ad905 345 my $new = shift->next::method(@_);
82cc0386 346
d79f59b9 347 $new->transaction_depth(0);
2cc3a7be 348 $new->_sql_maker_opts({});
ddf66ced 349 $new->{savepoints} = [];
1b994857 350 $new->{_in_dbh_do} = 0;
dbaee748 351 $new->{_dbh_gen} = 0;
82cc0386 352
046ad905 353 $new;
1c339d71 354}
355
1b45b01e 356=head2 connect_info
357
92fe2181 358This method is normally called by L<DBIx::Class::Schema/connection>, which
359encapsulates its argument list in an arrayref before passing them here.
360
361The argument list may contain:
362
363=over
364
365=item *
366
5d52945a 367The same 4-element argument set one would normally pass to
40911cb3 368L<DBI/connect>, optionally followed by
369L<extra attributes|/DBIx::Class specific connection attributes>
370recognized by DBIx::Class:
92fe2181 371
5d52945a 372 $connect_info_args = [ $dsn, $user, $password, \%dbi_attributes?, \%extra_attributes? ];
92fe2181 373
374=item *
1b45b01e 375
40911cb3 376A single code reference which returns a connected
377L<DBI database handle|DBI/connect> optionally followed by
378L<extra attributes|/DBIx::Class specific connection attributes> recognized
379by DBIx::Class:
1b45b01e 380
5d52945a 381 $connect_info_args = [ sub { DBI->connect (...) }, \%extra_attributes? ];
92fe2181 382
383=item *
384
5d52945a 385A single hashref with all the attributes and the dsn/user/password
386mixed together:
92fe2181 387
388 $connect_info_args = [{
389 dsn => $dsn,
390 user => $user,
34f1f658 391 password => $pass,
92fe2181 392 %dbi_attributes,
393 %extra_attributes,
394 }];
395
396This is particularly useful for L<Catalyst> based applications, allowing the
40911cb3 397following config (L<Config::General> style):
92fe2181 398
399 <Model::DB>
400 schema_class App::DB
401 <connect_info>
402 dsn dbi:mysql:database=test
403 user testuser
404 password TestPass
405 AutoCommit 1
406 </connect_info>
407 </Model::DB>
408
409=back
410
5d52945a 411Please note that the L<DBI> docs recommend that you always explicitly
412set C<AutoCommit> to either I<0> or I<1>. L<DBIx::Class> further
413recommends that it be set to I<1>, and that you perform transactions
40911cb3 414via our L<DBIx::Class::Schema/txn_do> method. L<DBIx::Class> will set it
415to I<1> if you do not do explicitly set it to zero. This is the default
416for most DBDs. See L</DBIx::Class and AutoCommit> for details.
92fe2181 417
418=head3 DBIx::Class specific connection attributes
419
420In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
421L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
422the following connection options. These options can be mixed in with your other
423L<DBI> connection attributes, or placed in a seperate hashref
424(C<\%extra_attributes>) as shown above.
425
426Every time C<connect_info> is invoked, any previous settings for
427these options will be cleared before setting the new ones, regardless of
428whether any options are specified in the new C<connect_info>.
d7c4c15c 429
2cc3a7be 430
5d52945a 431=over
2cc3a7be 432
433=item on_connect_do
434
6d2e7a96 435Specifies things to do immediately after connecting or re-connecting to
436the database. Its value may contain:
437
438=over
439
440=item an array reference
441
442This contains SQL statements to execute in order. Each element contains
443a string or a code reference that returns a string.
444
445=item a code reference
446
447This contains some code to execute. Unlike code references within an
448array reference, its return value is ignored.
449
450=back
579ca3f7 451
452=item on_disconnect_do
453
5d52945a 454Takes arguments in the same form as L</on_connect_do> and executes them
6d2e7a96 455immediately before disconnecting from the database.
579ca3f7 456
5d52945a 457Note, this only runs if you explicitly call L</disconnect> on the
579ca3f7 458storage object.
2cc3a7be 459
b33697ef 460=item disable_sth_caching
461
462If set to a true value, this option will disable the caching of
463statement handles via L<DBI/prepare_cached>.
464
2cc3a7be 465=item limit_dialect
466
467Sets the limit dialect. This is useful for JDBC-bridge among others
468where the remote SQL-dialect cannot be determined by the name of the
5d52945a 469driver alone. See also L<SQL::Abstract::Limit>.
2cc3a7be 470
471=item quote_char
d7c4c15c 472
2cc3a7be 473Specifies what characters to use to quote table and column names. If
5d52945a 474you use this you will want to specify L</name_sep> as well.
2cc3a7be 475
5d52945a 476C<quote_char> expects either a single character, in which case is it
477is placed on either side of the table/column name, or an arrayref of length
4782 in which case the table/column name is placed between the elements.
2cc3a7be 479
5d52945a 480For example under MySQL you should use C<< quote_char => '`' >>, and for
481SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
2cc3a7be 482
483=item name_sep
484
40911cb3 485This only needs to be used in conjunction with C<quote_char>, and is used to
2cc3a7be 486specify the charecter that seperates elements (schemas, tables, columns) from
487each other. In most cases this is simply a C<.>.
488
5d52945a 489The consequences of not supplying this value is that L<SQL::Abstract>
490will assume DBIx::Class' uses of aliases to be complete column
491names. The output will look like I<"me.name"> when it should actually
492be I<"me"."name">.
493
61646ebd 494=item unsafe
495
496This Storage driver normally installs its own C<HandleError>, sets
2ab60eb9 497C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
498all database handles, including those supplied by a coderef. It does this
499so that it can have consistent and useful error behavior.
61646ebd 500
501If you set this option to a true value, Storage will not do its usual
2ab60eb9 502modifications to the database handle's attributes, and instead relies on
503the settings in your connect_info DBI options (or the values you set in
504your connection coderef, in the case that you are connecting via coderef).
61646ebd 505
506Note that your custom settings can cause Storage to malfunction,
507especially if you set a C<HandleError> handler that suppresses exceptions
508and/or disable C<RaiseError>.
509
a3628767 510=item auto_savepoint
511
512If this option is true, L<DBIx::Class> will use savepoints when nesting
513transactions, making it possible to recover from failure in the inner
514transaction without having to abort all outer transactions.
515
34f1f658 516=item cursor_class
517
518Use this argument to supply a cursor class other than the default
519L<DBIx::Class::Storage::DBI::Cursor>.
520
2cc3a7be 521=back
522
5d52945a 523Some real-life examples of arguments to L</connect_info> and
524L<DBIx::Class::Schema/connect>
2cc3a7be 525
526 # Simple SQLite connection
bb4f246d 527 ->connect_info([ 'dbi:SQLite:./foo.db' ]);
6789ebe3 528
2cc3a7be 529 # Connect via subref
bb4f246d 530 ->connect_info([ sub { DBI->connect(...) } ]);
6789ebe3 531
2cc3a7be 532 # A bit more complicated
bb4f246d 533 ->connect_info(
534 [
535 'dbi:Pg:dbname=foo',
536 'postgres',
537 'my_pg_password',
77d76d0f 538 { AutoCommit => 1 },
2cc3a7be 539 { quote_char => q{"}, name_sep => q{.} },
540 ]
541 );
542
543 # Equivalent to the previous example
544 ->connect_info(
545 [
546 'dbi:Pg:dbname=foo',
547 'postgres',
548 'my_pg_password',
77d76d0f 549 { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
bb4f246d 550 ]
551 );
6789ebe3 552
92fe2181 553 # Same, but with hashref as argument
5d52945a 554 # See parse_connect_info for explanation
92fe2181 555 ->connect_info(
556 [{
557 dsn => 'dbi:Pg:dbname=foo',
558 user => 'postgres',
559 password => 'my_pg_password',
560 AutoCommit => 1,
561 quote_char => q{"},
562 name_sep => q{.},
563 }]
564 );
565
566 # Subref + DBIx::Class-specific connection options
bb4f246d 567 ->connect_info(
568 [
569 sub { DBI->connect(...) },
2cc3a7be 570 {
571 quote_char => q{`},
572 name_sep => q{@},
573 on_connect_do => ['SET search_path TO myschema,otherschema,public'],
b33697ef 574 disable_sth_caching => 1,
2cc3a7be 575 },
bb4f246d 576 ]
577 );
6789ebe3 578
92fe2181 579
580
004d31fb 581=cut
582
046ad905 583sub connect_info {
584 my ($self, $info_arg) = @_;
4c248161 585
046ad905 586 return $self->_connect_info if !$info_arg;
4c248161 587
92fe2181 588 my @args = @$info_arg; # take a shallow copy for further mutilation
589 $self->_connect_info([@args]); # copy for _connect_info
590
591
592 # combine/pre-parse arguments depending on invocation style
593
594 my %attrs;
595 if (ref $args[0] eq 'CODE') { # coderef with optional \%extra_attributes
596 %attrs = %{ $args[1] || {} };
597 @args = $args[0];
598 }
599 elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
600 %attrs = %{$args[0]};
601 @args = ();
602 for (qw/password user dsn/) {
603 unshift @args, delete $attrs{$_};
604 }
605 }
34f1f658 606 else { # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
92fe2181 607 %attrs = (
608 % { $args[3] || {} },
609 % { $args[4] || {} },
610 );
611 @args = @args[0,1,2];
612 }
613
046ad905 614 # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
615 # the new set of options
616 $self->_sql_maker(undef);
617 $self->_sql_maker_opts({});
8df3d107 618
92fe2181 619 if(keys %attrs) {
620 for my $storage_opt (@storage_options, 'cursor_class') { # @storage_options is declared at the top of the module
621 if(my $value = delete $attrs{$storage_opt}) {
b33697ef 622 $self->$storage_opt($value);
623 }
046ad905 624 }
625 for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
92fe2181 626 if(my $opt_val = delete $attrs{$sql_maker_opt}) {
046ad905 627 $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
628 }
629 }
046ad905 630 }
d7c4c15c 631
92fe2181 632 %attrs = () if (ref $args[0] eq 'CODE'); # _connect() never looks past $args[0] in this case
633
634 $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
fdad5fab 635 $self->_connect_info;
046ad905 636}
004d31fb 637
046ad905 638=head2 on_connect_do
4c248161 639
5d52945a 640This method is deprecated in favour of setting via L</connect_info>.
486ad69b 641
92fe2181 642
f11383c2 643=head2 dbh_do
644
3ff1602f 645Arguments: ($subref | $method_name), @extra_coderef_args?
046ad905 646
3ff1602f 647Execute the given $subref or $method_name using the new exception-based
648connection management.
046ad905 649
d4f16b21 650The first two arguments will be the storage object that C<dbh_do> was called
651on and a database handle to use. Any additional arguments will be passed
652verbatim to the called subref as arguments 2 and onwards.
653
654Using this (instead of $self->_dbh or $self->dbh) ensures correct
655exception handling and reconnection (or failover in future subclasses).
656
657Your subref should have no side-effects outside of the database, as
658there is the potential for your subref to be partially double-executed
659if the database connection was stale/dysfunctional.
046ad905 660
56769f7c 661Example:
f11383c2 662
56769f7c 663 my @stuff = $schema->storage->dbh_do(
664 sub {
d4f16b21 665 my ($storage, $dbh, @cols) = @_;
666 my $cols = join(q{, }, @cols);
667 $dbh->selectrow_array("SELECT $cols FROM foo");
046ad905 668 },
669 @column_list
56769f7c 670 );
f11383c2 671
672=cut
673
674sub dbh_do {
046ad905 675 my $self = shift;
3ff1602f 676 my $code = shift;
aa27edf7 677
6ad1059d 678 my $dbh = $self->_dbh;
679
680 return $self->$code($dbh, @_) if $self->{_in_dbh_do}
cb19f4dd 681 || $self->{transaction_depth};
682
1b994857 683 local $self->{_in_dbh_do} = 1;
684
f11383c2 685 my @result;
686 my $want_array = wantarray;
687
688 eval {
6ad1059d 689 $self->_verify_pid if $dbh;
37976db0 690 if(!$self->_dbh) {
6ad1059d 691 $self->_populate_dbh;
692 $dbh = $self->_dbh;
693 }
694
f11383c2 695 if($want_array) {
6ad1059d 696 @result = $self->$code($dbh, @_);
f11383c2 697 }
56769f7c 698 elsif(defined $want_array) {
6ad1059d 699 $result[0] = $self->$code($dbh, @_);
f11383c2 700 }
56769f7c 701 else {
6ad1059d 702 $self->$code($dbh, @_);
56769f7c 703 }
f11383c2 704 };
56769f7c 705
aa27edf7 706 my $exception = $@;
707 if(!$exception) { return $want_array ? @result : $result[0] }
708
709 $self->throw_exception($exception) if $self->connected;
710
711 # We were not connected - reconnect and retry, but let any
712 # exception fall right through this time
713 $self->_populate_dbh;
3ff1602f 714 $self->$code($self->_dbh, @_);
aa27edf7 715}
716
717# This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
718# It also informs dbh_do to bypass itself while under the direction of txn_do,
1b994857 719# via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
aa27edf7 720sub txn_do {
721 my $self = shift;
722 my $coderef = shift;
723
724 ref $coderef eq 'CODE' or $self->throw_exception
725 ('$coderef must be a CODE reference');
726
d6feb60f 727 return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
57c18b65 728
1b994857 729 local $self->{_in_dbh_do} = 1;
f11383c2 730
aa27edf7 731 my @result;
732 my $want_array = wantarray;
733
d4f16b21 734 my $tried = 0;
735 while(1) {
736 eval {
737 $self->_verify_pid if $self->_dbh;
738 $self->_populate_dbh if !$self->_dbh;
aa27edf7 739
d4f16b21 740 $self->txn_begin;
741 if($want_array) {
742 @result = $coderef->(@_);
743 }
744 elsif(defined $want_array) {
745 $result[0] = $coderef->(@_);
746 }
747 else {
748 $coderef->(@_);
749 }
750 $self->txn_commit;
751 };
aa27edf7 752
d4f16b21 753 my $exception = $@;
754 if(!$exception) { return $want_array ? @result : $result[0] }
755
756 if($tried++ > 0 || $self->connected) {
757 eval { $self->txn_rollback };
758 my $rollback_exception = $@;
759 if($rollback_exception) {
760 my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
761 $self->throw_exception($exception) # propagate nested rollback
762 if $rollback_exception =~ /$exception_class/;
763
764 $self->throw_exception(
765 "Transaction aborted: ${exception}. "
766 . "Rollback failed: ${rollback_exception}"
767 );
768 }
769 $self->throw_exception($exception)
aa27edf7 770 }
56769f7c 771
d4f16b21 772 # We were not connected, and was first try - reconnect and retry
773 # via the while loop
774 $self->_populate_dbh;
775 }
f11383c2 776}
777
9b83fccd 778=head2 disconnect
779
046ad905 780Our C<disconnect> method also performs a rollback first if the
9b83fccd 781database is not in C<AutoCommit> mode.
782
783=cut
784
412db1f4 785sub disconnect {
786 my ($self) = @_;
787
92925617 788 if( $self->connected ) {
6d2e7a96 789 my $connection_do = $self->on_disconnect_do;
790 $self->_do_connection_actions($connection_do) if ref($connection_do);
791
57c18b65 792 $self->_dbh->rollback unless $self->_dbh_autocommit;
92925617 793 $self->_dbh->disconnect;
794 $self->_dbh(undef);
dbaee748 795 $self->{_dbh_gen}++;
92925617 796 }
412db1f4 797}
798
e96a93df 799=head2 with_deferred_fk_checks
800
801=over 4
802
803=item Arguments: C<$coderef>
804
805=item Return Value: The return value of $coderef
806
807=back
808
809Storage specific method to run the code ref with FK checks deferred or
810in MySQL's case disabled entirely.
811
812=cut
813
814# Storage subclasses should override this
815sub with_deferred_fk_checks {
816 my ($self, $sub) = @_;
817
818 $sub->();
819}
820
f11383c2 821sub connected {
822 my ($self) = @_;
412db1f4 823
1346e22d 824 if(my $dbh = $self->_dbh) {
825 if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
dbaee748 826 $self->_dbh(undef);
827 $self->{_dbh_gen}++;
828 return;
1346e22d 829 }
56769f7c 830 else {
831 $self->_verify_pid;
649bfb8c 832 return 0 if !$self->_dbh;
56769f7c 833 }
1346e22d 834 return ($dbh->FETCH('Active') && $dbh->ping);
835 }
836
837 return 0;
412db1f4 838}
839
f11383c2 840# handle pid changes correctly
56769f7c 841# NOTE: assumes $self->_dbh is a valid $dbh
f11383c2 842sub _verify_pid {
843 my ($self) = @_;
844
6ae3f9b9 845 return if defined $self->_conn_pid && $self->_conn_pid == $$;
f11383c2 846
f11383c2 847 $self->_dbh->{InactiveDestroy} = 1;
d3abf3fe 848 $self->_dbh(undef);
dbaee748 849 $self->{_dbh_gen}++;
f11383c2 850
851 return;
852}
853
412db1f4 854sub ensure_connected {
855 my ($self) = @_;
856
857 unless ($self->connected) {
8b445e33 858 $self->_populate_dbh;
859 }
412db1f4 860}
861
c235bbae 862=head2 dbh
863
864Returns the dbh - a data base handle of class L<DBI>.
865
866=cut
867
412db1f4 868sub dbh {
869 my ($self) = @_;
870
871 $self->ensure_connected;
8b445e33 872 return $self->_dbh;
873}
874
f1f56aad 875sub _sql_maker_args {
876 my ($self) = @_;
877
6e399b4f 878 return ( bindtype=>'columns', limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
f1f56aad 879}
880
48c69e7c 881sub sql_maker {
882 my ($self) = @_;
fdc1c3d0 883 unless ($self->_sql_maker) {
95ba7ee4 884 my $sql_maker_class = $self->sql_maker_class;
885 $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
48c69e7c 886 }
887 return $self->_sql_maker;
888}
889
3ff1602f 890sub _rebless {}
891
8b445e33 892sub _populate_dbh {
893 my ($self) = @_;
7e47ea83 894 my @info = @{$self->_dbi_connect_info || []};
8b445e33 895 $self->_dbh($self->_connect(@info));
2fd24e78 896
77d76d0f 897 # Always set the transaction depth on connect, since
898 # there is no transaction in progress by definition
57c18b65 899 $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
77d76d0f 900
2fd24e78 901 if(ref $self eq 'DBIx::Class::Storage::DBI') {
902 my $driver = $self->_dbh->{Driver}->{Name};
efe6365b 903 if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
2fd24e78 904 bless $self, "DBIx::Class::Storage::DBI::${driver}";
3ff1602f 905 $self->_rebless();
2fd24e78 906 }
843f8ecd 907 }
2fd24e78 908
6d2e7a96 909 my $connection_do = $self->on_connect_do;
910 $self->_do_connection_actions($connection_do) if ref($connection_do);
5ef3e508 911
1346e22d 912 $self->_conn_pid($$);
913 $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
8b445e33 914}
915
6d2e7a96 916sub _do_connection_actions {
917 my $self = shift;
918 my $connection_do = shift;
919
920 if (ref $connection_do eq 'ARRAY') {
921 $self->_do_query($_) foreach @$connection_do;
922 }
923 elsif (ref $connection_do eq 'CODE') {
924 $connection_do->();
925 }
926
927 return $self;
928}
929
579ca3f7 930sub _do_query {
931 my ($self, $action) = @_;
932
6d2e7a96 933 if (ref $action eq 'CODE') {
1dafdb2a 934 $action = $action->($self);
935 $self->_do_query($_) foreach @$action;
579ca3f7 936 }
937 else {
1bd1640b 938 my @to_run = (ref $action eq 'ARRAY') ? (@$action) : ($action);
939 $self->_query_start(@to_run);
940 $self->_dbh->do(@to_run);
941 $self->_query_end(@to_run);
579ca3f7 942 }
943
944 return $self;
945}
946
8b445e33 947sub _connect {
948 my ($self, @info) = @_;
5ef3e508 949
9d31f7dc 950 $self->throw_exception("You failed to provide any connection info")
61646ebd 951 if !@info;
9d31f7dc 952
90ec6cad 953 my ($old_connect_via, $dbh);
954
5ef3e508 955 if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
61646ebd 956 $old_connect_via = $DBI::connect_via;
957 $DBI::connect_via = 'connect';
5ef3e508 958 }
959
75db246c 960 eval {
f5de3933 961 if(ref $info[0] eq 'CODE') {
962 $dbh = &{$info[0]}
963 }
964 else {
965 $dbh = DBI->connect(@info);
61646ebd 966 }
967
e7827df0 968 if($dbh && !$self->unsafe) {
664612fb 969 my $weak_self = $self;
970 weaken($weak_self);
61646ebd 971 $dbh->{HandleError} = sub {
9bf06dc0 972 if ($weak_self) {
973 $weak_self->throw_exception("DBI Exception: $_[0]");
974 }
975 else {
976 croak ("DBI Exception: $_[0]");
977 }
61646ebd 978 };
2ab60eb9 979 $dbh->{ShowErrorStatement} = 1;
61646ebd 980 $dbh->{RaiseError} = 1;
981 $dbh->{PrintError} = 0;
f5de3933 982 }
75db246c 983 };
90ec6cad 984
985 $DBI::connect_via = $old_connect_via if $old_connect_via;
986
d92a4015 987 $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
988 if !$dbh || $@;
90ec6cad 989
57c18b65 990 $self->_dbh_autocommit($dbh->{AutoCommit});
991
e571e823 992 $dbh;
8b445e33 993}
994
adb3554a 995sub svp_begin {
996 my ($self, $name) = @_;
adb3554a 997
ddf66ced 998 $name = $self->_svp_generate_name
999 unless defined $name;
1000
1001 $self->throw_exception ("You can't use savepoints outside a transaction")
1002 if $self->{transaction_depth} == 0;
1003
1004 $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1005 unless $self->can('_svp_begin');
1006
1007 push @{ $self->{savepoints} }, $name;
adb3554a 1008
adb3554a 1009 $self->debugobj->svp_begin($name) if $self->debug;
ddf66ced 1010
1011 return $self->_svp_begin($name);
adb3554a 1012}
1013
1014sub svp_release {
1015 my ($self, $name) = @_;
1016
ddf66ced 1017 $self->throw_exception ("You can't use savepoints outside a transaction")
1018 if $self->{transaction_depth} == 0;
adb3554a 1019
ddf66ced 1020 $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1021 unless $self->can('_svp_release');
1022
1023 if (defined $name) {
1024 $self->throw_exception ("Savepoint '$name' does not exist")
1025 unless grep { $_ eq $name } @{ $self->{savepoints} };
1026
1027 # Dig through the stack until we find the one we are releasing. This keeps
1028 # the stack up to date.
1029 my $svp;
adb3554a 1030
ddf66ced 1031 do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
1032 } else {
1033 $name = pop @{ $self->{savepoints} };
adb3554a 1034 }
ddf66ced 1035
adb3554a 1036 $self->debugobj->svp_release($name) if $self->debug;
ddf66ced 1037
1038 return $self->_svp_release($name);
adb3554a 1039}
1040
1041sub svp_rollback {
1042 my ($self, $name) = @_;
1043
ddf66ced 1044 $self->throw_exception ("You can't use savepoints outside a transaction")
1045 if $self->{transaction_depth} == 0;
adb3554a 1046
ddf66ced 1047 $self->throw_exception ("Your Storage implementation doesn't support savepoints")
1048 unless $self->can('_svp_rollback');
1049
1050 if (defined $name) {
1051 # If they passed us a name, verify that it exists in the stack
1052 unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
1053 $self->throw_exception("Savepoint '$name' does not exist!");
1054 }
adb3554a 1055
ddf66ced 1056 # Dig through the stack until we find the one we are releasing. This keeps
1057 # the stack up to date.
1058 while(my $s = pop(@{ $self->{savepoints} })) {
1059 last if($s eq $name);
1060 }
1061 # Add the savepoint back to the stack, as a rollback doesn't remove the
1062 # named savepoint, only everything after it.
1063 push(@{ $self->{savepoints} }, $name);
1064 } else {
1065 # We'll assume they want to rollback to the last savepoint
1066 $name = $self->{savepoints}->[-1];
adb3554a 1067 }
ddf66ced 1068
adb3554a 1069 $self->debugobj->svp_rollback($name) if $self->debug;
ddf66ced 1070
1071 return $self->_svp_rollback($name);
1072}
1073
1074sub _svp_generate_name {
1075 my ($self) = @_;
1076
1077 return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
adb3554a 1078}
d32d82f9 1079
8091aa91 1080sub txn_begin {
d79f59b9 1081 my $self = shift;
291bf95f 1082 $self->ensure_connected();
57c18b65 1083 if($self->{transaction_depth} == 0) {
77d76d0f 1084 $self->debugobj->txn_begin()
1085 if $self->debug;
1086 # this isn't ->_dbh-> because
1087 # we should reconnect on begin_work
1088 # for AutoCommit users
1089 $self->dbh->begin_work;
d6feb60f 1090 } elsif ($self->auto_savepoint) {
ddf66ced 1091 $self->svp_begin;
986e4fca 1092 }
57c18b65 1093 $self->{transaction_depth}++;
8091aa91 1094}
8b445e33 1095
8091aa91 1096sub txn_commit {
d79f59b9 1097 my $self = shift;
77d76d0f 1098 if ($self->{transaction_depth} == 1) {
1099 my $dbh = $self->_dbh;
1100 $self->debugobj->txn_commit()
1101 if ($self->debug);
1102 $dbh->commit;
1103 $self->{transaction_depth} = 0
57c18b65 1104 if $self->_dbh_autocommit;
77d76d0f 1105 }
1106 elsif($self->{transaction_depth} > 1) {
d6feb60f 1107 $self->{transaction_depth}--;
ddf66ced 1108 $self->svp_release
d6feb60f 1109 if $self->auto_savepoint;
77d76d0f 1110 }
d32d82f9 1111}
1112
77d76d0f 1113sub txn_rollback {
1114 my $self = shift;
1115 my $dbh = $self->_dbh;
77d76d0f 1116 eval {
77d76d0f 1117 if ($self->{transaction_depth} == 1) {
d32d82f9 1118 $self->debugobj->txn_rollback()
1119 if ($self->debug);
77d76d0f 1120 $self->{transaction_depth} = 0
57c18b65 1121 if $self->_dbh_autocommit;
1122 $dbh->rollback;
d32d82f9 1123 }
77d76d0f 1124 elsif($self->{transaction_depth} > 1) {
1125 $self->{transaction_depth}--;
d6feb60f 1126 if ($self->auto_savepoint) {
ddf66ced 1127 $self->svp_rollback;
1128 $self->svp_release;
d6feb60f 1129 }
986e4fca 1130 }
f11383c2 1131 else {
d32d82f9 1132 die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
986e4fca 1133 }
77d76d0f 1134 };
a62cf8d4 1135 if ($@) {
1136 my $error = $@;
1137 my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
1138 $error =~ /$exception_class/ and $self->throw_exception($error);
77d76d0f 1139 # ensure that a failed rollback resets the transaction depth
57c18b65 1140 $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
77d76d0f 1141 $self->throw_exception($error);
8091aa91 1142 }
1143}
8b445e33 1144
b7151206 1145# This used to be the top-half of _execute. It was split out to make it
1146# easier to override in NoBindVars without duping the rest. It takes up
1147# all of _execute's args, and emits $sql, @bind.
1148sub _prep_for_execute {
d944c5ae 1149 my ($self, $op, $extra_bind, $ident, $args) = @_;
b7151206 1150
d944c5ae 1151 my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
db4b5f11 1152 unshift(@bind,
1153 map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
1154 if $extra_bind;
b7151206 1155
d944c5ae 1156 return ($sql, \@bind);
b7151206 1157}
1158
e5d9ee92 1159sub _fix_bind_params {
1160 my ($self, @bind) = @_;
1161
1162 ### Turn @bind from something like this:
1163 ### ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
1164 ### to this:
1165 ### ( "'1'", "'1'", "'3'" )
1166 return
1167 map {
1168 if ( defined( $_ && $_->[1] ) ) {
1169 map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
1170 }
1171 else { q{'NULL'}; }
1172 } @bind;
1173}
1174
1175sub _query_start {
1176 my ( $self, $sql, @bind ) = @_;
1177
1178 if ( $self->debug ) {
1179 @bind = $self->_fix_bind_params(@bind);
50336325 1180
e5d9ee92 1181 $self->debugobj->query_start( $sql, @bind );
1182 }
1183}
1184
1185sub _query_end {
1186 my ( $self, $sql, @bind ) = @_;
1187
1188 if ( $self->debug ) {
1189 @bind = $self->_fix_bind_params(@bind);
1190 $self->debugobj->query_end( $sql, @bind );
1191 }
1192}
1193
baa31d2f 1194sub _dbh_execute {
1195 my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
7af8b477 1196
eda28767 1197 if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
b7ce6568 1198 $ident = $ident->from();
1199 }
d944c5ae 1200
1201 my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
d92a4015 1202
e5d9ee92 1203 $self->_query_start( $sql, @$bind );
95dad7e2 1204
61646ebd 1205 my $sth = $self->sth($sql,$op);
6e399b4f 1206
61646ebd 1207 my $placeholder_index = 1;
6e399b4f 1208
61646ebd 1209 foreach my $bound (@$bind) {
1210 my $attributes = {};
1211 my($column_name, @data) = @$bound;
6e399b4f 1212
61646ebd 1213 if ($bind_attributes) {
1214 $attributes = $bind_attributes->{$column_name}
1215 if defined $bind_attributes->{$column_name};
1216 }
6e399b4f 1217
61646ebd 1218 foreach my $data (@data) {
1219 $data = ref $data ? ''.$data : $data; # stringify args
0b5dee17 1220
61646ebd 1221 $sth->bind_param($placeholder_index, $data, $attributes);
1222 $placeholder_index++;
95dad7e2 1223 }
61646ebd 1224 }
d92a4015 1225
61646ebd 1226 # Can this fail without throwing an exception anyways???
1227 my $rv = $sth->execute();
1228 $self->throw_exception($sth->errstr) if !$rv;
d92a4015 1229
e5d9ee92 1230 $self->_query_end( $sql, @$bind );
baa31d2f 1231
d944c5ae 1232 return (wantarray ? ($rv, $sth, @$bind) : $rv);
223b8fe3 1233}
1234
baa31d2f 1235sub _execute {
1236 my $self = shift;
3ff1602f 1237 $self->dbh_do('_dbh_execute', @_)
baa31d2f 1238}
1239
8b445e33 1240sub insert {
7af8b477 1241 my ($self, $source, $to_insert) = @_;
1242
1243 my $ident = $source->from;
8b646589 1244 my $bind_attributes = $self->source_bind_attributes($source);
1245
2eebd801 1246 $self->ensure_connected;
a982c051 1247 foreach my $col ( $source->columns ) {
1248 if ( !defined $to_insert->{$col} ) {
1249 my $col_info = $source->column_info($col);
1250
1251 if ( $col_info->{auto_nextval} ) {
1252 $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
1253 }
1254 }
1255 }
1256
61646ebd 1257 $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
8e08ecc4 1258
8b445e33 1259 return $to_insert;
1260}
1261
744076d8 1262## Still not quite perfect, and EXPERIMENTAL
1263## Currently it is assumed that all values passed will be "normal", i.e. not
1264## scalar refs, or at least, all the same type as the first set, the statement is
1265## only prepped once.
54e0bd06 1266sub insert_bulk {
9fdf90df 1267 my ($self, $source, $cols, $data) = @_;
744076d8 1268 my %colvalues;
9fdf90df 1269 my $table = $source->from;
744076d8 1270 @colvalues{@$cols} = (0..$#$cols);
1271 my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
7af8b477 1272
e5d9ee92 1273 $self->_query_start( $sql, @bind );
894328b8 1274 my $sth = $self->sth($sql);
54e0bd06 1275
54e0bd06 1276# @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
1277
744076d8 1278 ## This must be an arrayref, else nothing works!
9fdf90df 1279
744076d8 1280 my $tuple_status = [];
9fdf90df 1281
1282 ##use Data::Dumper;
1283 ##print STDERR Dumper( $data, $sql, [@bind] );
eda28767 1284
61646ebd 1285 my $time = time();
8b646589 1286
61646ebd 1287 ## Get the bind_attributes, if any exist
1288 my $bind_attributes = $self->source_bind_attributes($source);
9fdf90df 1289
61646ebd 1290 ## Bind the values and execute
1291 my $placeholder_index = 1;
9fdf90df 1292
61646ebd 1293 foreach my $bound (@bind) {
9fdf90df 1294
61646ebd 1295 my $attributes = {};
1296 my ($column_name, $data_index) = @$bound;
eda28767 1297
61646ebd 1298 if( $bind_attributes ) {
1299 $attributes = $bind_attributes->{$column_name}
1300 if defined $bind_attributes->{$column_name};
1301 }
9fdf90df 1302
61646ebd 1303 my @data = map { $_->[$data_index] } @$data;
9fdf90df 1304
61646ebd 1305 $sth->bind_param_array( $placeholder_index, [@data], $attributes );
1306 $placeholder_index++;
54e0bd06 1307 }
61646ebd 1308 my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
1309 $self->throw_exception($sth->errstr) if !$rv;
1310
e5d9ee92 1311 $self->_query_end( $sql, @bind );
54e0bd06 1312 return (wantarray ? ($rv, $sth, @bind) : $rv);
1313}
1314
8b445e33 1315sub update {
7af8b477 1316 my $self = shift @_;
1317 my $source = shift @_;
8b646589 1318 my $bind_attributes = $self->source_bind_attributes($source);
8b646589 1319
b7ce6568 1320 return $self->_execute('update' => [], $source, $bind_attributes, @_);
8b445e33 1321}
1322
7af8b477 1323
8b445e33 1324sub delete {
7af8b477 1325 my $self = shift @_;
1326 my $source = shift @_;
1327
1328 my $bind_attrs = {}; ## If ever it's needed...
7af8b477 1329
b7ce6568 1330 return $self->_execute('delete' => [], $source, $bind_attrs, @_);
8b445e33 1331}
1332
de705b51 1333sub _select {
8b445e33 1334 my ($self, $ident, $select, $condition, $attrs) = @_;
223b8fe3 1335 my $order = $attrs->{order_by};
95ba7ee4 1336
223b8fe3 1337 if (ref $condition eq 'SCALAR') {
68f3b0dd 1338 my $unwrap = ${$condition};
1339 if ($unwrap =~ s/ORDER BY (.*)$//i) {
1340 $order = $1;
1341 $condition = \$unwrap;
1342 }
223b8fe3 1343 }
95ba7ee4 1344
1345 my $for = delete $attrs->{for};
1346 my $sql_maker = $self->sql_maker;
1347 local $sql_maker->{for} = $for;
1348
8839560b 1349 if (exists $attrs->{group_by} || $attrs->{having}) {
bc0c9800 1350 $order = {
1351 group_by => $attrs->{group_by},
1352 having => $attrs->{having},
1353 ($order ? (order_by => $order) : ())
1354 };
54540863 1355 }
7af8b477 1356 my $bind_attrs = {}; ## Future support
1357 my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
9229f20a 1358 if ($attrs->{software_limit} ||
1359 $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1360 $attrs->{software_limit} = 1;
5c91499f 1361 } else {
0823196c 1362 $self->throw_exception("rows attribute must be positive if present")
1363 if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
e60dc79f 1364
1365 # MySQL actually recommends this approach. I cringe.
1366 $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
5c91499f 1367 push @args, $attrs->{rows}, $attrs->{offset};
1368 }
95ba7ee4 1369
de705b51 1370 return $self->_execute(@args);
1371}
1372
8b646589 1373sub source_bind_attributes {
1374 my ($self, $source) = @_;
1375
1376 my $bind_attributes;
1377 foreach my $column ($source->columns) {
1378
1379 my $data_type = $source->column_info($column)->{data_type} || '';
1380 $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
eda28767 1381 if $data_type;
8b646589 1382 }
1383
1384 return $bind_attributes;
1385}
1386
9b83fccd 1387=head2 select
1388
d3b0e369 1389=over 4
1390
1391=item Arguments: $ident, $select, $condition, $attrs
1392
1393=back
1394
9b83fccd 1395Handle a SQL select statement.
1396
1397=cut
1398
de705b51 1399sub select {
1400 my $self = shift;
1401 my ($ident, $select, $condition, $attrs) = @_;
e4eb8ee1 1402 return $self->cursor_class->new($self, \@_, $attrs);
8b445e33 1403}
1404
1a14aa3f 1405sub select_single {
de705b51 1406 my $self = shift;
1407 my ($rv, $sth, @bind) = $self->_select(@_);
6157db4f 1408 my @row = $sth->fetchrow_array;
27252a4a 1409 my @nextrow = $sth->fetchrow_array if @row;
1410 if(@row && @nextrow) {
1a4e8d7c 1411 carp "Query returned more than one row. SQL that returns multiple rows is DEPRECATED for ->find and ->single";
1412 }
a3eaff0e 1413 # Need to call finish() to work round broken DBDs
6157db4f 1414 $sth->finish();
1415 return @row;
1a14aa3f 1416}
1417
9b83fccd 1418=head2 sth
1419
d3b0e369 1420=over 4
1421
1422=item Arguments: $sql
1423
1424=back
1425
9b83fccd 1426Returns a L<DBI> sth (statement handle) for the supplied SQL.
1427
1428=cut
1429
d4f16b21 1430sub _dbh_sth {
1431 my ($self, $dbh, $sql) = @_;
b33697ef 1432
d32d82f9 1433 # 3 is the if_active parameter which avoids active sth re-use
b33697ef 1434 my $sth = $self->disable_sth_caching
1435 ? $dbh->prepare($sql)
1436 : $dbh->prepare_cached($sql, {}, 3);
1437
d92a4015 1438 # XXX You would think RaiseError would make this impossible,
1439 # but apparently that's not true :(
61646ebd 1440 $self->throw_exception($dbh->errstr) if !$sth;
b33697ef 1441
1442 $sth;
d32d82f9 1443}
1444
8b445e33 1445sub sth {
cb5f2eea 1446 my ($self, $sql) = @_;
3ff1602f 1447 $self->dbh_do('_dbh_sth', $sql);
8b445e33 1448}
1449
d4f16b21 1450sub _dbh_columns_info_for {
1451 my ($self, $dbh, $table) = @_;
a32e8402 1452
d32d82f9 1453 if ($dbh->can('column_info')) {
a953d8d9 1454 my %result;
d32d82f9 1455 eval {
1456 my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1457 my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1458 $sth->execute();
1459 while ( my $info = $sth->fetchrow_hashref() ){
1460 my %column_info;
1461 $column_info{data_type} = $info->{TYPE_NAME};
1462 $column_info{size} = $info->{COLUMN_SIZE};
1463 $column_info{is_nullable} = $info->{NULLABLE} ? 1 : 0;
1464 $column_info{default_value} = $info->{COLUMN_DEF};
1465 my $col_name = $info->{COLUMN_NAME};
1466 $col_name =~ s/^\"(.*)\"$/$1/;
1467
1468 $result{$col_name} = \%column_info;
0d67fe74 1469 }
d32d82f9 1470 };
093fc7a6 1471 return \%result if !$@ && scalar keys %result;
d32d82f9 1472 }
0d67fe74 1473
d32d82f9 1474 my %result;
88262f96 1475 my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
d32d82f9 1476 $sth->execute;
1477 my @columns = @{$sth->{NAME_lc}};
1478 for my $i ( 0 .. $#columns ){
1479 my %column_info;
248bf0d0 1480 $column_info{data_type} = $sth->{TYPE}->[$i];
d32d82f9 1481 $column_info{size} = $sth->{PRECISION}->[$i];
1482 $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
0d67fe74 1483
d32d82f9 1484 if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1485 $column_info{data_type} = $1;
1486 $column_info{size} = $2;
0d67fe74 1487 }
1488
d32d82f9 1489 $result{$columns[$i]} = \%column_info;
1490 }
248bf0d0 1491 $sth->finish;
1492
1493 foreach my $col (keys %result) {
1494 my $colinfo = $result{$col};
1495 my $type_num = $colinfo->{data_type};
1496 my $type_name;
1497 if(defined $type_num && $dbh->can('type_info')) {
1498 my $type_info = $dbh->type_info($type_num);
1499 $type_name = $type_info->{TYPE_NAME} if $type_info;
1500 $colinfo->{data_type} = $type_name if $type_name;
1501 }
1502 }
d32d82f9 1503
1504 return \%result;
1505}
1506
1507sub columns_info_for {
1508 my ($self, $table) = @_;
3ff1602f 1509 $self->dbh_do('_dbh_columns_info_for', $table);
a953d8d9 1510}
1511
9b83fccd 1512=head2 last_insert_id
1513
1514Return the row id of the last insert.
1515
1516=cut
1517
d4f16b21 1518sub _dbh_last_insert_id {
1519 my ($self, $dbh, $source, $col) = @_;
1520 # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1521 $dbh->func('last_insert_rowid');
1522}
1523
843f8ecd 1524sub last_insert_id {
d4f16b21 1525 my $self = shift;
3ff1602f 1526 $self->dbh_do('_dbh_last_insert_id', @_);
843f8ecd 1527}
1528
9b83fccd 1529=head2 sqlt_type
1530
1531Returns the database driver name.
1532
1533=cut
1534
d4f16b21 1535sub sqlt_type { shift->dbh->{Driver}->{Name} }
1c339d71 1536
a71859b4 1537=head2 bind_attribute_by_data_type
1538
5d52945a 1539Given a datatype from column info, returns a database specific bind
40911cb3 1540attribute for C<< $dbh->bind_param($val,$attribute) >> or nothing if we will
5d52945a 1541let the database planner just handle it.
a71859b4 1542
1543Generally only needed for special case column types, like bytea in postgres.
1544
1545=cut
1546
1547sub bind_attribute_by_data_type {
1548 return;
1549}
1550
58ded37e 1551=head2 create_ddl_dir
9b83fccd 1552
1553=over 4
1554
348d7c84 1555=item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
9b83fccd 1556
1557=back
1558
d3b0e369 1559Creates a SQL file based on the Schema, for each of the specified
9b83fccd 1560database types, in the given directory.
1561
348d7c84 1562By default, C<\%sqlt_args> will have
1563
1564 { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
1565
1566merged with the hash passed in. To disable any of those features, pass in a
1567hashref like the following
1568
1569 { ignore_constraint_names => 0, # ... other options }
1570
9b83fccd 1571=cut
1572
99a74c4a 1573sub create_ddl_dir {
c9d2e0a2 1574 my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
e673f011 1575
99a74c4a 1576 if(!$dir || !-d $dir) {
e673f011 1577 warn "No directory given, using ./\n";
1578 $dir = "./";
1579 }
1580 $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1581 $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
b1f9d92e 1582
1583 my $schema_version = $schema->schema_version || '1.x';
1584 $version ||= $schema_version;
1585
d4d46d19 1586 $sqltargs = {
1587 add_drop_table => 1,
1588 ignore_constraint_names => 1,
1589 ignore_index_names => 1,
1590 %{$sqltargs || {}}
1591 };
e673f011 1592
b6d9f089 1593 $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09: '}
40dce2a5 1594 . $self->_check_sqlt_message . q{'})
1595 if !$self->_check_sqlt_version;
e673f011 1596
45f1a484 1597 my $sqlt = SQL::Translator->new( $sqltargs );
b7e303a8 1598
1599 $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1600 my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
1601
99a74c4a 1602 foreach my $db (@$databases) {
e673f011 1603 $sqlt->reset();
c9d2e0a2 1604 $sqlt = $self->configure_sqlt($sqlt, $db);
b7e303a8 1605 $sqlt->{schema} = $sqlt_schema;
e673f011 1606 $sqlt->producer($db);
1607
1608 my $file;
99a74c4a 1609 my $filename = $schema->ddl_filename($db, $version, $dir);
b1f9d92e 1610 if (-e $filename && ($version eq $schema_version )) {
99a74c4a 1611 # if we are dumping the current version, overwrite the DDL
1612 warn "Overwriting existing DDL file - $filename";
1613 unlink($filename);
1614 }
c9d2e0a2 1615
99a74c4a 1616 my $output = $sqlt->translate;
1617 if(!$output) {
1618 warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1619 next;
1620 }
1621 if(!open($file, ">$filename")) {
1622 $self->throw_exception("Can't open $filename for writing ($!)");
1623 next;
1624 }
1625 print $file $output;
1626 close($file);
1627
1628 next unless ($preversion);
c9d2e0a2 1629
99a74c4a 1630 require SQL::Translator::Diff;
2dc2cd0f 1631
99a74c4a 1632 my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
1633 if(!-e $prefilename) {
1634 warn("No previous schema file found ($prefilename)");
1635 next;
1636 }
c9d2e0a2 1637
99a74c4a 1638 my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
1639 if(-e $difffile) {
1640 warn("Overwriting existing diff file - $difffile");
1641 unlink($difffile);
1642 }
1643
1644 my $source_schema;
1645 {
1646 my $t = SQL::Translator->new($sqltargs);
1647 $t->debug( 0 );
1648 $t->trace( 0 );
1649 $t->parser( $db ) or die $t->error;
1650 $t = $self->configure_sqlt($t, $db);
1651 my $out = $t->translate( $prefilename ) or die $t->error;
1652 $source_schema = $t->schema;
1653 unless ( $source_schema->name ) {
1654 $source_schema->name( $prefilename );
2dc2cd0f 1655 }
99a74c4a 1656 }
c9d2e0a2 1657
99a74c4a 1658 # The "new" style of producers have sane normalization and can support
1659 # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
1660 # And we have to diff parsed SQL against parsed SQL.
1661 my $dest_schema = $sqlt_schema;
1662
1663 unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
1664 my $t = SQL::Translator->new($sqltargs);
1665 $t->debug( 0 );
1666 $t->trace( 0 );
1667 $t->parser( $db ) or die $t->error;
1668 $t = $self->configure_sqlt($t, $db);
1669 my $out = $t->translate( $filename ) or die $t->error;
1670 $dest_schema = $t->schema;
1671 $dest_schema->name( $filename )
1672 unless $dest_schema->name;
1673 }
1674
1675 my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1676 $dest_schema, $db,
1677 $sqltargs
1678 );
1679 if(!open $file, ">$difffile") {
1680 $self->throw_exception("Can't write to $difffile ($!)");
1681 next;
c9d2e0a2 1682 }
99a74c4a 1683 print $file $diff;
1684 close($file);
e673f011 1685 }
c9d2e0a2 1686}
e673f011 1687
c9d2e0a2 1688sub configure_sqlt() {
1689 my $self = shift;
1690 my $tr = shift;
1691 my $db = shift || $self->sqlt_type;
1692 if ($db eq 'PostgreSQL') {
1693 $tr->quote_table_names(0);
1694 $tr->quote_field_names(0);
1695 }
1696 return $tr;
e673f011 1697}
1698
9b83fccd 1699=head2 deployment_statements
1700
d3b0e369 1701=over 4
1702
1703=item Arguments: $schema, $type, $version, $directory, $sqlt_args
1704
1705=back
1706
1707Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1708The database driver name is given by C<$type>, though the value from
1709L</sqlt_type> is used if it is not specified.
1710
1711C<$directory> is used to return statements from files in a previously created
1712L</create_ddl_dir> directory and is optional. The filenames are constructed
1713from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1714
1715If no C<$directory> is specified then the statements are constructed on the
1716fly using L<SQL::Translator> and C<$version> is ignored.
1717
1718See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
9b83fccd 1719
1720=cut
1721
e673f011 1722sub deployment_statements {
1723 my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
915919c5 1724 # Need to be connected to get the correct sqlt_type
c377d939 1725 $self->ensure_connected() unless $type;
e673f011 1726 $type ||= $self->sqlt_type;
b1f9d92e 1727 $version ||= $schema->schema_version || '1.x';
e673f011 1728 $dir ||= './';
c9d2e0a2 1729 my $filename = $schema->ddl_filename($type, $dir, $version);
1730 if(-f $filename)
1731 {
1732 my $file;
1733 open($file, "<$filename")
1734 or $self->throw_exception("Can't open $filename ($!)");
1735 my @rows = <$file>;
1736 close($file);
1737 return join('', @rows);
1738 }
1739
b6d9f089 1740 $self->throw_exception(q{Can't deploy without SQL::Translator 0.09: '}
40dce2a5 1741 . $self->_check_sqlt_message . q{'})
1742 if !$self->_check_sqlt_version;
1743
1744 require SQL::Translator::Parser::DBIx::Class;
1745 eval qq{use SQL::Translator::Producer::${type}};
1746 $self->throw_exception($@) if $@;
1747
1748 # sources needs to be a parser arg, but for simplicty allow at top level
1749 # coming in
1750 $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1751 if exists $sqltargs->{sources};
1752
1753 my $tr = SQL::Translator->new(%$sqltargs);
1754 SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1755 return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1c339d71 1756}
843f8ecd 1757
1c339d71 1758sub deploy {
260129d8 1759 my ($self, $schema, $type, $sqltargs, $dir) = @_;
849d23b8 1760 foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1761 foreach my $line ( split(";\n", $statement)) {
1762 next if($line =~ /^--/);
1763 next if(!$line);
61bf0de5 1764# next if($line =~ /^DROP/m);
849d23b8 1765 next if($line =~ /^BEGIN TRANSACTION/m);
1766 next if($line =~ /^COMMIT/m);
1767 next if $line =~ /^\s+$/; # skip whitespace only
e5d9ee92 1768 $self->_query_start($line);
61bf0de5 1769 eval {
1770 $self->dbh->do($line); # shouldn't be using ->dbh ?
1771 };
1772 if ($@) {
1773 warn qq{$@ (running "${line}")};
1774 }
e5d9ee92 1775 $self->_query_end($line);
e4fe9ba3 1776 }
75d07914 1777 }
1c339d71 1778}
843f8ecd 1779
9b83fccd 1780=head2 datetime_parser
1781
1782Returns the datetime parser class
1783
1784=cut
1785
f86fcf0d 1786sub datetime_parser {
1787 my $self = shift;
114780ee 1788 return $self->{datetime_parser} ||= do {
1789 $self->ensure_connected;
1790 $self->build_datetime_parser(@_);
1791 };
f86fcf0d 1792}
1793
9b83fccd 1794=head2 datetime_parser_type
1795
1796Defines (returns) the datetime parser class - currently hardwired to
1797L<DateTime::Format::MySQL>
1798
1799=cut
1800
f86fcf0d 1801sub datetime_parser_type { "DateTime::Format::MySQL"; }
1802
9b83fccd 1803=head2 build_datetime_parser
1804
1805See L</datetime_parser>
1806
1807=cut
1808
f86fcf0d 1809sub build_datetime_parser {
1810 my $self = shift;
1811 my $type = $self->datetime_parser_type(@_);
1812 eval "use ${type}";
1813 $self->throw_exception("Couldn't load ${type}: $@") if $@;
1814 return $type;
1815}
1816
40dce2a5 1817{
1818 my $_check_sqlt_version; # private
1819 my $_check_sqlt_message; # private
1820 sub _check_sqlt_version {
1821 return $_check_sqlt_version if defined $_check_sqlt_version;
b6d9f089 1822 eval 'use SQL::Translator "0.09"';
b7e303a8 1823 $_check_sqlt_message = $@ || '';
1824 $_check_sqlt_version = !$@;
40dce2a5 1825 }
1826
1827 sub _check_sqlt_message {
1828 _check_sqlt_version if !defined $_check_sqlt_message;
1829 $_check_sqlt_message;
1830 }
1831}
1832
106d5f3b 1833=head2 is_replicating
1834
1835A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
1836replicate from a master database. Default is undef, which is the result
1837returned by databases that don't support replication.
1838
1839=cut
1840
1841sub is_replicating {
1842 return;
1843
1844}
1845
1846=head2 lag_behind_master
1847
1848Returns a number that represents a certain amount of lag behind a master db
1849when a given storage is replicating. The number is database dependent, but
1850starts at zero and increases with the amount of lag. Default in undef
1851
1852=cut
1853
1854sub lag_behind_master {
1855 return;
1856}
1857
c756145c 1858sub DESTROY {
1859 my $self = shift;
f5de3933 1860 return if !$self->_dbh;
c756145c 1861 $self->_verify_pid;
1862 $self->_dbh(undef);
1863}
92925617 1864
8b445e33 18651;
1866
92fe2181 1867=head1 USAGE NOTES
1868
1869=head2 DBIx::Class and AutoCommit
1870
1871DBIx::Class can do some wonderful magic with handling exceptions,
1872disconnections, and transactions when you use C<< AutoCommit => 1 >>
1873combined with C<txn_do> for transaction support.
1874
1875If you set C<< AutoCommit => 0 >> in your connect info, then you are always
1876in an assumed transaction between commits, and you're telling us you'd
1877like to manage that manually. A lot of the magic protections offered by
1878this module will go away. We can't protect you from exceptions due to database
1879disconnects because we don't know anything about how to restart your
1880transactions. You're on your own for handling all sorts of exceptional
1881cases if you choose the C<< AutoCommit => 0 >> path, just as you would
1882be with raw DBI.
1883
1884
9b83fccd 1885=head1 SQL METHODS
1886
1887The module defines a set of methods within the DBIC::SQL::Abstract
1888namespace. These build on L<SQL::Abstract::Limit> to provide the
1889SQL query functions.
1890
1891The following methods are extended:-
1892
1893=over 4
1894
1895=item delete
1896
1897=item insert
1898
1899=item select
1900
1901=item update
1902
1903=item limit_dialect
1904
2cc3a7be 1905See L</connect_info> for details.
bb4f246d 1906
9b83fccd 1907=item quote_char
1908
2cc3a7be 1909See L</connect_info> for details.
bb4f246d 1910
9b83fccd 1911=item name_sep
1912
2cc3a7be 1913See L</connect_info> for details.
bb4f246d 1914
9b83fccd 1915=back
1916
8b445e33 1917=head1 AUTHORS
1918
daec44b8 1919Matt S. Trout <mst@shadowcatsystems.co.uk>
8b445e33 1920
9f19b1d6 1921Andy Grundman <andy@hybridized.org>
1922
8b445e33 1923=head1 LICENSE
1924
1925You may distribute this code under the same terms as Perl itself.
1926
1927=cut