made commit/rollback when disconnected an exception
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / SQLAHacks.pm
1 package # Hide from PAUSE
2   DBIx::Class::SQLAHacks;
3
4 # This module is a subclass of SQL::Abstract::Limit and includes a number
5 # of DBIC-specific workarounds, not yet suitable for inclusion into the
6 # SQLA core
7
8 use base qw/SQL::Abstract::Limit/;
9 use strict;
10 use warnings;
11 use Carp::Clan qw/^DBIx::Class|^SQL::Abstract/;
12
13 BEGIN {
14   # reinstall the carp()/croak() functions imported into SQL::Abstract
15   # as Carp and Carp::Clan do not like each other much
16   no warnings qw/redefine/;
17   no strict qw/refs/;
18   for my $f (qw/carp croak/) {
19
20     my $orig = \&{"SQL::Abstract::$f"};
21     *{"SQL::Abstract::$f"} = sub {
22
23       local $Carp::CarpLevel = 1;   # even though Carp::Clan ignores this, $orig will not
24
25       if (Carp::longmess() =~ /DBIx::Class::SQLAHacks::[\w]+ .+? called \s at/x) {
26         __PACKAGE__->can($f)->(@_);
27       }
28       else {
29         $orig->(@_);
30       }
31     }
32   }
33 }
34
35
36 # Tries to determine limit dialect.
37 #
38 sub new {
39   my $self = shift->SUPER::new(@_);
40
41   # This prevents the caching of $dbh in S::A::L, I believe
42   # If limit_dialect is a ref (like a $dbh), go ahead and replace
43   #   it with what it resolves to:
44   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
45     if ref $self->{limit_dialect};
46
47   $self;
48 }
49
50
51 # Slow but ANSI standard Limit/Offset support. DB2 uses this
52 sub _RowNumberOver {
53   my ($self, $sql, $order, $rows, $offset ) = @_;
54
55   $offset += 1;
56   my $last = $rows + $offset - 1;
57   my ( $order_by ) = $self->_order_by( $order );
58
59   $sql = <<"SQL";
60 SELECT * FROM
61 (
62    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
63       $sql
64       $order_by
65    ) Q1
66 ) Q2
67 WHERE ROW_NUM BETWEEN $offset AND $last
68
69 SQL
70
71   return $sql;
72 }
73
74 # Crappy Top based Limit/Offset support. MSSQL uses this currently,
75 # but may have to switch to RowNumberOver one day
76 sub _Top {
77   my ( $self, $sql, $order, $rows, $offset ) = @_;
78
79   # mangle the input sql so it can be properly aliased in the outer queries
80   $sql =~ s/^ \s* SELECT \s+ (.+?) \s+ (?=FROM)//ix
81     or croak "Unrecognizable SELECT: $sql";
82   my $sql_select = $1;
83   my @sql_select = split (/\s*,\s*/, $sql_select);
84
85   # we can't support subqueries (in fact MSSQL can't) - croak
86   if (@sql_select != @{$self->{_dbic_rs_attrs}{select}}) {
87     croak (sprintf (
88       'SQL SELECT did not parse cleanly - retrieved %d comma separated elements, while '
89     . 'the resultset select attribure contains %d elements: %s',
90       scalar @sql_select,
91       scalar @{$self->{_dbic_rs_attrs}{select}},
92       $sql_select,
93     ));
94   }
95
96   my $name_sep = $self->name_sep || '.';
97   my $esc_name_sep = "\Q$name_sep\E";
98   my $col_re = qr/ ^ (?: (.+) $esc_name_sep )? ([^$esc_name_sep]+) $ /x;
99
100   my $rs_alias = $self->{_dbic_rs_attrs}{alias};
101   my $quoted_rs_alias = $self->_quote ($rs_alias);
102
103   # construct the new select lists, rename(alias) some columns if necessary
104   my (@outer_select, @inner_select, %seen_names, %col_aliases, %outer_col_aliases);
105
106   for (@{$self->{_dbic_rs_attrs}{select}}) {
107     next if ref $_;
108     my ($table, $orig_colname) = ( $_ =~ $col_re );
109     next unless $table;
110     $seen_names{$orig_colname}++;
111   }
112
113   for my $i (0 .. $#sql_select) {
114
115     my $colsel_arg = $self->{_dbic_rs_attrs}{select}[$i];
116     my $colsel_sql = $sql_select[$i];
117
118     # this may or may not work (in case of a scalarref or something)
119     my ($table, $orig_colname) = ( $colsel_arg =~ $col_re );
120
121     my $quoted_alias;
122     # do not attempt to understand non-scalar selects - alias numerically
123     if (ref $colsel_arg) {
124       $quoted_alias = $self->_quote ('column_' . (@inner_select + 1) );
125     }
126     # column name seen more than once - alias it
127     elsif ($orig_colname &&
128           ($seen_names{$orig_colname} && $seen_names{$orig_colname} > 1) ) {
129       $quoted_alias = $self->_quote ("${table}__${orig_colname}");
130     }
131
132     # we did rename - make a record and adjust
133     if ($quoted_alias) {
134       # alias inner
135       push @inner_select, "$colsel_sql AS $quoted_alias";
136
137       # push alias to outer
138       push @outer_select, $quoted_alias;
139
140       # Any aliasing accumulated here will be considered
141       # both for inner and outer adjustments of ORDER BY
142       $self->__record_alias (
143         \%col_aliases,
144         $quoted_alias,
145         $colsel_arg,
146         $table ? $orig_colname : undef,
147       );
148     }
149
150     # otherwise just leave things intact inside, and use the abbreviated one outside
151     # (as we do not have table names anymore)
152     else {
153       push @inner_select, $colsel_sql;
154
155       my $outer_quoted = $self->_quote ($orig_colname);  # it was not a duplicate so should just work
156       push @outer_select, $outer_quoted;
157       $self->__record_alias (
158         \%outer_col_aliases,
159         $outer_quoted,
160         $colsel_arg,
161         $table ? $orig_colname : undef,
162       );
163     }
164   }
165
166   my $outer_select = join (', ', @outer_select );
167   my $inner_select = join (', ', @inner_select );
168
169   %outer_col_aliases = (%outer_col_aliases, %col_aliases);
170
171   # deal with order
172   croak '$order supplied to SQLAHacks limit emulators must be a hash'
173     if (ref $order ne 'HASH');
174
175   $order = { %$order }; #copy
176
177   my $req_order = $order->{order_by};
178
179   # examine normalized version, collapses nesting
180   my $limit_order;
181   if (scalar $self->_order_by_chunks ($req_order)) {
182     $limit_order = $req_order;
183   }
184   else {
185     $limit_order = [ map
186       { join ('', $rs_alias, $name_sep, $_ ) }
187       ( $self->{_dbic_rs_attrs}{_source_handle}->resolve->primary_columns )
188     ];
189   }
190
191   my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
192   my $order_by_requested = $self->_order_by ($req_order);
193
194   # generate the rest
195   delete $order->{order_by};
196   my $grpby_having = $self->_order_by ($order);
197
198   # short circuit for counts - the ordering complexity is needless
199   if ($self->{_dbic_rs_attrs}{-for_count_only}) {
200     return "SELECT TOP $rows $inner_select $sql $grpby_having $order_by_outer";
201   }
202
203   # we can't really adjust the order_by columns, as introspection is lacking
204   # resort to simple substitution
205   for my $col (keys %outer_col_aliases) {
206     for ($order_by_requested, $order_by_outer) {
207       $_ =~ s/\s+$col\s+/ $outer_col_aliases{$col} /g;
208     }
209   }
210   for my $col (keys %col_aliases) {
211     $order_by_inner =~ s/\s+$col\s+/ $col_aliases{$col} /g;
212   }
213
214
215   my $inner_lim = $rows + $offset;
216
217   $sql = "SELECT TOP $inner_lim $inner_select $sql $grpby_having $order_by_inner";
218
219   if ($offset) {
220     $sql = <<"SQL";
221
222     SELECT TOP $rows $outer_select FROM
223     (
224       $sql
225     ) $quoted_rs_alias
226     $order_by_outer
227 SQL
228
229   }
230
231   if ($order_by_requested) {
232     $sql = <<"SQL";
233
234     SELECT $outer_select FROM
235       ( $sql ) $quoted_rs_alias
236     $order_by_requested
237 SQL
238
239   }
240
241   $sql =~ s/\s*\n\s*/ /g; # parsing out multiline statements is harder than a single line
242   return $sql;
243 }
244
245 # action at a distance to shorten Top code above
246 sub __record_alias {
247   my ($self, $register, $alias, $fqcol, $col) = @_;
248
249   # record qualified name
250   $register->{$fqcol} = $alias;
251   $register->{$self->_quote($fqcol)} = $alias;
252
253   return unless $col;
254
255   # record unqualified name, undef (no adjustment) if a duplicate is found
256   if (exists $register->{$col}) {
257     $register->{$col} = undef;
258   }
259   else {
260     $register->{$col} = $alias;
261   }
262
263   $register->{$self->_quote($col)} = $register->{$col};
264 }
265
266
267
268 # While we're at it, this should make LIMIT queries more efficient,
269 #  without digging into things too deeply
270 sub _find_syntax {
271   my ($self, $syntax) = @_;
272   return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
273 }
274
275 my $for_syntax = {
276   update => 'FOR UPDATE',
277   shared => 'FOR SHARE',
278 };
279 # Quotes table names, handles "limit" dialects (e.g. where rownum between x and
280 # y), supports SELECT ... FOR UPDATE and SELECT ... FOR SHARE.
281 sub select {
282   my ($self, $table, $fields, $where, $order, @rest) = @_;
283
284   $self->{"${_}_bind"} = [] for (qw/having from order/);
285
286   if (not ref($table) or ref($table) eq 'SCALAR') {
287     $table = $self->_quote($table);
288   }
289
290   local $self->{rownum_hack_count} = 1
291     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
292   @rest = (-1) unless defined $rest[0];
293   croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
294     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
295   my ($sql, @where_bind) = $self->SUPER::select(
296     $table, $self->_recurse_fields($fields), $where, $order, @rest
297   );
298   if (my $for = delete $self->{_dbic_rs_attrs}{for}) {
299     $sql .= " $for_syntax->{$for}" if $for_syntax->{$for};
300   }
301
302   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}, @{$self->{order_bind}} ) : $sql;
303 }
304
305 # Quotes table names, and handles default inserts
306 sub insert {
307   my $self = shift;
308   my $table = shift;
309   $table = $self->_quote($table);
310
311   # SQLA will emit INSERT INTO $table ( ) VALUES ( )
312   # which is sadly understood only by MySQL. Change default behavior here,
313   # until SQLA2 comes with proper dialect support
314   if (! $_[0] or (ref $_[0] eq 'HASH' and !keys %{$_[0]} ) ) {
315     return "INSERT INTO ${table} DEFAULT VALUES"
316   }
317
318   $self->SUPER::insert($table, @_);
319 }
320
321 # Just quotes table names.
322 sub update {
323   my $self = shift;
324   my $table = shift;
325   $table = $self->_quote($table);
326   $self->SUPER::update($table, @_);
327 }
328
329 # Just quotes table names.
330 sub delete {
331   my $self = shift;
332   my $table = shift;
333   $table = $self->_quote($table);
334   $self->SUPER::delete($table, @_);
335 }
336
337 sub _emulate_limit {
338   my $self = shift;
339   if ($_[3] == -1) {
340     return $_[1].$self->_order_by($_[2]);
341   } else {
342     return $self->SUPER::_emulate_limit(@_);
343   }
344 }
345
346 sub _recurse_fields {
347   my ($self, $fields, $params) = @_;
348   my $ref = ref $fields;
349   return $self->_quote($fields) unless $ref;
350   return $$fields if $ref eq 'SCALAR';
351
352   if ($ref eq 'ARRAY') {
353     return join(', ', map {
354       $self->_recurse_fields($_)
355         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
356           ? ' AS col'.$self->{rownum_hack_count}++
357           : '')
358       } @$fields);
359   }
360   elsif ($ref eq 'HASH') {
361     my %hash = %$fields;
362
363     my $as = delete $hash{-as};   # if supplied
364
365     my ($func, $args) = each %hash;
366     delete $hash{$func};
367
368     if (lc ($func) eq 'distinct' && ref $args eq 'ARRAY' && @$args > 1) {
369       croak (
370         'The select => { distinct => ... } syntax is not supported for multiple columns.'
371        .' Instead please use { group_by => [ qw/' . (join ' ', @$args) . '/ ] }'
372        .' or { select => [ qw/' . (join ' ', @$args) . '/ ], distinct => 1 }'
373       );
374     }
375
376     my $select = sprintf ('%s( %s )%s',
377       $self->_sqlcase($func),
378       $self->_recurse_fields($args),
379       $as
380         ? sprintf (' %s %s', $self->_sqlcase('as'), $as)
381         : ''
382     );
383
384     # there should be nothing left
385     if (keys %hash) {
386       croak "Malformed select argument - too many keys in hash: " . join (',', keys %$fields );
387     }
388
389     return $select;
390   }
391   # Is the second check absolutely necessary?
392   elsif ( $ref eq 'REF' and ref($$fields) eq 'ARRAY' ) {
393     return $self->_fold_sqlbind( $fields );
394   }
395   else {
396     croak($ref . qq{ unexpected in _recurse_fields()})
397   }
398 }
399
400 sub _order_by {
401   my ($self, $arg) = @_;
402
403   if (ref $arg eq 'HASH' and keys %$arg and not grep { $_ =~ /^-(?:desc|asc)/i } keys %$arg ) {
404
405     my $ret = '';
406
407     if (my $g = $self->_recurse_fields($arg->{group_by}, { no_rownum_hack => 1 }) ) {
408       $ret = $self->_sqlcase(' group by ') . $g;
409     }
410
411     if (defined $arg->{having}) {
412       my ($frag, @bind) = $self->_recurse_where($arg->{having});
413       push(@{$self->{having_bind}}, @bind);
414       $ret .= $self->_sqlcase(' having ').$frag;
415     }
416
417     if (defined $arg->{order_by}) {
418       my ($frag, @bind) = $self->SUPER::_order_by($arg->{order_by});
419       push(@{$self->{order_bind}}, @bind);
420       $ret .= $frag;
421     }
422
423     return $ret;
424   }
425   else {
426     my ($sql, @bind) = $self->SUPER::_order_by ($arg);
427     push(@{$self->{order_bind}}, @bind);
428     return $sql;
429   }
430 }
431
432 sub _order_directions {
433   my ($self, $order) = @_;
434
435   # strip bind values - none of the current _order_directions users support them
436   return $self->SUPER::_order_directions( [ map
437     { ref $_ ? $_->[0] : $_ }
438     $self->_order_by_chunks ($order)
439   ]);
440 }
441
442 sub _table {
443   my ($self, $from) = @_;
444   if (ref $from eq 'ARRAY') {
445     return $self->_recurse_from(@$from);
446   } elsif (ref $from eq 'HASH') {
447     return $self->_make_as($from);
448   } else {
449     return $from; # would love to quote here but _table ends up getting called
450                   # twice during an ->select without a limit clause due to
451                   # the way S::A::Limit->select works. should maybe consider
452                   # bypassing this and doing S::A::select($self, ...) in
453                   # our select method above. meantime, quoting shims have
454                   # been added to select/insert/update/delete here
455   }
456 }
457
458 sub _recurse_from {
459   my ($self, $from, @join) = @_;
460   my @sqlf;
461   push(@sqlf, $self->_make_as($from));
462   foreach my $j (@join) {
463     my ($to, $on) = @$j;
464
465
466     # check whether a join type exists
467     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
468     my $join_type;
469     if (ref($to_jt) eq 'HASH' and defined($to_jt->{-join_type})) {
470       $join_type = $to_jt->{-join_type};
471       $join_type =~ s/^\s+ | \s+$//xg;
472     }
473
474     $join_type = $self->{_default_jointype} if not defined $join_type;
475
476     my $join_clause = sprintf ('%s JOIN ',
477       $join_type ?  ' ' . uc($join_type) : ''
478     );
479     push @sqlf, $join_clause;
480
481     if (ref $to eq 'ARRAY') {
482       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
483     } else {
484       push(@sqlf, $self->_make_as($to));
485     }
486     push(@sqlf, ' ON ', $self->_join_condition($on));
487   }
488   return join('', @sqlf);
489 }
490
491 sub _fold_sqlbind {
492   my ($self, $sqlbind) = @_;
493
494   my @sqlbind = @$$sqlbind; # copy
495   my $sql = shift @sqlbind;
496   push @{$self->{from_bind}}, @sqlbind;
497
498   return $sql;
499 }
500
501 sub _make_as {
502   my ($self, $from) = @_;
503   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_
504                         : ref $_ eq 'REF'    ? $self->_fold_sqlbind($_)
505                         : $self->_quote($_))
506                        } reverse each %{$self->_skip_options($from)});
507 }
508
509 sub _skip_options {
510   my ($self, $hash) = @_;
511   my $clean_hash = {};
512   $clean_hash->{$_} = $hash->{$_}
513     for grep {!/^-/} keys %$hash;
514   return $clean_hash;
515 }
516
517 sub _join_condition {
518   my ($self, $cond) = @_;
519   if (ref $cond eq 'HASH') {
520     my %j;
521     for (keys %$cond) {
522       my $v = $cond->{$_};
523       if (ref $v) {
524         croak (ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
525             if ref($v) ne 'SCALAR';
526         $j{$_} = $v;
527       }
528       else {
529         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
530       }
531     };
532     return scalar($self->_recurse_where(\%j));
533   } elsif (ref $cond eq 'ARRAY') {
534     return join(' OR ', map { $self->_join_condition($_) } @$cond);
535   } else {
536     die "Can't handle this yet!";
537   }
538 }
539
540 sub _quote {
541   my ($self, $label) = @_;
542   return '' unless defined $label;
543   return $$label if ref($label) eq 'SCALAR';
544   return "*" if $label eq '*';
545   return $label unless $self->{quote_char};
546   if(ref $self->{quote_char} eq "ARRAY"){
547     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
548       if !defined $self->{name_sep};
549     my $sep = $self->{name_sep};
550     return join($self->{name_sep},
551         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
552        split(/\Q$sep\E/,$label));
553   }
554   return $self->SUPER::_quote($label);
555 }
556
557 sub limit_dialect {
558     my $self = shift;
559     $self->{limit_dialect} = shift if @_;
560     return $self->{limit_dialect};
561 }
562
563 # Set to an array-ref to specify separate left and right quotes for table names.
564 # A single scalar is equivalen to [ $char, $char ]
565 sub quote_char {
566     my $self = shift;
567     $self->{quote_char} = shift if @_;
568     return $self->{quote_char};
569 }
570
571 # Character separating quoted table names.
572 sub name_sep {
573     my $self = shift;
574     $self->{name_sep} = shift if @_;
575     return $self->{name_sep};
576 }
577
578 1;