Restore ability to handle underdefined root (t/prefetch/incomplete.t)
Peter Rabbitson [Thu, 3 Jan 2013 03:01:36 +0000 (04:01 +0100)]
In addition make things work when multiple has_many-inheriting branches
are present on the root. This squeezes in the last thing I could think
of into the row_parser maker, and allows to *properly* collapse stuff
like this:

$rs = $schema->resultset ('CD')->search ({}, {
  join => [ 'tracks', { single_track => { cd => { artist => { cds => 'tracks' } } } }  ],
  collapse => 1,
  columns => [
    { 'single_track.trackid'                    => 'single_track.trackid' },  # definitive link to root from 1:1:1:1:M:M chain
    { 'year'                                    => 'me.year' },               # non-unique
    { 'tracks.cd'                               => 'tracks.cd' },             # \ together both uniqueness for second multirel
    { 'tracks.title'                            => 'tracks.title' },          # / and definitive link back to root
    { 'single_track.cd.artist.cds.cdid'         => 'cds.cdid' },              # to give uniquiness to ...tracks.title below
    { 'single_track.cd.artist.cds.year'         => 'cds.year' },              # non-unique
    { 'single_track.cd.artist.artistid'         => 'artist.artistid' },       # uniqufies entire parental chain
    { 'single_track.cd.artist.cds.genreid'      => 'cds.genreid' },           # nullable
    { 'single_track.cd.artist.cds.tracks.title' => 'tracks_2.title' },        # unique when combined with ...cds.cdid above
  ],
  order_by => \ 'RANDOM()'
});

Which DBIC sees like this ( produced by `prove -l t/prefetch/manual.t -v` ):

 sngl_tr.trackid #        me.year #       tracks.cd #    tracks.title #        cds.cdid #        cds.year # artist.artistid #     cds.genreid #  tracks_2.title
---------------------------------------------------------------------------------------------------------------------------------------------------------------
               6 #           1978 #               3 #              e2 #               1 #            1981 #               1 #               1 #              m3
            NULL #           1977 #            NULL #            NULL #            NULL #            NULL #            NULL #            NULL #            NULL
               6 #           1978 #               3 #              e3 #               5 #            1977 #               1 #            NULL #            NULL
               6 #           1978 #               3 #              e1 #               1 #            1981 #               1 #               1 #              m1
               6 #           1978 #               3 #              e2 #               3 #            1978 #               1 #               1 #              e1
            NULL #           1981 #               1 #              m1 #            NULL #            NULL #            NULL #            NULL #            NULL
               6 #           1978 #               3 #              e2 #               1 #            1981 #               1 #               1 #              m1
               6 #           1978 #               3 #              e3 #               3 #            1978 #               1 #               1 #              e3
               6 #           1978 #               3 #              e3 #               2 #            1976 #               1 #            NULL #              o2
               6 #           1978 #               3 #              e2 #               6 #            1977 #               1 #            NULL #            NULL
            NULL #           1981 #               1 #              m3 #            NULL #            NULL #            NULL #            NULL #            NULL
               6 #           1978 #               3 #              e3 #               1 #            1981 #               1 #               1 #              m1
               6 #           1978 #               3 #              e1 #               2 #            1976 #               1 #            NULL #              o2
               6 #           1978 #               3 #              e3 #               1 #            1981 #               1 #               1 #              m3
               6 #           1978 #               3 #              e2 #               3 #            1978 #               1 #               1 #              e2
               6 #           1978 #               3 #              e2 #               4 #            1977 #               1 #            NULL #            NULL
               6 #           1978 #               3 #              e1 #               6 #            1977 #               1 #            NULL #            NULL
               6 #           1978 #               3 #              e2 #               5 #            1977 #               1 #            NULL #            NULL
               6 #           1978 #               3 #              e1 #               3 #            1978 #               1 #               1 #              e2
               6 #           1978 #               3 #              e1 #               3 #            1978 #               1 #               1 #              e1
            NULL #           1976 #               2 #              o1 #            NULL #            NULL #            NULL #            NULL #            NULL
            NULL #           1976 #               2 #              o2 #            NULL #            NULL #            NULL #            NULL #            NULL
               6 #           1978 #               3 #              e1 #               1 #            1981 #               1 #               1 #              m2
               6 #           1978 #               3 #              e1 #               5 #            1977 #               1 #            NULL #            NULL
               6 #           1978 #               3 #              e2 #               2 #            1976 #               1 #            NULL #              o1
               6 #           1978 #               3 #              e2 #               1 #            1981 #               1 #               1 #              m2
               6 #           1978 #               3 #              e1 #               1 #            1981 #               1 #               1 #              m4
               6 #           1978 #               3 #              e3 #               1 #            1981 #               1 #               1 #              m2
               6 #           1978 #               3 #              e2 #               2 #            1976 #               1 #            NULL #              o2
               6 #           1978 #               3 #              e3 #               3 #            1978 #               1 #               1 #              e1
               6 #           1978 #               3 #              e3 #               1 #            1981 #               1 #               1 #              m4
               6 #           1978 #               3 #              e1 #               3 #            1978 #               1 #               1 #              e3
            NULL #           1977 #            NULL #            NULL #            NULL #            NULL #            NULL #            NULL #            NULL
               6 #           1978 #               3 #              e1 #               1 #            1981 #               1 #               1 #              m3
               6 #           1978 #               3 #              e3 #               2 #            1976 #               1 #            NULL #              o1
               6 #           1978 #               3 #              e1 #               2 #            1976 #               1 #            NULL #              o1
               6 #           1978 #               3 #              e2 #               1 #            1981 #               1 #               1 #              m4
               6 #           1978 #               3 #              e3 #               3 #            1978 #               1 #               1 #              e2
            NULL #           1977 #            NULL #            NULL #            NULL #            NULL #            NULL #            NULL #            NULL
            NULL #           1981 #               1 #              m2 #            NULL #            NULL #            NULL #            NULL #            NULL
               6 #           1978 #               3 #              e3 #               6 #            1977 #               1 #            NULL #            NULL
               6 #           1978 #               3 #              e2 #               3 #            1978 #               1 #               1 #              e3
               6 #           1978 #               3 #              e3 #               4 #            1977 #               1 #            NULL #            NULL
               6 #           1978 #               3 #              e1 #               4 #            1977 #               1 #            NULL #            NULL
            NULL #           1981 #               1 #              m4 #            NULL #            NULL #            NULL #            NULL #            NULL

