use "dbms capabilities" and storage accessors
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Pg.pm
CommitLineData
843f8ecd 1package DBIx::Class::Storage::DBI::Pg;
2
3use strict;
4use warnings;
5
be64931c 6use base qw/DBIx::Class::Storage::DBI/;
843f8ecd 7
4d4dc518 8use Scope::Guard ();
6298a324 9use Context::Preserve 'preserve_context';
ee53ca0f 10use DBIx::Class::Carp;
9aec3ec6 11use Try::Tiny;
6298a324 12use namespace::clean;
8c194608 13use DBIx::Class::Storage::DBI::Pg::Sth;
843f8ecd 14
6a247f33 15__PACKAGE__->sql_limit_dialect ('LimitOffset');
2b8cc2f2 16__PACKAGE__->sql_quote_char ('"');
6f7a118e 17__PACKAGE__->datetime_parser_type ('DateTime::Format::Pg');
be64931c 18__PACKAGE__->_use_multicolumn_in (1);
6a247f33 19
24ab2353 20__PACKAGE__->mk_group_accessors('simple' =>
2d4928e3 21 '_pg_cursor_number',
22 'cursor_page_size',
23 );
590a6c43 24
bbdda281 25sub _determine_supports_insert_returning {
26 return shift->_server_info->{normalized_dbms_version} >= 8.002
27 ? 1
28 : 0
29 ;
38d5ea9f 30}
31
2d4928e3 32sub _determine_supports_server_cursors { 1 }
33
34sub _use_server_cursors { 0 } # temporary global off switch
35
e96a93df 36sub with_deferred_fk_checks {
37 my ($self, $sub) = @_;
38
4d4dc518 39 my $txn_scope_guard = $self->txn_scope_guard;
40
41 $self->_do_query('SET CONSTRAINTS ALL DEFERRED');
38d5ea9f 42
4d4dc518 43 my $sg = Scope::Guard->new(sub {
44 $self->_do_query('SET CONSTRAINTS ALL IMMEDIATE');
45 });
46
6298a324 47 return preserve_context { $sub->() } after => sub { $txn_scope_guard->commit };
e96a93df 48}
49
bab40dee 50# only used when INSERT ... RETURNING is disabled
38d5ea9f 51sub last_insert_id {
52 my ($self,$source,@cols) = @_;
53
54 my @values;
55
52416317 56 my $col_info = $source->columns_info(\@cols);
57
38d5ea9f 58 for my $col (@cols) {
52416317 59 my $seq = ( $col_info->{$col}{sequence} ||= $self->dbh_do('_dbh_get_autoinc_seq', $source, $col) )
38d5ea9f 60 or $self->throw_exception( sprintf(
61 'could not determine sequence for column %s.%s, please consider adding a schema-qualified sequence to its column info',
62 $source->name,
63 $col,
64 ));
65
66 push @values, $self->_dbh->last_insert_id(undef, undef, undef, undef, {sequence => $seq});
67 }
68
69 return @values;
70}
71
4d4dc518 72sub _sequence_fetch {
73 my ($self, $function, $sequence) = @_;
74
75 $self->throw_exception('No sequence to fetch') unless $sequence;
38d5ea9f 76
4d4dc518 77 my ($val) = $self->_get_dbh->selectrow_array(
07cda1c5 78 sprintf ("select %s('%s')", $function, (ref $sequence eq 'SCALAR') ? $$sequence : $sequence)
4d4dc518 79 );
80
81 return $val;
38d5ea9f 82}
9a0b7b26 83
2d424996 84sub _dbh_get_autoinc_seq {
85 my ($self, $dbh, $source, $col) = @_;
0063119f 86
2d424996 87 my $schema;
88 my $table = $source->name;
0063119f 89
2d424996 90 # deref table name if it needs it
91 $table = $$table
92 if ref $table eq 'SCALAR';
0063119f 93
2d424996 94 # parse out schema name if present
95 if( $table =~ /^(.+)\.(.+)$/ ) {
96 ( $schema, $table ) = ( $1, $2 );
97 }
0680ac39 98
2d424996 99 # get the column default using a Postgres-specific pg_catalog query
100 my $seq_expr = $self->_dbh_get_column_default( $dbh, $schema, $table, $col );
101
102 # if no default value is set on the column, or if we can't parse the
103 # default value as a sequence, throw.
5861223d 104 unless ( defined $seq_expr and $seq_expr =~ /^nextval\(+'([^']+)'::(?:text|regclass)\)/i ) {
2d424996 105 $seq_expr = '' unless defined $seq_expr;
106 $schema = "$schema." if defined $schema && length $schema;
d3fdc7b8 107 $self->throw_exception( sprintf (
108 'no sequence found for %s%s.%s, check the RDBMS table definition or explicitly set the '.
109 "'sequence' for this column in %s",
110 $schema ? "$schema." : '',
111 $table,
112 $col,
113 $source->source_name,
114 ));
777f7527 115 }
2d424996 116
117 return $1;
777f7527 118}
119
2d424996 120# custom method for fetching column default, since column_info has a
121# bug with older versions of DBD::Pg
122sub _dbh_get_column_default {
123 my ( $self, $dbh, $schema, $table, $col ) = @_;
124
125 # Build and execute a query into the pg_catalog to find the Pg
126 # expression for the default value for this column in this table.
127 # If the table name is schema-qualified, query using that specific
128 # schema name.
129
130 # Otherwise, find the table in the standard Postgres way, using the
131 # search path. This is done with the pg_catalog.pg_table_is_visible
132 # function, which returns true if a given table is 'visible',
133 # meaning the first table of that name to be found in the search
134 # path.
135
136 # I *think* we can be assured that this query will always find the
137 # correct column according to standard Postgres semantics.
138 #
139 # -- rbuels
140
141 my $sqlmaker = $self->sql_maker;
142 local $sqlmaker->{bindtype} = 'normal';
143
144 my ($where, @bind) = $sqlmaker->where ({
145 'a.attnum' => {'>', 0},
146 'c.relname' => $table,
147 'a.attname' => $col,
148 -not_bool => 'a.attisdropped',
149 (defined $schema && length $schema)
150 ? ( 'n.nspname' => $schema )
151 : ( -bool => \'pg_catalog.pg_table_is_visible(c.oid)' )
152 });
153
154 my ($seq_expr) = $dbh->selectrow_array(<<EOS,undef,@bind);
155
156SELECT
157 (SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
158 FROM pg_catalog.pg_attrdef d
159 WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef)
160FROM pg_catalog.pg_class c
161 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
162 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
163$where
164
165EOS
166
167 return $seq_expr;
168}
169
170
4f533e8c 171sub sqlt_type {
172 return 'PostgreSQL';
173}
174
a71859b4 175sub bind_attribute_by_data_type {
176 my ($self,$data_type) = @_;
177
8892d8e5 178 if ($self->_is_binary_lob_type($data_type)) {
179 # this is a hot-ish codepath, use an escape flag to minimize
180 # amount of function/method calls
181 # additionally version.pm is cock, and memleaks on multiple
182 # ->VERSION calls
183 # the flag is stored in the DBD namespace, so that Class::Unload
184 # will work (unlikely, but still)
185 unless ($DBD::Pg::__DBIC_DBD_VERSION_CHECK_DONE__) {
186 if ($self->_server_info->{normalized_dbms_version} >= 9.0) {
187 try { DBD::Pg->VERSION('2.17.2'); 1 } or carp (
188 __PACKAGE__.': BYTEA columns are known to not work on Pg >= 9.0 with DBD::Pg < 2.17.2'
189 );
190 }
191 elsif (not try { DBD::Pg->VERSION('2.9.2'); 1 } ) { carp (
192 __PACKAGE__.': DBD::Pg 2.9.2 or greater is strongly recommended for BYTEA column support'
193 )}
194
195 $DBD::Pg::__DBIC_DBD_VERSION_CHECK_DONE__ = 1;
9aec3ec6 196 }
ee53ca0f 197
8892d8e5 198 return { pg_type => DBD::Pg::PG_BYTEA() };
199 }
200 else {
201 return undef;
a71859b4 202 }
203}
204
90d7422f 205sub _exec_svp_begin {
eeb8cfeb 206 my ($self, $name) = @_;
adb3554a 207
90d7422f 208 $self->_dbh->pg_savepoint($name);
adb3554a 209}
210
90d7422f 211sub _exec_svp_release {
eeb8cfeb 212 my ($self, $name) = @_;
adb3554a 213
90d7422f 214 $self->_dbh->pg_release($name);
adb3554a 215}
216
90d7422f 217sub _exec_svp_rollback {
eeb8cfeb 218 my ($self, $name) = @_;
adb3554a 219
90d7422f 220 $self->_dbh->pg_rollback_to($name);
adb3554a 221}
222
c6375b4d 223sub deployment_statements {
224 my $self = shift;;
225 my ($schema, $type, $version, $dir, $sqltargs, @rest) = @_;
226
227 $sqltargs ||= {};
228
96736321 229 if (
230 ! exists $sqltargs->{producer_args}{postgres_version}
231 and
232 my $dver = $self->_server_info->{normalized_dbms_version}
233 ) {
234 $sqltargs->{producer_args}{postgres_version} = $dver;
c6375b4d 235 }
236
237 $self->next::method($schema, $type, $version, $dir, $sqltargs, @rest);
238}
239
24ab2353 240sub _populate_dbh {
241 my ($self) = @_;
242
09b689c9 243 # cursors are per-connection, so reset the numbering
8c194608 244 $self->_pg_cursor_number(1);
245 return $self->SUPER::_populate_dbh();
24ab2353 246}
247
248sub _get_next_pg_cursor_number {
249 my ($self) = @_;
250
8c194608 251 my $ret=$self->_pg_cursor_number||0;
24ab2353 252 $self->_pg_cursor_number($ret+1);
8c194608 253
24ab2353 254 return $ret;
255}
256
2d4928e3 257sub _should_use_pg_cursors {
258 my ($self,$attrs) = @_;
590a6c43 259
2d4928e3 260 if ( exists $attrs->{server_cursors}
261 && defined $attrs->{server_cursors}
590a6c43 262 ) {
2d4928e3 263 return $attrs->{server_cursors};
590a6c43 264 }
590a6c43 265
2d4928e3 266 return $self->get_use_dbms_capability('server_cursors');
590a6c43 267}
268
269sub _get_pg_cursor_page_size {
270 my ($self,$attrs) = @_;
271
2d4928e3 272 if ( exists $attrs->{cursor_page_size}
273 && defined $attrs->{cursor_page_size}
274 ) {
275 return $attrs->{cursor_page_size};
276 }
277
278 if (defined $self->cursor_page_size) {
279 return $self->cursor_page_size;
280 }
281 return 1000;
590a6c43 282}
283
284sub _select {
285 my $self = shift;
286 my ($ident, $select, $where, $attrs) = @_;
287
09b689c9 288 # ugly ugly ugly, but this is the last sub in the call chain that receives $attrs
590a6c43 289 local $self->{_use_pg_cursors}=$self->_should_use_pg_cursors($attrs);
290 local $self->{_pg_cursor_page_size}=$self->_get_pg_cursor_page_size($attrs);
291
292 return $self->next::method(@_);
293}
294
24ab2353 295sub _dbh_sth {
296 my ($self, $dbh, $sql) = @_;
297
590a6c43 298 if ($self->{_use_pg_cursors} && $sql =~ /^SELECT\b/i) {
299 return DBIx::Class::Storage::DBI::Pg::Sth
300 ->new($self,$dbh,$sql,$self->{_pg_cursor_page_size});
24ab2353 301 }
302 else { # short-circuit
590a6c43 303 return $self->next::method($dbh,$sql);
24ab2353 304 }
24ab2353 305}
306
843f8ecd 3071;
308
fd159e2a 309__END__
310
75d07914 311=head1 NAME
843f8ecd 312
09b689c9 313DBIx::Class::Storage::DBI::Pg - PostgreSQL-specific storage
843f8ecd 314
315=head1 SYNOPSIS
316
09b689c9 317Automatic primary key support:
318
d88ecca6 319 # In your result (table) classes
320 use base 'DBIx::Class::Core';
843f8ecd 321 __PACKAGE__->set_primary_key('id');
843f8ecd 322
09b689c9 323Using PostgreSQL cursors on fetches:
324
325 my $schema = MySchemaClass->connection(
326 $dsn, $user, $pass,
2d4928e3 327 );
328 $schema->storage->set_use_dbms_capability('sever_cursors');
329 $schema->storage->cursor_page_size(1000);
09b689c9 330
331 # override at ResultSet level
332 my $rs = $schema->resultset('Something')
2d4928e3 333 ->search({}, { server_cursors => 0});
09b689c9 334
843f8ecd 335=head1 DESCRIPTION
336
337This class implements autoincrements for PostgreSQL.
338
09b689c9 339It also implements fetching data via PostgreSQL cursors, as explained
340in the documentation for L<DBD::Pg>.
341
342=head1 CURSORS FETCHING SUPPORT
343
344By default, PostgreSQL cursors are not used. You can turn them on (or
345off again) either via the connection attributes, or via the ResultSet
346attributes (the latter take precedence).
347
348Fetching data using PostgreSQL cursors uses less memory, but is
349slightly slower. You can tune the memory / speed trade-off using the
2d4928e3 350C<cursor_page_size> attribute, which defines how many rows to
09b689c9 351fetch at a time (defaults to 1000).
352
7c0176a1 353=head1 POSTGRESQL SCHEMA SUPPORT
354
4f609014 355This driver supports multiple PostgreSQL schemas, with one caveat: for
6ff1d58c 356performance reasons, data about the search path, sequence names, and
357so forth is queried as needed and CACHED for subsequent uses.
7c0176a1 358
4f609014 359For this reason, once your schema is instantiated, you should not
360change the PostgreSQL schema search path for that schema's database
361connection. If you do, Bad Things may happen.
362
363You should do any necessary manipulation of the search path BEFORE
364instantiating your schema object, or as part of the on_connect_do
365option to connect(), for example:
7c0176a1 366
367 my $schema = My::Schema->connect
368 ( $dsn,$user,$pass,
369 { on_connect_do =>
370 [ 'SET search_path TO myschema, foo, public' ],
371 },
372 );
373
7ff926e6 374=head1 AUTHORS
7c0176a1 375
7ff926e6 376See L<DBIx::Class/CONTRIBUTORS>
843f8ecd 377
378=head1 LICENSE
379
380You may distribute this code under the same terms as Perl itself.
381
382=cut