52180701bb7f3287fecc84fc21f628c71ade1409
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / MSSQL.pm
1 package DBIx::Class::Storage::DBI::MSSQL;
2
3 use strict;
4 use warnings;
5
6 use base qw/DBIx::Class::Storage::DBI::UniqueIdentifier/;
7 use mro 'c3';
8 use Try::Tiny;
9
10 use List::Util();
11
12 __PACKAGE__->mk_group_accessors(simple => qw/
13   _identity _identity_method
14 /);
15
16 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks::MSSQL');
17
18 sub _set_identity_insert {
19   my ($self, $table) = @_;
20
21   my $sql = sprintf (
22     'SET IDENTITY_INSERT %s ON',
23     $self->sql_maker->_quote ($table),
24   );
25
26   my $dbh = $self->_get_dbh;
27   try { $dbh->do ($sql) }
28   catch {
29     $self->throw_exception (sprintf "Error executing '%s': %s",
30       $sql,
31       $dbh->errstr,
32     );
33   };
34 }
35
36 sub _unset_identity_insert {
37   my ($self, $table) = @_;
38
39   my $sql = sprintf (
40     'SET IDENTITY_INSERT %s OFF',
41     $self->sql_maker->_quote ($table),
42   );
43
44   my $dbh = $self->_get_dbh;
45   $dbh->do ($sql);
46 }
47
48 sub insert_bulk {
49   my $self = shift;
50   my ($source, $cols, $data) = @_;
51
52   my $is_identity_insert = (List::Util::first
53       { $source->column_info ($_)->{is_auto_increment} }
54       (@{$cols})
55   )
56      ? 1
57      : 0;
58
59   if ($is_identity_insert) {
60      $self->_set_identity_insert ($source->name);
61   }
62
63   $self->next::method(@_);
64
65   if ($is_identity_insert) {
66      $self->_unset_identity_insert ($source->name);
67   }
68 }
69
70 sub insert {
71   my $self = shift;
72   my ($source, $to_insert) = @_;
73
74   my $supplied_col_info = $self->_resolve_column_info($source, [keys %$to_insert] );
75
76   my $is_identity_insert = (List::Util::first { $_->{is_auto_increment} } (values %$supplied_col_info) )
77      ? 1
78      : 0;
79
80   if ($is_identity_insert) {
81      $self->_set_identity_insert ($source->name);
82   }
83
84   my $updated_cols = $self->next::method(@_);
85
86   if ($is_identity_insert) {
87      $self->_unset_identity_insert ($source->name);
88   }
89
90   return $updated_cols;
91 }
92
93 sub _prep_for_execute {
94   my $self = shift;
95   my ($op, $extra_bind, $ident, $args) = @_;
96
97 # cast MONEY values properly
98   if ($op eq 'insert' || $op eq 'update') {
99     my $fields = $args->[0];
100
101     for my $col (keys %$fields) {
102       # $ident is a result source object with INSERT/UPDATE ops
103       if ($ident->column_info ($col)->{data_type}
104          &&
105          $ident->column_info ($col)->{data_type} =~ /^money\z/i) {
106         my $val = $fields->{$col};
107         $fields->{$col} = \['CAST(? AS MONEY)', [ $col => $val ]];
108       }
109     }
110   }
111
112   my ($sql, $bind) = $self->next::method (@_);
113
114   if ($op eq 'insert') {
115     $sql .= ';SELECT SCOPE_IDENTITY()';
116
117   }
118
119   return ($sql, $bind);
120 }
121
122 sub _execute {
123   my $self = shift;
124   my ($op) = @_;
125
126   my ($rv, $sth, @bind) = $self->dbh_do($self->can('_dbh_execute'), @_);
127
128   if ($op eq 'insert') {
129
130     # this should bring back the result of SELECT SCOPE_IDENTITY() we tacked
131     # on in _prep_for_execute above
132     my ($identity) = try { $sth->fetchrow_array };
133
134     # SCOPE_IDENTITY failed, but we can do something else
135     if ( (! $identity) && $self->_identity_method) {
136       ($identity) = $self->_dbh->selectrow_array(
137         'select ' . $self->_identity_method
138       );
139     }
140
141     $self->_identity($identity);
142     $sth->finish;
143   }
144
145   return wantarray ? ($rv, $sth, @bind) : $rv;
146 }
147
148 sub last_insert_id { shift->_identity }
149
150 #
151 # MSSQL is retarded wrt ordered subselects. One needs to add a TOP
152 # to *all* subqueries, but one also can't use TOP 100 PERCENT
153 # http://sqladvice.com/forums/permalink/18496/22931/ShowThread.aspx#22931
154 #
155 sub _select_args_to_query {
156   my $self = shift;
157
158   my ($sql, $prep_bind, @rest) = $self->next::method (@_);
159
160   # see if this is an ordered subquery
161   my $attrs = $_[3];
162   if (
163     $sql !~ /^ \s* SELECT \s+ TOP \s+ \d+ \s+ /xi
164       &&
165     scalar $self->_parse_order_by ($attrs->{order_by}) 
166   ) {
167     $self->throw_exception(
168       'An ordered subselect encountered - this is not safe! Please see "Ordered Subselects" in DBIx::Class::Storage::DBI::MSSQL
169     ') unless $attrs->{unsafe_subselect_ok};
170     my $max = 2 ** 32;
171     $sql =~ s/^ \s* SELECT \s/SELECT TOP $max /xi;
172   }
173
174   return wantarray
175     ? ($sql, $prep_bind, @rest)
176     : \[ "($sql)", @$prep_bind ]
177   ;
178 }
179
180
181 # savepoint syntax is the same as in Sybase ASE
182
183 sub _svp_begin {
184   my ($self, $name) = @_;
185
186   $self->_get_dbh->do("SAVE TRANSACTION $name");
187 }
188
189 # A new SAVE TRANSACTION with the same name releases the previous one.
190 sub _svp_release { 1 }
191
192 sub _svp_rollback {
193   my ($self, $name) = @_;
194
195   $self->_get_dbh->do("ROLLBACK TRANSACTION $name");
196 }
197
198 sub datetime_parser_type {
199   'DBIx::Class::Storage::DBI::MSSQL::DateTime::Format'
200
201
202 sub sqlt_type { 'SQLServer' }
203
204 sub sql_maker {
205   my $self = shift;
206
207   unless ($self->_sql_maker) {
208     unless ($self->{_sql_maker_opts}{limit_dialect}) {
209       my $have_rno = 0;
210
211       if (exists $self->_server_info->{normalized_dbms_version}) {
212         $have_rno = 1 if $self->_server_info->{normalized_dbms_version} >= 9;
213       }
214       else {
215         # User is connecting via DBD::Sybase and has no permission to run
216         # stored procedures like xp_msver, or version detection failed for some
217         # other reason.
218         # So, we use a query to check if RNO is implemented.
219         try {
220           $self->_get_dbh->selectrow_array('SELECT row_number() OVER (ORDER BY rand())');
221           $have_rno = 1;
222         };
223       }
224
225       $self->{_sql_maker_opts} = {
226         limit_dialect => ($have_rno ? 'RowNumberOver' : 'Top'),
227         %{$self->{_sql_maker_opts}||{}}
228       };
229     }
230
231     my $maker = $self->next::method (@_);
232   }
233
234   return $self->_sql_maker;
235 }
236
237 sub _ping {
238   my $self = shift;
239
240   my $dbh = $self->_dbh or return 0;
241
242   local $dbh->{RaiseError} = 1;
243   local $dbh->{PrintError} = 0;
244
245   my $rc = 1;
246   try {
247     $dbh->do('select 1');
248   } catch {
249     $rc = 0;
250   };
251
252   return $rc;
253 }
254
255 package # hide from PAUSE
256   DBIx::Class::Storage::DBI::MSSQL::DateTime::Format;
257
258 my $datetime_format      = '%Y-%m-%d %H:%M:%S.%3N'; # %F %T 
259 my $smalldatetime_format = '%Y-%m-%d %H:%M:%S';
260
261 my ($datetime_parser, $smalldatetime_parser);
262
263 sub parse_datetime {
264   shift;
265   require DateTime::Format::Strptime;
266   $datetime_parser ||= DateTime::Format::Strptime->new(
267     pattern  => $datetime_format,
268     on_error => 'croak',
269   );
270   return $datetime_parser->parse_datetime(shift);
271 }
272
273 sub format_datetime {
274   shift;
275   require DateTime::Format::Strptime;
276   $datetime_parser ||= DateTime::Format::Strptime->new(
277     pattern  => $datetime_format,
278     on_error => 'croak',
279   );
280   return $datetime_parser->format_datetime(shift);
281 }
282
283 sub parse_smalldatetime {
284   shift;
285   require DateTime::Format::Strptime;
286   $smalldatetime_parser ||= DateTime::Format::Strptime->new(
287     pattern  => $smalldatetime_format,
288     on_error => 'croak',
289   );
290   return $smalldatetime_parser->parse_datetime(shift);
291 }
292
293 sub format_smalldatetime {
294   shift;
295   require DateTime::Format::Strptime;
296   $smalldatetime_parser ||= DateTime::Format::Strptime->new(
297     pattern  => $smalldatetime_format,
298     on_error => 'croak',
299   );
300   return $smalldatetime_parser->format_datetime(shift);
301 }
302
303 1;
304
305 =head1 NAME
306
307 DBIx::Class::Storage::DBI::MSSQL - Base Class for Microsoft SQL Server support
308 in DBIx::Class
309
310 =head1 SYNOPSIS
311
312 This is the base class for Microsoft SQL Server support, used by
313 L<DBIx::Class::Storage::DBI::ODBC::Microsoft_SQL_Server> and
314 L<DBIx::Class::Storage::DBI::Sybase::Microsoft_SQL_Server>.
315
316 =head1 IMPLEMENTATION NOTES
317
318 =head2 IDENTITY information
319
320 Microsoft SQL Server supports three methods of retrieving the IDENTITY
321 value for inserted row: IDENT_CURRENT, @@IDENTITY, and SCOPE_IDENTITY().
322 SCOPE_IDENTITY is used here because it is the safest.  However, it must
323 be called is the same execute statement, not just the same connection.
324
325 So, this implementation appends a SELECT SCOPE_IDENTITY() statement
326 onto each INSERT to accommodate that requirement.
327
328 C<SELECT @@IDENTITY> can also be used by issuing:
329
330   $self->_identity_method('@@identity');
331
332 it will only be used if SCOPE_IDENTITY() fails.
333
334 This is more dangerous, as inserting into a table with an on insert trigger that
335 inserts into another table with an identity will give erroneous results on
336 recent versions of SQL Server.
337
338 =head2 identity insert
339
340 Be aware that we have tried to make things as simple as possible for our users.
341 For MSSQL that means that when a user tries to create a row, while supplying an
342 explicit value for an autoincrementing column, we will try to issue the
343 appropriate database call to make this possible, namely C<SET IDENTITY_INSERT
344 $table_name ON>. Unfortunately this operation in MSSQL requires the
345 C<db_ddladmin> privilege, which is normally not included in the standard
346 write-permissions.
347
348 =head2 Ordered Subselects
349
350 If you attempted the following query (among many others) in Microsoft SQL
351 Server
352
353  $rs->search ({}, {
354   prefetch => 'relation',
355   rows => 2,
356   offset => 3,
357  });
358
359 You may be surprised to receive an exception. The reason for this is a quirk
360 in the MSSQL engine itself, and sadly doesn't have a sensible workaround due
361 to the way DBIC is built. DBIC can do truly wonderful things with the aid of
362 subselects, and does so automatically when necessary. The list of situations
363 when a subselect is necessary is long and still changes often, so it can not
364 be exhaustively enumerated here. The general rule of thumb is a joined
365 L<has_many|DBIx::Class::Relationship/has_many> relationship with limit/group
366 applied to the left part of the join.
367
368 In its "pursuit of standards" Microsft SQL Server goes to great lengths to
369 forbid the use of ordered subselects. This breaks a very useful group of
370 searches like "Give me things number 4 to 6 (ordered by name), and prefetch
371 all their relations, no matter how many". While there is a hack which fools
372 the syntax checker, the optimizer may B<still elect to break the subselect>.
373 Testing has determined that while such breakage does occur (the test suite
374 contains an explicit test which demonstrates the problem), it is relative
375 rare. The benefits of ordered subselects are on the other hand too great to be
376 outright disabled for MSSQL.
377
378 Thus compromise between usability and perfection is the MSSQL-specific
379 L<resultset attribute|DBIx::Class::ResultSet/ATTRIBUTES> C<unsafe_subselect_ok>.
380 It is deliberately not possible to set this on the Storage level, as the user
381 should inspect (and preferably regression-test) the return of every such
382 ResultSet individually. The example above would work if written like:
383
384  $rs->search ({}, {
385   unsafe_subselect_ok => 1,
386   prefetch => 'relation',
387   rows => 2,
388   offset => 3,
389  });
390
391 If it is possible to rewrite the search() in a way that will avoid the need
392 for this flag - you are urged to do so. If DBIC internals insist that an
393 ordered subselect is necessary for an operation, and you believe there is a
394 different/better way to get the same result - please file a bugreport.
395
396 =head1 AUTHOR
397
398 See L<DBIx::Class/AUTHOR> and L<DBIx::Class/CONTRIBUTORS>.
399
400 =head1 LICENSE
401
402 You may distribute this code under the same terms as Perl itself.
403
404 =cut