make insert_bulk use the sth method in the newer style of things
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
CommitLineData
8b445e33 1package DBIx::Class::Storage::DBI;
e673f011 2# -*- mode: cperl; cperl-indent-level: 2 -*-
8b445e33 3
a62cf8d4 4use base 'DBIx::Class::Storage';
5
20a2c954 6use strict;
7use warnings;
8b445e33 8use DBI;
aeaf3ce2 9use SQL::Abstract::Limit;
28927b50 10use DBIx::Class::Storage::DBI::Cursor;
4c248161 11use DBIx::Class::Storage::Statistics;
92b858c9 12use IO::File;
046ad905 13
14__PACKAGE__->mk_group_accessors(
15 'simple' =>
16 qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
17 cursor on_connect_do transaction_depth/
18);
19
bd7efd39 20BEGIN {
21
cb5f2eea 22package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
bd7efd39 23
24use base qw/SQL::Abstract::Limit/;
25
2cc3a7be 26# This prevents the caching of $dbh in S::A::L, I believe
27sub new {
28 my $self = shift->SUPER::new(@_);
29
30 # If limit_dialect is a ref (like a $dbh), go ahead and replace
31 # it with what it resolves to:
32 $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
33 if ref $self->{limit_dialect};
34
35 $self;
36}
37
260129d8 38sub _RowNumberOver {
39 my ($self, $sql, $order, $rows, $offset ) = @_;
40
41 $offset += 1;
42 my $last = $rows + $offset;
43 my ( $order_by ) = $self->_order_by( $order );
44
45 $sql = <<"";
46SELECT * FROM
47(
48 SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
49 $sql
50 $order_by
51 ) Q1
52) Q2
53WHERE ROW_NUM BETWEEN $offset AND $last
54
55 return $sql;
56}
57
58
2cc3a7be 59# While we're at it, this should make LIMIT queries more efficient,
60# without digging into things too deeply
758272ec 61use Scalar::Util 'blessed';
2cc3a7be 62sub _find_syntax {
63 my ($self, $syntax) = @_;
758272ec 64 my $dbhname = blessed($syntax) ? $syntax->{Driver}{Name} : $syntax;
65# print STDERR "Found DBH $syntax >$dbhname< ", $syntax->{Driver}->{Name}, "\n";
260129d8 66 if(ref($self) && $dbhname && $dbhname eq 'DB2') {
67 return 'RowNumberOver';
68 }
69
2cc3a7be 70 $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
71}
72
54540863 73sub select {
74 my ($self, $table, $fields, $where, $order, @rest) = @_;
6346a152 75 $table = $self->_quote($table) unless ref($table);
eac29141 76 local $self->{rownum_hack_count} = 1
77 if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
54540863 78 @rest = (-1) unless defined $rest[0];
0823196c 79 die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
80 # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
8839560b 81 local $self->{having_bind} = [];
bc0c9800 82 my ($sql, @ret) = $self->SUPER::select(
83 $table, $self->_recurse_fields($fields), $where, $order, @rest
84 );
8839560b 85 return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
54540863 86}
87
6346a152 88sub insert {
89 my $self = shift;
90 my $table = shift;
91 $table = $self->_quote($table) unless ref($table);
92 $self->SUPER::insert($table, @_);
93}
94
95sub update {
96 my $self = shift;
97 my $table = shift;
98 $table = $self->_quote($table) unless ref($table);
99 $self->SUPER::update($table, @_);
100}
101
102sub delete {
103 my $self = shift;
104 my $table = shift;
105 $table = $self->_quote($table) unless ref($table);
106 $self->SUPER::delete($table, @_);
107}
108
54540863 109sub _emulate_limit {
110 my $self = shift;
111 if ($_[3] == -1) {
112 return $_[1].$self->_order_by($_[2]);
113 } else {
114 return $self->SUPER::_emulate_limit(@_);
115 }
116}
117
118sub _recurse_fields {
119 my ($self, $fields) = @_;
120 my $ref = ref $fields;
121 return $self->_quote($fields) unless $ref;
122 return $$fields if $ref eq 'SCALAR';
123
124 if ($ref eq 'ARRAY') {
eac29141 125 return join(', ', map {
126 $self->_recurse_fields($_)
127 .(exists $self->{rownum_hack_count}
128 ? ' AS col'.$self->{rownum_hack_count}++
129 : '')
130 } @$fields);
54540863 131 } elsif ($ref eq 'HASH') {
132 foreach my $func (keys %$fields) {
133 return $self->_sqlcase($func)
134 .'( '.$self->_recurse_fields($fields->{$func}).' )';
135 }
136 }
137}
138
139sub _order_by {
140 my $self = shift;
141 my $ret = '';
8839560b 142 my @extra;
54540863 143 if (ref $_[0] eq 'HASH') {
144 if (defined $_[0]->{group_by}) {
145 $ret = $self->_sqlcase(' group by ')
146 .$self->_recurse_fields($_[0]->{group_by});
147 }
8839560b 148 if (defined $_[0]->{having}) {
149 my $frag;
150 ($frag, @extra) = $self->_recurse_where($_[0]->{having});
151 push(@{$self->{having_bind}}, @extra);
152 $ret .= $self->_sqlcase(' having ').$frag;
153 }
54540863 154 if (defined $_[0]->{order_by}) {
7ce5cbe7 155 $ret .= $self->_order_by($_[0]->{order_by});
54540863 156 }
d09c569a 157 } elsif (ref $_[0] eq 'SCALAR') {
e535069e 158 $ret = $self->_sqlcase(' order by ').${ $_[0] };
d09c569a 159 } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
160 my @order = @{+shift};
161 $ret = $self->_sqlcase(' order by ')
162 .join(', ', map {
163 my $r = $self->_order_by($_, @_);
164 $r =~ s/^ ?ORDER BY //i;
165 $r;
166 } @order);
54540863 167 } else {
168 $ret = $self->SUPER::_order_by(@_);
169 }
170 return $ret;
171}
172
f48dd03f 173sub _order_directions {
174 my ($self, $order) = @_;
175 $order = $order->{order_by} if ref $order eq 'HASH';
176 return $self->SUPER::_order_directions($order);
177}
178
2a816814 179sub _table {
bd7efd39 180 my ($self, $from) = @_;
181 if (ref $from eq 'ARRAY') {
182 return $self->_recurse_from(@$from);
183 } elsif (ref $from eq 'HASH') {
184 return $self->_make_as($from);
185 } else {
6346a152 186 return $from; # would love to quote here but _table ends up getting called
187 # twice during an ->select without a limit clause due to
188 # the way S::A::Limit->select works. should maybe consider
189 # bypassing this and doing S::A::select($self, ...) in
190 # our select method above. meantime, quoting shims have
191 # been added to select/insert/update/delete here
bd7efd39 192 }
193}
194
195sub _recurse_from {
196 my ($self, $from, @join) = @_;
197 my @sqlf;
198 push(@sqlf, $self->_make_as($from));
199 foreach my $j (@join) {
200 my ($to, $on) = @$j;
73856587 201
54540863 202 # check whether a join type exists
203 my $join_clause = '';
ca7b9fdf 204 my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
205 if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
206 $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
54540863 207 } else {
208 $join_clause = ' JOIN ';
209 }
73856587 210 push(@sqlf, $join_clause);
211
bd7efd39 212 if (ref $to eq 'ARRAY') {
213 push(@sqlf, '(', $self->_recurse_from(@$to), ')');
214 } else {
96cdbbab 215 push(@sqlf, $self->_make_as($to));
bd7efd39 216 }
217 push(@sqlf, ' ON ', $self->_join_condition($on));
218 }
219 return join('', @sqlf);
220}
221
222sub _make_as {
223 my ($self, $from) = @_;
54540863 224 return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
bc0c9800 225 reverse each %{$self->_skip_options($from)});
73856587 226}
227
228sub _skip_options {
54540863 229 my ($self, $hash) = @_;
230 my $clean_hash = {};
231 $clean_hash->{$_} = $hash->{$_}
232 for grep {!/^-/} keys %$hash;
233 return $clean_hash;
bd7efd39 234}
235
236sub _join_condition {
237 my ($self, $cond) = @_;
5efe4c79 238 if (ref $cond eq 'HASH') {
239 my %j;
bc0c9800 240 for (keys %$cond) {
241 my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
242 };
5efe4c79 243 return $self->_recurse_where(\%j);
244 } elsif (ref $cond eq 'ARRAY') {
245 return join(' OR ', map { $self->_join_condition($_) } @$cond);
246 } else {
247 die "Can't handle this yet!";
248 }
bd7efd39 249}
250
2a816814 251sub _quote {
252 my ($self, $label) = @_;
253 return '' unless defined $label;
3b24f6ea 254 return "*" if $label eq '*';
41728a6e 255 return $label unless $self->{quote_char};
3b24f6ea 256 if(ref $self->{quote_char} eq "ARRAY"){
257 return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
258 if !defined $self->{name_sep};
259 my $sep = $self->{name_sep};
260 return join($self->{name_sep},
261 map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1] }
262 split(/\Q$sep\E/,$label));
263 }
2a816814 264 return $self->SUPER::_quote($label);
265}
266
7be93b07 267sub limit_dialect {
268 my $self = shift;
269 $self->{limit_dialect} = shift if @_;
270 return $self->{limit_dialect};
271}
272
2437a1e3 273sub quote_char {
274 my $self = shift;
275 $self->{quote_char} = shift if @_;
276 return $self->{quote_char};
277}
278
279sub name_sep {
280 my $self = shift;
281 $self->{name_sep} = shift if @_;
282 return $self->{name_sep};
283}
284
bd7efd39 285} # End of BEGIN block
286
b327f988 287=head1 NAME
288
289DBIx::Class::Storage::DBI - DBI storage handler
290
291=head1 SYNOPSIS
292
293=head1 DESCRIPTION
294
046ad905 295This class represents the connection to an RDBMS via L<DBI>. See
296L<DBIx::Class::Storage> for general information. This pod only
297documents DBI-specific methods and behaviors.
b327f988 298
299=head1 METHODS
300
9b83fccd 301=cut
302
8b445e33 303sub new {
046ad905 304 my $new = shift->next::method(@_);
82cc0386 305
28927b50 306 $new->cursor("DBIx::Class::Storage::DBI::Cursor");
d79f59b9 307 $new->transaction_depth(0);
2cc3a7be 308 $new->_sql_maker_opts({});
1b994857 309 $new->{_in_dbh_do} = 0;
dbaee748 310 $new->{_dbh_gen} = 0;
82cc0386 311
046ad905 312 $new;
1c339d71 313}
314
1b45b01e 315=head2 connect_info
316
bb4f246d 317The arguments of C<connect_info> are always a single array reference.
1b45b01e 318
bb4f246d 319This is normally accessed via L<DBIx::Class::Schema/connection>, which
320encapsulates its argument list in an arrayref before calling
321C<connect_info> here.
1b45b01e 322
bb4f246d 323The arrayref can either contain the same set of arguments one would
324normally pass to L<DBI/connect>, or a lone code reference which returns
325a connected database handle.
d7c4c15c 326
2cc3a7be 327In either case, if the final argument in your connect_info happens
328to be a hashref, C<connect_info> will look there for several
329connection-specific options:
330
331=over 4
332
333=item on_connect_do
334
335This can be set to an arrayref of literal sql statements, which will
336be executed immediately after making the connection to the database
337every time we [re-]connect.
338
339=item limit_dialect
340
341Sets the limit dialect. This is useful for JDBC-bridge among others
342where the remote SQL-dialect cannot be determined by the name of the
343driver alone.
344
345=item quote_char
d7c4c15c 346
2cc3a7be 347Specifies what characters to use to quote table and column names. If
348you use this you will want to specify L<name_sep> as well.
349
350quote_char expects either a single character, in which case is it is placed
351on either side of the table/column, or an arrayref of length 2 in which case the
352table/column name is placed between the elements.
353
354For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd
355use C<quote_char =E<gt> [qw/[ ]/]>.
356
357=item name_sep
358
359This only needs to be used in conjunction with L<quote_char>, and is used to
360specify the charecter that seperates elements (schemas, tables, columns) from
361each other. In most cases this is simply a C<.>.
362
363=back
364
365These options can be mixed in with your other L<DBI> connection attributes,
366or placed in a seperate hashref after all other normal L<DBI> connection
367arguments.
368
369Every time C<connect_info> is invoked, any previous settings for
370these options will be cleared before setting the new ones, regardless of
371whether any options are specified in the new C<connect_info>.
372
f5de3933 373Important note: DBIC expects the returned database handle provided by
374a subref argument to have RaiseError set on it. If it doesn't, things
375might not work very well, YMMV. If you don't use a subref, DBIC will
376force this setting for you anyways. Setting HandleError to anything
377other than simple exception object wrapper might cause problems too.
378
2cc3a7be 379Examples:
380
381 # Simple SQLite connection
bb4f246d 382 ->connect_info([ 'dbi:SQLite:./foo.db' ]);
6789ebe3 383
2cc3a7be 384 # Connect via subref
bb4f246d 385 ->connect_info([ sub { DBI->connect(...) } ]);
6789ebe3 386
2cc3a7be 387 # A bit more complicated
bb4f246d 388 ->connect_info(
389 [
390 'dbi:Pg:dbname=foo',
391 'postgres',
392 'my_pg_password',
393 { AutoCommit => 0 },
2cc3a7be 394 { quote_char => q{"}, name_sep => q{.} },
395 ]
396 );
397
398 # Equivalent to the previous example
399 ->connect_info(
400 [
401 'dbi:Pg:dbname=foo',
402 'postgres',
403 'my_pg_password',
404 { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
bb4f246d 405 ]
406 );
6789ebe3 407
2cc3a7be 408 # Subref + DBIC-specific connection options
bb4f246d 409 ->connect_info(
410 [
411 sub { DBI->connect(...) },
2cc3a7be 412 {
413 quote_char => q{`},
414 name_sep => q{@},
415 on_connect_do => ['SET search_path TO myschema,otherschema,public'],
416 },
bb4f246d 417 ]
418 );
6789ebe3 419
004d31fb 420=cut
421
046ad905 422sub connect_info {
423 my ($self, $info_arg) = @_;
4c248161 424
046ad905 425 return $self->_connect_info if !$info_arg;
4c248161 426
046ad905 427 # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
428 # the new set of options
429 $self->_sql_maker(undef);
430 $self->_sql_maker_opts({});
486ad69b 431
046ad905 432 my $info = [ @$info_arg ]; # copy because we can alter it
433 my $last_info = $info->[-1];
434 if(ref $last_info eq 'HASH') {
435 if(my $on_connect_do = delete $last_info->{on_connect_do}) {
436 $self->on_connect_do($on_connect_do);
437 }
438 for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
439 if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
440 $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
441 }
442 }
486ad69b 443
046ad905 444 # Get rid of any trailing empty hashref
445 pop(@$info) if !keys %$last_info;
446 }
d7c4c15c 447
046ad905 448 $self->_connect_info($info);
449}
004d31fb 450
046ad905 451=head2 on_connect_do
4c248161 452
046ad905 453This method is deprecated in favor of setting via L</connect_info>.
486ad69b 454
f11383c2 455=head2 dbh_do
456
046ad905 457Arguments: $subref, @extra_coderef_args?
458
d4f16b21 459Execute the given subref using the new exception-based connection management.
046ad905 460
d4f16b21 461The first two arguments will be the storage object that C<dbh_do> was called
462on and a database handle to use. Any additional arguments will be passed
463verbatim to the called subref as arguments 2 and onwards.
464
465Using this (instead of $self->_dbh or $self->dbh) ensures correct
466exception handling and reconnection (or failover in future subclasses).
467
468Your subref should have no side-effects outside of the database, as
469there is the potential for your subref to be partially double-executed
470if the database connection was stale/dysfunctional.
046ad905 471
56769f7c 472Example:
f11383c2 473
56769f7c 474 my @stuff = $schema->storage->dbh_do(
475 sub {
d4f16b21 476 my ($storage, $dbh, @cols) = @_;
477 my $cols = join(q{, }, @cols);
478 $dbh->selectrow_array("SELECT $cols FROM foo");
046ad905 479 },
480 @column_list
56769f7c 481 );
f11383c2 482
483=cut
484
485sub dbh_do {
046ad905 486 my $self = shift;
aa27edf7 487 my $coderef = shift;
488
aa27edf7 489 ref $coderef eq 'CODE' or $self->throw_exception
490 ('$coderef must be a CODE reference');
f11383c2 491
1b994857 492 return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do};
493 local $self->{_in_dbh_do} = 1;
494
f11383c2 495 my @result;
496 my $want_array = wantarray;
497
498 eval {
56769f7c 499 $self->_verify_pid if $self->_dbh;
f11383c2 500 $self->_populate_dbh if !$self->_dbh;
f11383c2 501 if($want_array) {
d4f16b21 502 @result = $coderef->($self, $self->_dbh, @_);
f11383c2 503 }
56769f7c 504 elsif(defined $want_array) {
d4f16b21 505 $result[0] = $coderef->($self, $self->_dbh, @_);
f11383c2 506 }
56769f7c 507 else {
d4f16b21 508 $coderef->($self, $self->_dbh, @_);
56769f7c 509 }
f11383c2 510 };
56769f7c 511
aa27edf7 512 my $exception = $@;
513 if(!$exception) { return $want_array ? @result : $result[0] }
514
515 $self->throw_exception($exception) if $self->connected;
516
517 # We were not connected - reconnect and retry, but let any
518 # exception fall right through this time
519 $self->_populate_dbh;
d4f16b21 520 $coderef->($self, $self->_dbh, @_);
aa27edf7 521}
522
523# This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
524# It also informs dbh_do to bypass itself while under the direction of txn_do,
1b994857 525# via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
aa27edf7 526sub txn_do {
527 my $self = shift;
528 my $coderef = shift;
529
530 ref $coderef eq 'CODE' or $self->throw_exception
531 ('$coderef must be a CODE reference');
532
1b994857 533 local $self->{_in_dbh_do} = 1;
f11383c2 534
aa27edf7 535 my @result;
536 my $want_array = wantarray;
537
d4f16b21 538 my $tried = 0;
539 while(1) {
540 eval {
541 $self->_verify_pid if $self->_dbh;
542 $self->_populate_dbh if !$self->_dbh;
aa27edf7 543
d4f16b21 544 $self->txn_begin;
545 if($want_array) {
546 @result = $coderef->(@_);
547 }
548 elsif(defined $want_array) {
549 $result[0] = $coderef->(@_);
550 }
551 else {
552 $coderef->(@_);
553 }
554 $self->txn_commit;
555 };
aa27edf7 556
d4f16b21 557 my $exception = $@;
558 if(!$exception) { return $want_array ? @result : $result[0] }
559
560 if($tried++ > 0 || $self->connected) {
561 eval { $self->txn_rollback };
562 my $rollback_exception = $@;
563 if($rollback_exception) {
564 my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
565 $self->throw_exception($exception) # propagate nested rollback
566 if $rollback_exception =~ /$exception_class/;
567
568 $self->throw_exception(
569 "Transaction aborted: ${exception}. "
570 . "Rollback failed: ${rollback_exception}"
571 );
572 }
573 $self->throw_exception($exception)
aa27edf7 574 }
56769f7c 575
d4f16b21 576 # We were not connected, and was first try - reconnect and retry
577 # via the while loop
578 $self->_populate_dbh;
579 }
f11383c2 580}
581
9b83fccd 582=head2 disconnect
583
046ad905 584Our C<disconnect> method also performs a rollback first if the
9b83fccd 585database is not in C<AutoCommit> mode.
586
587=cut
588
412db1f4 589sub disconnect {
590 my ($self) = @_;
591
92925617 592 if( $self->connected ) {
593 $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
594 $self->_dbh->disconnect;
595 $self->_dbh(undef);
dbaee748 596 $self->{_dbh_gen}++;
92925617 597 }
412db1f4 598}
599
f11383c2 600sub connected {
601 my ($self) = @_;
412db1f4 602
1346e22d 603 if(my $dbh = $self->_dbh) {
604 if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
dbaee748 605 $self->_dbh(undef);
606 $self->{_dbh_gen}++;
607 return;
1346e22d 608 }
56769f7c 609 else {
610 $self->_verify_pid;
611 }
1346e22d 612 return ($dbh->FETCH('Active') && $dbh->ping);
613 }
614
615 return 0;
412db1f4 616}
617
f11383c2 618# handle pid changes correctly
56769f7c 619# NOTE: assumes $self->_dbh is a valid $dbh
f11383c2 620sub _verify_pid {
621 my ($self) = @_;
622
56769f7c 623 return if $self->_conn_pid == $$;
f11383c2 624
f11383c2 625 $self->_dbh->{InactiveDestroy} = 1;
d3abf3fe 626 $self->_dbh(undef);
dbaee748 627 $self->{_dbh_gen}++;
f11383c2 628
629 return;
630}
631
412db1f4 632sub ensure_connected {
633 my ($self) = @_;
634
635 unless ($self->connected) {
8b445e33 636 $self->_populate_dbh;
637 }
412db1f4 638}
639
c235bbae 640=head2 dbh
641
642Returns the dbh - a data base handle of class L<DBI>.
643
644=cut
645
412db1f4 646sub dbh {
647 my ($self) = @_;
648
649 $self->ensure_connected;
8b445e33 650 return $self->_dbh;
651}
652
f1f56aad 653sub _sql_maker_args {
654 my ($self) = @_;
655
2cc3a7be 656 return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
f1f56aad 657}
658
48c69e7c 659sub sql_maker {
660 my ($self) = @_;
fdc1c3d0 661 unless ($self->_sql_maker) {
f1f56aad 662 $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
48c69e7c 663 }
664 return $self->_sql_maker;
665}
666
8b445e33 667sub _populate_dbh {
668 my ($self) = @_;
1b45b01e 669 my @info = @{$self->_connect_info || []};
8b445e33 670 $self->_dbh($self->_connect(@info));
2fd24e78 671
672 if(ref $self eq 'DBIx::Class::Storage::DBI') {
673 my $driver = $self->_dbh->{Driver}->{Name};
efe6365b 674 if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
2fd24e78 675 bless $self, "DBIx::Class::Storage::DBI::${driver}";
676 $self->_rebless() if $self->can('_rebless');
677 }
843f8ecd 678 }
2fd24e78 679
d7c4c15c 680 # if on-connect sql statements are given execute them
681 foreach my $sql_statement (@{$self->on_connect_do || []}) {
4c248161 682 $self->debugobj->query_start($sql_statement) if $self->debug();
d7c4c15c 683 $self->_dbh->do($sql_statement);
4c248161 684 $self->debugobj->query_end($sql_statement) if $self->debug();
d7c4c15c 685 }
5ef3e508 686
1346e22d 687 $self->_conn_pid($$);
688 $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
8b445e33 689}
690
691sub _connect {
692 my ($self, @info) = @_;
5ef3e508 693
9d31f7dc 694 $self->throw_exception("You failed to provide any connection info")
695 if !@info;
696
90ec6cad 697 my ($old_connect_via, $dbh);
698
5ef3e508 699 if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
90ec6cad 700 $old_connect_via = $DBI::connect_via;
5ef3e508 701 $DBI::connect_via = 'connect';
5ef3e508 702 }
703
75db246c 704 eval {
f5de3933 705 if(ref $info[0] eq 'CODE') {
706 $dbh = &{$info[0]}
707 }
708 else {
709 $dbh = DBI->connect(@info);
710 $dbh->{RaiseError} = 1;
711 $dbh->{PrintError} = 0;
16e10e2f 712 $dbh->{PrintWarn} = 0;
f5de3933 713 }
75db246c 714 };
90ec6cad 715
716 $DBI::connect_via = $old_connect_via if $old_connect_via;
717
75db246c 718 if (!$dbh || $@) {
719 $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
720 }
90ec6cad 721
e571e823 722 $dbh;
8b445e33 723}
724
d4f16b21 725sub _dbh_txn_begin {
726 my ($self, $dbh) = @_;
d32d82f9 727 if ($dbh->{AutoCommit}) {
728 $self->debugobj->txn_begin()
729 if ($self->debug);
730 $dbh->begin_work;
731 }
732}
733
8091aa91 734sub txn_begin {
d79f59b9 735 my $self = shift;
d4f16b21 736 $self->dbh_do($self->can('_dbh_txn_begin'))
d32d82f9 737 if $self->{transaction_depth}++ == 0;
738}
739
d4f16b21 740sub _dbh_txn_commit {
741 my ($self, $dbh) = @_;
d32d82f9 742 if ($self->{transaction_depth} == 0) {
743 unless ($dbh->{AutoCommit}) {
744 $self->debugobj->txn_commit()
745 if ($self->debug);
746 $dbh->commit;
747 }
748 }
749 else {
750 if (--$self->{transaction_depth} == 0) {
751 $self->debugobj->txn_commit()
752 if ($self->debug);
753 $dbh->commit;
754 }
986e4fca 755 }
8091aa91 756}
8b445e33 757
8091aa91 758sub txn_commit {
d79f59b9 759 my $self = shift;
d4f16b21 760 $self->dbh_do($self->can('_dbh_txn_commit'));
d32d82f9 761}
762
d4f16b21 763sub _dbh_txn_rollback {
764 my ($self, $dbh) = @_;
d32d82f9 765 if ($self->{transaction_depth} == 0) {
766 unless ($dbh->{AutoCommit}) {
767 $self->debugobj->txn_rollback()
768 if ($self->debug);
769 $dbh->rollback;
770 }
771 }
772 else {
773 if (--$self->{transaction_depth} == 0) {
774 $self->debugobj->txn_rollback()
775 if ($self->debug);
776 $dbh->rollback;
986e4fca 777 }
f11383c2 778 else {
d32d82f9 779 die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
986e4fca 780 }
d32d82f9 781 }
8091aa91 782}
783
8091aa91 784sub txn_rollback {
d79f59b9 785 my $self = shift;
d4f16b21 786
787 eval { $self->dbh_do($self->can('_dbh_txn_rollback')) };
a62cf8d4 788 if ($@) {
789 my $error = $@;
790 my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
791 $error =~ /$exception_class/ and $self->throw_exception($error);
792 $self->{transaction_depth} = 0; # ensure that a failed rollback
793 $self->throw_exception($error); # resets the transaction depth
8091aa91 794 }
795}
8b445e33 796
b7151206 797# This used to be the top-half of _execute. It was split out to make it
798# easier to override in NoBindVars without duping the rest. It takes up
799# all of _execute's args, and emits $sql, @bind.
800sub _prep_for_execute {
223b8fe3 801 my ($self, $op, $extra_bind, $ident, @args) = @_;
b7151206 802
223b8fe3 803 my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
944f30bf 804 unshift(@bind, @$extra_bind) if $extra_bind;
b7151206 805 @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
806
807 return ($sql, @bind);
808}
809
810sub _execute {
811 my $self = shift;
812
813 my ($sql, @bind) = $self->_prep_for_execute(@_);
814
f59ffc79 815 if ($self->debug) {
e673f011 816 my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
4c248161 817 $self->debugobj->query_start($sql, @debug_bind);
f59ffc79 818 }
b7151206 819
16e10e2f 820 my $sth = $self->sth($sql);
b7151206 821
701da8c4 822 my $rv;
75d07914 823 if ($sth) {
4c248161 824 my $time = time();
95dad7e2 825 $rv = eval { $sth->execute(@bind) };
826
827 if ($@ || !$rv) {
828 $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
829 }
75d07914 830 } else {
1c339d71 831 $self->throw_exception("'$sql' did not generate a statement.");
701da8c4 832 }
4c248161 833 if ($self->debug) {
758272ec 834 my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
4c248161 835 $self->debugobj->query_end($sql, @debug_bind);
836 }
223b8fe3 837 return (wantarray ? ($rv, $sth, @bind) : $rv);
838}
839
8b445e33 840sub insert {
841 my ($self, $ident, $to_insert) = @_;
bc0c9800 842 $self->throw_exception(
843 "Couldn't insert ".join(', ',
844 map "$_ => $to_insert->{$_}", keys %$to_insert
845 )." into ${ident}"
846 ) unless ($self->_execute('insert' => [], $ident, $to_insert));
8b445e33 847 return $to_insert;
848}
849
744076d8 850## Still not quite perfect, and EXPERIMENTAL
851## Currently it is assumed that all values passed will be "normal", i.e. not
852## scalar refs, or at least, all the same type as the first set, the statement is
853## only prepped once.
54e0bd06 854sub insert_bulk {
855 my ($self, $table, $cols, $data) = @_;
744076d8 856 my %colvalues;
857 @colvalues{@$cols} = (0..$#$cols);
858 my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
859# print STDERR "BIND".Dumper(\@bind);
54e0bd06 860
861 if ($self->debug) {
862 my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
863 $self->debugobj->query_start($sql, @debug_bind);
864 }
894328b8 865 my $sth = $self->sth($sql);
54e0bd06 866
54e0bd06 867# @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
868
869 my $rv;
744076d8 870 ## This must be an arrayref, else nothing works!
871 my $tuple_status = [];
872# use Data::Dumper;
873# print STDERR Dumper($data);
54e0bd06 874 if ($sth) {
875 my $time = time();
744076d8 876 $rv = eval { $sth->execute_array({ ArrayTupleFetch => sub { my $values = shift @$data; return if !$values; return [ @{$values}[@bind] ]},
877 ArrayTupleStatus => $tuple_status }) };
878# print STDERR Dumper($tuple_status);
879# print STDERR "RV: $rv\n";
880 if ($@ || !defined $rv) {
881 my $errors = '';
882 foreach my $tuple (@$tuple_status)
883 {
884 $errors .= "\n" . $tuple->[1] if(ref $tuple);
885 }
886 $self->throw_exception("Error executing '$sql': ".($@ || $errors));
54e0bd06 887 }
888 } else {
889 $self->throw_exception("'$sql' did not generate a statement.");
890 }
891 if ($self->debug) {
892 my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
893 $self->debugobj->query_end($sql, @debug_bind);
894 }
895 return (wantarray ? ($rv, $sth, @bind) : $rv);
896}
897
8b445e33 898sub update {
223b8fe3 899 return shift->_execute('update' => [], @_);
8b445e33 900}
901
902sub delete {
223b8fe3 903 return shift->_execute('delete' => [], @_);
8b445e33 904}
905
de705b51 906sub _select {
8b445e33 907 my ($self, $ident, $select, $condition, $attrs) = @_;
223b8fe3 908 my $order = $attrs->{order_by};
909 if (ref $condition eq 'SCALAR') {
910 $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
911 }
8839560b 912 if (exists $attrs->{group_by} || $attrs->{having}) {
bc0c9800 913 $order = {
914 group_by => $attrs->{group_by},
915 having => $attrs->{having},
916 ($order ? (order_by => $order) : ())
917 };
54540863 918 }
5c91499f 919 my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
9229f20a 920 if ($attrs->{software_limit} ||
921 $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
922 $attrs->{software_limit} = 1;
5c91499f 923 } else {
0823196c 924 $self->throw_exception("rows attribute must be positive if present")
925 if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
5c91499f 926 push @args, $attrs->{rows}, $attrs->{offset};
927 }
de705b51 928 return $self->_execute(@args);
929}
930
9b83fccd 931=head2 select
932
d3b0e369 933=over 4
934
935=item Arguments: $ident, $select, $condition, $attrs
936
937=back
938
9b83fccd 939Handle a SQL select statement.
940
941=cut
942
de705b51 943sub select {
944 my $self = shift;
945 my ($ident, $select, $condition, $attrs) = @_;
cb5f2eea 946 return $self->cursor->new($self, \@_, $attrs);
8b445e33 947}
948
1a14aa3f 949sub select_single {
de705b51 950 my $self = shift;
951 my ($rv, $sth, @bind) = $self->_select(@_);
6157db4f 952 my @row = $sth->fetchrow_array;
a3eaff0e 953 # Need to call finish() to work round broken DBDs
6157db4f 954 $sth->finish();
955 return @row;
1a14aa3f 956}
957
9b83fccd 958=head2 sth
959
d3b0e369 960=over 4
961
962=item Arguments: $sql
963
964=back
965
9b83fccd 966Returns a L<DBI> sth (statement handle) for the supplied SQL.
967
968=cut
969
d4f16b21 970sub _dbh_sth {
971 my ($self, $dbh, $sql) = @_;
d32d82f9 972 # 3 is the if_active parameter which avoids active sth re-use
16e10e2f 973 $dbh->prepare_cached($sql, {}, 3) or
974 $self->throw_exception(
975 'no sth generated via sql (' . ($@ || $dbh->errstr) . "): $sql"
976 );
d32d82f9 977}
978
8b445e33 979sub sth {
cb5f2eea 980 my ($self, $sql) = @_;
d4f16b21 981 $self->dbh_do($self->can('_dbh_sth'), $sql);
8b445e33 982}
983
d4f16b21 984sub _dbh_columns_info_for {
985 my ($self, $dbh, $table) = @_;
a32e8402 986
d32d82f9 987 if ($dbh->can('column_info')) {
a953d8d9 988 my %result;
d32d82f9 989 eval {
990 my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
991 my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
992 $sth->execute();
993 while ( my $info = $sth->fetchrow_hashref() ){
994 my %column_info;
995 $column_info{data_type} = $info->{TYPE_NAME};
996 $column_info{size} = $info->{COLUMN_SIZE};
997 $column_info{is_nullable} = $info->{NULLABLE} ? 1 : 0;
998 $column_info{default_value} = $info->{COLUMN_DEF};
999 my $col_name = $info->{COLUMN_NAME};
1000 $col_name =~ s/^\"(.*)\"$/$1/;
1001
1002 $result{$col_name} = \%column_info;
0d67fe74 1003 }
d32d82f9 1004 };
093fc7a6 1005 return \%result if !$@ && scalar keys %result;
d32d82f9 1006 }
0d67fe74 1007
d32d82f9 1008 my %result;
1009 my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
1010 $sth->execute;
1011 my @columns = @{$sth->{NAME_lc}};
1012 for my $i ( 0 .. $#columns ){
1013 my %column_info;
1014 my $type_num = $sth->{TYPE}->[$i];
1015 my $type_name;
1016 if(defined $type_num && $dbh->can('type_info')) {
1017 my $type_info = $dbh->type_info($type_num);
1018 $type_name = $type_info->{TYPE_NAME} if $type_info;
1019 }
1020 $column_info{data_type} = $type_name ? $type_name : $type_num;
1021 $column_info{size} = $sth->{PRECISION}->[$i];
1022 $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
0d67fe74 1023
d32d82f9 1024 if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1025 $column_info{data_type} = $1;
1026 $column_info{size} = $2;
0d67fe74 1027 }
1028
d32d82f9 1029 $result{$columns[$i]} = \%column_info;
1030 }
1031
1032 return \%result;
1033}
1034
1035sub columns_info_for {
1036 my ($self, $table) = @_;
d4f16b21 1037 $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
a953d8d9 1038}
1039
9b83fccd 1040=head2 last_insert_id
1041
1042Return the row id of the last insert.
1043
1044=cut
1045
d4f16b21 1046sub _dbh_last_insert_id {
1047 my ($self, $dbh, $source, $col) = @_;
1048 # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1049 $dbh->func('last_insert_rowid');
1050}
1051
843f8ecd 1052sub last_insert_id {
d4f16b21 1053 my $self = shift;
1054 $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
843f8ecd 1055}
1056
9b83fccd 1057=head2 sqlt_type
1058
1059Returns the database driver name.
1060
1061=cut
1062
d4f16b21 1063sub sqlt_type { shift->dbh->{Driver}->{Name} }
1c339d71 1064
9b83fccd 1065=head2 create_ddl_dir (EXPERIMENTAL)
1066
1067=over 4
1068
1069=item Arguments: $schema \@databases, $version, $directory, $sqlt_args
1070
1071=back
1072
d3b0e369 1073Creates a SQL file based on the Schema, for each of the specified
9b83fccd 1074database types, in the given directory.
1075
1076Note that this feature is currently EXPERIMENTAL and may not work correctly
1077across all databases, or fully handle complex relationships.
1078
1079=cut
1080
e673f011 1081sub create_ddl_dir
1082{
1083 my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
1084
1085 if(!$dir || !-d $dir)
1086 {
1087 warn "No directory given, using ./\n";
1088 $dir = "./";
1089 }
1090 $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1091 $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1092 $version ||= $schema->VERSION || '1.x';
9e7b9292 1093 $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
e673f011 1094
1c339d71 1095 eval "use SQL::Translator";
1096 $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
e673f011 1097
9e7b9292 1098 my $sqlt = SQL::Translator->new($sqltargs);
e673f011 1099 foreach my $db (@$databases)
1100 {
1101 $sqlt->reset();
1102 $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1103# $sqlt->parser_args({'DBIx::Class' => $schema);
1104 $sqlt->data($schema);
1105 $sqlt->producer($db);
1106
1107 my $file;
1108 my $filename = $schema->ddl_filename($db, $dir, $version);
1109 if(-e $filename)
1110 {
1111 $self->throw_exception("$filename already exists, skipping $db");
1112 next;
1113 }
1114 open($file, ">$filename")
1115 or $self->throw_exception("Can't open $filename for writing ($!)");
1116 my $output = $sqlt->translate;
1117#use Data::Dumper;
1118# print join(":", keys %{$schema->source_registrations});
1119# print Dumper($sqlt->schema);
1120 if(!$output)
1121 {
1122 $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
1123 next;
1124 }
1125 print $file $output;
1126 close($file);
1127 }
1128
1129}
1130
9b83fccd 1131=head2 deployment_statements
1132
d3b0e369 1133=over 4
1134
1135=item Arguments: $schema, $type, $version, $directory, $sqlt_args
1136
1137=back
1138
1139Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1140The database driver name is given by C<$type>, though the value from
1141L</sqlt_type> is used if it is not specified.
1142
1143C<$directory> is used to return statements from files in a previously created
1144L</create_ddl_dir> directory and is optional. The filenames are constructed
1145from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1146
1147If no C<$directory> is specified then the statements are constructed on the
1148fly using L<SQL::Translator> and C<$version> is ignored.
1149
1150See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
9b83fccd 1151
1152=cut
1153
e673f011 1154sub deployment_statements {
1155 my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
915919c5 1156 # Need to be connected to get the correct sqlt_type
c377d939 1157 $self->ensure_connected() unless $type;
e673f011 1158 $type ||= $self->sqlt_type;
1159 $version ||= $schema->VERSION || '1.x';
1160 $dir ||= './';
0382d607 1161 eval "use SQL::Translator";
1162 if(!$@)
1163 {
1164 eval "use SQL::Translator::Parser::DBIx::Class;";
1165 $self->throw_exception($@) if $@;
1166 eval "use SQL::Translator::Producer::${type};";
1167 $self->throw_exception($@) if $@;
1168 my $tr = SQL::Translator->new(%$sqltargs);
1169 SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1170 return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1171 }
e673f011 1172
1173 my $filename = $schema->ddl_filename($type, $dir, $version);
1174 if(!-f $filename)
1175 {
0382d607 1176# $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1177 $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1178 return;
e673f011 1179 }
1180 my $file;
1181 open($file, "<$filename")
1182 or $self->throw_exception("Can't open $filename ($!)");
1183 my @rows = <$file>;
1184 close($file);
1185
1186 return join('', @rows);
1187
1c339d71 1188}
843f8ecd 1189
1c339d71 1190sub deploy {
260129d8 1191 my ($self, $schema, $type, $sqltargs, $dir) = @_;
1192 foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
e4fe9ba3 1193 for ( split(";\n", $statement)) {
e673f011 1194 next if($_ =~ /^--/);
1195 next if(!$_);
1196# next if($_ =~ /^DROP/m);
1197 next if($_ =~ /^BEGIN TRANSACTION/m);
1198 next if($_ =~ /^COMMIT/m);
b489f68a 1199 next if $_ =~ /^\s+$/; # skip whitespace only
bdea30e3 1200 $self->debugobj->query_start($_) if $self->debug;
f11383c2 1201 $self->dbh->do($_) or warn "SQL was:\n $_"; # XXX exceptions?
4c248161 1202 $self->debugobj->query_end($_) if $self->debug;
e4fe9ba3 1203 }
75d07914 1204 }
1c339d71 1205}
843f8ecd 1206
9b83fccd 1207=head2 datetime_parser
1208
1209Returns the datetime parser class
1210
1211=cut
1212
f86fcf0d 1213sub datetime_parser {
1214 my $self = shift;
1215 return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1216}
1217
9b83fccd 1218=head2 datetime_parser_type
1219
1220Defines (returns) the datetime parser class - currently hardwired to
1221L<DateTime::Format::MySQL>
1222
1223=cut
1224
f86fcf0d 1225sub datetime_parser_type { "DateTime::Format::MySQL"; }
1226
9b83fccd 1227=head2 build_datetime_parser
1228
1229See L</datetime_parser>
1230
1231=cut
1232
f86fcf0d 1233sub build_datetime_parser {
1234 my $self = shift;
1235 my $type = $self->datetime_parser_type(@_);
1236 eval "use ${type}";
1237 $self->throw_exception("Couldn't load ${type}: $@") if $@;
1238 return $type;
1239}
1240
c756145c 1241sub DESTROY {
1242 my $self = shift;
f5de3933 1243 return if !$self->_dbh;
c756145c 1244 $self->_verify_pid;
1245 $self->_dbh(undef);
1246}
92925617 1247
8b445e33 12481;
1249
9b83fccd 1250=head1 SQL METHODS
1251
1252The module defines a set of methods within the DBIC::SQL::Abstract
1253namespace. These build on L<SQL::Abstract::Limit> to provide the
1254SQL query functions.
1255
1256The following methods are extended:-
1257
1258=over 4
1259
1260=item delete
1261
1262=item insert
1263
1264=item select
1265
1266=item update
1267
1268=item limit_dialect
1269
2cc3a7be 1270See L</connect_info> for details.
1271For setting, this method is deprecated in favor of L</connect_info>.
bb4f246d 1272
9b83fccd 1273=item quote_char
1274
2cc3a7be 1275See L</connect_info> for details.
1276For setting, this method is deprecated in favor of L</connect_info>.
bb4f246d 1277
9b83fccd 1278=item name_sep
1279
2cc3a7be 1280See L</connect_info> for details.
1281For setting, this method is deprecated in favor of L</connect_info>.
bb4f246d 1282
9b83fccd 1283=back
1284
8b445e33 1285=head1 AUTHORS
1286
daec44b8 1287Matt S. Trout <mst@shadowcatsystems.co.uk>
8b445e33 1288
9f19b1d6 1289Andy Grundman <andy@hybridized.org>
1290
8b445e33 1291=head1 LICENSE
1292
1293You may distribute this code under the same terms as Perl itself.
1294
1295=cut