0b1e57e8f5dde536289f9a91fe65a0287ba59305
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / MSSQL.pm
1 package DBIx::Class::Storage::DBI::MSSQL;
2
3 use strict;
4 use warnings;
5
6 use base qw/DBIx::Class::Storage::DBI::UniqueIdentifier/;
7 use mro 'c3';
8 use Try::Tiny;
9
10 use List::Util();
11
12 __PACKAGE__->mk_group_accessors(simple => qw/
13   _identity _identity_method
14 /);
15
16 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks::MSSQL');
17
18 sub _set_identity_insert {
19   my ($self, $table) = @_;
20
21   my $sql = sprintf (
22     'SET IDENTITY_INSERT %s ON',
23     $self->sql_maker->_quote ($table),
24   );
25
26   my $dbh = $self->_get_dbh;
27   try { $dbh->do ($sql) }
28   catch {
29     $self->throw_exception (sprintf "Error executing '%s': %s",
30       $sql,
31       $dbh->errstr,
32     );
33   };
34 }
35
36 sub _unset_identity_insert {
37   my ($self, $table) = @_;
38
39   my $sql = sprintf (
40     'SET IDENTITY_INSERT %s OFF',
41     $self->sql_maker->_quote ($table),
42   );
43
44   my $dbh = $self->_get_dbh;
45   $dbh->do ($sql);
46 }
47
48 sub insert_bulk {
49   my $self = shift;
50   my ($source, $cols, $data) = @_;
51
52   my $is_identity_insert = (List::Util::first
53       { $source->column_info ($_)->{is_auto_increment} }
54       (@{$cols})
55   )
56      ? 1
57      : 0;
58
59   if ($is_identity_insert) {
60      $self->_set_identity_insert ($source->name);
61   }
62
63   $self->next::method(@_);
64
65   if ($is_identity_insert) {
66      $self->_unset_identity_insert ($source->name);
67   }
68 }
69
70 sub insert {
71   my $self = shift;
72   my ($source, $to_insert) = @_;
73
74   my $supplied_col_info = $self->_resolve_column_info($source, [keys %$to_insert] );
75
76   my $is_identity_insert = (List::Util::first { $_->{is_auto_increment} } (values %$supplied_col_info) )
77      ? 1
78      : 0;
79
80   if ($is_identity_insert) {
81      $self->_set_identity_insert ($source->name);
82   }
83
84   my $updated_cols = $self->next::method(@_);
85
86   if ($is_identity_insert) {
87      $self->_unset_identity_insert ($source->name);
88   }
89
90   return $updated_cols;
91 }
92
93 sub _prep_for_execute {
94   my $self = shift;
95   my ($op, $extra_bind, $ident, $args) = @_;
96
97 # cast MONEY values properly
98   if ($op eq 'insert' || $op eq 'update') {
99     my $fields = $args->[0];
100
101     for my $col (keys %$fields) {
102       # $ident is a result source object with INSERT/UPDATE ops
103       if ($ident->column_info ($col)->{data_type}
104          &&
105          $ident->column_info ($col)->{data_type} =~ /^money\z/i) {
106         my $val = $fields->{$col};
107         $fields->{$col} = \['CAST(? AS MONEY)', [ $col => $val ]];
108       }
109     }
110   }
111
112   my ($sql, $bind) = $self->next::method (@_);
113
114   if ($op eq 'insert') {
115     $sql .= ';SELECT SCOPE_IDENTITY()';
116
117   }
118
119   return ($sql, $bind);
120 }
121
122 sub _execute {
123   my $self = shift;
124   my ($op) = @_;
125
126   my ($rv, $sth, @bind) = $self->dbh_do($self->can('_dbh_execute'), @_);
127
128   if ($op eq 'insert') {
129
130     # this should bring back the result of SELECT SCOPE_IDENTITY() we tacked
131     # on in _prep_for_execute above
132     my ($identity) = try { $sth->fetchrow_array };
133
134     # SCOPE_IDENTITY failed, but we can do something else
135     if ( (! $identity) && $self->_identity_method) {
136       ($identity) = $self->_dbh->selectrow_array(
137         'select ' . $self->_identity_method
138       );
139     }
140
141     $self->_identity($identity);
142     $sth->finish;
143   }
144
145   return wantarray ? ($rv, $sth, @bind) : $rv;
146 }
147
148 sub last_insert_id { shift->_identity }
149
150 #
151 # MSSQL is retarded wrt ordered subselects. One needs to add a TOP
152 # to *all* subqueries, but one also can't use TOP 100 PERCENT
153 # http://sqladvice.com/forums/permalink/18496/22931/ShowThread.aspx#22931
154 #
155 sub _select_args_to_query {
156   my $self = shift;
157
158   my ($sql, $prep_bind, @rest) = $self->next::method (@_);
159
160   # see if this is an ordered subquery
161   my $attrs = $_[3];
162   if (
163     $sql !~ /^ \s* SELECT \s+ TOP \s+ \d+ \s+ /xi
164       &&
165     scalar $self->_parse_order_by ($attrs->{order_by}) 
166   ) {
167     $self->throw_exception(
168       'An ordered subselect encountered - this is not safe! Please see "Ordered Subselects" in DBIx::Class::Storage::DBI::MSSQL
169     ') unless $attrs->{unsafe_subselect_ok};
170     my $max = 2 ** 32;
171     $sql =~ s/^ \s* SELECT \s/SELECT TOP $max /xi;
172   }
173
174   return wantarray
175     ? ($sql, $prep_bind, @rest)
176     : \[ "($sql)", @$prep_bind ]
177   ;
178 }
179
180
181 # savepoint syntax is the same as in Sybase ASE
182
183 sub _svp_begin {
184   my ($self, $name) = @_;
185
186   $self->_get_dbh->do("SAVE TRANSACTION $name");
187 }
188
189 # A new SAVE TRANSACTION with the same name releases the previous one.
190 sub _svp_release { 1 }
191
192 sub _svp_rollback {
193   my ($self, $name) = @_;
194
195   $self->_get_dbh->do("ROLLBACK TRANSACTION $name");
196 }
197
198 sub datetime_parser_type {
199   'DBIx::Class::Storage::DBI::MSSQL::DateTime::Format'
200
201
202 sub sqlt_type { 'SQLServer' }
203
204 sub sql_maker {
205   my $self = shift;
206
207   unless ($self->_sql_maker) {
208     unless ($self->{_sql_maker_opts}{limit_dialect}) {
209       my $have_rno = 0;
210
211       if (exists $self->_server_info->{normalized_dbms_version}) {
212         $have_rno = 1 if $self->_server_info->{normalized_dbms_version} >= 9;
213       }
214       else {
215         # User is connecting via DBD::Sybase and has no permission to run
216         # stored procedures like xp_msver, or version detection failed for some
217         # other reason.
218         # So, we use a query to check if RNO is implemented.
219         try {
220           $self->_get_dbh->selectrow_array('SELECT row_number() OVER (ORDER BY rand())');
221           $have_rno = 1;
222         };
223       }
224
225       $self->{_sql_maker_opts} = {
226         limit_dialect => ($have_rno ? 'RowNumberOver' : 'Top'),
227         %{$self->{_sql_maker_opts}||{}}
228       };
229     }
230
231     my $maker = $self->next::method (@_);
232   }
233
234   return $self->_sql_maker;
235 }
236
237 sub _ping {
238   my $self = shift;
239
240   my $dbh = $self->_dbh or return 0;
241
242   local $dbh->{RaiseError} = 1;
243   local $dbh->{PrintError} = 0;
244
245   return try {
246     $dbh->do('select 1');
247     1;
248   } catch {
249     0;
250   };
251 }
252
253 package # hide from PAUSE
254   DBIx::Class::Storage::DBI::MSSQL::DateTime::Format;
255
256 my $datetime_format      = '%Y-%m-%d %H:%M:%S.%3N'; # %F %T 
257 my $smalldatetime_format = '%Y-%m-%d %H:%M:%S';
258
259 my ($datetime_parser, $smalldatetime_parser);
260
261 sub parse_datetime {
262   shift;
263   require DateTime::Format::Strptime;
264   $datetime_parser ||= DateTime::Format::Strptime->new(
265     pattern  => $datetime_format,
266     on_error => 'croak',
267   );
268   return $datetime_parser->parse_datetime(shift);
269 }
270
271 sub format_datetime {
272   shift;
273   require DateTime::Format::Strptime;
274   $datetime_parser ||= DateTime::Format::Strptime->new(
275     pattern  => $datetime_format,
276     on_error => 'croak',
277   );
278   return $datetime_parser->format_datetime(shift);
279 }
280
281 sub parse_smalldatetime {
282   shift;
283   require DateTime::Format::Strptime;
284   $smalldatetime_parser ||= DateTime::Format::Strptime->new(
285     pattern  => $smalldatetime_format,
286     on_error => 'croak',
287   );
288   return $smalldatetime_parser->parse_datetime(shift);
289 }
290
291 sub format_smalldatetime {
292   shift;
293   require DateTime::Format::Strptime;
294   $smalldatetime_parser ||= DateTime::Format::Strptime->new(
295     pattern  => $smalldatetime_format,
296     on_error => 'croak',
297   );
298   return $smalldatetime_parser->format_datetime(shift);
299 }
300
301 1;
302
303 =head1 NAME
304
305 DBIx::Class::Storage::DBI::MSSQL - Base Class for Microsoft SQL Server support
306 in DBIx::Class
307
308 =head1 SYNOPSIS
309
310 This is the base class for Microsoft SQL Server support, used by
311 L<DBIx::Class::Storage::DBI::ODBC::Microsoft_SQL_Server> and
312 L<DBIx::Class::Storage::DBI::Sybase::Microsoft_SQL_Server>.
313
314 =head1 IMPLEMENTATION NOTES
315
316 =head2 IDENTITY information
317
318 Microsoft SQL Server supports three methods of retrieving the IDENTITY
319 value for inserted row: IDENT_CURRENT, @@IDENTITY, and SCOPE_IDENTITY().
320 SCOPE_IDENTITY is used here because it is the safest.  However, it must
321 be called is the same execute statement, not just the same connection.
322
323 So, this implementation appends a SELECT SCOPE_IDENTITY() statement
324 onto each INSERT to accommodate that requirement.
325
326 C<SELECT @@IDENTITY> can also be used by issuing:
327
328   $self->_identity_method('@@identity');
329
330 it will only be used if SCOPE_IDENTITY() fails.
331
332 This is more dangerous, as inserting into a table with an on insert trigger that
333 inserts into another table with an identity will give erroneous results on
334 recent versions of SQL Server.
335
336 =head2 identity insert
337
338 Be aware that we have tried to make things as simple as possible for our users.
339 For MSSQL that means that when a user tries to create a row, while supplying an
340 explicit value for an autoincrementing column, we will try to issue the
341 appropriate database call to make this possible, namely C<SET IDENTITY_INSERT
342 $table_name ON>. Unfortunately this operation in MSSQL requires the
343 C<db_ddladmin> privilege, which is normally not included in the standard
344 write-permissions.
345
346 =head2 Ordered Subselects
347
348 If you attempted the following query (among many others) in Microsoft SQL
349 Server
350
351  $rs->search ({}, {
352   prefetch => 'relation',
353   rows => 2,
354   offset => 3,
355  });
356
357 You may be surprised to receive an exception. The reason for this is a quirk
358 in the MSSQL engine itself, and sadly doesn't have a sensible workaround due
359 to the way DBIC is built. DBIC can do truly wonderful things with the aid of
360 subselects, and does so automatically when necessary. The list of situations
361 when a subselect is necessary is long and still changes often, so it can not
362 be exhaustively enumerated here. The general rule of thumb is a joined
363 L<has_many|DBIx::Class::Relationship/has_many> relationship with limit/group
364 applied to the left part of the join.
365
366 In its "pursuit of standards" Microsft SQL Server goes to great lengths to
367 forbid the use of ordered subselects. This breaks a very useful group of
368 searches like "Give me things number 4 to 6 (ordered by name), and prefetch
369 all their relations, no matter how many". While there is a hack which fools
370 the syntax checker, the optimizer may B<still elect to break the subselect>.
371 Testing has determined that while such breakage does occur (the test suite
372 contains an explicit test which demonstrates the problem), it is relative
373 rare. The benefits of ordered subselects are on the other hand too great to be
374 outright disabled for MSSQL.
375
376 Thus compromise between usability and perfection is the MSSQL-specific
377 L<resultset attribute|DBIx::Class::ResultSet/ATTRIBUTES> C<unsafe_subselect_ok>.
378 It is deliberately not possible to set this on the Storage level, as the user
379 should inspect (and preferably regression-test) the return of every such
380 ResultSet individually. The example above would work if written like:
381
382  $rs->search ({}, {
383   unsafe_subselect_ok => 1,
384   prefetch => 'relation',
385   rows => 2,
386   offset => 3,
387  });
388
389 If it is possible to rewrite the search() in a way that will avoid the need
390 for this flag - you are urged to do so. If DBIC internals insist that an
391 ordered subselect is necessary for an operation, and you believe there is a
392 different/better way to get the same result - please file a bugreport.
393
394 =head1 AUTHOR
395
396 See L<DBIx::Class/AUTHOR> and L<DBIx::Class/CONTRIBUTORS>.
397
398 =head1 LICENSE
399
400 You may distribute this code under the same terms as Perl itself.
401
402 =cut