Restore ability to handle underdefined root (t/prefetch/incomplete.t)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSource / RowParser.pm
index c74ab9e..141037b 100644 (file)
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 
 use Try::Tiny;
-use List::Util 'first';
+use List::Util qw(first max);
 use B 'perlstring';
 
 use namespace::clean;
@@ -87,13 +87,12 @@ sub _resolve_collapse {
       $rel_cols->{$1}{$2} = 1;
     }
     else {
-      $my_cols->{$_} = {};  # important for ||= below
+      $my_cols->{$_} = {};  # important for ||='s below
     }
   }
 
   my $relinfo;
-  # run through relationships, collect metadata, inject non-left fk-bridges from
-  # *INNER-JOINED* children (if any)
+  # run through relationships, collect metadata
   for my $rel (keys %$rel_cols) {
     my $rel_src = __get_related_source($self, $rel, $rel_cols->{$rel});
 
@@ -103,6 +102,7 @@ sub _resolve_collapse {
     $relinfo->{$rel}{is_inner} = ( $inf->{attrs}{join_type} || '' ) !~ /^left/i;
     $relinfo->{$rel}{rsrc} = $rel_src;
 
+    # FIME - need to use _resolve_cond here instead
     my $cond = $inf->{cond};
 
     if (
@@ -110,25 +110,28 @@ sub _resolve_collapse {
         and
       keys %$cond
         and
-      ! first { $_ !~ /^foreign\./ } (keys %$cond)
+      ! defined first { $_ !~ /^foreign\./ } (keys %$cond)
         and
-      ! first { $_ !~ /^self\./ } (values %$cond)
+      ! defined first { $_ !~ /^self\./ } (values %$cond)
     ) {
       for my $f (keys %$cond) {
         my $s = $cond->{$f};
         $_ =~ s/^ (?: foreign | self ) \.//x for ($f, $s);
         $relinfo->{$rel}{fk_map}{$s} = $f;
-
-        # need to know source from *our* pov, hence $rel.
-        $my_cols->{$s} ||= { via_fk => "$rel.$f" } if (
-          defined $rel_cols->{$rel}{$f} # in fact selected
-            and
-          $relinfo->{$rel}{is_inner}
-        );
       }
     }
   }
 
+  # inject non-left fk-bridges from *INNER-JOINED* children (if any)
+  for my $rel (grep { $relinfo->{$_}{is_inner} } keys %$relinfo) {
+    my $ri = $relinfo->{$rel};
+    for (keys %{$ri->{fk_map}} ) {
+      # need to know source from *our* pov, hence $rel.col
+      $my_cols->{$_} ||= { via_fk => "$rel.$ri->{fk_map}{$_}" }
+        if defined $rel_cols->{$rel}{$ri->{fk_map}{$_}} # in fact selected
+    }
+  }
+
   # if the parent is already defined, assume all of its related FKs are selected
   # (even if they in fact are NOT in the select list). Keep a record of what we
   # assumed, and if any such phantom-column becomes part of our own collapser,
@@ -136,14 +139,11 @@ sub _resolve_collapse {
   # the parent (whatever it may be)
   my $assumed_from_parent;
   unless ($args->{_parent_info}{underdefined}) {
-    $assumed_from_parent->{columns} = { map
-      # only add to the list if we do not already select said columns
-      { ! exists $my_cols->{$_} ? ( $_ => 1 ) : () }
-      values %{$args->{_parent_info}{rel_condition} || {}}
-    };
-
-    $my_cols->{$_} = { via_collapse => $args->{_parent_info}{collapse_on_idcols} }
-      for keys %{$assumed_from_parent->{columns}};
+    for my $col ( values %{$args->{_parent_info}{rel_condition} || {}} ) {
+      next if exists $my_cols->{$col};
+      $my_cols->{$col} = { via_collapse => $args->{_parent_info}{collapse_on_idcols} };
+      $assumed_from_parent->{columns}{$col}++;
+    }
   }
 
   # get colinfo for everything
@@ -217,6 +217,82 @@ sub _resolve_collapse {
     }
   }
 
+  # Stil don't know how to collapse, and we are the root node. Last ditch
+  # effort in case we are *NOT* premultiplied.
+  # Run through *each multi* all the way down, left or not, and all
+  # *left* singles (a single may become a multi underneath) . When everything
+  # gets back see if all the rels link to us definitively. If this is the
+  # case we are good - either one of them will define us, or if all are NULLs
+  # we know we are "unique" due to the "non-premultiplied" check
+  if (
+    ! $collapse_map->{-idcols_current_node}
+      and
+    ! $args->{premultiplied}
+      and
+    $common_args->{_node_idx} == 1
+  ) {
+    my (@collapse_sets, $uncollapsible_chain);
+
+    for my $rel (keys %$relinfo) {
+
+      # we already looked at these higher up
+      next if ($relinfo->{$rel}{is_single} && $relinfo->{$rel}{is_inner});
+
+      if (my $clps = $relinfo->{$rel}{rsrc}->_resolve_collapse ({
+        as => $rel_cols->{$rel},
+        _rel_chain => [ @{$args->{_rel_chain}}, $rel ],
+        _parent_info => { underdefined => 1 },
+      }, $common_args) ) {
+
+        # for singles use the idcols wholesale (either there or not)
+        if ($relinfo->{$rel}{is_single}) {
+          push @collapse_sets, $clps->{-idcols_current_node};
+        }
+        elsif (! $relinfo->{$rel}{fk_map}) {
+          $uncollapsible_chain = 1;
+          last;
+        }
+        else {
+          my $defined_cols_parent_side;
+
+          for my $fq_col ( grep { /^$rel\.[^\.]+$/ } keys %{$args->{as}} ) {
+            my ($col) = $fq_col =~ /([^\.]+)$/;
+
+            $defined_cols_parent_side->{$_} = $args->{as}{$fq_col} for grep
+              { $relinfo->{$rel}{fk_map}{$_} eq $col }
+              keys %{$relinfo->{$rel}{fk_map}}
+            ;
+          }
+
+          if (my $set = $self->_identifying_column_set([ keys %$defined_cols_parent_side ]) ) {
+            push @collapse_sets, [ sort map { $defined_cols_parent_side->{$_} } @$set ];
+          }
+          else {
+            $uncollapsible_chain = 1;
+            last;
+          }
+        }
+      }
+      else {
+        $uncollapsible_chain = 1;
+        last;
+      }
+    }
+
+    unless ($uncollapsible_chain) {
+      # if we got here - we are good to go, but the construction is tricky
+      # since our children will want to include our collapse criteria - we
+      # don't give them anything (safe, since they are all collapsible on their own)
+      # in addition we record the individual collapse posibilities
+      # of all left children node collapsers, and merge them in the rowparser
+      # coderef later
+      $collapse_map->{-idcols_current_node} = [];
+      $collapse_map->{-root_node_idcol_variants} = [ sort {
+        (scalar @$a) <=> (scalar @$b) or max(@$a) <=> max(@$b)
+      } @collapse_sets ];
+    }
+  }
+
   # stop descending into children if we were called by a parent for first-pass
   # and don't despair if nothing was found (there may be other parallel branches
   # to dive into)
@@ -258,7 +334,7 @@ sub _resolve_collapse {
 
         # if this is a 1:1 our own collapser can be used as a collapse-map
         # (regardless of left or not)
-        collapser_reusable => $relinfo->{$rel}{is_single},
+        collapser_reusable => @{$collapse_map->{-idcols_current_node}} && $relinfo->{$rel}{is_single},
       },
     }, $common_args );
 
