first draft of new design
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Pg.pm
1 package DBIx::Class::Storage::DBI::Pg;
2
3 use strict;
4 use warnings;
5
6 use base qw/DBIx::Class::Storage::DBI/;
7
8 use Scope::Guard ();
9 use Context::Preserve 'preserve_context';
10 use DBIx::Class::Carp;
11 use Try::Tiny;
12 use namespace::clean;
13
14 __PACKAGE__->sql_limit_dialect ('LimitOffset');
15 __PACKAGE__->sql_quote_char ('"');
16 __PACKAGE__->datetime_parser_type ('DateTime::Format::Pg');
17 __PACKAGE__->_use_multicolumn_in (1);
18
19 __PACKAGE__->mk_group_accessors('simple' =>
20                                     '_pg_cursor_number');
21
22 sub _determine_supports_insert_returning {
23   return shift->_server_info->{normalized_dbms_version} >= 8.002
24     ? 1
25     : 0
26   ;
27 }
28
29 sub with_deferred_fk_checks {
30   my ($self, $sub) = @_;
31
32   my $txn_scope_guard = $self->txn_scope_guard;
33
34   $self->_do_query('SET CONSTRAINTS ALL DEFERRED');
35
36   my $sg = Scope::Guard->new(sub {
37     $self->_do_query('SET CONSTRAINTS ALL IMMEDIATE');
38   });
39
40   return preserve_context { $sub->() } after => sub { $txn_scope_guard->commit };
41 }
42
43 # only used when INSERT ... RETURNING is disabled
44 sub last_insert_id {
45   my ($self,$source,@cols) = @_;
46
47   my @values;
48
49   my $col_info = $source->columns_info(\@cols);
50
51   for my $col (@cols) {
52     my $seq = ( $col_info->{$col}{sequence} ||= $self->dbh_do('_dbh_get_autoinc_seq', $source, $col) )
53       or $self->throw_exception( sprintf(
54         'could not determine sequence for column %s.%s, please consider adding a schema-qualified sequence to its column info',
55           $source->name,
56           $col,
57       ));
58
59     push @values, $self->_dbh->last_insert_id(undef, undef, undef, undef, {sequence => $seq});
60   }
61
62   return @values;
63 }
64
65 sub _sequence_fetch {
66   my ($self, $function, $sequence) = @_;
67
68   $self->throw_exception('No sequence to fetch') unless $sequence;
69
70   my ($val) = $self->_get_dbh->selectrow_array(
71     sprintf ("select %s('%s')", $function, (ref $sequence eq 'SCALAR') ? $$sequence : $sequence)
72   );
73
74   return $val;
75 }
76
77 sub _dbh_get_autoinc_seq {
78   my ($self, $dbh, $source, $col) = @_;
79
80   my $schema;
81   my $table = $source->name;
82
83   # deref table name if it needs it
84   $table = $$table
85       if ref $table eq 'SCALAR';
86
87   # parse out schema name if present
88   if( $table =~ /^(.+)\.(.+)$/ ) {
89     ( $schema, $table ) = ( $1, $2 );
90   }
91
92   # get the column default using a Postgres-specific pg_catalog query
93   my $seq_expr = $self->_dbh_get_column_default( $dbh, $schema, $table, $col );
94
95   # if no default value is set on the column, or if we can't parse the
96   # default value as a sequence, throw.
97   unless ( defined $seq_expr and $seq_expr =~ /^nextval\(+'([^']+)'::(?:text|regclass)\)/i ) {
98     $seq_expr = '' unless defined $seq_expr;
99     $schema = "$schema." if defined $schema && length $schema;
100     $self->throw_exception( sprintf (
101       'no sequence found for %s%s.%s, check the RDBMS table definition or explicitly set the '.
102       "'sequence' for this column in %s",
103         $schema ? "$schema." : '',
104         $table,
105         $col,
106         $source->source_name,
107     ));
108   }
109
110   return $1;
111 }
112
113 # custom method for fetching column default, since column_info has a
114 # bug with older versions of DBD::Pg
115 sub _dbh_get_column_default {
116   my ( $self, $dbh, $schema, $table, $col ) = @_;
117
118   # Build and execute a query into the pg_catalog to find the Pg
119   # expression for the default value for this column in this table.
120   # If the table name is schema-qualified, query using that specific
121   # schema name.
122
123   # Otherwise, find the table in the standard Postgres way, using the
124   # search path.  This is done with the pg_catalog.pg_table_is_visible
125   # function, which returns true if a given table is 'visible',
126   # meaning the first table of that name to be found in the search
127   # path.
128
129   # I *think* we can be assured that this query will always find the
130   # correct column according to standard Postgres semantics.
131   #
132   # -- rbuels
133
134   my $sqlmaker = $self->sql_maker;
135   local $sqlmaker->{bindtype} = 'normal';
136
137   my ($where, @bind) = $sqlmaker->where ({
138     'a.attnum' => {'>', 0},
139     'c.relname' => $table,
140     'a.attname' => $col,
141     -not_bool => 'a.attisdropped',
142     (defined $schema && length $schema)
143       ? ( 'n.nspname' => $schema )
144       : ( -bool => \'pg_catalog.pg_table_is_visible(c.oid)' )
145   });
146
147   my ($seq_expr) = $dbh->selectrow_array(<<EOS,undef,@bind);
148
149 SELECT
150   (SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
151    FROM pg_catalog.pg_attrdef d
152    WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef)
153 FROM pg_catalog.pg_class c
154      LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
155      JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
156 $where
157
158 EOS
159
160   return $seq_expr;
161 }
162
163
164 sub sqlt_type {
165   return 'PostgreSQL';
166 }
167
168 sub bind_attribute_by_data_type {
169   my ($self,$data_type) = @_;
170
171   if ($self->_is_binary_lob_type($data_type)) {
172     # this is a hot-ish codepath, use an escape flag to minimize
173     # amount of function/method calls
174     # additionally version.pm is cock, and memleaks on multiple
175     # ->VERSION calls
176     # the flag is stored in the DBD namespace, so that Class::Unload
177     # will work (unlikely, but still)
178     unless ($DBD::Pg::__DBIC_DBD_VERSION_CHECK_DONE__) {
179       if ($self->_server_info->{normalized_dbms_version} >= 9.0) {
180         try { DBD::Pg->VERSION('2.17.2'); 1 } or carp (
181           __PACKAGE__.': BYTEA columns are known to not work on Pg >= 9.0 with DBD::Pg < 2.17.2'
182         );
183       }
184       elsif (not try { DBD::Pg->VERSION('2.9.2'); 1 } ) { carp (
185         __PACKAGE__.': DBD::Pg 2.9.2 or greater is strongly recommended for BYTEA column support'
186       )}
187
188       $DBD::Pg::__DBIC_DBD_VERSION_CHECK_DONE__ = 1;
189     }
190
191     return { pg_type => DBD::Pg::PG_BYTEA() };
192   }
193   else {
194     return undef;
195   }
196 }
197
198 sub _exec_svp_begin {
199     my ($self, $name) = @_;
200
201     $self->_dbh->pg_savepoint($name);
202 }
203
204 sub _exec_svp_release {
205     my ($self, $name) = @_;
206
207     $self->_dbh->pg_release($name);
208 }
209
210 sub _exec_svp_rollback {
211     my ($self, $name) = @_;
212
213     $self->_dbh->pg_rollback_to($name);
214 }
215
216 sub deployment_statements {
217   my $self = shift;;
218   my ($schema, $type, $version, $dir, $sqltargs, @rest) = @_;
219
220   $sqltargs ||= {};
221
222   if (
223     ! exists $sqltargs->{producer_args}{postgres_version}
224       and
225     my $dver = $self->_server_info->{normalized_dbms_version}
226   ) {
227     $sqltargs->{producer_args}{postgres_version} = $dver;
228   }
229
230   $self->next::method($schema, $type, $version, $dir, $sqltargs, @rest);
231 }
232
233 sub _populate_dbh {
234     my ($self) = @_;
235
236     $self->_pg_cursor_number(0);
237     $self->SUPER::_populate_dbh();
238 }
239
240 sub _get_next_pg_cursor_number {
241     my ($self) = @_;
242
243     my $ret=$self->_pg_cursor_number;
244     $self->_pg_cursor_number($ret+1);
245     return $ret;
246 }
247
248 sub _dbh_sth {
249     my ($self, $dbh, $sql) = @_;
250
251     DBIx::Class::Storage::DBI::Pg::Sth->new($self,$dbh,$sql);
252 }
253
254 package DBIx::Class::Storage::DBI::Pg::Sth;{
255 use strict;
256 use warnings;
257
258 __PACKAGE__->mk_group_accessors('simple' =>
259                                     'storage', 'dbh',
260                                     'cursor_id', 'cursor_created',
261                                     'cursor_sth', 'fetch_sth',
262                             );
263
264 sub new {
265     my ($class, $storage, $dbh, $sql) = @_;
266
267     if ($sql =~ /^SELECT\b/i) {
268         my $self=bless {},$class;
269         $self->storage($storage);
270         $self->dbh($dbh);
271
272         $csr_id=$self->_cursor_name_from_number(
273             $storage->_get_next_pg_cursor_number()
274         );
275         my $hold= ($sql =~ /\bFOR\s+UPDATE\s*\z/i) ? '' : 'WITH HOLD';
276         $sql="DECLARE $csr_id CURSOR $hold FOR $sql";
277         $self->cursor_id($csr_id);
278         $self->cursor_sth($storage->SUPER::_dbh_sth($dbh,$sql));
279         $self->cursor_created(0);
280         return $self;
281     }
282     else { # short-circuit
283         return $storage->SUPER::_dbh_sth($dbh,$sql);
284     }
285 }
286
287 sub _cursor_name_from_number {
288     return 'dbic_pg_cursor_'.$_[1];
289 }
290
291 sub _cleanup_sth {
292     my ($self)=@_;
293
294     eval {
295         $self->fetch_sth->finish() if $self->fetch_sth;
296         $self->fetch_sth(undef);
297         $self->cursor_sth->finish() if $self->cursor_sth;
298         $self->cursor_sth(undef);
299         $self->storage->_dbh_do('CLOSE '.$self->cursor_id);
300     };
301 }
302
303 sub DESTROY {
304     my ($self) = @_;
305
306     $self->_cleanup_sth;
307
308     return;
309 }
310
311 sub bind_param {
312     my ($self,@bind_args)=@_;
313
314     return $self->cursor_sth->bind_param(@bind_args);
315 }
316
317 sub execute {
318     my ($self,@bind_values)=@_;
319
320     return $self->cursor_sth->execute(@bind_values);
321 }
322
323 # bind_param_array & execute_array not used for SELECT statements, so
324 # we'll ignore them
325
326 sub errstr {
327     my ($self)=@_;
328
329     return $self->cursor_sth->errstr;
330 }
331
332 sub finish {
333     my ($self)=@_;
334
335     $self->fetch_sth->finish if $self->fetch_sth;
336     return $self->cursor_sth->finish;
337 }
338
339 sub _check_cursor_end {
340     my ($self) = @_;
341     if ($self->fetch_sth->rows == 0) {
342         $self->_cleanup_sth;
343         return 1;
344     }
345     return;
346 }
347
348 sub _run_fetch_sth {
349     my ($self)=@_;
350
351     if (!$self->cursor_created) {
352         $self->cursor_sth->execute();
353     }
354     $self->fetch_sth->finish if $self->fetch_sth;
355     $self->fetch_sth($self->storage->sth("fetch 1000 from ".$self->cursor_id));
356     $self->fetch_sth->execute;
357 }
358
359 sub fetchrow_array {
360     my ($self) = @_;
361
362     $self->_run_fetch_sth unless $self->fetch_sth;
363     return if $self->_check_cursor_end;
364
365     my @row = $self->fetch_sth->fetchrow_array;
366     if (!@row) {
367         $self->_run_fetch_sth;
368         return if $self->_check_cursor_end;
369
370         @row = $self->fetch_sth->fetchrow_array;
371     }
372     return @row;
373 }
374
375 sub fetchall_arrayref {
376     my ($self,$slice,$max_rows) = @_;
377
378     my $ret=[];
379     $self->_run_fetch_sth unless $self->fetch_sth;
380     return if $self->_check_cursor_end;
381
382     while (1) {
383         my $batch=$self->fetch_sth->fetchall_arrayref($slice,$max_rows);
384
385         if (@$batch == 0) {
386             $self->_run_fetch_sth;
387             last if $self->_check_cursor_end;
388             next;
389         }
390
391         $max_rows -= @$batch;
392         last if $max_rows <=0;
393
394         push @$ret,@$batch;
395     }
396
397     return $ret;
398 }
399
400 };
401
402 1;
403
404 __END__
405
406 =head1 NAME
407
408 DBIx::Class::Storage::DBI::Pg - Automatic primary key class for PostgreSQL
409
410 =head1 SYNOPSIS
411
412   # In your result (table) classes
413   use base 'DBIx::Class::Core';
414   __PACKAGE__->set_primary_key('id');
415
416 =head1 DESCRIPTION
417
418 This class implements autoincrements for PostgreSQL.
419
420 =head1 POSTGRESQL SCHEMA SUPPORT
421
422 This driver supports multiple PostgreSQL schemas, with one caveat: for
423 performance reasons, data about the search path, sequence names, and
424 so forth is queried as needed and CACHED for subsequent uses.
425
426 For this reason, once your schema is instantiated, you should not
427 change the PostgreSQL schema search path for that schema's database
428 connection. If you do, Bad Things may happen.
429
430 You should do any necessary manipulation of the search path BEFORE
431 instantiating your schema object, or as part of the on_connect_do
432 option to connect(), for example:
433
434    my $schema = My::Schema->connect
435                   ( $dsn,$user,$pass,
436                     { on_connect_do =>
437                         [ 'SET search_path TO myschema, foo, public' ],
438                     },
439                   );
440
441 =head1 AUTHORS
442
443 See L<DBIx::Class/CONTRIBUTORS>
444
445 =head1 LICENSE
446
447 You may distribute this code under the same terms as Perl itself.
448
449 =cut