use namespace::clean w/ Try::Tiny
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / MSSQL.pm
1 package DBIx::Class::Storage::DBI::MSSQL;
2
3 use strict;
4 use warnings;
5
6 use base qw/DBIx::Class::Storage::DBI::UniqueIdentifier/;
7 use mro 'c3';
8 use Try::Tiny;
9 use namespace::clean;
10
11 use List::Util();
12
13 __PACKAGE__->mk_group_accessors(simple => qw/
14   _identity _identity_method
15 /);
16
17 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks::MSSQL');
18
19 sub _set_identity_insert {
20   my ($self, $table) = @_;
21
22   my $sql = sprintf (
23     'SET IDENTITY_INSERT %s ON',
24     $self->sql_maker->_quote ($table),
25   );
26
27   my $dbh = $self->_get_dbh;
28   try { $dbh->do ($sql) }
29   catch {
30     $self->throw_exception (sprintf "Error executing '%s': %s",
31       $sql,
32       $dbh->errstr,
33     );
34   };
35 }
36
37 sub _unset_identity_insert {
38   my ($self, $table) = @_;
39
40   my $sql = sprintf (
41     'SET IDENTITY_INSERT %s OFF',
42     $self->sql_maker->_quote ($table),
43   );
44
45   my $dbh = $self->_get_dbh;
46   $dbh->do ($sql);
47 }
48
49 sub insert_bulk {
50   my $self = shift;
51   my ($source, $cols, $data) = @_;
52
53   my $is_identity_insert = (List::Util::first
54       { $source->column_info ($_)->{is_auto_increment} }
55       (@{$cols})
56   )
57      ? 1
58      : 0;
59
60   if ($is_identity_insert) {
61      $self->_set_identity_insert ($source->name);
62   }
63
64   $self->next::method(@_);
65
66   if ($is_identity_insert) {
67      $self->_unset_identity_insert ($source->name);
68   }
69 }
70
71 sub insert {
72   my $self = shift;
73   my ($source, $to_insert) = @_;
74
75   my $supplied_col_info = $self->_resolve_column_info($source, [keys %$to_insert] );
76
77   my $is_identity_insert = (List::Util::first { $_->{is_auto_increment} } (values %$supplied_col_info) )
78      ? 1
79      : 0;
80
81   if ($is_identity_insert) {
82      $self->_set_identity_insert ($source->name);
83   }
84
85   my $updated_cols = $self->next::method(@_);
86
87   if ($is_identity_insert) {
88      $self->_unset_identity_insert ($source->name);
89   }
90
91   return $updated_cols;
92 }
93
94 sub _prep_for_execute {
95   my $self = shift;
96   my ($op, $extra_bind, $ident, $args) = @_;
97
98 # cast MONEY values properly
99   if ($op eq 'insert' || $op eq 'update') {
100     my $fields = $args->[0];
101
102     for my $col (keys %$fields) {
103       # $ident is a result source object with INSERT/UPDATE ops
104       if ($ident->column_info ($col)->{data_type}
105          &&
106          $ident->column_info ($col)->{data_type} =~ /^money\z/i) {
107         my $val = $fields->{$col};
108         $fields->{$col} = \['CAST(? AS MONEY)', [ $col => $val ]];
109       }
110     }
111   }
112
113   my ($sql, $bind) = $self->next::method (@_);
114
115   if ($op eq 'insert') {
116     $sql .= ';SELECT SCOPE_IDENTITY()';
117
118   }
119
120   return ($sql, $bind);
121 }
122
123 sub _execute {
124   my $self = shift;
125   my ($op) = @_;
126
127   my ($rv, $sth, @bind) = $self->dbh_do($self->can('_dbh_execute'), @_);
128
129   if ($op eq 'insert') {
130
131     # this should bring back the result of SELECT SCOPE_IDENTITY() we tacked
132     # on in _prep_for_execute above
133     my ($identity) = try { $sth->fetchrow_array };
134
135     # SCOPE_IDENTITY failed, but we can do something else
136     if ( (! $identity) && $self->_identity_method) {
137       ($identity) = $self->_dbh->selectrow_array(
138         'select ' . $self->_identity_method
139       );
140     }
141
142     $self->_identity($identity);
143     $sth->finish;
144   }
145
146   return wantarray ? ($rv, $sth, @bind) : $rv;
147 }
148
149 sub last_insert_id { shift->_identity }
150
151 #
152 # MSSQL is retarded wrt ordered subselects. One needs to add a TOP
153 # to *all* subqueries, but one also can't use TOP 100 PERCENT
154 # http://sqladvice.com/forums/permalink/18496/22931/ShowThread.aspx#22931
155 #
156 sub _select_args_to_query {
157   my $self = shift;
158
159   my ($sql, $prep_bind, @rest) = $self->next::method (@_);
160
161   # see if this is an ordered subquery
162   my $attrs = $_[3];
163   if (
164     $sql !~ /^ \s* SELECT \s+ TOP \s+ \d+ \s+ /xi
165       &&
166     scalar $self->_parse_order_by ($attrs->{order_by})
167   ) {
168     $self->throw_exception(
169       'An ordered subselect encountered - this is not safe! Please see "Ordered Subselects" in DBIx::Class::Storage::DBI::MSSQL
170     ') unless $attrs->{unsafe_subselect_ok};
171     my $max = 2 ** 32;
172     $sql =~ s/^ \s* SELECT \s/SELECT TOP $max /xi;
173   }
174
175   return wantarray
176     ? ($sql, $prep_bind, @rest)
177     : \[ "($sql)", @$prep_bind ]
178   ;
179 }
180
181
182 # savepoint syntax is the same as in Sybase ASE
183
184 sub _svp_begin {
185   my ($self, $name) = @_;
186
187   $self->_get_dbh->do("SAVE TRANSACTION $name");
188 }
189
190 # A new SAVE TRANSACTION with the same name releases the previous one.
191 sub _svp_release { 1 }
192
193 sub _svp_rollback {
194   my ($self, $name) = @_;
195
196   $self->_get_dbh->do("ROLLBACK TRANSACTION $name");
197 }
198
199 sub datetime_parser_type {
200   'DBIx::Class::Storage::DBI::MSSQL::DateTime::Format'
201 }
202
203 sub sqlt_type { 'SQLServer' }
204
205 sub sql_maker {
206   my $self = shift;
207
208   unless ($self->_sql_maker) {
209     unless ($self->{_sql_maker_opts}{limit_dialect}) {
210       my $have_rno = 0;
211
212       if (exists $self->_server_info->{normalized_dbms_version}) {
213         $have_rno = 1 if $self->_server_info->{normalized_dbms_version} >= 9;
214       }
215       else {
216         # User is connecting via DBD::Sybase and has no permission to run
217         # stored procedures like xp_msver, or version detection failed for some
218         # other reason.
219         # So, we use a query to check if RNO is implemented.
220         try {
221           $self->_get_dbh->selectrow_array('SELECT row_number() OVER (ORDER BY rand())');
222           $have_rno = 1;
223         };
224       }
225
226       $self->{_sql_maker_opts} = {
227         limit_dialect => ($have_rno ? 'RowNumberOver' : 'Top'),
228         %{$self->{_sql_maker_opts}||{}}
229       };
230     }
231
232     my $maker = $self->next::method (@_);
233   }
234
235   return $self->_sql_maker;
236 }
237
238 sub _ping {
239   my $self = shift;
240
241   my $dbh = $self->_dbh or return 0;
242
243   local $dbh->{RaiseError} = 1;
244   local $dbh->{PrintError} = 0;
245
246   return try {
247     $dbh->do('select 1');
248     1;
249   } catch {
250     0;
251   };
252 }
253
254 package # hide from PAUSE
255   DBIx::Class::Storage::DBI::MSSQL::DateTime::Format;
256
257 my $datetime_format      = '%Y-%m-%d %H:%M:%S.%3N'; # %F %T
258 my $smalldatetime_format = '%Y-%m-%d %H:%M:%S';
259
260 my ($datetime_parser, $smalldatetime_parser);
261
262 sub parse_datetime {
263   shift;
264   require DateTime::Format::Strptime;
265   $datetime_parser ||= DateTime::Format::Strptime->new(
266     pattern  => $datetime_format,
267     on_error => 'croak',
268   );
269   return $datetime_parser->parse_datetime(shift);
270 }
271
272 sub format_datetime {
273   shift;
274   require DateTime::Format::Strptime;
275   $datetime_parser ||= DateTime::Format::Strptime->new(
276     pattern  => $datetime_format,
277     on_error => 'croak',
278   );
279   return $datetime_parser->format_datetime(shift);
280 }
281
282 sub parse_smalldatetime {
283   shift;
284   require DateTime::Format::Strptime;
285   $smalldatetime_parser ||= DateTime::Format::Strptime->new(
286     pattern  => $smalldatetime_format,
287     on_error => 'croak',
288   );
289   return $smalldatetime_parser->parse_datetime(shift);
290 }
291
292 sub format_smalldatetime {
293   shift;
294   require DateTime::Format::Strptime;
295   $smalldatetime_parser ||= DateTime::Format::Strptime->new(
296     pattern  => $smalldatetime_format,
297     on_error => 'croak',
298   );
299   return $smalldatetime_parser->format_datetime(shift);
300 }
301
302 1;
303
304 =head1 NAME
305
306 DBIx::Class::Storage::DBI::MSSQL - Base Class for Microsoft SQL Server support
307 in DBIx::Class
308
309 =head1 SYNOPSIS
310
311 This is the base class for Microsoft SQL Server support, used by
312 L<DBIx::Class::Storage::DBI::ODBC::Microsoft_SQL_Server> and
313 L<DBIx::Class::Storage::DBI::Sybase::Microsoft_SQL_Server>.
314
315 =head1 IMPLEMENTATION NOTES
316
317 =head2 IDENTITY information
318
319 Microsoft SQL Server supports three methods of retrieving the IDENTITY
320 value for inserted row: IDENT_CURRENT, @@IDENTITY, and SCOPE_IDENTITY().
321 SCOPE_IDENTITY is used here because it is the safest.  However, it must
322 be called is the same execute statement, not just the same connection.
323
324 So, this implementation appends a SELECT SCOPE_IDENTITY() statement
325 onto each INSERT to accommodate that requirement.
326
327 C<SELECT @@IDENTITY> can also be used by issuing:
328
329   $self->_identity_method('@@identity');
330
331 it will only be used if SCOPE_IDENTITY() fails.
332
333 This is more dangerous, as inserting into a table with an on insert trigger that
334 inserts into another table with an identity will give erroneous results on
335 recent versions of SQL Server.
336
337 =head2 identity insert
338
339 Be aware that we have tried to make things as simple as possible for our users.
340 For MSSQL that means that when a user tries to create a row, while supplying an
341 explicit value for an autoincrementing column, we will try to issue the
342 appropriate database call to make this possible, namely C<SET IDENTITY_INSERT
343 $table_name ON>. Unfortunately this operation in MSSQL requires the
344 C<db_ddladmin> privilege, which is normally not included in the standard
345 write-permissions.
346
347 =head2 Ordered Subselects
348
349 If you attempted the following query (among many others) in Microsoft SQL
350 Server
351
352  $rs->search ({}, {
353   prefetch => 'relation',
354   rows => 2,
355   offset => 3,
356  });
357
358 You may be surprised to receive an exception. The reason for this is a quirk
359 in the MSSQL engine itself, and sadly doesn't have a sensible workaround due
360 to the way DBIC is built. DBIC can do truly wonderful things with the aid of
361 subselects, and does so automatically when necessary. The list of situations
362 when a subselect is necessary is long and still changes often, so it can not
363 be exhaustively enumerated here. The general rule of thumb is a joined
364 L<has_many|DBIx::Class::Relationship/has_many> relationship with limit/group
365 applied to the left part of the join.
366
367 In its "pursuit of standards" Microsft SQL Server goes to great lengths to
368 forbid the use of ordered subselects. This breaks a very useful group of
369 searches like "Give me things number 4 to 6 (ordered by name), and prefetch
370 all their relations, no matter how many". While there is a hack which fools
371 the syntax checker, the optimizer may B<still elect to break the subselect>.
372 Testing has determined that while such breakage does occur (the test suite
373 contains an explicit test which demonstrates the problem), it is relative
374 rare. The benefits of ordered subselects are on the other hand too great to be
375 outright disabled for MSSQL.
376
377 Thus compromise between usability and perfection is the MSSQL-specific
378 L<resultset attribute|DBIx::Class::ResultSet/ATTRIBUTES> C<unsafe_subselect_ok>.
379 It is deliberately not possible to set this on the Storage level, as the user
380 should inspect (and preferably regression-test) the return of every such
381 ResultSet individually. The example above would work if written like:
382
383  $rs->search ({}, {
384   unsafe_subselect_ok => 1,
385   prefetch => 'relation',
386   rows => 2,
387   offset => 3,
388  });
389
390 If it is possible to rewrite the search() in a way that will avoid the need
391 for this flag - you are urged to do so. If DBIC internals insist that an
392 ordered subselect is necessary for an operation, and you believe there is a
393 different/better way to get the same result - please file a bugreport.
394
395 =head1 AUTHOR
396
397 See L<DBIx::Class/AUTHOR> and L<DBIx::Class/CONTRIBUTORS>.
398
399 =head1 LICENSE
400
401 You may distribute this code under the same terms as Perl itself.
402
403 =cut