Minor cleanups after rebase
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Pg / Sth.pm
1 package DBIx::Class::Storage::DBI::Pg::Sth;
2 use strict;
3 use warnings;
4 use base 'Class::Accessor::Grouped';
5
6 __PACKAGE__->mk_group_accessors('simple' =>
7                                     'storage',
8                                     'cursor_id', 'cursor_sql',
9                                     'cursor_created',
10                                     'cursor_sth',
11                                     'fetch_sql', 'fetch_sth',
12                             );
13
14 =head1 NAME
15
16 DBIx::Class::Storage::DBI::Pg::Sth
17
18 =head1 DESCRIPTION
19
20 A statement wrapper to use PostgreSQL cursors on DBIx::Class C<SELECT>s
21
22 =head1 How this whole thing works
23
24 This class encapsulates I<two> DBI statements:
25
26 =over 4
27
28 =item *
29
30 one is used to declare the cursor in postgres (C<cursor_sth>)
31
32 =item *
33
34 the other is used to fetch records from the cursor (C<fetch_sth>)
35
36 =back
37
38 C<cursor_sth> is prepared as needed (in L</bind_param> or
39 L</execute>); it's executed in L</execute>. We need the bind
40 parameters to run it, and we don't want to prepare it if it won't be
41 used.
42
43 C<fetch_sth> is prepared and executed whenever we need to
44 fetch more records from the cursor. The algorithm, taken from the
45 documentation of L<DBD::Pg>, is:
46
47   declare_the_cursor($name,@bind_params);
48   while (1) {
49     my $fetch_sth = prepare_and_execute_fetch_from($name);
50     last if $fetch_sth->rows == 0; # cursor reached the end of the result set
51
52     while (my $row = $fetch_sth->fetchrow_hashref) {
53        use_the($row);
54     }
55   }
56   close_the_cursor($name);
57
58 We implement the algorithm twice, in L</fetchrow_array> and in
59 L</fetchall_arrayref> (other statement methods are not used by
60 DBIx::Class, so we don't care about them).
61
62 C<cursor_sth> is kept in an attribute of this class because we may
63 prepare/bind it in L</bind_param> and execute it in
64 L</execute>. C<cursor_created> is used to create the cursor on demand
65 (if our "fetch" methods are called before L</execute>) and to avoid
66 doing it twice.
67
68 The name of the cursor created by this class is determined by the
69 calling Storage object. Cursors are per-connection, but so are
70 statements, which means that we don't have to care about
71 re-connections here. The Storage will sort it out.
72
73 =cut
74
75 sub new {
76     my ($class, $storage, $dbh, $sql, $page_size) = @_;
77
78     # sanity, DBIx::Class::Storage::DBI::Pg should never instantiate
79     # this class for non-selects
80     if ($sql =~ /^SELECT\b/i) {
81         my $self=bless {},$class;
82         $self->storage($storage);
83
84         my $csr_id=$self->_cursor_name_from_number(
85             $storage->_get_next_pg_cursor_number()
86         );
87         my $hold= ($sql =~ /\bFOR\s+UPDATE\s*\z/i) ? '' : 'WITH HOLD';
88         # the SQL to create the cursor
89         $self->cursor_sql("DECLARE $csr_id CURSOR $hold FOR $sql");
90         # our id, used when fetching
91         $self->cursor_id($csr_id);
92         # we prepare this as late as possible
93         $self->cursor_sth(undef);
94         # we haven't created the cursor, yet
95         $self->cursor_created(0);
96         # the SQL to fetch records from the cursor
97         $self->fetch_sql("FETCH $page_size FROM $csr_id");
98
99         return $self;
100     }
101     else {
102         die "Can only be used for SELECTs";
103     }
104 }
105
106 sub _cursor_name_from_number {
107     return 'dbic_pg_cursor_'.$_[1];
108 }
109
110 sub _prepare_cursor_sth {
111     my ($self)=@_;
112
113     return if $self->cursor_sth;
114
115     $self->cursor_sth($self->storage->_sth($self->cursor_sql));
116 }
117
118 sub _cleanup_sth {
119     my ($self)=@_;
120
121     if ($self->fetch_sth) {
122         $self->fetch_sth->finish();
123         $self->fetch_sth(undef);
124     }
125     if ($self->cursor_sth) {
126         $self->cursor_sth->finish();
127         $self->cursor_sth(undef);
128         $self->storage->dbh->do('CLOSE '.$self->cursor_id);
129     }
130 }
131
132 sub DESTROY {
133     my ($self) = @_;
134
135     local $@; # be nice to callers, don't clobber their exceptions
136     eval { $self->_cleanup_sth };
137
138     return;
139 }
140
141 sub bind_param {
142     my ($self,@bind_args)=@_;
143
144     $self->_prepare_cursor_sth;
145
146     return $self->cursor_sth->bind_param(@bind_args);
147 }
148
149 sub execute {
150     my ($self,@bind_values)=@_;
151
152     $self->_prepare_cursor_sth;
153
154     my $ret=$self->cursor_sth->execute(@bind_values);
155     $self->cursor_created(1) if $ret;
156     return $ret;
157 }
158
159 # bind_param_array & execute_array not used for SELECT statements, so
160 # we'll ignore them
161
162 sub errstr {
163     my ($self)=@_;
164
165     return $self->cursor_sth->errstr;
166 }
167
168 sub finish {
169     my ($self)=@_;
170
171     $self->fetch_sth->finish if $self->fetch_sth;
172     return $self->cursor_sth->finish if $self->cursor_sth;
173     return 1;
174 }
175
176 sub _check_cursor_end {
177     my ($self) = @_;
178
179     if ($self->fetch_sth->rows == 0) {
180         $self->_cleanup_sth;
181         return 1;
182     }
183     return;
184 }
185
186 sub _run_fetch_sth {
187     my ($self)=@_;
188
189     if (!$self->cursor_created) {
190         $self->execute();
191     }
192
193     $self->fetch_sth->finish if $self->fetch_sth;
194     $self->fetch_sth($self->storage->_sth($self->fetch_sql));
195     $self->fetch_sth->execute;
196 }
197
198 sub fetchrow_array {
199     my ($self) = @_;
200
201     # start fetching if we haven't already
202     $self->_run_fetch_sth unless $self->fetch_sth;
203     # no rows? the the cursor is at the end of the resultset, nothing
204     # else to do
205     return if $self->_check_cursor_end;
206
207     # got a row
208     my @row = $self->fetch_sth->fetchrow_array;
209     if (!@row) {
210         # hmm. no row came back, we are at the end of the page
211         $self->_run_fetch_sth;
212         # we are also at the end of the resultset? if so, return
213         return if $self->_check_cursor_end;
214
215         # get the row from the new page
216         @row = $self->fetch_sth->fetchrow_array;
217     }
218     return @row;
219 }
220
221 sub fetchall_arrayref {
222     my ($self,$slice,$max_rows) = @_;
223
224     my $ret=[];
225
226     # start fetching if we haven't already
227     $self->_run_fetch_sth unless $self->fetch_sth;
228     # no rows? the the cursor is at the end of the resultset, nothing
229     # else to do
230     return if $self->_check_cursor_end;
231
232     while (1) {
233         # get the whole page from the cursor
234         my $batch=$self->fetch_sth->fetchall_arrayref($slice,$max_rows);
235
236         push @$ret,@$batch;
237
238         # take care to never return more than $max_rows
239         if (defined($max_rows) && $max_rows >=0) {
240             $max_rows -= @$batch;
241             last if $max_rows <=0;
242         }
243
244         # if the page was empty, the cursor reached the end of the
245         # resultset, get out of here
246         last if @$batch ==0;
247
248         # fetch a new page
249         $self->_run_fetch_sth;
250         # get out if this new page is empty
251         last if $self->_check_cursor_end;
252     }
253
254     return $ret;
255 }
256
257 1;