From: Peter Rabbitson Date: Wed, 19 Sep 2012 07:26:44 +0000 (+0200) Subject: Massively optimize ->cursor->next while fixing some bugs along the way X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=a2f228547;p=dbsrgits%2FDBIx-Class-Historic.git Massively optimize ->cursor->next while fixing some bugs along the way Cursor access was previously wrapped in a dbh_do() block with the intent to restart the connection and the cursor upon a handle error. However this arrangement overlooks our inability to restart the cursor from the same spot (or for that matter guarantee the same dataset). Instead we remove the dbh_do wrap entirely, and just let the exception propagate to whatever esle might be ready to catch it (and potentially properly retry the select). As a side effect (because anonsubs are sloooooooow) the end result yields astonishing speed gains - one of the test cases mentioned in [1] went (on my test rig) from 9.61058 to 0.64960 due to this commit alone. A particular detail - the cursor private methods _dbh_next/_dbh_all are no more as they are unused indirection now. All storage drivers (well their respective cursors) have been adjusted, but not yet tested due to lack of a proper environment. [1] http://lists.scsys.co.uk/pipermail/dbix-class/2012-July/010595.html --- diff --git a/Changes b/Changes index 6eacbb1..53fce16 100644 --- a/Changes +++ b/Changes @@ -1,6 +1,12 @@ Revision history for DBIx::Class + * New Features / Changes + - Massively optimize codepath around ->cursor(), over 10x speedup + on some workloads + * Fixes + - Fix open cursors silently resetting when inherited across a fork + or a thread - Fix duplicated selected columns when calling 'count' when a same aggregate function is used more than once in a 'having' clause (RT#83305) diff --git a/lib/DBIx/Class/Storage/DBI.pm b/lib/DBIx/Class/Storage/DBI.pm index fb99190..daef684 100644 --- a/lib/DBIx/Class/Storage/DBI.pm +++ b/lib/DBIx/Class/Storage/DBI.pm @@ -176,7 +176,6 @@ sub new { $new->_sql_maker_opts({}); $new->_dbh_details({}); $new->{_in_do_block} = 0; - $new->{_dbh_gen} = 0; # read below to see what this does $new->_arm_global_destructor; @@ -216,17 +215,17 @@ sub new { # soon as possible (DBIC will reconnect only on demand from within # the thread) my @instances = grep { defined $_ } values %seek_and_destroy; + %seek_and_destroy = (); + for (@instances) { - $_->{_dbh_gen}++; # so that existing cursors will drop as well $_->_dbh(undef); $_->transaction_depth(0); $_->savepoints([]); - } - # properly renumber all existing refs - %seek_and_destroy = (); - $_->_arm_global_destructor for @instances; + # properly renumber existing refs + $_->_arm_global_destructor + } } } @@ -252,7 +251,6 @@ sub _verify_pid { my $pid = $self->_conn_pid; if( defined $pid and $pid != $$ and my $dbh = $self->_dbh ) { $dbh->{InactiveDestroy} = 1; - $self->{_dbh_gen}++; $self->_dbh(undef); $self->transaction_depth(0); $self->savepoints([]); @@ -835,7 +833,6 @@ sub disconnect { %{ $self->_dbh->{CachedKids} } = (); $self->_dbh->disconnect; $self->_dbh(undef); - $self->{_dbh_gen}++; } } diff --git a/lib/DBIx/Class/Storage/DBI/ADO/MS_Jet/Cursor.pm b/lib/DBIx/Class/Storage/DBI/ADO/MS_Jet/Cursor.pm index 71916c2..5c50ca3 100644 --- a/lib/DBIx/Class/Storage/DBI/ADO/MS_Jet/Cursor.pm +++ b/lib/DBIx/Class/Storage/DBI/ADO/MS_Jet/Cursor.pm @@ -34,34 +34,32 @@ for the inner cursor class. =cut -sub _dbh_next { - my ($storage, $dbh, $self) = @_; +sub next { + my $self = shift; - my $next = $self->next::can; + my @row = $self->next::method(@_); - my @row = $next->(@_); - - my $col_infos = $storage->_resolve_column_info($self->args->[0]); - - my $select = $self->args->[1]; - - _normalize_guids($select, $col_infos, \@row, $storage); + _normalize_guids( + $self->args->[1], + $self->{_colinfos} ||= $self->storage->_resolve_column_info($self->args->[0]), + \@row, + $self->storage + ); return @row; } -sub _dbh_all { - my ($storage, $dbh, $self) = @_; - - my $next = $self->next::can; - - my @rows = $next->(@_); - - my $col_infos = $storage->_resolve_column_info($self->args->[0]); +sub all { + my $self = shift; - my $select = $self->args->[1]; + my @rows = $self->next::method(@_); - _normalize_guids($select, $col_infos, $_, $storage) for @rows; + _normalize_guids( + $self->args->[1], + $self->{_colinfos} ||= $self->storage->_resolve_column_info($self->args->[0]), + $_, + $self->storage + ) for @rows; return @rows; } diff --git a/lib/DBIx/Class/Storage/DBI/ADO/Microsoft_SQL_Server/Cursor.pm b/lib/DBIx/Class/Storage/DBI/ADO/Microsoft_SQL_Server/Cursor.pm index 9c02e9a..1ada243 100644 --- a/lib/DBIx/Class/Storage/DBI/ADO/Microsoft_SQL_Server/Cursor.pm +++ b/lib/DBIx/Class/Storage/DBI/ADO/Microsoft_SQL_Server/Cursor.pm @@ -37,37 +37,51 @@ for the inner cursor class. =cut -sub _dbh_next { - my ($storage, $dbh, $self) = @_; +sub next { + my $self = shift; - my $next = $self->next::can; + my @row = $self->next::method(@_); - my @row = $next->(@_); + $self->{_colinfos} ||= $self->storage->_resolve_column_info($self->args->[0]); - my $col_infos = $storage->_resolve_column_info($self->args->[0]); + _normalize_guids( + $self->args->[1], + $self->{_colinfos}, + \@row, + $self->storage + ); - my $select = $self->args->[1]; - - _normalize_guids($select, $col_infos, \@row, $storage); - _strip_trailing_binary_nulls($select, $col_infos, \@row, $storage); + _strip_trailing_binary_nulls( + $self->args->[1], + $self->{_colinfos}, + \@row, + $self->storage + ); return @row; } -sub _dbh_all { - my ($storage, $dbh, $self) = @_; - - my $next = $self->next::can; - - my @rows = $next->(@_); +sub all { + my $self = shift; - my $col_infos = $storage->_resolve_column_info($self->args->[0]); + my @rows = $self->next::method(@_); - my $select = $self->args->[1]; + $self->{_colinfos} ||= $self->storage->_resolve_column_info($self->args->[0]); for (@rows) { - _normalize_guids($select, $col_infos, $_, $storage); - _strip_trailing_binary_nulls($select, $col_infos, $_, $storage); + _normalize_guids( + $self->args->[1], + $self->{_colinfos}, + $_, + $self->storage + ); + + _strip_trailing_binary_nulls( + $self->args->[1], + $self->{_colinfos}, + $_, + $self->storage + ); } return @rows; diff --git a/lib/DBIx/Class/Storage/DBI/Cursor.pm b/lib/DBIx/Class/Storage/DBI/Cursor.pm index a71036e..6483ca7 100644 --- a/lib/DBIx/Class/Storage/DBI/Cursor.pm +++ b/lib/DBIx/Class/Storage/DBI/Cursor.pm @@ -6,10 +6,11 @@ use warnings; use base qw/DBIx::Class::Cursor/; use Try::Tiny; +use Scalar::Util qw/refaddr weaken/; use namespace::clean; __PACKAGE__->mk_group_accessors('simple' => - qw/sth storage args attrs/ + qw/storage args attrs/ ); =head1 NAME @@ -46,20 +47,35 @@ Returns a new L object. =cut -sub new { - my ($class, $storage, $args, $attrs) = @_; - $class = ref $class if ref $class; +{ + my %cursor_registry; - my $new = { - storage => $storage, - args => $args, - attrs => $attrs, - _dbh_gen => $storage->{_dbh_gen}, - _pos => 0, - _done => 0, - }; + sub new { + my ($class, $storage, $args, $attrs) = @_; - return bless ($new, $class); + my $self = bless { + storage => $storage, + args => $args, + attrs => $attrs, + }, ref $class || $class; + + weaken( $cursor_registry{ refaddr($self) } = $self ) + if DBIx::Class::_ENV_::HAS_ITHREADS; + + return $self; + } + + sub CLONE { + for (keys %cursor_registry) { + # once marked we no longer care about them, hence no + # need to keep in the registry, left alone renumber the + # keys (all addresses are now different) + my $self = delete $cursor_registry{$_} + or next; + + $self->{_intra_thread} = 1; + } + } } =head2 next @@ -77,45 +93,48 @@ values (the result of L method). =cut -sub _dbh_next { - my ($storage, $dbh, $self) = @_; +sub next { + my $self = shift; + + return if $self->{_done}; + + my $sth; - $self->_check_dbh_gen; if ( $self->{attrs}{software_limit} && $self->{attrs}{rows} - && $self->{_pos} >= $self->{attrs}{rows} + && ($self->{_pos}||0) >= $self->{attrs}{rows} ) { - $self->sth->finish if $self->sth->{Active}; - $self->sth(undef); + if ($sth = $self->sth) { + # explicit finish will issue warnings, unlike the DESTROY below + $sth->finish if $sth->FETCH('Active'); + } $self->{_done} = 1; + return; } - return if $self->{_done}; - unless ($self->sth) { - $self->sth(($storage->_select(@{$self->{args}}))[1]); - if ($self->{attrs}{software_limit}) { - if (my $offset = $self->{attrs}{offset}) { - $self->sth->fetch for 1 .. $offset; - } + unless ($sth = $self->sth) { + (undef, $sth) = $self->storage->_select( @{$self->{args}} ); + + if ( $self->{attrs}{software_limit} and $self->{attrs}{offset} ) { + $sth->fetch for 1 .. $self->{attrs}{offset}; } + + $self->sth($sth); } - my @row = $self->sth->fetchrow_array; - if (@row) { + + my $row = $sth->fetchrow_arrayref; + if ($row) { $self->{_pos}++; } else { - $self->sth(undef); $self->{_done} = 1; } - return @row; -} -sub next { - my ($self) = @_; - $self->{storage}->dbh_do($self->can('_dbh_next'), $self); + return @{$row||[]}; } + =head2 all =over 4 @@ -131,24 +150,58 @@ L. =cut -sub _dbh_all { - my ($storage, $dbh, $self) = @_; +sub all { + my $self = shift; + + # delegate to DBIC::Cursor which will delegate back to next() + if ($self->{attrs}{software_limit} + && ($self->{attrs}{offset} || $self->{attrs}{rows})) { + return $self->next::method(@_); + } + + my $sth; + + if ($sth = $self->sth) { + # explicit finish will issue warnings, unlike the DESTROY below + $sth->finish if ( ! $self->{_done} and $sth->FETCH('Active') ); + $self->sth(undef); + } + + (undef, $sth) = $self->storage->_select( @{$self->{args}} ); - $self->_check_dbh_gen; - $self->sth->finish if $self->sth && $self->sth->{Active}; - $self->sth(undef); - my ($rv, $sth) = $storage->_select(@{$self->{args}}); return @{$sth->fetchall_arrayref}; } -sub all { - my ($self) = @_; - if ($self->{attrs}{software_limit} - && ($self->{attrs}{offset} || $self->{attrs}{rows})) { - return $self->next::method; +sub sth { + my $self = shift; + + if (@_) { + delete @{$self}{qw/_pos _done _pid _intra_thread/}; + + $self->{sth} = $_[0]; + $self->{_pid} = $$ if ! DBIx::Class::_ENV_::BROKEN_FORK and $_[0]; + } + elsif ($self->{sth} and ! $self->{_done}) { + + my $invalidate_handle_reason; + + if (DBIx::Class::_ENV_::HAS_ITHREADS and $self->{_intra_thread} ) { + $invalidate_handle_reason = 'Multi-thread'; + } + elsif (!DBIx::Class::_ENV_::BROKEN_FORK and $self->{_pid} != $$ ) { + $invalidate_handle_reason = 'Multi-process'; + } + + if ($invalidate_handle_reason) { + $self->storage->throw_exception("$invalidate_handle_reason access attempted while cursor in progress (position $self->{_pos})") + if $self->{_pos}; + + # reinvokes the reset logic above + $self->sth(undef); + } } - $self->{storage}->dbh_do($self->can('_dbh_all'), $self); + return $self->{sth}; } =head2 reset @@ -158,38 +211,30 @@ Resets the cursor to the beginning of the L. =cut sub reset { - my ($self) = @_; - - # No need to care about failures here - try { $self->sth->finish } - if $self->sth && $self->sth->{Active}; - $self->_soft_reset; - return undef; + $_[0]->__finish_sth if $_[0]->{sth}; + $_[0]->sth(undef); } -sub _soft_reset { - my ($self) = @_; - $self->sth(undef); - $self->{_done} = 0; - $self->{_pos} = 0; +sub DESTROY { + $_[0]->__finish_sth if $_[0]->{sth}; } -sub _check_dbh_gen { - my ($self) = @_; +sub __finish_sth { + # It is (sadly) extremely important to finish() handles we are about + # to lose (due to reset() or a DESTROY() ). $rs->reset is the closest + # thing the user has to getting to the underlying finish() API and some + # DBDs mandate this (e.g. DBD::InterBase will segfault, DBD::Sybase + # won't start a transaction sanely, etc) + # We also can't use the accessor here, as it will trigger a fork/thread + # check, and resetting a cursor in a child is perfectly valid - if($self->{_dbh_gen} != $self->{storage}->{_dbh_gen}) { - $self->{_dbh_gen} = $self->{storage}->{_dbh_gen}; - $self->_soft_reset; - } -} + my $self = shift; -sub DESTROY { - # None of the reasons this would die matter if we're in DESTROY anyways - if (my $sth = $_[0]->sth) { - local $SIG{__WARN__} = sub {}; - try { $sth->finish } if $sth->FETCH('Active'); - } + # No need to care about failures here + try { local $SIG{__WARN__} = sub {}; $self->{sth}->finish } if ( + $self->{sth} and ! try { ! $self->{sth}->FETCH('Active') } + ); } 1; diff --git a/lib/DBIx/Class/Storage/DBI/SQLAnywhere/Cursor.pm b/lib/DBIx/Class/Storage/DBI/SQLAnywhere/Cursor.pm index 8c9f533..189562e 100644 --- a/lib/DBIx/Class/Storage/DBI/SQLAnywhere/Cursor.pm +++ b/lib/DBIx/Class/Storage/DBI/SQLAnywhere/Cursor.pm @@ -33,64 +33,54 @@ for the inner cursor class. =cut -sub _dbh_next { - my ($storage, $dbh, $self) = @_; - - my $next = $self->next::can; - - my @row = $next->(@_); - - my $col_info = $storage->_resolve_column_info($self->args->[0]); - - my $select = $self->args->[1]; +my $unpack_guids = sub { + my ($select, $col_infos, $data, $storage) = @_; for my $select_idx (0..$#$select) { - my $selected = $select->[$select_idx]; - - next if ref $selected; + next unless ( + defined $data->[$select_idx] + and + length($data->[$select_idx]) == 16 + ); - my $data_type = $col_info->{$selected}{data_type}; + my $selected = $select->[$select_idx]; - if ($storage->_is_guid_type($data_type)) { - my $returned = $row[$select_idx]; + my $data_type = $col_infos->{$select->[$select_idx]}{data_type} + or next; - if (length $returned == 16) { - $row[$select_idx] = $storage->_uuid_to_str($returned); - } - } + $data->[$select_idx] = $storage->_uuid_to_str($data->[$select_idx]) + if $storage->_is_guid_type($data_type); } +}; - return @row; -} - -sub _dbh_all { - my ($storage, $dbh, $self) = @_; - - my $next = $self->next::can; - my @rows = $next->(@_); +sub next { + my $self = shift; - my $col_info = $storage->_resolve_column_info($self->args->[0]); + my @row = $self->next::method(@_); - my $select = $self->args->[1]; + $unpack_guids->( + $self->args->[1], + $self->{_colinfos} ||= $self->storage->_resolve_column_info($self->args->[0]), + \@row, + $self->storage + ); - for my $row (@rows) { - for my $select_idx (0..$#$select) { - my $selected = $select->[$select_idx]; + return @row; +} - next if ref $selected; +sub all { + my $self = shift; - my $data_type = $col_info->{$selected}{data_type}; + my @rows = $self->next::method(@_); - if ($storage->_is_guid_type($data_type)) { - my $returned = $row->[$select_idx]; + $unpack_guids->( + $self->args->[1], + $self->{_colinfos} ||= $self->storage->_resolve_column_info($self->args->[0]), + $_, + $self->storage + ) for @rows; - if (length $returned == 16) { - $row->[$select_idx] = $storage->_uuid_to_str($returned); - } - } - } - } return @rows; } diff --git a/t/50fork.t b/t/50fork.t index 3ddcaf3..af61dca 100644 --- a/t/50fork.t +++ b/t/50fork.t @@ -1,6 +1,7 @@ use strict; use warnings; use Test::More; +use Test::Exception; use lib qw(t/lib); use DBICTest; @@ -40,27 +41,58 @@ eval { $schema->resultset('CD')->create({ title => 'vacation in antarctica part 2', artist => 456, year => 1901 }); $parent_rs = $schema->resultset('CD')->search({ year => 1901 }); - $parent_rs->next; + is ($parent_rs->count, 2); }; ok(!$@) or diag "Creation eval failed: $@"; +# basic tests { - my $pid = fork; - if(!defined $pid) { - die "fork failed: $!"; + ok ($schema->storage->connected(), 'Parent is connected'); + is ($parent_rs->next->id, 1, 'Cursor advanced'); + + my ($parent_in, $child_out); + pipe( $parent_in, $child_out ) or die "Pipe open failed: $!"; + + my $pid = fork; + if(!defined $pid) { + die "fork failed: $!"; + } + + if (!$pid) { + close $parent_in; + + #simulate a subtest to not confuse the parent TAP emission + my $tb = Test::More->builder; + $tb->reset; + for (qw/output failure_output todo_output/) { + close $tb->$_; + open ($tb->$_, '>&', $child_out); } - if (!$pid) { - exit $schema->storage->connected ? 1 : 0; + ok(!$schema->storage->connected, "storage->connected() false in child"); + for (1,2) { + throws_ok { $parent_rs->next } qr/\QMulti-process access attempted while cursor in progress (position 1)/; } - if (waitpid($pid, 0) == $pid) { - my $ex = $? >> 8; - ok($ex == 0, "storage->connected() returns false in child"); - exit $ex if $ex; # skip remaining tests - } + $parent_rs->reset; + is($parent_rs->next->id, 1, 'Resetting cursor reprepares it within child environment'); + + done_testing; + exit 0; + } + + close $child_out; + while (my $ln = <$parent_in>) { + print " $ln"; + } + waitpid( $pid, 0 ); + ok(!$?, 'Child subtests passed'); + + is ($parent_rs->next->id, 2, 'Cursor still intact in parent'); + is ($parent_rs->next, undef, 'Cursor exhausted'); } +$parent_rs->reset; my @pids; while(@pids < $num_children) { diff --git a/t/51threads.t b/t/51threads.t index be383e5..6dc0d11 100644 --- a/t/51threads.t +++ b/t/51threads.t @@ -53,9 +53,55 @@ lives_ok (sub { $schema->resultset('CD')->create({ title => 'vacation in antarctica part 2', artist => 456, year => 1901 }); $parent_rs = $schema->resultset('CD')->search({ year => 1901 }); - $parent_rs->next; + is ($parent_rs->count, 2); }, 'populate successfull'); +# basic tests +{ + ok ($schema->storage->connected(), 'Parent is connected'); + is ($parent_rs->next->id, 1, 'Cursor advanced'); + my $ct_num = Test::More->builder->current_test; + + my $newthread = async { + my $out = ''; + + #simulate a subtest to not confuse the parent TAP emission + my $tb = Test::More->builder; + $tb->reset; + for (qw/output failure_output todo_output/) { + close $tb->$_; + open ($tb->$_, '>', \$out); + } + + ok(!$schema->storage->connected, "storage->connected() false in child"); + for (1,2) { + throws_ok { $parent_rs->next } qr/\QMulti-thread access attempted while cursor in progress (position 1)/; + } + + $parent_rs->reset; + is($parent_rs->next->id, 1, 'Resetting cursor reprepares it within child environment'); + + done_testing; + + close $tb->$_ for (qw/output failure_output todo_output/); + sleep(1); # tasty crashes without this + + $out; + }; + die "Thread creation failed: $! $@" if !defined $newthread; + + my $out = $newthread->join; + $out =~ s/^/ /gm; + print $out; + + # workaround for older Test::More confusing the plan under threads + Test::More->builder->current_test($ct_num); + + is ($parent_rs->next->id, 2, 'Cursor still intact in parent'); + is ($parent_rs->next, undef, 'Cursor exhausted'); +} + +$parent_rs->reset; my @children; while(@children < $num_children) { @@ -89,6 +135,7 @@ while(@children) { } ok(1, "Made it to the end"); +undef $parent_rs; $schema->storage->dbh->do("DROP TABLE cd"); diff --git a/t/51threadtxn.t b/t/51threadtxn.t index e6cc3ac..4ab96fb 100644 --- a/t/51threadtxn.t +++ b/t/51threadtxn.t @@ -54,7 +54,7 @@ eval { $schema->resultset('CD')->create({ title => 'vacation in antarctica part 2', artist => 456, year => 1901 }); $parent_rs = $schema->resultset('CD')->search({ year => 1901 }); - $parent_rs->next; + is ($parent_rs->count, 2); }; ok(!$@) or diag "Creation eval failed: $@"; diff --git a/t/60core.t b/t/60core.t index f21355c..ffb7d13 100644 --- a/t/60core.t +++ b/t/60core.t @@ -253,11 +253,13 @@ is ($collapsed_or_rs->all, 4, 'Collapsed joined search with OR returned correct is ($collapsed_or_rs->count, 4, 'Collapsed search count with OR ok'); # make sure sure distinct on a grouped rs is warned about -my $cd_rs = $schema->resultset ('CD') - ->search ({}, { distinct => 1, group_by => 'title' }); -warnings_exist (sub { - $cd_rs->next; -}, qr/Useless use of distinct/, 'UUoD warning'); +{ + my $cd_rs = $schema->resultset ('CD') + ->search ({}, { distinct => 1, group_by => 'title' }); + warnings_exist (sub { + $cd_rs->next; + }, qr/Useless use of distinct/, 'UUoD warning'); +} { my $tcount = $schema->resultset('Track')->search( @@ -298,6 +300,14 @@ is($or_rs->next->cdid, $rel_rs->next->cdid, 'Related object ok'); $or_rs->reset; $rel_rs->reset; +# at this point there should be no active statements +# (finish() was called everywhere, either explicitly via +# reset() or on DESTROY) +for (keys %{$schema->storage->dbh->{CachedKids}}) { + fail("Unreachable cached statement still active: $_") + if $schema->storage->dbh->{CachedKids}{$_}->FETCH('Active'); +} + my $tag = $schema->resultset('Tag')->search( [ { 'me.tag' => 'Blue' } ], { cols=>[qw/tagid/] } )->next; diff --git a/t/750firebird.t b/t/750firebird.t index aef3fcf..d092379 100644 --- a/t/750firebird.t +++ b/t/750firebird.t @@ -255,6 +255,14 @@ EOF } 'inferring generator from trigger source works'; } + # at this point there should be no active statements + # (finish() was called everywhere, either explicitly via + # reset() or on DESTROY) + for (keys %{$schema->storage->dbh->{CachedKids}}) { + fail("Unreachable cached statement still active: $_") + if $schema->storage->dbh->{CachedKids}{$_}->FETCH('Active'); + } + # test blobs (stolen from 73oracle.t) eval { $dbh->do('DROP TABLE "bindtype_test"') }; $dbh->do(q[