Another overhaul of transaction/savepoint handling
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Pg.pm
1 package DBIx::Class::Storage::DBI::Pg;
2
3 use strict;
4 use warnings;
5
6 use base qw/
7     DBIx::Class::Storage::DBI::MultiColumnIn
8 /;
9 use mro 'c3';
10
11 use Scope::Guard ();
12 use Context::Preserve 'preserve_context';
13 use DBIx::Class::Carp;
14 use namespace::clean;
15
16 __PACKAGE__->sql_limit_dialect ('LimitOffset');
17 __PACKAGE__->sql_quote_char ('"');
18 __PACKAGE__->datetime_parser_type ('DateTime::Format::Pg');
19
20 sub _determine_supports_insert_returning {
21   return shift->_server_info->{normalized_dbms_version} >= 8.002
22     ? 1
23     : 0
24   ;
25 }
26
27 sub with_deferred_fk_checks {
28   my ($self, $sub) = @_;
29
30   my $txn_scope_guard = $self->txn_scope_guard;
31
32   $self->_do_query('SET CONSTRAINTS ALL DEFERRED');
33
34   my $sg = Scope::Guard->new(sub {
35     $self->_do_query('SET CONSTRAINTS ALL IMMEDIATE');
36   });
37
38   return preserve_context { $sub->() } after => sub { $txn_scope_guard->commit };
39 }
40
41 # only used when INSERT ... RETURNING is disabled
42 sub last_insert_id {
43   my ($self,$source,@cols) = @_;
44
45   my @values;
46
47   my $col_info = $source->columns_info(\@cols);
48
49   for my $col (@cols) {
50     my $seq = ( $col_info->{$col}{sequence} ||= $self->dbh_do('_dbh_get_autoinc_seq', $source, $col) )
51       or $self->throw_exception( sprintf(
52         'could not determine sequence for column %s.%s, please consider adding a schema-qualified sequence to its column info',
53           $source->name,
54           $col,
55       ));
56
57     push @values, $self->_dbh->last_insert_id(undef, undef, undef, undef, {sequence => $seq});
58   }
59
60   return @values;
61 }
62
63 sub _sequence_fetch {
64   my ($self, $function, $sequence) = @_;
65
66   $self->throw_exception('No sequence to fetch') unless $sequence;
67
68   my ($val) = $self->_get_dbh->selectrow_array(
69     sprintf ("select %s('%s')", $function, (ref $sequence eq 'SCALAR') ? $$sequence : $sequence)
70   );
71
72   return $val;
73 }
74
75 sub _dbh_get_autoinc_seq {
76   my ($self, $dbh, $source, $col) = @_;
77
78   my $schema;
79   my $table = $source->name;
80
81   # deref table name if it needs it
82   $table = $$table
83       if ref $table eq 'SCALAR';
84
85   # parse out schema name if present
86   if( $table =~ /^(.+)\.(.+)$/ ) {
87     ( $schema, $table ) = ( $1, $2 );
88   }
89
90   # get the column default using a Postgres-specific pg_catalog query
91   my $seq_expr = $self->_dbh_get_column_default( $dbh, $schema, $table, $col );
92
93   # if no default value is set on the column, or if we can't parse the
94   # default value as a sequence, throw.
95   unless ( defined $seq_expr and $seq_expr =~ /^nextval\(+'([^']+)'::(?:text|regclass)\)/i ) {
96     $seq_expr = '' unless defined $seq_expr;
97     $schema = "$schema." if defined $schema && length $schema;
98     $self->throw_exception( sprintf (
99       'no sequence found for %s%s.%s, check the RDBMS table definition or explicitly set the '.
100       "'sequence' for this column in %s",
101         $schema ? "$schema." : '',
102         $table,
103         $col,
104         $source->source_name,
105     ));
106   }
107
108   return $1;
109 }
110
111 # custom method for fetching column default, since column_info has a
112 # bug with older versions of DBD::Pg
113 sub _dbh_get_column_default {
114   my ( $self, $dbh, $schema, $table, $col ) = @_;
115
116   # Build and execute a query into the pg_catalog to find the Pg
117   # expression for the default value for this column in this table.
118   # If the table name is schema-qualified, query using that specific
119   # schema name.
120
121   # Otherwise, find the table in the standard Postgres way, using the
122   # search path.  This is done with the pg_catalog.pg_table_is_visible
123   # function, which returns true if a given table is 'visible',
124   # meaning the first table of that name to be found in the search
125   # path.
126
127   # I *think* we can be assured that this query will always find the
128   # correct column according to standard Postgres semantics.
129   #
130   # -- rbuels
131
132   my $sqlmaker = $self->sql_maker;
133   local $sqlmaker->{bindtype} = 'normal';
134
135   my ($where, @bind) = $sqlmaker->where ({
136     'a.attnum' => {'>', 0},
137     'c.relname' => $table,
138     'a.attname' => $col,
139     -not_bool => 'a.attisdropped',
140     (defined $schema && length $schema)
141       ? ( 'n.nspname' => $schema )
142       : ( -bool => \'pg_catalog.pg_table_is_visible(c.oid)' )
143   });
144
145   my ($seq_expr) = $dbh->selectrow_array(<<EOS,undef,@bind);
146
147 SELECT
148   (SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
149    FROM pg_catalog.pg_attrdef d
150    WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef)
151 FROM pg_catalog.pg_class c
152      LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
153      JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
154 $where
155
156 EOS
157
158   return $seq_expr;
159 }
160
161
162 sub sqlt_type {
163   return 'PostgreSQL';
164 }
165
166 my $type_cache;
167 sub bind_attribute_by_data_type {
168   my ($self,$data_type) = @_;
169
170   # Ask for a DBD::Pg with array support
171   # pg uses (used?) version::qv()
172   require DBD::Pg;
173   if ($DBD::Pg::VERSION < 2.009002) {
174     carp_once( __PACKAGE__.": DBD::Pg 2.9.2 or greater is strongly recommended\n" );
175   }
176
177   # cache the result of _is_binary_lob_type
178   if (!exists $type_cache->{$data_type}) {
179     $type_cache->{$data_type} = $self->_is_binary_lob_type($data_type)
180       ? +{ pg_type => DBD::Pg::PG_BYTEA() }
181       : undef
182   }
183
184   $type_cache->{$data_type};
185 }
186
187 sub _exec_svp_begin {
188     my ($self, $name) = @_;
189
190     $self->_dbh->pg_savepoint($name);
191 }
192
193 sub _exec_svp_release {
194     my ($self, $name) = @_;
195
196     $self->_dbh->pg_release($name);
197 }
198
199 sub _exec_svp_rollback {
200     my ($self, $name) = @_;
201
202     $self->_dbh->pg_rollback_to($name);
203 }
204
205 sub deployment_statements {
206   my $self = shift;;
207   my ($schema, $type, $version, $dir, $sqltargs, @rest) = @_;
208
209   $sqltargs ||= {};
210
211   if (
212     ! exists $sqltargs->{producer_args}{postgres_version}
213       and
214     my $dver = $self->_server_info->{normalized_dbms_version}
215   ) {
216     $sqltargs->{producer_args}{postgres_version} = $dver;
217   }
218
219   $self->next::method($schema, $type, $version, $dir, $sqltargs, @rest);
220 }
221
222 1;
223
224 __END__
225
226 =head1 NAME
227
228 DBIx::Class::Storage::DBI::Pg - Automatic primary key class for PostgreSQL
229
230 =head1 SYNOPSIS
231
232   # In your result (table) classes
233   use base 'DBIx::Class::Core';
234   __PACKAGE__->set_primary_key('id');
235
236 =head1 DESCRIPTION
237
238 This class implements autoincrements for PostgreSQL.
239
240 =head1 POSTGRESQL SCHEMA SUPPORT
241
242 This driver supports multiple PostgreSQL schemas, with one caveat: for
243 performance reasons, data about the search path, sequence names, and
244 so forth is queried as needed and CACHED for subsequent uses.
245
246 For this reason, once your schema is instantiated, you should not
247 change the PostgreSQL schema search path for that schema's database
248 connection. If you do, Bad Things may happen.
249
250 You should do any necessary manipulation of the search path BEFORE
251 instantiating your schema object, or as part of the on_connect_do
252 option to connect(), for example:
253
254    my $schema = My::Schema->connect
255                   ( $dsn,$user,$pass,
256                     { on_connect_do =>
257                         [ 'SET search_path TO myschema, foo, public' ],
258                     },
259                   );
260
261 =head1 AUTHORS
262
263 See L<DBIx::Class/CONTRIBUTORS>
264
265 =head1 LICENSE
266
267 You may distribute this code under the same terms as Perl itself.
268
269 =cut