lib/DBIx/Class/ResultSet.pm
lib/DBIx/Class/ResultSource/RowParser.pm
t/prefetch/incomplete.t
t/prefetch/manual.t
t/resultset/rowparser_internals.t

index b5099f9..13cfa03 100644 (file)
@@ -1358,18 +1358,21 @@ sub _construct_objects {
     }
   }
   else {
-    ($self->{_row_parser} ||= eval sprintf 'sub { %s }', $rsrc->_mk_row_parser({
+    $self->{_row_parser} ||= eval sprintf 'sub { %s }', $rsrc->_mk_row_parser({
       inflate_map => $infmap,
       selection => $attrs->{select},
       collapse => $attrs->{collapse},
-    }) or die $@)->($rows, $fetch_all ? () : (
+      premultiplied => $attrs->{_main_source_premultiplied},
+    }) or die $@;
+
+    # modify $rows in-place, shrinking/extending as necessary
+    $self->{_row_parser}->($rows, $fetch_all ? () : (
       # FIXME SUBOPTIMAL - we can do better, cursor->next/all (well diff. methods) should return a ref
       sub { my @r = $cursor->next or return; \@r }, # how the collapser gets more rows
       ($self->{stashed_rows} = []),                 # where does it stuff excess
-    ));  # modify $rows in-place, shrinking/extending as necessary
+    ));
 
     $_ = $inflator->($res_class, $rsrc, @$_) for @$rows;
-
   }
 
   # CDBI compat stuff
