use "dbms capabilities" and storage accessors
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Pg.pm
1 package DBIx::Class::Storage::DBI::Pg;
2
3 use strict;
4 use warnings;
5
6 use base qw/DBIx::Class::Storage::DBI/;
7
8 use Scope::Guard ();
9 use Context::Preserve 'preserve_context';
10 use DBIx::Class::Carp;
11 use Try::Tiny;
12 use namespace::clean;
13 use DBIx::Class::Storage::DBI::Pg::Sth;
14
15 __PACKAGE__->sql_limit_dialect ('LimitOffset');
16 __PACKAGE__->sql_quote_char ('"');
17 __PACKAGE__->datetime_parser_type ('DateTime::Format::Pg');
18 __PACKAGE__->_use_multicolumn_in (1);
19
20 __PACKAGE__->mk_group_accessors('simple' =>
21                                     '_pg_cursor_number',
22                                     'cursor_page_size',
23                             );
24
25 sub _determine_supports_insert_returning {
26   return shift->_server_info->{normalized_dbms_version} >= 8.002
27     ? 1
28     : 0
29   ;
30 }
31
32 sub _determine_supports_server_cursors { 1 }
33
34 sub _use_server_cursors { 0 } # temporary global off switch
35
36 sub with_deferred_fk_checks {
37   my ($self, $sub) = @_;
38
39   my $txn_scope_guard = $self->txn_scope_guard;
40
41   $self->_do_query('SET CONSTRAINTS ALL DEFERRED');
42
43   my $sg = Scope::Guard->new(sub {
44     $self->_do_query('SET CONSTRAINTS ALL IMMEDIATE');
45   });
46
47   return preserve_context { $sub->() } after => sub { $txn_scope_guard->commit };
48 }
49
50 # only used when INSERT ... RETURNING is disabled
51 sub last_insert_id {
52   my ($self,$source,@cols) = @_;
53
54   my @values;
55
56   my $col_info = $source->columns_info(\@cols);
57
58   for my $col (@cols) {
59     my $seq = ( $col_info->{$col}{sequence} ||= $self->dbh_do('_dbh_get_autoinc_seq', $source, $col) )
60       or $self->throw_exception( sprintf(
61         'could not determine sequence for column %s.%s, please consider adding a schema-qualified sequence to its column info',
62           $source->name,
63           $col,
64       ));
65
66     push @values, $self->_dbh->last_insert_id(undef, undef, undef, undef, {sequence => $seq});
67   }
68
69   return @values;
70 }
71
72 sub _sequence_fetch {
73   my ($self, $function, $sequence) = @_;
74
75   $self->throw_exception('No sequence to fetch') unless $sequence;
76
77   my ($val) = $self->_get_dbh->selectrow_array(
78     sprintf ("select %s('%s')", $function, (ref $sequence eq 'SCALAR') ? $$sequence : $sequence)
79   );
80
81   return $val;
82 }
83
84 sub _dbh_get_autoinc_seq {
85   my ($self, $dbh, $source, $col) = @_;
86
87   my $schema;
88   my $table = $source->name;
89
90   # deref table name if it needs it
91   $table = $$table
92       if ref $table eq 'SCALAR';
93
94   # parse out schema name if present
95   if( $table =~ /^(.+)\.(.+)$/ ) {
96     ( $schema, $table ) = ( $1, $2 );
97   }
98
99   # get the column default using a Postgres-specific pg_catalog query
100   my $seq_expr = $self->_dbh_get_column_default( $dbh, $schema, $table, $col );
101
102   # if no default value is set on the column, or if we can't parse the
103   # default value as a sequence, throw.
104   unless ( defined $seq_expr and $seq_expr =~ /^nextval\(+'([^']+)'::(?:text|regclass)\)/i ) {
105     $seq_expr = '' unless defined $seq_expr;
106     $schema = "$schema." if defined $schema && length $schema;
107     $self->throw_exception( sprintf (
108       'no sequence found for %s%s.%s, check the RDBMS table definition or explicitly set the '.
109       "'sequence' for this column in %s",
110         $schema ? "$schema." : '',
111         $table,
112         $col,
113         $source->source_name,
114     ));
115   }
116
117   return $1;
118 }
119
120 # custom method for fetching column default, since column_info has a
121 # bug with older versions of DBD::Pg
122 sub _dbh_get_column_default {
123   my ( $self, $dbh, $schema, $table, $col ) = @_;
124
125   # Build and execute a query into the pg_catalog to find the Pg
126   # expression for the default value for this column in this table.
127   # If the table name is schema-qualified, query using that specific
128   # schema name.
129
130   # Otherwise, find the table in the standard Postgres way, using the
131   # search path.  This is done with the pg_catalog.pg_table_is_visible
132   # function, which returns true if a given table is 'visible',
133   # meaning the first table of that name to be found in the search
134   # path.
135
136   # I *think* we can be assured that this query will always find the
137   # correct column according to standard Postgres semantics.
138   #
139   # -- rbuels
140
141   my $sqlmaker = $self->sql_maker;
142   local $sqlmaker->{bindtype} = 'normal';
143
144   my ($where, @bind) = $sqlmaker->where ({
145     'a.attnum' => {'>', 0},
146     'c.relname' => $table,
147     'a.attname' => $col,
148     -not_bool => 'a.attisdropped',
149     (defined $schema && length $schema)
150       ? ( 'n.nspname' => $schema )
151       : ( -bool => \'pg_catalog.pg_table_is_visible(c.oid)' )
152   });
153
154   my ($seq_expr) = $dbh->selectrow_array(<<EOS,undef,@bind);
155
156 SELECT
157   (SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
158    FROM pg_catalog.pg_attrdef d
159    WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef)
160 FROM pg_catalog.pg_class c
161      LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
162      JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
163 $where
164
165 EOS
166
167   return $seq_expr;
168 }
169
170
171 sub sqlt_type {
172   return 'PostgreSQL';
173 }
174
175 sub bind_attribute_by_data_type {
176   my ($self,$data_type) = @_;
177
178   if ($self->_is_binary_lob_type($data_type)) {
179     # this is a hot-ish codepath, use an escape flag to minimize
180     # amount of function/method calls
181     # additionally version.pm is cock, and memleaks on multiple
182     # ->VERSION calls
183     # the flag is stored in the DBD namespace, so that Class::Unload
184     # will work (unlikely, but still)
185     unless ($DBD::Pg::__DBIC_DBD_VERSION_CHECK_DONE__) {
186       if ($self->_server_info->{normalized_dbms_version} >= 9.0) {
187         try { DBD::Pg->VERSION('2.17.2'); 1 } or carp (
188           __PACKAGE__.': BYTEA columns are known to not work on Pg >= 9.0 with DBD::Pg < 2.17.2'
189         );
190       }
191       elsif (not try { DBD::Pg->VERSION('2.9.2'); 1 } ) { carp (
192         __PACKAGE__.': DBD::Pg 2.9.2 or greater is strongly recommended for BYTEA column support'
193       )}
194
195       $DBD::Pg::__DBIC_DBD_VERSION_CHECK_DONE__ = 1;
196     }
197
198     return { pg_type => DBD::Pg::PG_BYTEA() };
199   }
200   else {
201     return undef;
202   }
203 }
204
205 sub _exec_svp_begin {
206     my ($self, $name) = @_;
207
208     $self->_dbh->pg_savepoint($name);
209 }
210
211 sub _exec_svp_release {
212     my ($self, $name) = @_;
213
214     $self->_dbh->pg_release($name);
215 }
216
217 sub _exec_svp_rollback {
218     my ($self, $name) = @_;
219
220     $self->_dbh->pg_rollback_to($name);
221 }
222
223 sub deployment_statements {
224   my $self = shift;;
225   my ($schema, $type, $version, $dir, $sqltargs, @rest) = @_;
226
227   $sqltargs ||= {};
228
229   if (
230     ! exists $sqltargs->{producer_args}{postgres_version}
231       and
232     my $dver = $self->_server_info->{normalized_dbms_version}
233   ) {
234     $sqltargs->{producer_args}{postgres_version} = $dver;
235   }
236
237   $self->next::method($schema, $type, $version, $dir, $sqltargs, @rest);
238 }
239
240 sub _populate_dbh {
241     my ($self) = @_;
242
243     # cursors are per-connection, so reset the numbering
244     $self->_pg_cursor_number(1);
245     return $self->SUPER::_populate_dbh();
246 }
247
248 sub _get_next_pg_cursor_number {
249     my ($self) = @_;
250
251     my $ret=$self->_pg_cursor_number||0;
252     $self->_pg_cursor_number($ret+1);
253
254     return $ret;
255 }
256
257 sub _should_use_pg_cursors {
258     my ($self,$attrs) = @_;
259
260     if (   exists $attrs->{server_cursors}
261         && defined $attrs->{server_cursors}
262     ) {
263         return $attrs->{server_cursors};
264     }
265
266     return $self->get_use_dbms_capability('server_cursors');
267 }
268
269 sub _get_pg_cursor_page_size {
270     my ($self,$attrs) = @_;
271
272     if (   exists $attrs->{cursor_page_size}
273         && defined $attrs->{cursor_page_size}
274     ) {
275         return $attrs->{cursor_page_size};
276     }
277
278     if (defined $self->cursor_page_size) {
279         return $self->cursor_page_size;
280     }
281     return 1000;
282 }
283
284 sub _select {
285     my $self = shift;
286     my ($ident, $select, $where, $attrs) = @_;
287
288     # ugly ugly ugly, but this is the last sub in the call chain that receives $attrs
289     local $self->{_use_pg_cursors}=$self->_should_use_pg_cursors($attrs);
290     local $self->{_pg_cursor_page_size}=$self->_get_pg_cursor_page_size($attrs);
291
292     return $self->next::method(@_);
293 }
294
295 sub _dbh_sth {
296     my ($self, $dbh, $sql) = @_;
297
298     if ($self->{_use_pg_cursors} && $sql =~ /^SELECT\b/i) {
299         return DBIx::Class::Storage::DBI::Pg::Sth
300             ->new($self,$dbh,$sql,$self->{_pg_cursor_page_size});
301     }
302     else { # short-circuit
303         return $self->next::method($dbh,$sql);
304     }
305 }
306
307 1;
308
309 __END__
310
311 =head1 NAME
312
313 DBIx::Class::Storage::DBI::Pg - PostgreSQL-specific storage
314
315 =head1 SYNOPSIS
316
317 Automatic primary key support:
318
319   # In your result (table) classes
320   use base 'DBIx::Class::Core';
321   __PACKAGE__->set_primary_key('id');
322
323 Using PostgreSQL cursors on fetches:
324
325   my $schema = MySchemaClass->connection(
326                    $dsn, $user, $pass,
327                );
328   $schema->storage->set_use_dbms_capability('sever_cursors');
329   $schema->storage->cursor_page_size(1000);
330
331   # override at ResultSet level
332   my $rs = $schema->resultset('Something')
333                   ->search({}, { server_cursors => 0});
334
335 =head1 DESCRIPTION
336
337 This class implements autoincrements for PostgreSQL.
338
339 It also implements fetching data via PostgreSQL cursors, as explained
340 in the documentation for L<DBD::Pg>.
341
342 =head1 CURSORS FETCHING SUPPORT
343
344 By default, PostgreSQL cursors are not used. You can turn them on (or
345 off again) either via the connection attributes, or via the ResultSet
346 attributes (the latter take precedence).
347
348 Fetching data using PostgreSQL cursors uses less memory, but is
349 slightly slower. You can tune the memory / speed trade-off using the
350 C<cursor_page_size> attribute, which defines how many rows to
351 fetch at a time (defaults to 1000).
352
353 =head1 POSTGRESQL SCHEMA SUPPORT
354
355 This driver supports multiple PostgreSQL schemas, with one caveat: for
356 performance reasons, data about the search path, sequence names, and
357 so forth is queried as needed and CACHED for subsequent uses.
358
359 For this reason, once your schema is instantiated, you should not
360 change the PostgreSQL schema search path for that schema's database
361 connection. If you do, Bad Things may happen.
362
363 You should do any necessary manipulation of the search path BEFORE
364 instantiating your schema object, or as part of the on_connect_do
365 option to connect(), for example:
366
367    my $schema = My::Schema->connect
368                   ( $dsn,$user,$pass,
369                     { on_connect_do =>
370                         [ 'SET search_path TO myschema, foo, public' ],
371                     },
372                   );
373
374 =head1 AUTHORS
375
376 See L<DBIx::Class/CONTRIBUTORS>
377
378 =head1 LICENSE
379
380 You may distribute this code under the same terms as Perl itself.
381
382 =cut