Revision history for DBIx::Class
+ * New Features / Changes
+ - Massively optimize codepath around ->cursor(), over 10x speedup
+ on some workloads
+
* Fixes
+ - Fix open cursors silently resetting when inherited across a fork
+ or a thread
- Fix duplicated selected columns when calling 'count' when a same
aggregate function is used more than once in a 'having' clause
(RT#83305)
$new->_sql_maker_opts({});
$new->_dbh_details({});
$new->{_in_do_block} = 0;
- $new->{_dbh_gen} = 0;
# read below to see what this does
$new->_arm_global_destructor;
# soon as possible (DBIC will reconnect only on demand from within
# the thread)
my @instances = grep { defined $_ } values %seek_and_destroy;
+ %seek_and_destroy = ();
+
for (@instances) {
- $_->{_dbh_gen}++; # so that existing cursors will drop as well
$_->_dbh(undef);
$_->transaction_depth(0);
$_->savepoints([]);
- }
- # properly renumber all existing refs
- %seek_and_destroy = ();
- $_->_arm_global_destructor for @instances;
+ # properly renumber existing refs
+ $_->_arm_global_destructor
+ }
}
}
my $pid = $self->_conn_pid;
if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) {
$dbh->{InactiveDestroy} = 1;
- $self->{_dbh_gen}++;
$self->_dbh(undef);
$self->transaction_depth(0);
$self->savepoints([]);
%{ $self->_dbh->{CachedKids} } = ();
$self->_dbh->disconnect;
$self->_dbh(undef);
- $self->{_dbh_gen}++;
}
}
=cut
-sub _dbh_next {
- my ($storage, $dbh, $self) = @_;
+sub next {
+ my $self = shift;
- my $next = $self->next::can;
+ my @row = $self->next::method(@_);
- my @row = $next->(@_);
-
- my $col_infos = $storage->_resolve_column_info($self->args->[0]);
-
- my $select = $self->args->[1];
-
- _normalize_guids($select, $col_infos, \@row, $storage);
+ _normalize_guids(
+ $self->args->[1],
+ $self->{_colinfos} ||= $self->storage->_resolve_column_info($self->args->[0]),
+ \@row,
+ $self->storage
+ );
return @row;
}
-sub _dbh_all {
- my ($storage, $dbh, $self) = @_;
-
- my $next = $self->next::can;
-
- my @rows = $next->(@_);
-
- my $col_infos = $storage->_resolve_column_info($self->args->[0]);
+sub all {
+ my $self = shift;
- my $select = $self->args->[1];
+ my @rows = $self->next::method(@_);
- _normalize_guids($select, $col_infos, $_, $storage) for @rows;
+ _normalize_guids(
+ $self->args->[1],
+ $self->{_colinfos} ||= $self->storage->_resolve_column_info($self->args->[0]),
+ $_,
+ $self->storage
+ ) for @rows;
return @rows;
}
=cut
-sub _dbh_next {
- my ($storage, $dbh, $self) = @_;
+sub next {
+ my $self = shift;
- my $next = $self->next::can;
+ my @row = $self->next::method(@_);
- my @row = $next->(@_);
+ $self->{_colinfos} ||= $self->storage->_resolve_column_info($self->args->[0]);
- my $col_infos = $storage->_resolve_column_info($self->args->[0]);
+ _normalize_guids(
+ $self->args->[1],
+ $self->{_colinfos},
+ \@row,
+ $self->storage
+ );
- my $select = $self->args->[1];
-
- _normalize_guids($select, $col_infos, \@row, $storage);
- _strip_trailing_binary_nulls($select, $col_infos, \@row, $storage);
+ _strip_trailing_binary_nulls(
+ $self->args->[1],
+ $self->{_colinfos},
+ \@row,
+ $self->storage
+ );
return @row;
}
-sub _dbh_all {
- my ($storage, $dbh, $self) = @_;
-
- my $next = $self->next::can;
-
- my @rows = $next->(@_);
+sub all {
+ my $self = shift;
- my $col_infos = $storage->_resolve_column_info($self->args->[0]);
+ my @rows = $self->next::method(@_);
- my $select = $self->args->[1];
+ $self->{_colinfos} ||= $self->storage->_resolve_column_info($self->args->[0]);
for (@rows) {
- _normalize_guids($select, $col_infos, $_, $storage);
- _strip_trailing_binary_nulls($select, $col_infos, $_, $storage);
+ _normalize_guids(
+ $self->args->[1],
+ $self->{_colinfos},
+ $_,
+ $self->storage
+ );
+
+ _strip_trailing_binary_nulls(
+ $self->args->[1],
+ $self->{_colinfos},
+ $_,
+ $self->storage
+ );
}
return @rows;
use base qw/DBIx::Class::Cursor/;
use Try::Tiny;
+use Scalar::Util qw/refaddr weaken/;
use namespace::clean;
__PACKAGE__->mk_group_accessors('simple' =>
- qw/sth storage args attrs/
+ qw/storage args attrs/
);
=head1 NAME
=cut
-sub new {
- my ($class, $storage, $args, $attrs) = @_;
- $class = ref $class if ref $class;
+{
+ my %cursor_registry;
- my $new = {
- storage => $storage,
- args => $args,
- attrs => $attrs,
- _dbh_gen => $storage->{_dbh_gen},
- _pos => 0,
- _done => 0,
- };
+ sub new {
+ my ($class, $storage, $args, $attrs) = @_;
- return bless ($new, $class);
+ my $self = bless {
+ storage => $storage,
+ args => $args,
+ attrs => $attrs,
+ }, ref $class || $class;
+
+ weaken( $cursor_registry{ refaddr($self) } = $self )
+ if DBIx::Class::_ENV_::HAS_ITHREADS;
+
+ return $self;
+ }
+
+ sub CLONE {
+ for (keys %cursor_registry) {
+ # once marked we no longer care about them, hence no
+ # need to keep in the registry, left alone renumber the
+ # keys (all addresses are now different)
+ my $self = delete $cursor_registry{$_}
+ or next;
+
+ $self->{_intra_thread} = 1;
+ }
+ }
}
=head2 next
=cut
-sub _dbh_next {
- my ($storage, $dbh, $self) = @_;
+sub next {
+ my $self = shift;
+
+ return if $self->{_done};
+
+ my $sth;
- $self->_check_dbh_gen;
if (
$self->{attrs}{software_limit}
&& $self->{attrs}{rows}
- && $self->{_pos} >= $self->{attrs}{rows}
+ && ($self->{_pos}||0) >= $self->{attrs}{rows}
) {
- $self->sth->finish if $self->sth->{Active};
- $self->sth(undef);
+ if ($sth = $self->sth) {
+ # explicit finish will issue warnings, unlike the DESTROY below
+ $sth->finish if $sth->FETCH('Active');
+ }
$self->{_done} = 1;
+ return;
}
- return if $self->{_done};
- unless ($self->sth) {
- $self->sth(($storage->_select(@{$self->{args}}))[1]);
- if ($self->{attrs}{software_limit}) {
- if (my $offset = $self->{attrs}{offset}) {
- $self->sth->fetch for 1 .. $offset;
- }
+ unless ($sth = $self->sth) {
+ (undef, $sth) = $self->storage->_select( @{$self->{args}} );
+
+ if ( $self->{attrs}{software_limit} and $self->{attrs}{offset} ) {
+ $sth->fetch for 1 .. $self->{attrs}{offset};
}
+
+ $self->sth($sth);
}
- my @row = $self->sth->fetchrow_array;
- if (@row) {
+
+ my $row = $sth->fetchrow_arrayref;
+ if ($row) {
$self->{_pos}++;
} else {
- $self->sth(undef);
$self->{_done} = 1;
}
- return @row;
-}
-sub next {
- my ($self) = @_;
- $self->{storage}->dbh_do($self->can('_dbh_next'), $self);
+ return @{$row||[]};
}
+
=head2 all
=over 4
=cut
-sub _dbh_all {
- my ($storage, $dbh, $self) = @_;
+sub all {
+ my $self = shift;
+
+ # delegate to DBIC::Cursor which will delegate back to next()
+ if ($self->{attrs}{software_limit}
+ && ($self->{attrs}{offset} || $self->{attrs}{rows})) {
+ return $self->next::method(@_);
+ }
+
+ my $sth;
+
+ if ($sth = $self->sth) {
+ # explicit finish will issue warnings, unlike the DESTROY below
+ $sth->finish if ( ! $self->{_done} and $sth->FETCH('Active') );
+ $self->sth(undef);
+ }
+
+ (undef, $sth) = $self->storage->_select( @{$self->{args}} );
- $self->_check_dbh_gen;
- $self->sth->finish if $self->sth && $self->sth->{Active};
- $self->sth(undef);
- my ($rv, $sth) = $storage->_select(@{$self->{args}});
return @{$sth->fetchall_arrayref};
}
-sub all {
- my ($self) = @_;
- if ($self->{attrs}{software_limit}
- && ($self->{attrs}{offset} || $self->{attrs}{rows})) {
- return $self->next::method;
+sub sth {
+ my $self = shift;
+
+ if (@_) {
+ delete @{$self}{qw/_pos _done _pid _intra_thread/};
+
+ $self->{sth} = $_[0];
+ $self->{_pid} = $$ if ! DBIx::Class::_ENV_::BROKEN_FORK and $_[0];
+ }
+ elsif ($self->{sth} and ! $self->{_done}) {
+
+ my $invalidate_handle_reason;
+
+ if (DBIx::Class::_ENV_::HAS_ITHREADS and $self->{_intra_thread} ) {
+ $invalidate_handle_reason = 'Multi-thread';
+ }
+ elsif (!DBIx::Class::_ENV_::BROKEN_FORK and $self->{_pid} != $$ ) {
+ $invalidate_handle_reason = 'Multi-process';
+ }
+
+ if ($invalidate_handle_reason) {
+ $self->storage->throw_exception("$invalidate_handle_reason access attempted while cursor in progress (position $self->{_pos})")
+ if $self->{_pos};
+
+ # reinvokes the reset logic above
+ $self->sth(undef);
+ }
}
- $self->{storage}->dbh_do($self->can('_dbh_all'), $self);
+ return $self->{sth};
}
=head2 reset
=cut
sub reset {
- my ($self) = @_;
-
- # No need to care about failures here
- try { $self->sth->finish }
- if $self->sth && $self->sth->{Active};
- $self->_soft_reset;
- return undef;
+ $_[0]->__finish_sth if $_[0]->{sth};
+ $_[0]->sth(undef);
}
-sub _soft_reset {
- my ($self) = @_;
- $self->sth(undef);
- $self->{_done} = 0;
- $self->{_pos} = 0;
+sub DESTROY {
+ $_[0]->__finish_sth if $_[0]->{sth};
}
-sub _check_dbh_gen {
- my ($self) = @_;
+sub __finish_sth {
+ # It is (sadly) extremely important to finish() handles we are about
+ # to lose (due to reset() or a DESTROY() ). $rs->reset is the closest
+ # thing the user has to getting to the underlying finish() API and some
+ # DBDs mandate this (e.g. DBD::InterBase will segfault, DBD::Sybase
+ # won't start a transaction sanely, etc)
+ # We also can't use the accessor here, as it will trigger a fork/thread
+ # check, and resetting a cursor in a child is perfectly valid
- if($self->{_dbh_gen} != $self->{storage}->{_dbh_gen}) {
- $self->{_dbh_gen} = $self->{storage}->{_dbh_gen};
- $self->_soft_reset;
- }
-}
+ my $self = shift;
-sub DESTROY {
- # None of the reasons this would die matter if we're in DESTROY anyways
- if (my $sth = $_[0]->sth) {
- local $SIG{__WARN__} = sub {};
- try { $sth->finish } if $sth->FETCH('Active');
- }
+ # No need to care about failures here
+ try { local $SIG{__WARN__} = sub {}; $self->{sth}->finish } if (
+ $self->{sth} and ! try { ! $self->{sth}->FETCH('Active') }
+ );
}
1;
=cut
-sub _dbh_next {
- my ($storage, $dbh, $self) = @_;
-
- my $next = $self->next::can;
-
- my @row = $next->(@_);
-
- my $col_info = $storage->_resolve_column_info($self->args->[0]);
-
- my $select = $self->args->[1];
+my $unpack_guids = sub {
+ my ($select, $col_infos, $data, $storage) = @_;
for my $select_idx (0..$#$select) {
- my $selected = $select->[$select_idx];
-
- next if ref $selected;
+ next unless (
+ defined $data->[$select_idx]
+ and
+ length($data->[$select_idx]) == 16
+ );
- my $data_type = $col_info->{$selected}{data_type};
+ my $selected = $select->[$select_idx];
- if ($storage->_is_guid_type($data_type)) {
- my $returned = $row[$select_idx];
+ my $data_type = $col_infos->{$select->[$select_idx]}{data_type}
+ or next;
- if (length $returned == 16) {
- $row[$select_idx] = $storage->_uuid_to_str($returned);
- }
- }
+ $data->[$select_idx] = $storage->_uuid_to_str($data->[$select_idx])
+ if $storage->_is_guid_type($data_type);
}
+};
- return @row;
-}
-
-sub _dbh_all {
- my ($storage, $dbh, $self) = @_;
-
- my $next = $self->next::can;
- my @rows = $next->(@_);
+sub next {
+ my $self = shift;
- my $col_info = $storage->_resolve_column_info($self->args->[0]);
+ my @row = $self->next::method(@_);
- my $select = $self->args->[1];
+ $unpack_guids->(
+ $self->args->[1],
+ $self->{_colinfos} ||= $self->storage->_resolve_column_info($self->args->[0]),
+ \@row,
+ $self->storage
+ );
- for my $row (@rows) {
- for my $select_idx (0..$#$select) {
- my $selected = $select->[$select_idx];
+ return @row;
+}
- next if ref $selected;
+sub all {
+ my $self = shift;
- my $data_type = $col_info->{$selected}{data_type};
+ my @rows = $self->next::method(@_);
- if ($storage->_is_guid_type($data_type)) {
- my $returned = $row->[$select_idx];
+ $unpack_guids->(
+ $self->args->[1],
+ $self->{_colinfos} ||= $self->storage->_resolve_column_info($self->args->[0]),
+ $_,
+ $self->storage
+ ) for @rows;
- if (length $returned == 16) {
- $row->[$select_idx] = $storage->_uuid_to_str($returned);
- }
- }
- }
- }
return @rows;
}
use strict;
use warnings;
use Test::More;
+use Test::Exception;
use lib qw(t/lib);
use DBICTest;
$schema->resultset('CD')->create({ title => 'vacation in antarctica part 2', artist => 456, year => 1901 });
$parent_rs = $schema->resultset('CD')->search({ year => 1901 });
- $parent_rs->next;
+ is ($parent_rs->count, 2);
};
ok(!$@) or diag "Creation eval failed: $@";
+# basic tests
{
- my $pid = fork;
- if(!defined $pid) {
- die "fork failed: $!";
+ ok ($schema->storage->connected(), 'Parent is connected');
+ is ($parent_rs->next->id, 1, 'Cursor advanced');
+
+ my ($parent_in, $child_out);
+ pipe( $parent_in, $child_out ) or die "Pipe open failed: $!";
+
+ my $pid = fork;
+ if(!defined $pid) {
+ die "fork failed: $!";
+ }
+
+ if (!$pid) {
+ close $parent_in;
+
+ #simulate a subtest to not confuse the parent TAP emission
+ my $tb = Test::More->builder;
+ $tb->reset;
+ for (qw/output failure_output todo_output/) {
+ close $tb->$_;
+ open ($tb->$_, '>&', $child_out);
}
- if (!$pid) {
- exit $schema->storage->connected ? 1 : 0;
+ ok(!$schema->storage->connected, "storage->connected() false in child");
+ for (1,2) {
+ throws_ok { $parent_rs->next } qr/\QMulti-process access attempted while cursor in progress (position 1)/;
}
- if (waitpid($pid, 0) == $pid) {
- my $ex = $? >> 8;
- ok($ex == 0, "storage->connected() returns false in child");
- exit $ex if $ex; # skip remaining tests
- }
+ $parent_rs->reset;
+ is($parent_rs->next->id, 1, 'Resetting cursor reprepares it within child environment');
+
+ done_testing;
+ exit 0;
+ }
+
+ close $child_out;
+ while (my $ln = <$parent_in>) {
+ print " $ln";
+ }
+ waitpid( $pid, 0 );
+ ok(!$?, 'Child subtests passed');
+
+ is ($parent_rs->next->id, 2, 'Cursor still intact in parent');
+ is ($parent_rs->next, undef, 'Cursor exhausted');
}
+$parent_rs->reset;
my @pids;
while(@pids < $num_children) {
$schema->resultset('CD')->create({ title => 'vacation in antarctica part 2', artist => 456, year => 1901 });
$parent_rs = $schema->resultset('CD')->search({ year => 1901 });
- $parent_rs->next;
+ is ($parent_rs->count, 2);
}, 'populate successfull');
+# basic tests
+{
+ ok ($schema->storage->connected(), 'Parent is connected');
+ is ($parent_rs->next->id, 1, 'Cursor advanced');
+ my $ct_num = Test::More->builder->current_test;
+
+ my $newthread = async {
+ my $out = '';
+
+ #simulate a subtest to not confuse the parent TAP emission
+ my $tb = Test::More->builder;
+ $tb->reset;
+ for (qw/output failure_output todo_output/) {
+ close $tb->$_;
+ open ($tb->$_, '>', \$out);
+ }
+
+ ok(!$schema->storage->connected, "storage->connected() false in child");
+ for (1,2) {
+ throws_ok { $parent_rs->next } qr/\QMulti-thread access attempted while cursor in progress (position 1)/;
+ }
+
+ $parent_rs->reset;
+ is($parent_rs->next->id, 1, 'Resetting cursor reprepares it within child environment');
+
+ done_testing;
+
+ close $tb->$_ for (qw/output failure_output todo_output/);
+ sleep(1); # tasty crashes without this
+
+ $out;
+ };
+ die "Thread creation failed: $! $@" if !defined $newthread;
+
+ my $out = $newthread->join;
+ $out =~ s/^/ /gm;
+ print $out;
+
+ # workaround for older Test::More confusing the plan under threads
+ Test::More->builder->current_test($ct_num);
+
+ is ($parent_rs->next->id, 2, 'Cursor still intact in parent');
+ is ($parent_rs->next, undef, 'Cursor exhausted');
+}
+
+$parent_rs->reset;
my @children;
while(@children < $num_children) {
}
ok(1, "Made it to the end");
+undef $parent_rs;
$schema->storage->dbh->do("DROP TABLE cd");
$schema->resultset('CD')->create({ title => 'vacation in antarctica part 2', artist => 456, year => 1901 });
$parent_rs = $schema->resultset('CD')->search({ year => 1901 });
- $parent_rs->next;
+ is ($parent_rs->count, 2);
};
ok(!$@) or diag "Creation eval failed: $@";
is ($collapsed_or_rs->count, 4, 'Collapsed search count with OR ok');
# make sure sure distinct on a grouped rs is warned about
-my $cd_rs = $schema->resultset ('CD')
- ->search ({}, { distinct => 1, group_by => 'title' });
-warnings_exist (sub {
- $cd_rs->next;
-}, qr/Useless use of distinct/, 'UUoD warning');
+{
+ my $cd_rs = $schema->resultset ('CD')
+ ->search ({}, { distinct => 1, group_by => 'title' });
+ warnings_exist (sub {
+ $cd_rs->next;
+ }, qr/Useless use of distinct/, 'UUoD warning');
+}
{
my $tcount = $schema->resultset('Track')->search(
$or_rs->reset;
$rel_rs->reset;
+# at this point there should be no active statements
+# (finish() was called everywhere, either explicitly via
+# reset() or on DESTROY)
+for (keys %{$schema->storage->dbh->{CachedKids}}) {
+ fail("Unreachable cached statement still active: $_")
+ if $schema->storage->dbh->{CachedKids}{$_}->FETCH('Active');
+}
+
my $tag = $schema->resultset('Tag')->search(
[ { 'me.tag' => 'Blue' } ], { cols=>[qw/tagid/] } )->next;
} 'inferring generator from trigger source works';
}
+ # at this point there should be no active statements
+ # (finish() was called everywhere, either explicitly via
+ # reset() or on DESTROY)
+ for (keys %{$schema->storage->dbh->{CachedKids}}) {
+ fail("Unreachable cached statement still active: $_")
+ if $schema->storage->dbh->{CachedKids}{$_}->FETCH('Active');
+ }
+
# test blobs (stolen from 73oracle.t)
eval { $dbh->do('DROP TABLE "bindtype_test"') };
$dbh->do(q[