@@ -290,7 +366,7 @@ sub _resolve_collapse {
 # For an example of this coderef in action (and to see its guts) look at
 # t/resultset/rowparser_internals.t
 #
-# This is a huge performance win, as we call the same code for # every row
+# This is a huge performance win, as we call the same code for every row
 # returned from the db, thus avoiding repeated method lookups when traversing
 # relationships
 #
@@ -338,6 +414,7 @@ sub _mk_row_parser {
   #
   else {
     my $collapse_map = $self->_resolve_collapse ({
+      premultiplied => $args->{premultiplied},
       # FIXME
       # only consider real columns (not functions) during collapse resolution
       # this check shouldn't really be here, as fucktards are not supposed to
@@ -352,23 +429,48 @@ sub _mk_row_parser {
       }
     });
 
-    my $all_idcols_as_list = join ', ', sort map { @$_ } (
+    my @all_idcols = sort { $a <=> $b } map { @$_ } (
       $collapse_map->{-idcols_current_node},
       $collapse_map->{-idcols_extra_from_children} || (),
     );
 
-    my $top_node_id_path = join ('', map
-      { "{'\xFF__IDVALPOS__${_}__\xFF'}" }
-      @{$collapse_map->{-idcols_current_node}}
-    );
+    my ($top_node_id_path, $top_node_id_cacher, @path_variants);
+    if (scalar @{$collapse_map->{-idcols_current_node}}) {
+      $top_node_id_path = join ('', map
+        { "{'\xFF__IDVALPOS__${_}__\xFF'}" }
+        @{$collapse_map->{-idcols_current_node}}
+      );
+    }
+    elsif( my @variants = @{$collapse_map->{-root_node_idcol_variants}} ) {
+      my @path_parts;
+
+      for (@variants) {
+
+        push @path_variants, sprintf "(join qq(\xFF), '', %s, '')",
+          ( join ', ', map { "'\xFF__VALPOS__${_}__\xFF'" } @$_ )
+        ;
+
+        push @path_parts, sprintf "( %s && %s)",
+          ( join ' && ', map { "( defined '\xFF__VALPOS__${_}__\xFF' )" } @$_ ),
+          $path_variants[-1];
+        ;
+      }
+
+      $top_node_id_cacher = sprintf '$cur_row_ids[%d] = (%s);',
+        $all_idcols[-1] + 1,
+        "\n" . join( "\n  or\n", @path_parts, qq{"\0\$rows_pos\0"} );
+      $top_node_id_path = sprintf '{$cur_row_ids[%d]}', $all_idcols[-1] + 1;
+    }
+    else {
+      $self->throw_exception('Unexpected collapse map contents');
+    }
 
     my $rel_assemblers = __visit_infmap_collapse (
-      $inflate_index, $collapse_map
+      $inflate_index, { %$collapse_map, -custom_node_id => $top_node_id_path },
     );
 
-    $parser_src = sprintf (<<'EOS', $all_idcols_as_list, $top_node_id_path, $rel_assemblers);
+    $parser_src = sprintf (<<'EOS', join(', ', @all_idcols), $top_node_id_path, $top_node_id_cacher||'', $rel_assemblers);
 ### BEGIN LITERAL STRING EVAL
-
   my ($rows_pos, $result_pos, $cur_row, @cur_row_ids, @collapse_idx, $is_new_res) = (0,0);
 
   # this loop is a bit arcane - the rationale is that the passed in
@@ -385,14 +487,19 @@ sub _mk_row_parser {
 
     # due to left joins some of the ids may be NULL/undef, and
     # won't play well when used as hash lookups
-    $cur_row_ids[$_] = defined $cur_row->[$_] ? $cur_row->[$_] : "\xFF\xFFN\xFFU\xFFL\xFFL\xFF\xFF"
+    # we also need to differentiate NULLs on per-row/per-col basis
+    #(otherwise folding of optional 1:1s will be greatly confused
+    $cur_row_ids[$_] = defined $cur_row->[$_] ? $cur_row->[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
       for (%1$s);
 
+    # maybe(!) cache the top node id calculation
+    %3$s
+
     $is_new_res = ! $collapse_idx[1]%2$s and (
       $_[1] and $result_pos and (unshift @{$_[2]}, $cur_row) and last
     );
 
-    %3$s
+    %4$s
 
     $_[0][$result_pos++] = $collapse_idx[1]%2$s
       if $is_new_res;
@@ -479,7 +586,7 @@ sub __visit_infmap_collapse {
     }
   }
 
-  my $sequenced_node_id = join ('', map
+  my $sequenced_node_id = $collapse_map->{-custom_node_id} || join ('', map
     { "{'\xFF__IDVALPOS__${_}__\xFF'}" }
     @{$collapse_map->{-idcols_current_node}}
   );
@@ -521,7 +628,6 @@ sub __visit_infmap_collapse {
   # DISABLEPRUNE
   #my $known_defined = { %{ $parent_info->{known_defined} || {} } };
   #$known_defined->{$_}++ for @{$collapse_map->{-idcols_current_node}};
-
   for my $rel (sort keys %$rel_cols) {
 
 #    push @src, sprintf(