@@ -3453,26 +3456,50 @@ sub _resolved_attrs {
 
   # run through the resulting joinstructure (starting from our current slot)
   # and unset collapse if proven unnesessary
-  if ($attrs->{collapse} && ref $attrs->{from} eq 'ARRAY') {
+  #
+  # also while we are at it find out if the current root source has
+  # been premultiplied by previous related_source chaining
+  #
+  # this allows to predict whether a root object with all other relation
+  # data set to NULL is in fact unique
+  if ($attrs->{collapse}) {
 
-    if (@{$attrs->{from}} > 1) {
+    if (ref $attrs->{from} eq 'ARRAY') {
 
-      # find where our table-spec starts and consider only things after us
-      my @fromlist = @{$attrs->{from}};
-      while (@fromlist) {
-        my $t = shift @fromlist;
-        $t = $t->[0] if ref $t eq 'ARRAY';  #me vs join from-spec mismatch
-        last if ($t->{-alias} && $t->{-alias} eq $alias);
+      if (@{$attrs->{from}} <= 1) {
+        # no joins - no collapse
+        $attrs->{collapse} = 0;
       }
+      else {
+        # find where our table-spec starts
+        my @fromlist = @{$attrs->{from}};
+        while (@fromlist) {
+          my $t = shift @fromlist;
+
+          my $is_multi;
+          # me vs join from-spec distinction - a ref means non-root
+          if (ref $t eq 'ARRAY') {
+            $t = $t->[0];
+            $is_multi ||= ! $t->{-is_single};
+          }
+          last if ($t->{-alias} && $t->{-alias} eq $alias);
+          $attrs->{_main_source_premultiplied} ||= $is_multi;
+        }
 
-      for (@fromlist) {
-        $attrs->{collapse} = ! $_->[0]{-is_single}
-          and last;
+        # no non-singles remaining, nor any premultiplication - nothing to collapse
+        if (
+          ! $attrs->{_main_source_premultiplied}
+            and
+          ! List::Util::first { ! $_->[0]{-is_single} } @fromlist
+        ) {
+          $attrs->{collapse} = 0;
+        }
       }
     }
+
     else {
-      # no joins - no collapse
-      $attrs->{collapse} = 0;
+      # if we can not analyze the from - err on the side of safety
+      $attrs->{_main_source_premultiplied} = 1;
     }
   }
 
index c74ab9e..141037b 100644 (file)
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 
 use Try::Tiny;
-use List::Util 'first';
+use List::Util qw(first max);
 use B 'perlstring';
 
 use namespace::clean;
@@ -87,13 +87,12 @@ sub _resolve_collapse {
       $rel_cols->{$1}{$2} = 1;
     }
     else {
-      $my_cols->{$_} = {};  # important for ||= below
+      $my_cols->{$_} = {};  # important for ||='s below
     }
   }
 
   my $relinfo;
-  # run through relationships, collect metadata, inject non-left fk-bridges from
-  # *INNER-JOINED* children (if any)
+  # run through relationships, collect metadata
   for my $rel (keys %$rel_cols) {
     my $rel_src = __get_related_source($self, $rel, $rel_cols->{$rel});
 
@@ -103,6 +102,7 @@ sub _resolve_collapse {
     $relinfo->{$rel}{is_inner} = ( $inf->{attrs}{join_type} || '' ) !~ /^left/i;
     $relinfo->{$rel}{rsrc} = $rel_src;
 
+    # FIME - need to use _resolve_cond here instead
     my $cond = $inf->{cond};
 
     if (
@@ -110,25 +110,28 @@ sub _resolve_collapse {
         and
       keys %$cond
         and
-      ! first { $_ !~ /^foreign\./ } (keys %$cond)
+      ! defined first { $_ !~ /^foreign\./ } (keys %$cond)
         and
-      ! first { $_ !~ /^self\./ } (values %$cond)
+      ! defined first { $_ !~ /^self\./ } (values %$cond)
     ) {
       for my $f (keys %$cond) {
         my $s = $cond->{$f};
         $_ =~ s/^ (?: foreign | self ) \.//x for ($f, $s);
         $relinfo->{$rel}{fk_map}{$s} = $f;
-
-        # need to know source from *our* pov, hence $rel.
-        $my_cols->{$s} ||= { via_fk => "$rel.$f" } if (
-          defined $rel_cols->{$rel}{$f} # in fact selected
-            and
-          $relinfo->{$rel}{is_inner}
-        );
       }
     }
   }
 
+  # inject non-left fk-bridges from *INNER-JOINED* children (if any)
+  for my $rel (grep { $relinfo->{$_}{is_inner} } keys %$relinfo) {
+    my $ri = $relinfo->{$rel};
+    for (keys %{$ri->{fk_map}} ) {
+      # need to know source from *our* pov, hence $rel.col
+      $my_cols->{$_} ||= { via_fk => "$rel.$ri->{fk_map}{$_}" }
+        if defined $rel_cols->{$rel}{$ri->{fk_map}{$_}} # in fact selected
+    }
+  }
+
   # if the parent is already defined, assume all of its related FKs are selected
   # (even if they in fact are NOT in the select list). Keep a record of what we
   # assumed, and if any such phantom-column becomes part of our own collapser,
@@ -136,14 +139,11 @@ sub _resolve_collapse {
   # the parent (whatever it may be)
   my $assumed_from_parent;
   unless ($args->{_parent_info}{underdefined}) {
-    $assumed_from_parent->{columns} = { map
-      # only add to the list if we do not already select said columns
-      { ! exists $my_cols->{$_} ? ( $_ => 1 ) : () }
-      values %{$args->{_parent_info}{rel_condition} || {}}
-    };
-
-    $my_cols->{$_} = { via_collapse => $args->{_parent_info}{collapse_on_idcols} }
-      for keys %{$assumed_from_parent->{columns}};
+    for my $col ( values %{$args->{_parent_info}{rel_condition} || {}} ) {
+      next if exists $my_cols->{$col};
+      $my_cols->{$col} = { via_collapse => $args->{_parent_info}{collapse_on_idcols} };
+      $assumed_from_parent->{columns}{$col}++;
+    }
   }
 
   # get colinfo for everything
@@ -217,6 +217,82 @@ sub _resolve_collapse {
     }
   }
 
+  # Stil don't know how to collapse, and we are the root node. Last ditch
+  # effort in case we are *NOT* premultiplied.
+  # Run through *each multi* all the way down, left or not, and all
+  # *left* singles (a single may become a multi underneath) . When everything
+  # gets back see if all the rels link to us definitively. If this is the
+  # case we are good - either one of them will define us, or if all are NULLs
+  # we know we are "unique" due to the "non-premultiplied" check
+  if (
+    ! $collapse_map->{-idcols_current_node}
+      and
+    ! $args->{premultiplied}
+      and
+    $common_args->{_node_idx} == 1
+  ) {
+    my (@collapse_sets, $uncollapsible_chain);
+
+    for my $rel (keys %$relinfo) {
+
+      # we already looked at these higher up
+      next if ($relinfo->{$rel}{is_single} && $relinfo->{$rel}{is_inner});
+
+      if (my $clps = $relinfo->{$rel}{rsrc}->_resolve_collapse ({
+        as => $rel_cols->{$rel},
+        _rel_chain => [ @{$args->{_rel_chain}}, $rel ],
+        _parent_info => { underdefined => 1 },
+      }, $common_args) ) {
+
+        # for singles use the idcols wholesale (either there or not)
+        if ($relinfo->{$rel}{is_single}) {
+          push @collapse_sets, $clps->{-idcols_current_node};
+        }
+        elsif (! $relinfo->{$rel}{fk_map}) {
+          $uncollapsible_chain = 1;
+          last;
+        }
+        else {
+          my $defined_cols_parent_side;
+
+          for my $fq_col ( grep { /^$rel\.[^\.]+$/ } keys %{$args->{as}} ) {
+            my ($col) = $fq_col =~ /([^\.]+)$/;
+
+            $defined_cols_parent_side->{$_} = $args->{as}{$fq_col} for grep
+              { $relinfo->{$rel}{fk_map}{$_} eq $col }
+              keys %{$relinfo->{$rel}{fk_map}}
+            ;
+          }
+
+          if (my $set = $self->_identifying_column_set([ keys %$defined_cols_parent_side ]) ) {
+            push @collapse_sets, [ sort map { $defined_cols_parent_side->{$_} } @$set ];
+          }
+          else {
+            $uncollapsible_chain = 1;
+            last;
+          }
+        }
+      }
+      else {
+        $uncollapsible_chain = 1;
+        last;
+      }
+    }
+
+    unless ($uncollapsible_chain) {
+      # if we got here - we are good to go, but the construction is tricky
+      # since our children will want to include our collapse criteria - we
+      # don't give them anything (safe, since they are all collapsible on their own)
+      # in addition we record the individual collapse posibilities
+      # of all left children node collapsers, and merge them in the rowparser
+      # coderef later
+      $collapse_map->{-idcols_current_node} = [];
+      $collapse_map->{-root_node_idcol_variants} = [ sort {
+        (scalar @$a) <=> (scalar @$b) or max(@$a) <=> max(@$b)
+      } @collapse_sets ];
+    }
+  }
+
   # stop descending into children if we were called by a parent for first-pass
   # and don't despair if nothing was found (there may be other parallel branches
   # to dive into)
@@ -258,7 +334,7 @@ sub _resolve_collapse {
 
         # if this is a 1:1 our own collapser can be used as a collapse-map
         # (regardless of left or not)
-        collapser_reusable => $relinfo->{$rel}{is_single},
+        collapser_reusable => @{$collapse_map->{-idcols_current_node}} && $relinfo->{$rel}{is_single},
       },
     }, $common_args );
 
@@ -290,7 +366,7 @@ sub _resolve_collapse {
 # For an example of this coderef in action (and to see its guts) look at
 # t/resultset/rowparser_internals.t
 #
-# This is a huge performance win, as we call the same code for # every row
+# This is a huge performance win, as we call the same code for every row
 # returned from the db, thus avoiding repeated method lookups when traversing
 # relationships
 #
@@ -338,6 +414,7 @@ sub _mk_row_parser {
   #
   else {
     my $collapse_map = $self->_resolve_collapse ({
+      premultiplied => $args->{premultiplied},
       # FIXME
       # only consider real columns (not functions) during collapse resolution
       # this check shouldn't really be here, as fucktards are not supposed to
@@ -352,23 +429,48 @@ sub _mk_row_parser {
       }
     });
 
-    my $all_idcols_as_list = join ', ', sort map { @$_ } (
+    my @all_idcols = sort { $a <=> $b } map { @$_ } (
       $collapse_map->{-idcols_current_node},
       $collapse_map->{-idcols_extra_from_children} || (),
     );
 
-    my $top_node_id_path = join ('', map
-      { "{'\xFF__IDVALPOS__${_}__\xFF'}" }
-      @{$collapse_map->{-idcols_current_node}}
-    );
+    my ($top_node_id_path, $top_node_id_cacher, @path_variants);
+    if (scalar @{$collapse_map->{-idcols_current_node}}) {
+      $top_node_id_path = join ('', map
+        { "{'\xFF__IDVALPOS__${_}__\xFF'}" }
+        @{$collapse_map->{-idcols_current_node}}
+      );
+    }
+    elsif( my @variants = @{$collapse_map->{-root_node_idcol_variants}} ) {
+      my @path_parts;
+
+      for (@variants) {
+
+        push @path_variants, sprintf "(join qq(\xFF), '', %s, '')",
+          ( join ', ', map { "'\xFF__VALPOS__${_}__\xFF'" } @$_ )
+        ;
+
+        push @path_parts, sprintf "( %s && %s)",
+          ( join ' && ', map { "( defined '\xFF__VALPOS__${_}__\xFF' )" } @$_ ),
+          $path_variants[-1];
+        ;
+      }
+
+      $top_node_id_cacher = sprintf '$cur_row_ids[%d] = (%s);',
+        $all_idcols[-1] + 1,
+        "\n" . join( "\n  or\n", @path_parts, qq{"\0\$rows_pos\0"} );
+      $top_node_id_path = sprintf '{$cur_row_ids[%d]}', $all_idcols[-1] + 1;
+    }
+    else {
+      $self->throw_exception('Unexpected collapse map contents');
+    }
 
     my $rel_assemblers = __visit_infmap_collapse (
-      $inflate_index, $collapse_map
+      $inflate_index, { %$collapse_map, -custom_node_id => $top_node_id_path },
     );
 
-    $parser_src = sprintf (<<'EOS', $all_idcols_as_list, $top_node_id_path, $rel_assemblers);
+    $parser_src = sprintf (<<'EOS', join(', ', @all_idcols), $top_node_id_path, $top_node_id_cacher||'', $rel_assemblers);
 ### BEGIN LITERAL STRING EVAL
-
   my ($rows_pos, $result_pos, $cur_row, @cur_row_ids, @collapse_idx, $is_new_res) = (0,0);
 
   # this loop is a bit arcane - the rationale is that the passed in
@@ -385,14 +487,19 @@ sub _mk_row_parser {
 
     # due to left joins some of the ids may be NULL/undef, and
     # won't play well when used as hash lookups
-    $cur_row_ids[$_] = defined $cur_row->[$_] ? $cur_row->[$_] : "\xFF\xFFN\xFFU\xFFL\xFFL\xFF\xFF"
+    # we also need to differentiate NULLs on per-row/per-col basis
+    #(otherwise folding of optional 1:1s will be greatly confused
+    $cur_row_ids[$_] = defined $cur_row->[$_] ? $cur_row->[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
       for (%1$s);
 
+    # maybe(!) cache the top node id calculation
+    %3$s
+
     $is_new_res = ! $collapse_idx[1]%2$s and (
       $_[1] and $result_pos and (unshift @{$_[2]}, $cur_row) and last
     );
 
-    %3$s
+    %4$s
 
     $_[0][$result_pos++] = $collapse_idx[1]%2$s
       if $is_new_res;
@@ -479,7 +586,7 @@ sub __visit_infmap_collapse {
     }
   }
 
-  my $sequenced_node_id = join ('', map
+  my $sequenced_node_id = $collapse_map->{-custom_node_id} || join ('', map
     { "{'\xFF__IDVALPOS__${_}__\xFF'}" }
     @{$collapse_map->{-idcols_current_node}}
   );
@@ -521,7 +628,6 @@ sub __visit_infmap_collapse {
   # DISABLEPRUNE
   #my $known_defined = { %{ $parent_info->{known_defined} || {} } };
   #$known_defined->{$_}++ for @{$collapse_map->{-idcols_current_node}};
-
   for my $rel (sort keys %$rel_cols) {
 
 #    push @src, sprintf(
index 4cfbdfc..f8e89de 100644 (file)
@@ -9,18 +9,18 @@ use DBICTest;
 my $schema = DBICTest->init_schema();
 
 lives_ok(sub {
-  # while cds.* will be selected anyway (prefetch currently forces the result of _resolve_prefetch)
-  # only the requested me.name/me.artistid columns will be fetched.
+  # while cds.* will be selected anyway (prefetch implies it)
+  # only the requested me.name column will be fetched.
 
   # reference sql with select => [...]
-  #   SELECT me.name, cds.title, me.artistid, cds.cdid, cds.artist, cds.title, cds.year, cds.genreid, cds.single_track FROM ...
+  #   SELECT me.name, cds.title, cds.cdid, cds.artist, cds.title, cds.year, cds.genreid, cds.single_track FROM ...
 
   my $rs = $schema->resultset('Artist')->search(
     { 'cds.title' => { '!=', 'Generic Manufactured Singles' } },
     {
       prefetch => [ qw/ cds / ],
       order_by => [ { -desc => 'me.name' }, 'cds.title' ],
-      select => [qw/ me.name cds.title me.artistid / ],
+      select => [qw/ me.name cds.title / ],
     },
   );
 
index 9e6ea91..6914cae 100644 (file)
@@ -184,9 +184,8 @@ is_deeply (
   'W00T, manual prefetch with collapse works'
 );
 
-my $row = $rs->next;
-
 TODO: {
+  my $row = $rs->next;
   local $TODO = 'Something is wrong with filter type rels, they throw on incomplete objects >.<';
 
   lives_ok {
@@ -200,6 +199,7 @@ TODO: {
 
 is ($rs->cursor->next, undef, 'cursor exhausted');
 
+
 TODO: {
 local $TODO = 'this does not work at all, need to promote rsattrs to an object on its own';
 # make sure has_many column redirection does not do weird stuff when collapse is requested
@@ -223,4 +223,245 @@ for my $pref_args (
 }
 }
 
+# multi-has_many with underdefined root, with rather random order
+$rs = $schema->resultset ('CD')->search ({}, {
+  join => [ 'tracks', { single_track => { cd => { artist => { cds => 'tracks' } } } }  ],
+  collapse => 1,
+  columns => [
+    { 'single_track.trackid'                    => 'single_track.trackid' },  # definitive link to root from 1:1:1:1:M:M chain
+    { 'year'                                    => 'me.year' },               # non-unique
+    { 'tracks.cd'                               => 'tracks.cd' },             # \ together both uniqueness for second multirel
+    { 'tracks.title'                            => 'tracks.title' },          # / and definitive link back to root
+    { 'single_track.cd.artist.cds.cdid'         => 'cds.cdid' },              # to give uniquiness to ...tracks.title below
+    { 'single_track.cd.artist.cds.year'         => 'cds.year' },              # non-unique
+    { 'single_track.cd.artist.artistid'         => 'artist.artistid' },       # uniqufies entire parental chain
+    { 'single_track.cd.artist.cds.genreid'      => 'cds.genreid' },           # nullable
+    { 'single_track.cd.artist.cds.tracks.title' => 'tracks_2.title' },        # unique when combined with ...cds.cdid above
+  ],
+});
+
+for (1..3) {
+  $rs->create({ artist => 1, year => 1977, title => "fuzzy_$_" });
+}
+
+my $rs_random = $rs->search({}, { order_by => \ 'RANDOM()' });
+is ($rs_random->count, 6, 'row count matches');
+
+if ($ENV{TEST_VERBOSE}) {
+ my @lines = (
+    [ "What are we actually trying to collapse (Select/As, tests below will see a *DIFFERENT* random order):" ],
+    [ map { my $s = $_; $s =~ s/single_track\./sngl_tr./; $s } @{$rs_random->{_attrs}{select} } ],
+    $rs_random->{_attrs}{as},
+    [ "-" x 159 ],
+    $rs_random->cursor->all,
+  );
+
+  diag join ' # ', map { sprintf '% 15s', (defined $_ ? $_ : 'NULL') } @$_
+    for @lines;
+}
+
+my $queries = 0;
+$schema->storage->debugcb(sub { $queries++ });
+my $orig_debug = $schema->storage->debug;
+$schema->storage->debug (1);
+
+for my $use_next (0, 1) {
+  my @random_cds;
+  if ($use_next) {
+    while (my $o = $rs_random->next) {
+      push @random_cds, $o;
+    }
+  }
+  else {
+    @random_cds = $rs_random->all;
+  }
+
+  is (@random_cds, 6, 'object count matches');
+
+  for my $cd (@random_cds) {
+    if ($cd->year == 1977) {
+      is( scalar $cd->tracks, 0, 'no tracks on 1977 cd' );
+      is( $cd->single_track, undef, 'no single_track on 1977 cd' );
+    }
+    elsif ($cd->year == 1976) {
+      is( scalar $cd->tracks, 2, 'Two tracks on 1976 cd' );
+      like( $_->title, qr/^o\d/, "correct title" )
+        for $cd->tracks;
+      is( $cd->single_track, undef, 'no single_track on 1976 cd' );
+    }
+    elsif ($cd->year == 1981) {
+      is( scalar $cd->tracks, 4, 'Four tracks on 1981 cd' );
+      like( $_->title, qr/^m\d/, "correct title" )
+        for $cd->tracks;
+      is( $cd->single_track, undef, 'no single_track on 1981 cd' );
+    }
+    elsif ($cd->year == 1978) {
+      is( scalar $cd->tracks, 3, 'Three tracks on 1978 cd' );
+      like( $_->title, qr/^e\d/, "correct title" )
+        for $cd->tracks;
+      ok( defined $cd->single_track, 'single track prefetched on 1987 cd' );
+      # FIXME - crap! skipping prefetch also doesn't work, next commit
+      #is( $cd->single_track->cd->artist->id, 1, 'Single_track->cd->artist prefetched on 1978 cd' );
+      #is( scalar $cd->single_track->cd->artist->cds, 6, '6 cds prefetched on artist' );
+    }
+  }
+}
+
+$schema->storage->debugcb(undef);
+$schema->storage->debug($orig_debug);
+is ($queries, 2, "Only two queries for rwo prefetch calls total");
+
+# can't is_deeply a random set - need *some* order
+my @hris = sort { $a->{year} cmp $b->{year} } @{$rs->search({}, {
+  order_by => [ 'tracks_2.title', 'tracks.title', 'cds.cdid', \ 'RANDOM()' ],
+})->all_hri};
+is (@hris, 6, 'hri count matches' );
+
+is_deeply (\@hris, [
+  {
+    single_track => undef,
+    tracks => [
+      {
+        cd => 2,
+        title => "o1"
+      },
+      {
+        cd => 2,
+        title => "o2"
+      }
+    ],
+    year => 1976
+  },
+  {
+    single_track => undef,
+    tracks => [],
+    year => 1977
+  },
+  {
+    single_track => undef,
+    tracks => [],
+    year => 1977
+  },
+  {
+    single_track => undef,
+    tracks => [],
+    year => 1977
+  },
+  {
+    single_track => {
+      cd => {
+        artist => {
+          artistid => 1,
+          cds => [
+            {
+              cdid => 4,
+              genreid => undef,
+              tracks => [],
+              year => 1977
+            },
+            {
+              cdid => 5,
+              genreid => undef,
+              tracks => [],
+              year => 1977
+            },
+            {
+              cdid => 6,
+              genreid => undef,
+              tracks => [],
+              year => 1977
+            },
+            {
+              cdid => 3,
+              genreid => 1,
+              tracks => [
+                {
+                  title => "e1"
+                },
+                {
+                  title => "e2"
+                },
+                {
+                  title => "e3"
+                }
+              ],
+              year => 1978
+            },
+            {
+              cdid => 1,
+              genreid => 1,
+              tracks => [
+                {
+                  title => "m1"
+                },
+                {
+                  title => "m2"
+                },
+                {
+                  title => "m3"
+                },
+                {
+                  title => "m4"
+                }
+              ],
+              year => 1981
+            },
+            {
+              cdid => 2,
+              genreid => undef,
+              tracks => [
+                {
+                  title => "o1"
+                },
+                {
+                  title => "o2"
+                }
+              ],
+              year => 1976
+            }
+          ]
+        }
+      },
+      trackid => 6
+    },
+    tracks => [
+      {
+        cd => 3,
+        title => "e1"
+      },
+      {
+        cd => 3,
+        title => "e2"
+      },
+      {
+        cd => 3,
+        title => "e3"
+      },
+    ],
+    year => 1978
+  },
+  {
+    single_track => undef,
+    tracks => [
+      {
+        cd => 1,
+        title => "m1"
+      },
+      {
+        cd => 1,
+        title => "m2"
+      },
+      {
+        cd => 1,
+        title => "m3"
+      },
+      {
+        cd => 1,
+        title => "m4"
+      },
+    ],
+    year => 1981
+  },
+], 'W00T, multi-has_many manual underdefined root prefetch with collapse works');
+
 done_testing;
index 60bf1ba..80d7cf4 100644 (file)
@@ -127,7 +127,7 @@ is_same_src (
       ( $_[1] and $_[1]->() )
     ) {
 
-      $cur_row_ids[$_] = defined $cur_row->[$_] ? $cur_row->[$_] : "\xFF\xFFN\xFFU\xFFL\xFFL\xFF\xFF"
+      $cur_row_ids[$_] = defined $cur_row->[$_] ? $cur_row->[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
         for (0, 3, 4, 5);
 
       # a present cref in $_[1] implies lazy prefetch, implies a supplied stash in $_[2]
@@ -252,7 +252,7 @@ is_same_src (
       ( $_[1] and $_[1]->() )
     ) {
 
-      $cur_row_ids[$_] = defined $cur_row->[$_] ? $cur_row->[$_] : "\xFF\xFFN\xFFU\xFFL\xFFL\xFF\xFF"
+      $cur_row_ids[$_] = defined $cur_row->[$_] ? $cur_row->[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
         for (0, 1, 5, 6, 8);
 
       $is_new_res = ! $collapse_idx[1]{$cur_row_ids[1]} and (
@@ -288,6 +288,124 @@ is_same_src (
   'Multiple has_many on multiple branches torture test',
 );
 
+$infmap = [
+  'single_track.trackid',                   # (0) definitive link to root from 1:1:1:1:M:M chain
+  'year',                                   # (1) non-unique
+  'tracks.cd',                              # (2) \ together both uniqueness for second multirel
+  'tracks.title',                           # (3) / and definitive link back to root
+  'single_track.cd.artist.cds.cdid',        # (4) to give uniquiness to ...tracks.title below
+  'single_track.cd.artist.cds.year',        # (5) non-unique
+  'single_track.cd.artist.artistid',        # (6) uniqufies entire parental chain
+  'single_track.cd.artist.cds.genreid',     # (7) nullable
+  'single_track.cd.artist.cds.tracks.title',# (8) unique when combined with ...cds.cdid above
+];
+
+is_deeply (
+  $schema->source('CD')->_resolve_collapse({ as => {map { $infmap->[$_] => $_ } 0 .. $#$infmap} }),
+  {
+    -idcols_current_node => [],
+    -idcols_extra_from_children => [ 0, 2, 3, 4, 8 ],
+    -node_index => 1,
+    -root_node_idcol_variants => [
+      [ 0 ], [ 2 ],
+    ],
+    single_track => {
+      -idcols_current_node => [ 0 ],
+      -idcols_extra_from_children => [ 4, 8 ],
+      -is_optional => 1,
+      -is_single => 1,
+      -node_index => 2,
+      cd => {
+        -idcols_current_node => [ 0 ],
+        -idcols_extra_from_children => [ 4, 8 ],
+        -is_single => 1,
+        -node_index => 3,
+        artist => {
+          -idcols_current_node => [ 0 ],
+          -idcols_extra_from_children => [ 4, 8 ],
+          -is_single => 1,
+          -node_index => 4,
+          cds => {
+            -idcols_current_node => [ 0, 4 ],
+            -idcols_extra_from_children => [ 8 ],
+            -is_optional => 1,
+            -node_index => 5,
+            tracks => {
+              -idcols_current_node => [ 0, 4, 8 ],
+              -is_optional => 1,
+              -node_index => 6,
+            }
+          }
+        }
+      }
+    },
+    tracks => {
+      -idcols_current_node => [ 2, 3 ],
+      -is_optional => 1,
+      -node_index => 7,
+    }
+  },
+  'Correct underdefined root collapse map constructed'
+);
+
+is_same_src (
+  $schema->source ('CD')->_mk_row_parser({
+    inflate_map => $infmap,
+    collapse => 1,
+  }),
+  ' my($rows_pos, $result_pos, $cur_row, @cur_row_ids, @collapse_idx, $is_new_res) = (0, 0);
+
+    while ($cur_row = (
+      ( $rows_pos >= 0 and $_[0][$rows_pos++] ) or do { $rows_pos = -1; undef } )
+        ||
+      ( $_[1] and $_[1]->() )
+    ) {
+
+      $cur_row_ids[$_] = defined $$cur_row[$_] ? $$cur_row[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
+        for (0, 2, 3, 4, 8);
+
+      # cache expensive set of ops in a non-existent rowid slot
+      $cur_row_ids[9] = (
+        ( ( defined $cur_row->[0] ) && (join "\xFF", q{}, $cur_row->[0], q{} ))
+          or
+        ( ( defined $cur_row->[2] ) && (join "\xFF", q{}, $cur_row->[2], q{} ))
+          or
+        "\0$rows_pos\0"
+      );
+
+      $is_new_res = ! $collapse_idx[1]{$cur_row_ids[9]} and (
+        $_[1] and $result_pos and (unshift @{$_[2]}, $cur_row) and last
+      );
+
+      $collapse_idx[1]{$cur_row_ids[9]} ||= [{ year => $$cur_row[1] }];
+
+      $collapse_idx[1]{$cur_row_ids[9]}[1]{single_track} ||= ($collapse_idx[2]{$cur_row_ids[0]} ||= [{ trackid => $$cur_row[0] }]);
+
+      $collapse_idx[2]{$cur_row_ids[0]}[1]{cd} ||= $collapse_idx[3]{$cur_row_ids[0]};
+
+      $collapse_idx[3]{$cur_row_ids[0]}[1]{artist} ||= ($collapse_idx[4]{$cur_row_ids[0]} ||= [{ artistid => $$cur_row[6] }]);
+
+      push @{$collapse_idx[4]{$cur_row_ids[0]}[1]{cds}},
+          $collapse_idx[5]{$cur_row_ids[0]}{$cur_row_ids[4]} ||= [{ cdid => $$cur_row[4], genreid => $$cur_row[7], year => $$cur_row[5] }]
+        unless $collapse_idx[5]{$cur_row_ids[0]}{$cur_row_ids[4]};
+
+      push @{$collapse_idx[5]{$cur_row_ids[0]}{$cur_row_ids[4]}[1]{tracks}},
+          $collapse_idx[6]{$cur_row_ids[0]}{$cur_row_ids[4]}{$cur_row_ids[8]} ||= [{ title => $$cur_row[8] }]
+        unless $collapse_idx[6]{$cur_row_ids[0]}{$cur_row_ids[4]}{$cur_row_ids[8]};
+
+      push @{$collapse_idx[1]{$cur_row_ids[9]}[1]{tracks}},
+          $collapse_idx[7]{$cur_row_ids[2]}{$cur_row_ids[3]} ||= [{ cd => $$cur_row[2], title => $$cur_row[3] }]
+        unless $collapse_idx[7]{$cur_row_ids[2]}{$cur_row_ids[3]};
+
+      $_[0][$result_pos++] = $collapse_idx[1]{$cur_row_ids[9]}
+        if $is_new_res;
+    }
+
+    splice @{$_[0]}, $result_pos;
+  ',
+  'Multiple has_many on multiple branches with underdefined root torture test',
+);
+
 done_testing;
 
 my $deparser;