515ff9b53172c6ad558e8014f74ce087a9d9a1c4
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / MSSQL.pm
1 package DBIx::Class::Storage::DBI::MSSQL;
2
3 use strict;
4 use warnings;
5
6 use base qw/DBIx::Class::Storage::DBI::UniqueIdentifier/;
7 use mro 'c3';
8
9 use List::Util();
10
11 __PACKAGE__->mk_group_accessors(simple => qw/
12   _identity _identity_method
13 /);
14
15 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks::MSSQL');
16
17 sub _set_identity_insert {
18   my ($self, $table) = @_;
19
20   my $sql = sprintf (
21     'SET IDENTITY_INSERT %s ON',
22     $self->sql_maker->_quote ($table),
23   );
24
25   my $dbh = $self->_get_dbh;
26   eval { $dbh->do ($sql) };
27   if ($@) {
28     $self->throw_exception (sprintf "Error executing '%s': %s",
29       $sql,
30       $dbh->errstr,
31     );
32   }
33 }
34
35 sub _unset_identity_insert {
36   my ($self, $table) = @_;
37
38   my $sql = sprintf (
39     'SET IDENTITY_INSERT %s OFF',
40     $self->sql_maker->_quote ($table),
41   );
42
43   my $dbh = $self->_get_dbh;
44   $dbh->do ($sql);
45 }
46
47 sub insert_bulk {
48   my $self = shift;
49   my ($source, $cols, $data) = @_;
50
51   my $is_identity_insert = (List::Util::first
52       { $source->column_info ($_)->{is_auto_increment} }
53       (@{$cols})
54   )
55      ? 1
56      : 0;
57
58   if ($is_identity_insert) {
59      $self->_set_identity_insert ($source->name);
60   }
61
62   $self->next::method(@_);
63
64   if ($is_identity_insert) {
65      $self->_unset_identity_insert ($source->name);
66   }
67 }
68
69 sub insert {
70   my $self = shift;
71   my ($source, $to_insert) = @_;
72
73   my $supplied_col_info = $self->_resolve_column_info($source, [keys %$to_insert] );
74
75   my $is_identity_insert = (List::Util::first { $_->{is_auto_increment} } (values %$supplied_col_info) )
76      ? 1
77      : 0;
78
79   if ($is_identity_insert) {
80      $self->_set_identity_insert ($source->name);
81   }
82
83   my $updated_cols = $self->next::method(@_);
84
85   if ($is_identity_insert) {
86      $self->_unset_identity_insert ($source->name);
87   }
88
89   return $updated_cols;
90 }
91
92 sub _prep_for_execute {
93   my $self = shift;
94   my ($op, $extra_bind, $ident, $args) = @_;
95
96 # cast MONEY values properly
97   if ($op eq 'insert' || $op eq 'update') {
98     my $fields = $args->[0];
99
100     for my $col (keys %$fields) {
101       # $ident is a result source object with INSERT/UPDATE ops
102       if ($ident->column_info ($col)->{data_type}
103          &&
104          $ident->column_info ($col)->{data_type} =~ /^money\z/i) {
105         my $val = $fields->{$col};
106         $fields->{$col} = \['CAST(? AS MONEY)', [ $col => $val ]];
107       }
108     }
109   }
110
111   my ($sql, $bind) = $self->next::method (@_);
112
113   if ($op eq 'insert') {
114     $sql .= ';SELECT SCOPE_IDENTITY()';
115
116   }
117
118   return ($sql, $bind);
119 }
120
121 sub _execute {
122   my $self = shift;
123   my ($op) = @_;
124
125   my ($rv, $sth, @bind) = $self->dbh_do($self->can('_dbh_execute'), @_);
126
127   if ($op eq 'insert') {
128
129     # this should bring back the result of SELECT SCOPE_IDENTITY() we tacked
130     # on in _prep_for_execute above
131     my ($identity) = eval { $sth->fetchrow_array };
132
133     # SCOPE_IDENTITY failed, but we can do something else
134     if ( (! $identity) && $self->_identity_method) {
135       ($identity) = $self->_dbh->selectrow_array(
136         'select ' . $self->_identity_method
137       );
138     }
139
140     $self->_identity($identity);
141     $sth->finish;
142   }
143
144   return wantarray ? ($rv, $sth, @bind) : $rv;
145 }
146
147 sub last_insert_id { shift->_identity }
148
149 #
150 # MSSQL is retarded wrt ordered subselects. One needs to add a TOP
151 # to *all* subqueries, but one also can't use TOP 100 PERCENT
152 # http://sqladvice.com/forums/permalink/18496/22931/ShowThread.aspx#22931
153 #
154 sub _select_args_to_query {
155   my $self = shift;
156
157   my ($sql, $prep_bind, @rest) = $self->next::method (@_);
158
159   # see if this is an ordered subquery
160   my $attrs = $_[3];
161   if (
162     $sql !~ /^ \s* SELECT \s+ TOP \s+ \d+ \s+ /xi
163       &&
164     scalar $self->_parse_order_by ($attrs->{order_by}) 
165   ) {
166     $self->throw_exception(
167       'An ordered subselect encountered - this is not safe! Please see "Ordered Subselects" in DBIx::Class::Storage::DBI::MSSQL
168     ') unless $attrs->{unsafe_subselect_ok};
169     my $max = 2 ** 32;
170     $sql =~ s/^ \s* SELECT \s/SELECT TOP $max /xi;
171   }
172
173   return wantarray
174     ? ($sql, $prep_bind, @rest)
175     : \[ "($sql)", @$prep_bind ]
176   ;
177 }
178
179
180 # savepoint syntax is the same as in Sybase ASE
181
182 sub _svp_begin {
183   my ($self, $name) = @_;
184
185   $self->_get_dbh->do("SAVE TRANSACTION $name");
186 }
187
188 # A new SAVE TRANSACTION with the same name releases the previous one.
189 sub _svp_release { 1 }
190
191 sub _svp_rollback {
192   my ($self, $name) = @_;
193
194   $self->_get_dbh->do("ROLLBACK TRANSACTION $name");
195 }
196
197 sub datetime_parser_type {
198   'DBIx::Class::Storage::DBI::MSSQL::DateTime::Format'
199
200
201 sub sqlt_type { 'SQLServer' }
202
203 sub sql_maker {
204   my $self = shift;
205
206   unless ($self->_sql_maker) {
207     unless ($self->{_sql_maker_opts}{limit_dialect}) {
208       my $have_rno = 0;
209
210       if (exists $self->_server_info->{normalized_dbms_version}) {
211         $have_rno = 1 if $self->_server_info->{normalized_dbms_version} >= 9;
212       }
213       else {
214         # User is connecting via DBD::Sybase and has no permission to run
215         # stored procedures like xp_msver, or version detection failed for some
216         # other reason.
217         # So, we use a query to check if RNO is implemented.
218         $have_rno = 1 if (eval { local $@; ($self->_get_dbh
219           ->selectrow_array('SELECT row_number() OVER (ORDER BY rand())')
220           )[0] });
221       }
222
223       $self->{_sql_maker_opts} = {
224         limit_dialect => ($have_rno ? 'RowNumberOver' : 'Top'),
225         %{$self->{_sql_maker_opts}||{}}
226       };
227     }
228
229     my $maker = $self->next::method (@_);
230   }
231
232   return $self->_sql_maker;
233 }
234
235 sub _ping {
236   my $self = shift;
237
238   my $dbh = $self->_dbh or return 0;
239
240   local $dbh->{RaiseError} = 1;
241   local $dbh->{PrintError} = 0;
242
243   eval {
244     $dbh->do('select 1');
245   };
246
247   return $@ ? 0 : 1;
248 }
249
250 package # hide from PAUSE
251   DBIx::Class::Storage::DBI::MSSQL::DateTime::Format;
252
253 my $datetime_format      = '%Y-%m-%d %H:%M:%S.%3N'; # %F %T 
254 my $smalldatetime_format = '%Y-%m-%d %H:%M:%S';
255
256 my ($datetime_parser, $smalldatetime_parser);
257
258 sub parse_datetime {
259   shift;
260   require DateTime::Format::Strptime;
261   $datetime_parser ||= DateTime::Format::Strptime->new(
262     pattern  => $datetime_format,
263     on_error => 'croak',
264   );
265   return $datetime_parser->parse_datetime(shift);
266 }
267
268 sub format_datetime {
269   shift;
270   require DateTime::Format::Strptime;
271   $datetime_parser ||= DateTime::Format::Strptime->new(
272     pattern  => $datetime_format,
273     on_error => 'croak',
274   );
275   return $datetime_parser->format_datetime(shift);
276 }
277
278 sub parse_smalldatetime {
279   shift;
280   require DateTime::Format::Strptime;
281   $smalldatetime_parser ||= DateTime::Format::Strptime->new(
282     pattern  => $smalldatetime_format,
283     on_error => 'croak',
284   );
285   return $smalldatetime_parser->parse_datetime(shift);
286 }
287
288 sub format_smalldatetime {
289   shift;
290   require DateTime::Format::Strptime;
291   $smalldatetime_parser ||= DateTime::Format::Strptime->new(
292     pattern  => $smalldatetime_format,
293     on_error => 'croak',
294   );
295   return $smalldatetime_parser->format_datetime(shift);
296 }
297
298 1;
299
300 =head1 NAME
301
302 DBIx::Class::Storage::DBI::MSSQL - Base Class for Microsoft SQL Server support
303 in DBIx::Class
304
305 =head1 SYNOPSIS
306
307 This is the base class for Microsoft SQL Server support, used by
308 L<DBIx::Class::Storage::DBI::ODBC::Microsoft_SQL_Server> and
309 L<DBIx::Class::Storage::DBI::Sybase::Microsoft_SQL_Server>.
310
311 =head1 IMPLEMENTATION NOTES
312
313 =head2 IDENTITY information
314
315 Microsoft SQL Server supports three methods of retrieving the IDENTITY
316 value for inserted row: IDENT_CURRENT, @@IDENTITY, and SCOPE_IDENTITY().
317 SCOPE_IDENTITY is used here because it is the safest.  However, it must
318 be called is the same execute statement, not just the same connection.
319
320 So, this implementation appends a SELECT SCOPE_IDENTITY() statement
321 onto each INSERT to accommodate that requirement.
322
323 C<SELECT @@IDENTITY> can also be used by issuing:
324
325   $self->_identity_method('@@identity');
326
327 it will only be used if SCOPE_IDENTITY() fails.
328
329 This is more dangerous, as inserting into a table with an on insert trigger that
330 inserts into another table with an identity will give erroneous results on
331 recent versions of SQL Server.
332
333 =head2 identity insert
334
335 Be aware that we have tried to make things as simple as possible for our users.
336 For MSSQL that means that when a user tries to create a row, while supplying an
337 explicit value for an autoincrementing column, we will try to issue the
338 appropriate database call to make this possible, namely C<SET IDENTITY_INSERT
339 $table_name ON>. Unfortunately this operation in MSSQL requires the
340 C<db_ddladmin> privilege, which is normally not included in the standard
341 write-permissions.
342
343 =head2 Ordered Subselects
344
345 If you attempted the following query (among many others) in Microsoft SQL
346 Server
347
348  $rs->search ({}, {
349   prefetch => 'relation',
350   rows => 2,
351   offset => 3,
352  });
353
354 You may be surprised to receive an exception. The reason for this is a quirk
355 in the MSSQL engine itself, and sadly doesn't have a sensible workaround due
356 to the way DBIC is built. DBIC can do truly wonderful things with the aid of
357 subselects, and does so automatically when necessary. The list of situations
358 when a subselect is necessary is long and still changes often, so it can not
359 be exhaustively enumerated here. The general rule of thumb is a joined
360 L<has_many|DBIx::Class::Relationship/has_many> relationship with limit/group
361 applied to the left part of the join.
362
363 In its "pursuit of standards" Microsft SQL Server goes to great lengths to
364 forbid the use of ordered subselects. This breaks a very useful group of
365 searches like "Give me things number 4 to 6 (ordered by name), and prefetch
366 all their relations, no matter how many". While there is a hack which fools
367 the syntax checker, the optimizer may B<still elect to break the subselect>.
368 Testing has determined that while such breakage does occur (the test suite
369 contains an explicit test which demonstrates the problem), it is relative
370 rare. The benefits of ordered subselects are on the other hand too great to be
371 outright disabled for MSSQL.
372
373 Thus compromise between usability and perfection is the MSSQL-specific
374 L<resultset attribute|DBIx::Class::ResultSet/ATTRIBUTES> C<unsafe_subselect_ok>.
375 It is deliberately not possible to set this on the Storage level, as the user
376 should inspect (and preferably regression-test) the return of every such
377 ResultSet individually. The example above would work if written like:
378
379  $rs->search ({}, {
380   unsafe_subselect_ok => 1,
381   prefetch => 'relation',
382   rows => 2,
383   offset => 3,
384  });
385
386 If it is possible to rewrite the search() in a way that will avoid the need
387 for this flag - you are urged to do so. If DBIC internals insist that an
388 ordered subselect is necessary for an operation, and you believe there is a
389 different/better way to get the same result - please file a bugreport.
390
391 =head1 AUTHOR
392
393 See L<DBIx::Class/AUTHOR> and L<DBIx::Class/CONTRIBUTORS>.
394
395 =head1 LICENSE
396
397 You may distribute this code under the same terms as Perl itself.
398
399 =cut