sub _populate_dbh {
my ($self) = @_;
- # cursors are per-connection, so reset the numbering
+ # cursors are per-connection, so we can reset the numbering
+ # without fear of collisions
$self->_pg_cursor_number(1);
return $self->SUPER::_populate_dbh();
}
if (defined $self->cursor_page_size) {
return $self->cursor_page_size;
}
+
return 1000;
}
my $self = shift;
my ($ident, $select, $where, $attrs) = @_;
- # ugly ugly ugly, but this is the last sub in the call chain that receives $attrs
+ # ugly ugly ugly, but this is the last sub in the call chain that
+ # receives $attrs
local $self->{_use_pg_cursors}=$self->_should_use_pg_cursors($attrs);
local $self->{_pg_cursor_page_size}=$self->_get_pg_cursor_page_size($attrs);
sub _dbh_sth {
my ($self, $dbh, $sql) = @_;
+ # here we have to use the ugly local attributes because we no
+ # longer have access to the resultset attributes
if ($self->{_use_pg_cursors} && $sql =~ /^SELECT\b/i) {
return DBIx::Class::Storage::DBI::Pg::Sth
->new($self,$dbh,$sql,$self->{_pg_cursor_page_size});
'storage',
'cursor_id', 'cursor_sql',
'cursor_created',
- 'cursor_sth', 'fetch_sth',
- 'page_size',
+ 'cursor_sth',
+ 'fetch_sql', 'fetch_sth',
);
+=head1 NAME
+
+DBIx::Class::Storage::DBI::Pg::Sth
+
+=head1 DESCRIPTION
+
+A statement wrapper to use PostgreSQL cursors on DBIx::Class C<SELECT>s
+
+=head1 How this whole thing works
+
+This class encapsulates I<two> DBI statements:
+
+=over 4
+
+=item *
+
+one is used to declare the cursor in postgres (C<cursor_sth>)
+
+=item *
+
+the other is used to fetch records from the cursor (C<fetch_sth>)
+
+=back
+
+C<cursor_sth> is prepared as needed (in L</bind_param> or
+L</execute>); it's executed in L</execute>. We need the bind
+parameters to run it, and we don't want to prepare it if it won't be
+used.
+
+C<fetch_sth> is prepared and executed whenever we need to
+fetch more records from the cursor. The algorithm, taken from the
+documentation of L<DBD::Pg>, is:
+
+ declare_the_cursor($name,@bind_params);
+ while (1) {
+ my $fetch_sth = prepare_and_execute_fetch_from($name);
+ last if $fetch_sth->rows == 0; # cursor reached the end of the result set
+
+ while (my $row = $fetch_sth->fetchrow_hashref) {
+ use_the($row);
+ }
+ }
+ close_the_cursor($name);
+
+We implement the algorithm twice, in L</fetchrow_array> and in
+L</fetchall_arrayref> (other statement methods are not used by
+DBIx::Class, so we don't care about them).
+
+C<cursor_sth> is kept in an attribute of this class because we may
+prepare/bind it in L</bind_param> and execute it in
+L</execute>. C<cursor_created> is used to create the cursor on demand
+(if our "fetch" methods are called before L</execute>) and to avoid
+doing it twice.
+
+The name of the cursor created by this class is determined by the
+calling Storage object. Cursors are per-connection, but so are
+statements, which means that we don't have to care about
+re-connections here. The Storage will sort it out.
+
+=cut
+
sub new {
my ($class, $storage, $dbh, $sql, $page_size) = @_;
+ # sanity, DBIx::Class::Storage::DBI::Pg should never instantiate
+ # this class for non-selects
if ($sql =~ /^SELECT\b/i) {
my $self=bless {},$class;
$self->storage($storage);
$storage->_get_next_pg_cursor_number()
);
my $hold= ($sql =~ /\bFOR\s+UPDATE\s*\z/i) ? '' : 'WITH HOLD';
+ # the SQL to create the cursor
$self->cursor_sql("DECLARE $csr_id CURSOR $hold FOR $sql");
+ # our id, used when fetching
$self->cursor_id($csr_id);
+ # we prepare this as late as possible
$self->cursor_sth(undef);
+ # we haven't created the cursor, yet
$self->cursor_created(0);
- $self->page_size($page_size);
+ # the SQL to fetch records from the cursor
+ $self->fetch_sql("FETCH $page_size FROM $csr_id");
+
return $self;
}
else {
sub DESTROY {
my ($self) = @_;
+ local $@; # be nice to callers, don't clobber their exceptions
eval { $self->_cleanup_sth };
return;
}
$self->fetch_sth->finish if $self->fetch_sth;
- $self->fetch_sth($self->storage->sth(
- sprintf 'fetch %d from %s',
- $self->page_size,
- $self->cursor_id
- ));
+ $self->fetch_sth($self->storage->sth($self->fetch_sql));
$self->fetch_sth->execute;
}
sub fetchrow_array {
my ($self) = @_;
+ # start fetching if we haven't already
$self->_run_fetch_sth unless $self->fetch_sth;
+ # no rows? the the cursor is at the end of the resultset, nothing
+ # else to do
return if $self->_check_cursor_end;
+ # got a row
my @row = $self->fetch_sth->fetchrow_array;
if (!@row) {
+ # hmm. no row came back, we are at the end of the page
$self->_run_fetch_sth;
+ # we are also at the end of the resultset? if so, return
return if $self->_check_cursor_end;
+ # get the row from the new page
@row = $self->fetch_sth->fetchrow_array;
}
return @row;
my ($self,$slice,$max_rows) = @_;
my $ret=[];
+
+ # start fetching if we haven't already
$self->_run_fetch_sth unless $self->fetch_sth;
+ # no rows? the the cursor is at the end of the resultset, nothing
+ # else to do
return if $self->_check_cursor_end;
while (1) {
+ # get the whole page from the cursor
my $batch=$self->fetch_sth->fetchall_arrayref($slice,$max_rows);
push @$ret,@$batch;
+ # take care to never return more than $max_rows
if (defined($max_rows) && $max_rows >=0) {
$max_rows -= @$batch;
last if $max_rows <=0;
}
+ # if the page was empty, the cursor reached the end of the
+ # resultset, get out of here
last if @$batch ==0;
+ # fetch a new page
$self->_run_fetch_sth;
+ # get out if this new page is empty
last if $self->_check_cursor_end;
}