first draft of new design
Gianni Ceccarelli [Sun, 23 May 2010 17:08:28 +0000 (17:08 +0000)]
lib/DBIx/Class/Storage/DBI/Pg.pm

index d38f84c..a0cee8b 100644 (file)
@@ -16,6 +16,9 @@ __PACKAGE__->sql_quote_char ('"');
 __PACKAGE__->datetime_parser_type ('DateTime::Format::Pg');
 __PACKAGE__->_use_multicolumn_in (1);
 
+__PACKAGE__->mk_group_accessors('simple' =>
+                                    '_pg_cursor_number');
+
 sub _determine_supports_insert_returning {
   return shift->_server_info->{normalized_dbms_version} >= 8.002
     ? 1
@@ -227,6 +230,175 @@ sub deployment_statements {
   $self->next::method($schema, $type, $version, $dir, $sqltargs, @rest);
 }
 
+sub _populate_dbh {
+    my ($self) = @_;
+
+    $self->_pg_cursor_number(0);
+    $self->SUPER::_populate_dbh();
+}
+
+sub _get_next_pg_cursor_number {
+    my ($self) = @_;
+
+    my $ret=$self->_pg_cursor_number;
+    $self->_pg_cursor_number($ret+1);
+    return $ret;
+}
+
+sub _dbh_sth {
+    my ($self, $dbh, $sql) = @_;
+
+    DBIx::Class::Storage::DBI::Pg::Sth->new($self,$dbh,$sql);
+}
+
+package DBIx::Class::Storage::DBI::Pg::Sth;{
+use strict;
+use warnings;
+
+__PACKAGE__->mk_group_accessors('simple' =>
+                                    'storage', 'dbh',
+                                    'cursor_id', 'cursor_created',
+                                    'cursor_sth', 'fetch_sth',
+                            );
+
+sub new {
+    my ($class, $storage, $dbh, $sql) = @_;
+
+    if ($sql =~ /^SELECT\b/i) {
+        my $self=bless {},$class;
+        $self->storage($storage);
+        $self->dbh($dbh);
+
+        $csr_id=$self->_cursor_name_from_number(
+            $storage->_get_next_pg_cursor_number()
+        );
+        my $hold= ($sql =~ /\bFOR\s+UPDATE\s*\z/i) ? '' : 'WITH HOLD';
+        $sql="DECLARE $csr_id CURSOR $hold FOR $sql";
+        $self->cursor_id($csr_id);
+        $self->cursor_sth($storage->SUPER::_dbh_sth($dbh,$sql));
+        $self->cursor_created(0);
+        return $self;
+    }
+    else { # short-circuit
+        return $storage->SUPER::_dbh_sth($dbh,$sql);
+    }
+}
+
+sub _cursor_name_from_number {
+    return 'dbic_pg_cursor_'.$_[1];
+}
+
+sub _cleanup_sth {
+    my ($self)=@_;
+
+    eval {
+        $self->fetch_sth->finish() if $self->fetch_sth;
+        $self->fetch_sth(undef);
+        $self->cursor_sth->finish() if $self->cursor_sth;
+        $self->cursor_sth(undef);
+        $self->storage->_dbh_do('CLOSE '.$self->cursor_id);
+    };
+}
+
+sub DESTROY {
+    my ($self) = @_;
+
+    $self->_cleanup_sth;
+
+    return;
+}
+
+sub bind_param {
+    my ($self,@bind_args)=@_;
+
+    return $self->cursor_sth->bind_param(@bind_args);
+}
+
+sub execute {
+    my ($self,@bind_values)=@_;
+
+    return $self->cursor_sth->execute(@bind_values);
+}
+
+# bind_param_array & execute_array not used for SELECT statements, so
+# we'll ignore them
+
+sub errstr {
+    my ($self)=@_;
+
+    return $self->cursor_sth->errstr;
+}
+
+sub finish {
+    my ($self)=@_;
+
+    $self->fetch_sth->finish if $self->fetch_sth;
+    return $self->cursor_sth->finish;
+}
+
+sub _check_cursor_end {
+    my ($self) = @_;
+    if ($self->fetch_sth->rows == 0) {
+        $self->_cleanup_sth;
+        return 1;
+    }
+    return;
+}
+
+sub _run_fetch_sth {
+    my ($self)=@_;
+
+    if (!$self->cursor_created) {
+        $self->cursor_sth->execute();
+    }
+    $self->fetch_sth->finish if $self->fetch_sth;
+    $self->fetch_sth($self->storage->sth("fetch 1000 from ".$self->cursor_id));
+    $self->fetch_sth->execute;
+}
+
+sub fetchrow_array {
+    my ($self) = @_;
+
+    $self->_run_fetch_sth unless $self->fetch_sth;
+    return if $self->_check_cursor_end;
+
+    my @row = $self->fetch_sth->fetchrow_array;
+    if (!@row) {
+        $self->_run_fetch_sth;
+        return if $self->_check_cursor_end;
+
+        @row = $self->fetch_sth->fetchrow_array;
+    }
+    return @row;
+}
+
+sub fetchall_arrayref {
+    my ($self,$slice,$max_rows) = @_;
+
+    my $ret=[];
+    $self->_run_fetch_sth unless $self->fetch_sth;
+    return if $self->_check_cursor_end;
+
+    while (1) {
+        my $batch=$self->fetch_sth->fetchall_arrayref($slice,$max_rows);
+
+        if (@$batch == 0) {
+            $self->_run_fetch_sth;
+            last if $self->_check_cursor_end;
+            next;
+        }
+
+        $max_rows -= @$batch;
+        last if $max_rows <=0;
+
+        push @$ret,@$batch;
+    }
+
+    return $ret;
+}
+
+};
+
 1;
 
 __END__