first draft of new design
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Pg.pm
CommitLineData
843f8ecd 1package DBIx::Class::Storage::DBI::Pg;
2
3use strict;
4use warnings;
5
be64931c 6use base qw/DBIx::Class::Storage::DBI/;
843f8ecd 7
4d4dc518 8use Scope::Guard ();
6298a324 9use Context::Preserve 'preserve_context';
ee53ca0f 10use DBIx::Class::Carp;
9aec3ec6 11use Try::Tiny;
6298a324 12use namespace::clean;
843f8ecd 13
6a247f33 14__PACKAGE__->sql_limit_dialect ('LimitOffset');
2b8cc2f2 15__PACKAGE__->sql_quote_char ('"');
6f7a118e 16__PACKAGE__->datetime_parser_type ('DateTime::Format::Pg');
be64931c 17__PACKAGE__->_use_multicolumn_in (1);
6a247f33 18
24ab2353 19__PACKAGE__->mk_group_accessors('simple' =>
20 '_pg_cursor_number');
21
bbdda281 22sub _determine_supports_insert_returning {
23 return shift->_server_info->{normalized_dbms_version} >= 8.002
24 ? 1
25 : 0
26 ;
38d5ea9f 27}
28
e96a93df 29sub with_deferred_fk_checks {
30 my ($self, $sub) = @_;
31
4d4dc518 32 my $txn_scope_guard = $self->txn_scope_guard;
33
34 $self->_do_query('SET CONSTRAINTS ALL DEFERRED');
38d5ea9f 35
4d4dc518 36 my $sg = Scope::Guard->new(sub {
37 $self->_do_query('SET CONSTRAINTS ALL IMMEDIATE');
38 });
39
6298a324 40 return preserve_context { $sub->() } after => sub { $txn_scope_guard->commit };
e96a93df 41}
42
bab40dee 43# only used when INSERT ... RETURNING is disabled
38d5ea9f 44sub last_insert_id {
45 my ($self,$source,@cols) = @_;
46
47 my @values;
48
52416317 49 my $col_info = $source->columns_info(\@cols);
50
38d5ea9f 51 for my $col (@cols) {
52416317 52 my $seq = ( $col_info->{$col}{sequence} ||= $self->dbh_do('_dbh_get_autoinc_seq', $source, $col) )
38d5ea9f 53 or $self->throw_exception( sprintf(
54 'could not determine sequence for column %s.%s, please consider adding a schema-qualified sequence to its column info',
55 $source->name,
56 $col,
57 ));
58
59 push @values, $self->_dbh->last_insert_id(undef, undef, undef, undef, {sequence => $seq});
60 }
61
62 return @values;
63}
64
4d4dc518 65sub _sequence_fetch {
66 my ($self, $function, $sequence) = @_;
67
68 $self->throw_exception('No sequence to fetch') unless $sequence;
38d5ea9f 69
4d4dc518 70 my ($val) = $self->_get_dbh->selectrow_array(
07cda1c5 71 sprintf ("select %s('%s')", $function, (ref $sequence eq 'SCALAR') ? $$sequence : $sequence)
4d4dc518 72 );
73
74 return $val;
38d5ea9f 75}
9a0b7b26 76
2d424996 77sub _dbh_get_autoinc_seq {
78 my ($self, $dbh, $source, $col) = @_;
0063119f 79
2d424996 80 my $schema;
81 my $table = $source->name;
0063119f 82
2d424996 83 # deref table name if it needs it
84 $table = $$table
85 if ref $table eq 'SCALAR';
0063119f 86
2d424996 87 # parse out schema name if present
88 if( $table =~ /^(.+)\.(.+)$/ ) {
89 ( $schema, $table ) = ( $1, $2 );
90 }
0680ac39 91
2d424996 92 # get the column default using a Postgres-specific pg_catalog query
93 my $seq_expr = $self->_dbh_get_column_default( $dbh, $schema, $table, $col );
94
95 # if no default value is set on the column, or if we can't parse the
96 # default value as a sequence, throw.
5861223d 97 unless ( defined $seq_expr and $seq_expr =~ /^nextval\(+'([^']+)'::(?:text|regclass)\)/i ) {
2d424996 98 $seq_expr = '' unless defined $seq_expr;
99 $schema = "$schema." if defined $schema && length $schema;
d3fdc7b8 100 $self->throw_exception( sprintf (
101 'no sequence found for %s%s.%s, check the RDBMS table definition or explicitly set the '.
102 "'sequence' for this column in %s",
103 $schema ? "$schema." : '',
104 $table,
105 $col,
106 $source->source_name,
107 ));
777f7527 108 }
2d424996 109
110 return $1;
777f7527 111}
112
2d424996 113# custom method for fetching column default, since column_info has a
114# bug with older versions of DBD::Pg
115sub _dbh_get_column_default {
116 my ( $self, $dbh, $schema, $table, $col ) = @_;
117
118 # Build and execute a query into the pg_catalog to find the Pg
119 # expression for the default value for this column in this table.
120 # If the table name is schema-qualified, query using that specific
121 # schema name.
122
123 # Otherwise, find the table in the standard Postgres way, using the
124 # search path. This is done with the pg_catalog.pg_table_is_visible
125 # function, which returns true if a given table is 'visible',
126 # meaning the first table of that name to be found in the search
127 # path.
128
129 # I *think* we can be assured that this query will always find the
130 # correct column according to standard Postgres semantics.
131 #
132 # -- rbuels
133
134 my $sqlmaker = $self->sql_maker;
135 local $sqlmaker->{bindtype} = 'normal';
136
137 my ($where, @bind) = $sqlmaker->where ({
138 'a.attnum' => {'>', 0},
139 'c.relname' => $table,
140 'a.attname' => $col,
141 -not_bool => 'a.attisdropped',
142 (defined $schema && length $schema)
143 ? ( 'n.nspname' => $schema )
144 : ( -bool => \'pg_catalog.pg_table_is_visible(c.oid)' )
145 });
146
147 my ($seq_expr) = $dbh->selectrow_array(<<EOS,undef,@bind);
148
149SELECT
150 (SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
151 FROM pg_catalog.pg_attrdef d
152 WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef)
153FROM pg_catalog.pg_class c
154 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
155 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
156$where
157
158EOS
159
160 return $seq_expr;
161}
162
163
4f533e8c 164sub sqlt_type {
165 return 'PostgreSQL';
166}
167
a71859b4 168sub bind_attribute_by_data_type {
169 my ($self,$data_type) = @_;
170
8892d8e5 171 if ($self->_is_binary_lob_type($data_type)) {
172 # this is a hot-ish codepath, use an escape flag to minimize
173 # amount of function/method calls
174 # additionally version.pm is cock, and memleaks on multiple
175 # ->VERSION calls
176 # the flag is stored in the DBD namespace, so that Class::Unload
177 # will work (unlikely, but still)
178 unless ($DBD::Pg::__DBIC_DBD_VERSION_CHECK_DONE__) {
179 if ($self->_server_info->{normalized_dbms_version} >= 9.0) {
180 try { DBD::Pg->VERSION('2.17.2'); 1 } or carp (
181 __PACKAGE__.': BYTEA columns are known to not work on Pg >= 9.0 with DBD::Pg < 2.17.2'
182 );
183 }
184 elsif (not try { DBD::Pg->VERSION('2.9.2'); 1 } ) { carp (
185 __PACKAGE__.': DBD::Pg 2.9.2 or greater is strongly recommended for BYTEA column support'
186 )}
187
188 $DBD::Pg::__DBIC_DBD_VERSION_CHECK_DONE__ = 1;
9aec3ec6 189 }
ee53ca0f 190
8892d8e5 191 return { pg_type => DBD::Pg::PG_BYTEA() };
192 }
193 else {
194 return undef;
a71859b4 195 }
196}
197
90d7422f 198sub _exec_svp_begin {
eeb8cfeb 199 my ($self, $name) = @_;
adb3554a 200
90d7422f 201 $self->_dbh->pg_savepoint($name);
adb3554a 202}
203
90d7422f 204sub _exec_svp_release {
eeb8cfeb 205 my ($self, $name) = @_;
adb3554a 206
90d7422f 207 $self->_dbh->pg_release($name);
adb3554a 208}
209
90d7422f 210sub _exec_svp_rollback {
eeb8cfeb 211 my ($self, $name) = @_;
adb3554a 212
90d7422f 213 $self->_dbh->pg_rollback_to($name);
adb3554a 214}
215
c6375b4d 216sub deployment_statements {
217 my $self = shift;;
218 my ($schema, $type, $version, $dir, $sqltargs, @rest) = @_;
219
220 $sqltargs ||= {};
221
96736321 222 if (
223 ! exists $sqltargs->{producer_args}{postgres_version}
224 and
225 my $dver = $self->_server_info->{normalized_dbms_version}
226 ) {
227 $sqltargs->{producer_args}{postgres_version} = $dver;
c6375b4d 228 }
229
230 $self->next::method($schema, $type, $version, $dir, $sqltargs, @rest);
231}
232
24ab2353 233sub _populate_dbh {
234 my ($self) = @_;
235
236 $self->_pg_cursor_number(0);
237 $self->SUPER::_populate_dbh();
238}
239
240sub _get_next_pg_cursor_number {
241 my ($self) = @_;
242
243 my $ret=$self->_pg_cursor_number;
244 $self->_pg_cursor_number($ret+1);
245 return $ret;
246}
247
248sub _dbh_sth {
249 my ($self, $dbh, $sql) = @_;
250
251 DBIx::Class::Storage::DBI::Pg::Sth->new($self,$dbh,$sql);
252}
253
254package DBIx::Class::Storage::DBI::Pg::Sth;{
255use strict;
256use warnings;
257
258__PACKAGE__->mk_group_accessors('simple' =>
259 'storage', 'dbh',
260 'cursor_id', 'cursor_created',
261 'cursor_sth', 'fetch_sth',
262 );
263
264sub new {
265 my ($class, $storage, $dbh, $sql) = @_;
266
267 if ($sql =~ /^SELECT\b/i) {
268 my $self=bless {},$class;
269 $self->storage($storage);
270 $self->dbh($dbh);
271
272 $csr_id=$self->_cursor_name_from_number(
273 $storage->_get_next_pg_cursor_number()
274 );
275 my $hold= ($sql =~ /\bFOR\s+UPDATE\s*\z/i) ? '' : 'WITH HOLD';
276 $sql="DECLARE $csr_id CURSOR $hold FOR $sql";
277 $self->cursor_id($csr_id);
278 $self->cursor_sth($storage->SUPER::_dbh_sth($dbh,$sql));
279 $self->cursor_created(0);
280 return $self;
281 }
282 else { # short-circuit
283 return $storage->SUPER::_dbh_sth($dbh,$sql);
284 }
285}
286
287sub _cursor_name_from_number {
288 return 'dbic_pg_cursor_'.$_[1];
289}
290
291sub _cleanup_sth {
292 my ($self)=@_;
293
294 eval {
295 $self->fetch_sth->finish() if $self->fetch_sth;
296 $self->fetch_sth(undef);
297 $self->cursor_sth->finish() if $self->cursor_sth;
298 $self->cursor_sth(undef);
299 $self->storage->_dbh_do('CLOSE '.$self->cursor_id);
300 };
301}
302
303sub DESTROY {
304 my ($self) = @_;
305
306 $self->_cleanup_sth;
307
308 return;
309}
310
311sub bind_param {
312 my ($self,@bind_args)=@_;
313
314 return $self->cursor_sth->bind_param(@bind_args);
315}
316
317sub execute {
318 my ($self,@bind_values)=@_;
319
320 return $self->cursor_sth->execute(@bind_values);
321}
322
323# bind_param_array & execute_array not used for SELECT statements, so
324# we'll ignore them
325
326sub errstr {
327 my ($self)=@_;
328
329 return $self->cursor_sth->errstr;
330}
331
332sub finish {
333 my ($self)=@_;
334
335 $self->fetch_sth->finish if $self->fetch_sth;
336 return $self->cursor_sth->finish;
337}
338
339sub _check_cursor_end {
340 my ($self) = @_;
341 if ($self->fetch_sth->rows == 0) {
342 $self->_cleanup_sth;
343 return 1;
344 }
345 return;
346}
347
348sub _run_fetch_sth {
349 my ($self)=@_;
350
351 if (!$self->cursor_created) {
352 $self->cursor_sth->execute();
353 }
354 $self->fetch_sth->finish if $self->fetch_sth;
355 $self->fetch_sth($self->storage->sth("fetch 1000 from ".$self->cursor_id));
356 $self->fetch_sth->execute;
357}
358
359sub fetchrow_array {
360 my ($self) = @_;
361
362 $self->_run_fetch_sth unless $self->fetch_sth;
363 return if $self->_check_cursor_end;
364
365 my @row = $self->fetch_sth->fetchrow_array;
366 if (!@row) {
367 $self->_run_fetch_sth;
368 return if $self->_check_cursor_end;
369
370 @row = $self->fetch_sth->fetchrow_array;
371 }
372 return @row;
373}
374
375sub fetchall_arrayref {
376 my ($self,$slice,$max_rows) = @_;
377
378 my $ret=[];
379 $self->_run_fetch_sth unless $self->fetch_sth;
380 return if $self->_check_cursor_end;
381
382 while (1) {
383 my $batch=$self->fetch_sth->fetchall_arrayref($slice,$max_rows);
384
385 if (@$batch == 0) {
386 $self->_run_fetch_sth;
387 last if $self->_check_cursor_end;
388 next;
389 }
390
391 $max_rows -= @$batch;
392 last if $max_rows <=0;
393
394 push @$ret,@$batch;
395 }
396
397 return $ret;
398}
399
400};
401
843f8ecd 4021;
403
fd159e2a 404__END__
405
75d07914 406=head1 NAME
843f8ecd 407
408DBIx::Class::Storage::DBI::Pg - Automatic primary key class for PostgreSQL
409
410=head1 SYNOPSIS
411
d88ecca6 412 # In your result (table) classes
413 use base 'DBIx::Class::Core';
843f8ecd 414 __PACKAGE__->set_primary_key('id');
843f8ecd 415
416=head1 DESCRIPTION
417
418This class implements autoincrements for PostgreSQL.
419
7c0176a1 420=head1 POSTGRESQL SCHEMA SUPPORT
421
4f609014 422This driver supports multiple PostgreSQL schemas, with one caveat: for
6ff1d58c 423performance reasons, data about the search path, sequence names, and
424so forth is queried as needed and CACHED for subsequent uses.
7c0176a1 425
4f609014 426For this reason, once your schema is instantiated, you should not
427change the PostgreSQL schema search path for that schema's database
428connection. If you do, Bad Things may happen.
429
430You should do any necessary manipulation of the search path BEFORE
431instantiating your schema object, or as part of the on_connect_do
432option to connect(), for example:
7c0176a1 433
434 my $schema = My::Schema->connect
435 ( $dsn,$user,$pass,
436 { on_connect_do =>
437 [ 'SET search_path TO myschema, foo, public' ],
438 },
439 );
440
7ff926e6 441=head1 AUTHORS
7c0176a1 442
7ff926e6 443See L<DBIx::Class/CONTRIBUTORS>
843f8ecd 444
445=head1 LICENSE
446
447You may distribute this code under the same terms as Perl itself.
448
449=cut