dae4094b7add3b6a64f8fef011d10321c31f878d
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Pg.pm
1 package DBIx::Class::Storage::DBI::Pg;
2
3 use strict;
4 use warnings;
5
6 use base qw/DBIx::Class::Storage::DBI/;
7
8 use Scope::Guard ();
9 use Context::Preserve 'preserve_context';
10 use DBIx::Class::Carp;
11 use Try::Tiny;
12 use namespace::clean;
13 use DBIx::Class::Storage::DBI::Pg::Sth;
14
15 __PACKAGE__->sql_limit_dialect ('LimitOffset');
16 __PACKAGE__->sql_quote_char ('"');
17 __PACKAGE__->datetime_parser_type ('DateTime::Format::Pg');
18 __PACKAGE__->_use_multicolumn_in (1);
19
20 __PACKAGE__->mk_group_accessors('simple' =>
21                                     '_pg_cursor_number');
22
23 our $DEFAULT_USE_PG_CURSORS=0;
24 our $DEFAULT_PG_CURSORS_PAGE_SIZE=1000;
25
26 sub _determine_supports_insert_returning {
27   return shift->_server_info->{normalized_dbms_version} >= 8.002
28     ? 1
29     : 0
30   ;
31 }
32
33 sub with_deferred_fk_checks {
34   my ($self, $sub) = @_;
35
36   my $txn_scope_guard = $self->txn_scope_guard;
37
38   $self->_do_query('SET CONSTRAINTS ALL DEFERRED');
39
40   my $sg = Scope::Guard->new(sub {
41     $self->_do_query('SET CONSTRAINTS ALL IMMEDIATE');
42   });
43
44   return preserve_context { $sub->() } after => sub { $txn_scope_guard->commit };
45 }
46
47 # only used when INSERT ... RETURNING is disabled
48 sub last_insert_id {
49   my ($self,$source,@cols) = @_;
50
51   my @values;
52
53   my $col_info = $source->columns_info(\@cols);
54
55   for my $col (@cols) {
56     my $seq = ( $col_info->{$col}{sequence} ||= $self->dbh_do('_dbh_get_autoinc_seq', $source, $col) )
57       or $self->throw_exception( sprintf(
58         'could not determine sequence for column %s.%s, please consider adding a schema-qualified sequence to its column info',
59           $source->name,
60           $col,
61       ));
62
63     push @values, $self->_dbh->last_insert_id(undef, undef, undef, undef, {sequence => $seq});
64   }
65
66   return @values;
67 }
68
69 sub _sequence_fetch {
70   my ($self, $function, $sequence) = @_;
71
72   $self->throw_exception('No sequence to fetch') unless $sequence;
73
74   my ($val) = $self->_get_dbh->selectrow_array(
75     sprintf ("select %s('%s')", $function, (ref $sequence eq 'SCALAR') ? $$sequence : $sequence)
76   );
77
78   return $val;
79 }
80
81 sub _dbh_get_autoinc_seq {
82   my ($self, $dbh, $source, $col) = @_;
83
84   my $schema;
85   my $table = $source->name;
86
87   # deref table name if it needs it
88   $table = $$table
89       if ref $table eq 'SCALAR';
90
91   # parse out schema name if present
92   if( $table =~ /^(.+)\.(.+)$/ ) {
93     ( $schema, $table ) = ( $1, $2 );
94   }
95
96   # get the column default using a Postgres-specific pg_catalog query
97   my $seq_expr = $self->_dbh_get_column_default( $dbh, $schema, $table, $col );
98
99   # if no default value is set on the column, or if we can't parse the
100   # default value as a sequence, throw.
101   unless ( defined $seq_expr and $seq_expr =~ /^nextval\(+'([^']+)'::(?:text|regclass)\)/i ) {
102     $seq_expr = '' unless defined $seq_expr;
103     $schema = "$schema." if defined $schema && length $schema;
104     $self->throw_exception( sprintf (
105       'no sequence found for %s%s.%s, check the RDBMS table definition or explicitly set the '.
106       "'sequence' for this column in %s",
107         $schema ? "$schema." : '',
108         $table,
109         $col,
110         $source->source_name,
111     ));
112   }
113
114   return $1;
115 }
116
117 # custom method for fetching column default, since column_info has a
118 # bug with older versions of DBD::Pg
119 sub _dbh_get_column_default {
120   my ( $self, $dbh, $schema, $table, $col ) = @_;
121
122   # Build and execute a query into the pg_catalog to find the Pg
123   # expression for the default value for this column in this table.
124   # If the table name is schema-qualified, query using that specific
125   # schema name.
126
127   # Otherwise, find the table in the standard Postgres way, using the
128   # search path.  This is done with the pg_catalog.pg_table_is_visible
129   # function, which returns true if a given table is 'visible',
130   # meaning the first table of that name to be found in the search
131   # path.
132
133   # I *think* we can be assured that this query will always find the
134   # correct column according to standard Postgres semantics.
135   #
136   # -- rbuels
137
138   my $sqlmaker = $self->sql_maker;
139   local $sqlmaker->{bindtype} = 'normal';
140
141   my ($where, @bind) = $sqlmaker->where ({
142     'a.attnum' => {'>', 0},
143     'c.relname' => $table,
144     'a.attname' => $col,
145     -not_bool => 'a.attisdropped',
146     (defined $schema && length $schema)
147       ? ( 'n.nspname' => $schema )
148       : ( -bool => \'pg_catalog.pg_table_is_visible(c.oid)' )
149   });
150
151   my ($seq_expr) = $dbh->selectrow_array(<<EOS,undef,@bind);
152
153 SELECT
154   (SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
155    FROM pg_catalog.pg_attrdef d
156    WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef)
157 FROM pg_catalog.pg_class c
158      LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
159      JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
160 $where
161
162 EOS
163
164   return $seq_expr;
165 }
166
167
168 sub sqlt_type {
169   return 'PostgreSQL';
170 }
171
172 sub bind_attribute_by_data_type {
173   my ($self,$data_type) = @_;
174
175   if ($self->_is_binary_lob_type($data_type)) {
176     # this is a hot-ish codepath, use an escape flag to minimize
177     # amount of function/method calls
178     # additionally version.pm is cock, and memleaks on multiple
179     # ->VERSION calls
180     # the flag is stored in the DBD namespace, so that Class::Unload
181     # will work (unlikely, but still)
182     unless ($DBD::Pg::__DBIC_DBD_VERSION_CHECK_DONE__) {
183       if ($self->_server_info->{normalized_dbms_version} >= 9.0) {
184         try { DBD::Pg->VERSION('2.17.2'); 1 } or carp (
185           __PACKAGE__.': BYTEA columns are known to not work on Pg >= 9.0 with DBD::Pg < 2.17.2'
186         );
187       }
188       elsif (not try { DBD::Pg->VERSION('2.9.2'); 1 } ) { carp (
189         __PACKAGE__.': DBD::Pg 2.9.2 or greater is strongly recommended for BYTEA column support'
190       )}
191
192       $DBD::Pg::__DBIC_DBD_VERSION_CHECK_DONE__ = 1;
193     }
194
195     return { pg_type => DBD::Pg::PG_BYTEA() };
196   }
197   else {
198     return undef;
199   }
200 }
201
202 sub _exec_svp_begin {
203     my ($self, $name) = @_;
204
205     $self->_dbh->pg_savepoint($name);
206 }
207
208 sub _exec_svp_release {
209     my ($self, $name) = @_;
210
211     $self->_dbh->pg_release($name);
212 }
213
214 sub _exec_svp_rollback {
215     my ($self, $name) = @_;
216
217     $self->_dbh->pg_rollback_to($name);
218 }
219
220 sub deployment_statements {
221   my $self = shift;;
222   my ($schema, $type, $version, $dir, $sqltargs, @rest) = @_;
223
224   $sqltargs ||= {};
225
226   if (
227     ! exists $sqltargs->{producer_args}{postgres_version}
228       and
229     my $dver = $self->_server_info->{normalized_dbms_version}
230   ) {
231     $sqltargs->{producer_args}{postgres_version} = $dver;
232   }
233
234   $self->next::method($schema, $type, $version, $dir, $sqltargs, @rest);
235 }
236
237 sub _populate_dbh {
238     my ($self) = @_;
239
240     $self->_pg_cursor_number(1);
241     return $self->SUPER::_populate_dbh();
242 }
243
244 sub _get_next_pg_cursor_number {
245     my ($self) = @_;
246
247     my $ret=$self->_pg_cursor_number||0;
248     $self->_pg_cursor_number($ret+1);
249
250     return $ret;
251 }
252
253 sub __get_tweak_value {
254     my ($self,$attrs,$slot,$default,$extra_test)=@_;
255
256     $extra_test||=sub{1};
257
258     if (   exists $attrs->{$slot}
259         && defined $attrs->{$slot}
260         && $extra_test->($attrs->{$slot})
261     ) {
262         return $attrs->{$slot};
263     }
264     my @info=@{$self->_dbi_connect_info};
265     if (   @info
266         && ref($info[-1]) eq 'HASH'
267         && exists $info[-1]->{$slot}
268         && defined $info[-1]->{$slot}
269         && $extra_test->($info[-1]->{$slot})
270     ) {
271         return $info[-1]->{$slot};
272     }
273     return $default;
274 }
275
276 sub _should_use_pg_cursors {
277     my ($self,$attrs) = @_;
278
279     return $self->__get_tweak_value($attrs,'use_pg_cursors',$DEFAULT_USE_PG_CURSORS);
280 }
281
282 sub _get_pg_cursor_page_size {
283     my ($self,$attrs) = @_;
284
285     return $self->__get_tweak_value($attrs,'pg_cursors_page_size',$DEFAULT_PG_CURSORS_PAGE_SIZE,
286                                     sub { $_[0] =~ /^\d+$/ });
287 }
288
289 sub _select {
290     my $self = shift;
291     my ($ident, $select, $where, $attrs) = @_;
292
293     local $self->{_use_pg_cursors}=$self->_should_use_pg_cursors($attrs);
294     local $self->{_pg_cursor_page_size}=$self->_get_pg_cursor_page_size($attrs);
295
296     return $self->next::method(@_);
297 }
298
299 sub _dbh_sth {
300     my ($self, $dbh, $sql) = @_;
301
302     if ($self->{_use_pg_cursors} && $sql =~ /^SELECT\b/i) {
303         return DBIx::Class::Storage::DBI::Pg::Sth
304             ->new($self,$dbh,$sql,$self->{_pg_cursor_page_size});
305     }
306     else { # short-circuit
307         return $self->next::method($dbh,$sql);
308     }
309 }
310
311 1;
312
313 __END__
314
315 =head1 NAME
316
317 DBIx::Class::Storage::DBI::Pg - Automatic primary key class for PostgreSQL
318
319 =head1 SYNOPSIS
320
321   # In your result (table) classes
322   use base 'DBIx::Class::Core';
323   __PACKAGE__->set_primary_key('id');
324
325 =head1 DESCRIPTION
326
327 This class implements autoincrements for PostgreSQL.
328
329 =head1 POSTGRESQL SCHEMA SUPPORT
330
331 This driver supports multiple PostgreSQL schemas, with one caveat: for
332 performance reasons, data about the search path, sequence names, and
333 so forth is queried as needed and CACHED for subsequent uses.
334
335 For this reason, once your schema is instantiated, you should not
336 change the PostgreSQL schema search path for that schema's database
337 connection. If you do, Bad Things may happen.
338
339 You should do any necessary manipulation of the search path BEFORE
340 instantiating your schema object, or as part of the on_connect_do
341 option to connect(), for example:
342
343    my $schema = My::Schema->connect
344                   ( $dsn,$user,$pass,
345                     { on_connect_do =>
346                         [ 'SET search_path TO myschema, foo, public' ],
347                     },
348                   );
349
350 =head1 AUTHORS
351
352 See L<DBIx::Class/CONTRIBUTORS>
353
354 =head1 LICENSE
355
356 You may distribute this code under the same terms as Perl itself.
357
358 =cut