From: Gianni Ceccarelli Date: Sun, 23 May 2010 17:08:31 +0000 (+0000) Subject: cleanup, moved Pg::Sth to separate file X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=8c194608c60b8f5644f27a87e91acb7977f813f4;p=dbsrgits%2FDBIx-Class.git cleanup, moved Pg::Sth to separate file all tests pass! --- diff --git a/lib/DBIx/Class/Storage/DBI/Pg.pm b/lib/DBIx/Class/Storage/DBI/Pg.pm index a0cee8b..5854644 100644 --- a/lib/DBIx/Class/Storage/DBI/Pg.pm +++ b/lib/DBIx/Class/Storage/DBI/Pg.pm @@ -10,6 +10,7 @@ use Context::Preserve 'preserve_context'; use DBIx::Class::Carp; use Try::Tiny; use namespace::clean; +use DBIx::Class::Storage::DBI::Pg::Sth; __PACKAGE__->sql_limit_dialect ('LimitOffset'); __PACKAGE__->sql_quote_char ('"'); @@ -233,172 +234,30 @@ sub deployment_statements { sub _populate_dbh { my ($self) = @_; - $self->_pg_cursor_number(0); - $self->SUPER::_populate_dbh(); + $self->_pg_cursor_number(1); + return $self->SUPER::_populate_dbh(); } sub _get_next_pg_cursor_number { my ($self) = @_; - my $ret=$self->_pg_cursor_number; + my $ret=$self->_pg_cursor_number||0; $self->_pg_cursor_number($ret+1); + return $ret; } sub _dbh_sth { my ($self, $dbh, $sql) = @_; - DBIx::Class::Storage::DBI::Pg::Sth->new($self,$dbh,$sql); -} - -package DBIx::Class::Storage::DBI::Pg::Sth;{ -use strict; -use warnings; - -__PACKAGE__->mk_group_accessors('simple' => - 'storage', 'dbh', - 'cursor_id', 'cursor_created', - 'cursor_sth', 'fetch_sth', - ); - -sub new { - my ($class, $storage, $dbh, $sql) = @_; - if ($sql =~ /^SELECT\b/i) { - my $self=bless {},$class; - $self->storage($storage); - $self->dbh($dbh); - - $csr_id=$self->_cursor_name_from_number( - $storage->_get_next_pg_cursor_number() - ); - my $hold= ($sql =~ /\bFOR\s+UPDATE\s*\z/i) ? '' : 'WITH HOLD'; - $sql="DECLARE $csr_id CURSOR $hold FOR $sql"; - $self->cursor_id($csr_id); - $self->cursor_sth($storage->SUPER::_dbh_sth($dbh,$sql)); - $self->cursor_created(0); - return $self; + return DBIx::Class::Storage::DBI::Pg::Sth->new($self,$dbh,$sql); } else { # short-circuit - return $storage->SUPER::_dbh_sth($dbh,$sql); - } -} - -sub _cursor_name_from_number { - return 'dbic_pg_cursor_'.$_[1]; -} - -sub _cleanup_sth { - my ($self)=@_; - - eval { - $self->fetch_sth->finish() if $self->fetch_sth; - $self->fetch_sth(undef); - $self->cursor_sth->finish() if $self->cursor_sth; - $self->cursor_sth(undef); - $self->storage->_dbh_do('CLOSE '.$self->cursor_id); - }; -} - -sub DESTROY { - my ($self) = @_; - - $self->_cleanup_sth; - - return; -} - -sub bind_param { - my ($self,@bind_args)=@_; - - return $self->cursor_sth->bind_param(@bind_args); -} - -sub execute { - my ($self,@bind_values)=@_; - - return $self->cursor_sth->execute(@bind_values); -} - -# bind_param_array & execute_array not used for SELECT statements, so -# we'll ignore them - -sub errstr { - my ($self)=@_; - - return $self->cursor_sth->errstr; -} - -sub finish { - my ($self)=@_; - - $self->fetch_sth->finish if $self->fetch_sth; - return $self->cursor_sth->finish; -} - -sub _check_cursor_end { - my ($self) = @_; - if ($self->fetch_sth->rows == 0) { - $self->_cleanup_sth; - return 1; - } - return; -} - -sub _run_fetch_sth { - my ($self)=@_; - - if (!$self->cursor_created) { - $self->cursor_sth->execute(); - } - $self->fetch_sth->finish if $self->fetch_sth; - $self->fetch_sth($self->storage->sth("fetch 1000 from ".$self->cursor_id)); - $self->fetch_sth->execute; -} - -sub fetchrow_array { - my ($self) = @_; - - $self->_run_fetch_sth unless $self->fetch_sth; - return if $self->_check_cursor_end; - - my @row = $self->fetch_sth->fetchrow_array; - if (!@row) { - $self->_run_fetch_sth; - return if $self->_check_cursor_end; - - @row = $self->fetch_sth->fetchrow_array; + return $self->SUPER::_dbh_sth($dbh,$sql); } - return @row; } -sub fetchall_arrayref { - my ($self,$slice,$max_rows) = @_; - - my $ret=[]; - $self->_run_fetch_sth unless $self->fetch_sth; - return if $self->_check_cursor_end; - - while (1) { - my $batch=$self->fetch_sth->fetchall_arrayref($slice,$max_rows); - - if (@$batch == 0) { - $self->_run_fetch_sth; - last if $self->_check_cursor_end; - next; - } - - $max_rows -= @$batch; - last if $max_rows <=0; - - push @$ret,@$batch; - } - - return $ret; -} - -}; - 1; __END__ diff --git a/lib/DBIx/Class/Storage/DBI/Pg/Sth.pm b/lib/DBIx/Class/Storage/DBI/Pg/Sth.pm new file mode 100644 index 0000000..8145135 --- /dev/null +++ b/lib/DBIx/Class/Storage/DBI/Pg/Sth.pm @@ -0,0 +1,154 @@ +package DBIx::Class::Storage::DBI::Pg::Sth; +use strict; +use warnings; +use base 'Class::Accessor::Grouped'; + +__PACKAGE__->mk_group_accessors('simple' => + 'storage', + 'cursor_id', 'cursor_created', + 'cursor_sth', 'fetch_sth', + ); + +sub new { + my ($class, $storage, $dbh, $sql) = @_; + + if ($sql =~ /^SELECT\b/i) { + my $self=bless {},$class; + $self->storage($storage); + + my $csr_id=$self->_cursor_name_from_number( + $storage->_get_next_pg_cursor_number() + ); + my $hold= ($sql =~ /\bFOR\s+UPDATE\s*\z/i) ? '' : 'WITH HOLD'; + $sql="DECLARE $csr_id CURSOR $hold FOR $sql"; + $self->cursor_id($csr_id); + $self->cursor_sth($storage->_dbh_sth($dbh,$sql)); + $self->cursor_created(0); + return $self; + } + else { + die "Can only be used for SELECTS"; + } +} + +sub _cursor_name_from_number { + return 'dbic_pg_cursor_'.$_[1]; +} + +sub _cleanup_sth { + my ($self)=@_; + + if ($self->fetch_sth) { + $self->fetch_sth->finish(); + $self->fetch_sth(undef); + } + if ($self->cursor_sth) { + $self->cursor_sth->finish(); + $self->cursor_sth(undef); + $self->storage->dbh->do('CLOSE '.$self->cursor_id); + } +} + +sub DESTROY { + my ($self) = @_; + + eval { $self->_cleanup_sth }; + + return; +} + +sub bind_param { + my ($self,@bind_args)=@_; + + return $self->cursor_sth->bind_param(@bind_args); +} + +sub execute { + my ($self,@bind_values)=@_; + + $self->cursor_created(1); + return $self->cursor_sth->execute(@bind_values); +} + +# bind_param_array & execute_array not used for SELECT statements, so +# we'll ignore them + +sub errstr { + my ($self)=@_; + + return $self->cursor_sth->errstr; +} + +sub finish { + my ($self)=@_; + + $self->fetch_sth->finish if $self->fetch_sth; + return $self->cursor_sth->finish if $self->cursor_sth; + return 0; +} + +sub _check_cursor_end { + my ($self) = @_; + + if ($self->fetch_sth->rows == 0) { + $self->_cleanup_sth; + return 1; + } + return; +} + +sub _run_fetch_sth { + my ($self)=@_; + + if (!$self->cursor_created) { + $self->cursor_sth->execute(); + } + + $self->fetch_sth->finish if $self->fetch_sth; + $self->fetch_sth($self->storage->sth("fetch 1000 from ".$self->cursor_id)); + $self->fetch_sth->execute; +} + +sub fetchrow_array { + my ($self) = @_; + + $self->_run_fetch_sth unless $self->fetch_sth; + return if $self->_check_cursor_end; + + my @row = $self->fetch_sth->fetchrow_array; + if (!@row) { + $self->_run_fetch_sth; + return if $self->_check_cursor_end; + + @row = $self->fetch_sth->fetchrow_array; + } + return @row; +} + +sub fetchall_arrayref { + my ($self,$slice,$max_rows) = @_; + + my $ret=[]; + $self->_run_fetch_sth unless $self->fetch_sth; + return if $self->_check_cursor_end; + + while (1) { + my $batch=$self->fetch_sth->fetchall_arrayref($slice,$max_rows); + + push @$ret,@$batch; + + if (defined($max_rows) && $max_rows >=0) { + $max_rows -= @$batch; + last if $max_rows <=0; + } + + last if @$batch ==0; + + $self->_run_fetch_sth; + last if $self->_check_cursor_end; + } + + return $ret; +} + +1; diff --git a/t/72pg_cursors.t b/t/72pg_cursors.t new file mode 100644 index 0000000..17e01fd --- /dev/null +++ b/t/72pg_cursors.t @@ -0,0 +1,89 @@ +#!perl +use strict; +use warnings; + +use Test::More; +use lib qw(t/lib); +use DBICTest; +use Time::HiRes qw(gettimeofday tv_interval); + +my ($dsn, $dbuser, $dbpass) = @ENV{map { "DBICTEST_PG_${_}" } qw/DSN USER PASS/}; + +plan skip_all => 'Set $ENV{DBICTEST_PG_DSN}, _USER and _PASS to run this test' + unless ($dsn && $dbuser); + +plan tests => 3; + +sub create_test_schema { + my ($schema)=@_; + $schema->storage->dbh_do( + sub { + my (undef,$dbh)=@_; + local $dbh->{Warn} = 0; + $dbh->do(q[ + CREATE TABLE artist + ( + artistid serial NOT NULL PRIMARY KEY, + name varchar(100), + rank integer, + charfield char(10) + ); + ],{ RaiseError => 0, PrintError => 0 }); + }); +} + +sub drop_test_schema { + my ($schema)=@_; + $schema->storage->dbh_do( + sub { + my (undef,$dbh)=@_; + local $dbh->{Warn} = 0; + eval { $dbh->do('DROP TABLE IF EXISTS artist') }; + eval { $dbh->do('DROP SEQUENCE public.artist_artistid_seq') }; + }); +} + +# copied from 100populate.t + +my $schema = DBICTest::Schema->connection($dsn, $dbuser, $dbpass, { AutoCommit => 1 }); +drop_test_schema($schema);create_test_schema($schema); + +END { + return unless $schema; + drop_test_schema($schema); +} + +my $start_id = 'populateXaaaaaa'; +my $rows=1e4; +my $offset = 3; + +$schema->populate('Artist', [ [ qw/artistid name/ ], map { [ ($_ + $offset) => $start_id++ ] } ( 1 .. $rows ) ] ); +is ( + $schema->resultset ('Artist')->search ({ name => { -like => 'populateX%' } })->count, + $rows, + 'populate created correct number of rows with massive AoA bulk insert', +); + +{ + my $rs=$schema->resultset('Artist')->search({}); + my $count=0; + my $t0=[gettimeofday]; + $count++ while $rs->next; + is($count,$rows,'get all the rows (loop)'); + diag('Time for all(loop): '.tv_interval($t0)); +} + +{ + my $rs=$schema->resultset('Artist')->search({}); + my $t0=[gettimeofday]; + $rs->first; + diag('Time for first: '.tv_interval($t0)); +} + +{ + my $rs=$schema->resultset('Artist')->search({}); + my $t0=[gettimeofday]; + my @rows=$rs->all; + is(scalar(@rows),$rows,'get all the rows (all)'); + diag('Time for all: '.tv_interval($t0)); +}