Simplify collapse map contents, shuffle things around, rename most vars
Peter Rabbitson [Sat, 9 Feb 2013 11:18:15 +0000 (12:18 +0100)]
There are almost no functional changes, just better organize what goes where

lib/DBIx/Class/ResultSource/RowParser.pm
lib/DBIx/Class/ResultSource/RowParser/Util.pm [new file with mode: 0644]
t/55namespaces_cleaned.t
t/resultset/rowparser_internals.t

index 56db86f..c0e0d97 100644 (file)
@@ -4,13 +4,18 @@ package # hide from the pauses
 use strict;
 use warnings;
 
+use base 'DBIx::Class';
+
 use Try::Tiny;
 use List::Util qw(first max);
 use B 'perlstring';
 
-use namespace::clean;
+use DBIx::Class::ResultSource::RowParser::Util qw(
+  assemble_simple_parser
+  assemble_collapsing_parser
+);
 
-use base 'DBIx::Class';
+use namespace::clean;
 
 # Accepts one or more relationships for the current source and returns an
 # array of column names for each of those relationships. Column names are
@@ -63,6 +68,68 @@ sub _resolve_prefetch {
   }
 }
 
+# Takes an arrayref of {as} dbic column aliases and the collapse and select
+# attributes from the same $rs (the selector requirement is a temporary
+# workaround... I hope), and returns a coderef capable of:
+# my $me_pref_clps = $coderef->([$rs->cursor->next/all])
+# Where the $me_pref_clps arrayref is the future argument to inflate_result()
+#
+# For an example of this coderef in action (and to see its guts) look at
+# t/resultset/rowparser_internals.t
+#
+# This is a huge performance win, as we call the same code for every row
+# returned from the db, thus avoiding repeated method lookups when traversing
+# relationships
+#
+# Also since the coderef is completely stateless (the returned structure is
+# always fresh on every new invocation) this is a very good opportunity for
+# memoization if further speed improvements are needed
+#
+# The way we construct this coderef is somewhat fugly, although the result is
+# really worth it. The final coderef does not perform any kind of recursion -
+# the entire nested structure constructor is rolled out into a single scope.
+#
+# In any case - the output of this thing is meticulously micro-tested, so
+# any sort of adjustment/rewrite should be relatively easy (fsvo relatively)
+#
+sub _mk_row_parser {
+  my ($self, $args) = @_;
+
+  my $val_index = { map
+    { $args->{inflate_map}[$_] => $_ }
+    ( 0 .. $#{$args->{inflate_map}} )
+  };
+
+  if (! $args->{collapse} ) {
+    return assemble_simple_parser({
+      val_index => $val_index,
+    });
+  }
+  else {
+    my $collapse_map = $self->_resolve_collapse ({
+      premultiplied => $args->{premultiplied},
+      # FIXME
+      # only consider real columns (not functions) during collapse resolution
+      # this check shouldn't really be here, as fucktards are not supposed to
+      # alias random crap to existing column names anyway, but still - just in
+      # case
+      # FIXME !!!! - this does not yet deal with unbalanced selectors correctly
+      # (it is now trivial as the attrs specify where things go out of sync
+      # needs MOAR tests)
+      as => { map
+        { ref $args->{selection}[$val_index->{$_}] ? () : ( $_ => $val_index->{$_} ) }
+        keys %$val_index
+      }
+    });
+
+    return assemble_collapsing_parser({
+      val_index => $val_index,
+      collapse_map => $collapse_map,
+    });
+  }
+}
+
+
 # Takes an arrayref selection list and generates a collapse-map representing
 # row-object fold-points. Every relationship is assigned a set of unique,
 # non-nullable columns (which may *not even be* from the same resultset)
@@ -75,10 +142,10 @@ sub _resolve_collapse {
   # for comprehensible error messages put ourselves at the head of the relationship chain
   $args->{_rel_chain} ||= [ $self->source_name ];
 
-  # record top-level fully-qualified column index, start nodecount
-  $common_args ||= {
-    _as_fq_idx => { %{$args->{as}} },
-    _node_idx => 1, # this is *deliberately* not 0-based
+  # record top-level fully-qualified column index, signify toplevelness
+  unless ($common_args->{_as_fq_idx}) {
+    $common_args->{_as_fq_idx} = { %{$args->{as}} };
+    $args->{_is_top_level} = 1;
   };
 
   my ($my_cols, $rel_cols);
@@ -156,15 +223,15 @@ sub _resolve_collapse {
 
   # first try to reuse the parent's collapser (i.e. reuse collapser over 1:1)
   # (makes for a leaner coderef later)
-  unless ($collapse_map->{-idcols_current_node}) {
-    $collapse_map->{-idcols_current_node} = $args->{_parent_info}{collapse_on_idcols}
+  unless ($collapse_map->{-identifying_columns}) {
+    $collapse_map->{-identifying_columns} = $args->{_parent_info}{collapse_on_idcols}
       if $args->{_parent_info}{collapser_reusable};
   }
 
 
   # Still dont know how to collapse - try to resolve based on our columns (plus already inserted FK bridges)
   if (
-    ! $collapse_map->{-idcols_current_node}
+    ! $collapse_map->{-identifying_columns}
       and
     $my_cols
       and
@@ -174,7 +241,7 @@ sub _resolve_collapse {
     # and fix stuff up if this is the case
     my @reduced_set = grep { ! $assumed_from_parent->{columns}{$_} } @$idset;
 
-    $collapse_map->{-idcols_current_node} = [ __unique_numlist(
+    $collapse_map->{-identifying_columns} = [ __unique_numlist(
       @{ $args->{_parent_info}{collapse_on_idcols}||[] },
 
       (map
@@ -194,7 +261,7 @@ sub _resolve_collapse {
   # Stil don't know how to collapse - keep descending down 1:1 chains - if
   # a related non-LEFT 1:1 is resolvable - its condition will collapse us
   # too
-  unless ($collapse_map->{-idcols_current_node}) {
+  unless ($collapse_map->{-identifying_columns}) {
     my @candidates;
 
     for my $rel (keys %$relinfo) {
@@ -205,7 +272,7 @@ sub _resolve_collapse {
         _rel_chain => [ @{$args->{_rel_chain}}, $rel ],
         _parent_info => { underdefined => 1 },
       }, $common_args)) {
-        push @candidates, $rel_collapse->{-idcols_current_node};
+        push @candidates, $rel_collapse->{-identifying_columns};
       }
     }
 
@@ -213,7 +280,7 @@ sub _resolve_collapse {
     # FIXME - maybe need to implement a data type order as well (i.e. prefer several ints
     # to a single varchar)
     if (@candidates) {
-      ($collapse_map->{-idcols_current_node}) = sort { scalar @$a <=> scalar @$b } (@candidates);
+      ($collapse_map->{-identifying_columns}) = sort { scalar @$a <=> scalar @$b } (@candidates);
     }
   }
 
@@ -225,11 +292,11 @@ sub _resolve_collapse {
   # case we are good - either one of them will define us, or if all are NULLs
   # we know we are "unique" due to the "non-premultiplied" check
   if (
-    ! $collapse_map->{-idcols_current_node}
+    ! $collapse_map->{-identifying_columns}
       and
     ! $args->{premultiplied}
       and
-    $common_args->{_node_idx} == 1
+    $args->{_is_top_level}
   ) {
     my (@collapse_sets, $uncollapsible_chain);
 
@@ -246,7 +313,7 @@ sub _resolve_collapse {
 
         # for singles use the idcols wholesale (either there or not)
         if ($relinfo->{$rel}{is_single}) {
-          push @collapse_sets, $clps->{-idcols_current_node};
+          push @collapse_sets, $clps->{-identifying_columns};
         }
         elsif (! $relinfo->{$rel}{fk_map}) {
           $uncollapsible_chain = 1;
@@ -286,8 +353,8 @@ sub _resolve_collapse {
       # in addition we record the individual collapse posibilities
       # of all left children node collapsers, and merge them in the rowparser
       # coderef later
-      $collapse_map->{-idcols_current_node} = [];
-      $collapse_map->{-root_node_idcol_variants} = [ sort {
+      $collapse_map->{-identifying_columns} = [];
+      $collapse_map->{-identifying_columns_variants} = [ sort {
         (scalar @$a) <=> (scalar @$b) or max(@$a) <=> max(@$b)
       } @collapse_sets ];
     }
@@ -297,10 +364,10 @@ sub _resolve_collapse {
   # and don't despair if nothing was found (there may be other parallel branches
   # to dive into)
   if ($args->{_parent_info}{underdefined}) {
-    return $collapse_map->{-idcols_current_node} ? $collapse_map : undef
+    return $collapse_map->{-identifying_columns} ? $collapse_map : undef
   }
   # nothing down the chain resolved - can't calculate a collapse-map
-  elsif (! $collapse_map->{-idcols_current_node}) {
+  elsif (! $collapse_map->{-identifying_columns}) {
     $self->throw_exception ( sprintf
       "Unable to calculate a definitive collapse column set for %s%s: fetch more unique non-nullable columns",
       $self->source_name,
@@ -315,7 +382,6 @@ sub _resolve_collapse {
   # a second time, and fill in the rest
 
   $collapse_map->{-is_optional} = 1 if $args->{_parent_info}{is_optional};
-  $collapse_map->{-node_index} = $common_args->{_node_idx}++;
 
 
   my @id_sets;
@@ -326,7 +392,7 @@ sub _resolve_collapse {
       _rel_chain => [ @{$args->{_rel_chain}}, $rel],
       _parent_info => {
         # shallow copy
-        collapse_on_idcols => [ @{$collapse_map->{-idcols_current_node}} ],
+        collapse_on_idcols => [ @{$collapse_map->{-identifying_columns}} ],
 
         rel_condition => $relinfo->{$rel}{fk_map},
 
@@ -334,354 +400,20 @@ sub _resolve_collapse {
 
         # if this is a 1:1 our own collapser can be used as a collapse-map
         # (regardless of left or not)
-        collapser_reusable => @{$collapse_map->{-idcols_current_node}} && $relinfo->{$rel}{is_single},
+        collapser_reusable => @{$collapse_map->{-identifying_columns}} && $relinfo->{$rel}{is_single},
       },
     }, $common_args );
 
     $collapse_map->{$rel}{-is_single} = 1 if $relinfo->{$rel}{is_single};
     $collapse_map->{$rel}{-is_optional} ||= 1 unless $relinfo->{$rel}{is_inner};
-    push @id_sets, ( map { @$_ } (
-      $collapse_map->{$rel}{-idcols_current_node},
-      $collapse_map->{$rel}{-idcols_extra_from_children} || (),
-    ));
-  }
-
-  if (@id_sets) {
-    my $cur_nodeid_hash = { map { $_ => 1 } @{$collapse_map->{-idcols_current_node}} };
-    $collapse_map->{-idcols_extra_from_children} = [ grep
-      { ! $cur_nodeid_hash->{$_} }
-      __unique_numlist( @id_sets )
-    ];
   }
 
   return $collapse_map;
 }
 
-# Takes an arrayref of {as} dbic column aliases and the collapse and select
-# attributes from the same $rs (the selector requirement is a temporary
-# workaround... I hope), and returns a coderef capable of:
-# my $me_pref_clps = $coderef->([$rs->cursor->next/all])
-# Where the $me_pref_clps arrayref is the future argument to inflate_result()
-#
-# For an example of this coderef in action (and to see its guts) look at
-# t/resultset/rowparser_internals.t
-#
-# This is a huge performance win, as we call the same code for every row
-# returned from the db, thus avoiding repeated method lookups when traversing
-# relationships
-#
-# Also since the coderef is completely stateless (the returned structure is
-# always fresh on every new invocation) this is a very good opportunity for
-# memoization if further speed improvements are needed
-#
-# The way we construct this coderef is somewhat fugly, although the result is
-# really worth it. The final coderef does not perform any kind of recursion -
-# the entire nested structure constructor is rolled out into a single scope.
-#
-# In any case - the output of this thing is meticulously micro-tested, so
-# any sort of adjustment/rewrite should be relatively easy (fsvo relatively)
-#
-sub _mk_row_parser {
-  my ($self, $args) = @_;
-
-  my $inflate_index = { map
-    { $args->{inflate_map}[$_] => $_ }
-    ( 0 .. $#{$args->{inflate_map}} )
-  };
-
-  my $parser_src;
-
-  # the non-collapsing assembler is easy
-  # FIXME SUBOPTIMAL there could be a yet faster way to do things here, but
-  # need to try an actual implementation and benchmark it:
-  #
-  # <timbunce_> First setup the nested data structure you want for each row
-  #   Then call bind_col() to alias the row fields into the right place in
-  #   the data structure, then to fetch the data do:
-  # push @rows, dclone($row_data_struct) while ($sth->fetchrow);
-  #
-  if (!$args->{collapse}) {
-    $parser_src = sprintf('$_ = %s for @{$_[0]}', __visit_infmap_simple(
-      $inflate_index,
-    ));
-
-    # change the quoted placeholders to unquoted alias-references
-    $parser_src =~ s/ \' \xFF__VALPOS__(\d+)__\xFF \' /"\$_->[$1]"/gex;
-  }
-
-  # the collapsing parser is more complicated - it needs to keep a lot of state
-  #
-  else {
-    my $collapse_map = $self->_resolve_collapse ({
-      premultiplied => $args->{premultiplied},
-      # FIXME
-      # only consider real columns (not functions) during collapse resolution
-      # this check shouldn't really be here, as fucktards are not supposed to
-      # alias random crap to existing column names anyway, but still - just in
-      # case
-      # FIXME !!!! - this does not yet deal with unbalanced selectors correctly
-      # (it is now trivial as the attrs specify where things go out of sync
-      # needs MOAR tests)
-      as => { map
-        { ref $args->{selection}[$inflate_index->{$_}] ? () : ( $_ => $inflate_index->{$_} ) }
-        keys %$inflate_index
-      }
-    });
-
-    my @all_idcols = sort { $a <=> $b } map { @$_ } (
-      $collapse_map->{-idcols_current_node},
-      $collapse_map->{-idcols_extra_from_children} || (),
-    );
-
-    my ($top_node_id_path, $top_node_id_cacher, @path_variants);
-    if (scalar @{$collapse_map->{-idcols_current_node}}) {
-      $top_node_id_path = join ('', map
-        { "{'\xFF__IDVALPOS__${_}__\xFF'}" }
-        @{$collapse_map->{-idcols_current_node}}
-      );
-    }
-    elsif( my @variants = @{$collapse_map->{-root_node_idcol_variants}} ) {
-      my @path_parts;
-
-      for (@variants) {
-
-        push @path_variants, sprintf "(join qq(\xFF), '', %s, '')",
-          ( join ', ', map { "'\xFF__VALPOS__${_}__\xFF'" } @$_ )
-        ;
-
-        push @path_parts, sprintf "( %s && %s)",
-          ( join ' && ', map { "( defined '\xFF__VALPOS__${_}__\xFF' )" } @$_ ),
-          $path_variants[-1];
-        ;
-      }
-
-      $top_node_id_cacher = sprintf '$cur_row_ids[%d] = (%s);',
-        $all_idcols[-1] + 1,
-        "\n" . join( "\n  or\n", @path_parts, qq{"\0\$rows_pos\0"} );
-      $top_node_id_path = sprintf '{$cur_row_ids[%d]}', $all_idcols[-1] + 1;
-    }
-    else {
-      $self->throw_exception('Unexpected collapse map contents');
-    }
-
-    my $rel_assemblers = __visit_infmap_collapse (
-      $inflate_index, { %$collapse_map, -custom_node_id => $top_node_id_path },
-    );
-
-    $parser_src = sprintf (<<'EOS', join(', ', @all_idcols), $top_node_id_path, $top_node_id_cacher||'', $rel_assemblers);
-### BEGIN LITERAL STRING EVAL
-  my ($rows_pos, $result_pos, $cur_row, @cur_row_ids, @collapse_idx, $is_new_res) = (0,0);
-
-  # this loop is a bit arcane - the rationale is that the passed in
-  # $_[0] will either have only one row (->next) or will have all
-  # rows already pulled in (->all and/or unordered). Given that the
-  # result can be rather large - we reuse the same already allocated
-  # array, since the collapsed prefetch is smaller by definition.
-  # At the end we cut the leftovers away and move on.
-  while ($cur_row =
-    ( ( $rows_pos >= 0 and $_[0][$rows_pos++] ) or do { $rows_pos = -1; undef } )
-      ||
-    ($_[1] and $_[1]->())
-  ) {
-
-    # due to left joins some of the ids may be NULL/undef, and
-    # won't play well when used as hash lookups
-    # we also need to differentiate NULLs on per-row/per-col basis
-    #(otherwise folding of optional 1:1s will be greatly confused
-    $cur_row_ids[$_] = defined $cur_row->[$_] ? $cur_row->[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
-      for (%1$s);
-
-    # maybe(!) cache the top node id calculation
-    %3$s
-
-    $is_new_res = ! $collapse_idx[1]%2$s and (
-      $_[1] and $result_pos and (unshift @{$_[2]}, $cur_row) and last
-    );
-
-    %4$s
-
-    $_[0][$result_pos++] = $collapse_idx[1]%2$s
-      if $is_new_res;
-  }
-
-  splice @{$_[0]}, $result_pos; # truncate the passed in array for cases of collapsing ->all()
-### END LITERAL STRING EVAL
-EOS
-
-    # !!! note - different var than the one above
-    # change the quoted placeholders to unquoted alias-references
-    $parser_src =~ s/ \' \xFF__VALPOS__(\d+)__\xFF \' /"\$cur_row->[$1]"/gex;
-    $parser_src =~ s/ \' \xFF__IDVALPOS__(\d+)__\xFF \' /"\$cur_row_ids[$1]"/gex;
-  }
-
-  $parser_src;
-}
-
-# the simple non-collapsing nested structure recursor
-sub __visit_infmap_simple {
-  my ($val_idx, $args) = @_;
-
-  my $my_cols = {};
-  my $rel_cols;
-  for (keys %$val_idx) {
-    if ($_ =~ /^ ([^\.]+) \. (.+) /x) {
-      $rel_cols->{$1}{$2} = $val_idx->{$_};
-    }
-    else {
-      $my_cols->{$_} = $val_idx->{$_};
-    }
-  }
-  my @relperl;
-  for my $rel (sort keys %$rel_cols) {
-
-    # DISABLEPRUNE
-    #my $optional = $args->{is_optional};
-    #$optional ||= ($args->{rsrc}->relationship_info($rel)->{attrs}{join_type} || '') =~ /^left/i;
-
-    push @relperl, join ' => ', perlstring($rel), __visit_infmap_simple($rel_cols->{$rel}, {
-      # DISABLEPRUNE
-      #non_top => 1,
-      #is_optional => $optional,
-    });
-
-    # FIXME SUBOPTIMAL DISABLEPRUNE - disabled to satisfy t/resultset/inflate_result_api.t
-    #if ($optional and my @branch_null_checks = map
-    #  { "(! defined '\xFF__VALPOS__${_}__\xFF')" }
-    #  sort { $a <=> $b } values %{$rel_cols->{$rel}}
-    #) {
-    #  $relperl[-1] = sprintf ( '(%s) ? ( %s => [] ) : ( %s )',
-    #    join (' && ', @branch_null_checks ),
-    #    perlstring($rel),
-    #    $relperl[-1],
-    #  );
-    #}
-  }
-
-  my $me_struct = keys %$my_cols
-    ? __visit_dump({ map { $_ => "\xFF__VALPOS__$my_cols->{$_}__\xFF" } (keys %$my_cols) })
-    : 'undef'
-  ;
-
-  return sprintf '[%s]', join (',',
-    $me_struct,
-    @relperl ? sprintf ('{ %s }', join (',', @relperl)) : (),
-  );
-}
-
-# the collapsing nested structure recursor
-sub __visit_infmap_collapse {
-
-  my ($val_idx, $collapse_map, $parent_info) = @_;
-
-  my $my_cols = {};
-  my $rel_cols;
-  for (keys %$val_idx) {
-    if ($_ =~ /^ ([^\.]+) \. (.+) /x) {
-      $rel_cols->{$1}{$2} = $val_idx->{$_};
-    }
-    else {
-      $my_cols->{$_} = $val_idx->{$_};
-    }
-  }
-
-  my $sequenced_node_id = $collapse_map->{-custom_node_id} || join ('', map
-    { "{'\xFF__IDVALPOS__${_}__\xFF'}" }
-    @{$collapse_map->{-idcols_current_node}}
-  );
-
-  my $me_struct = keys %$my_cols
-    ? __visit_dump([{ map { $_ => "\xFF__VALPOS__$my_cols->{$_}__\xFF" } (keys %$my_cols) }])
-    : undef
-  ;
-  my $node_idx_ref = sprintf '$collapse_idx[%d]%s', $collapse_map->{-node_index}, $sequenced_node_id;
-
-  my $parent_idx_ref = sprintf( '$collapse_idx[%d]%s[1]{%s}',
-    @{$parent_info}{qw/node_idx sequenced_node_id/},
-    perlstring($parent_info->{relname}),
-  ) if $parent_info;
-
-  my @src;
-  if ($collapse_map->{-node_index} == 1) {
-    push @src, sprintf( '%s ||= %s;',
-      $node_idx_ref,
-      $me_struct,
-    ) if $me_struct;
-  }
-  elsif ($collapse_map->{-is_single}) {
-    push @src, sprintf ( '%s ||= %s%s;',
-      $parent_idx_ref,
-      $node_idx_ref,
-      $me_struct ? " ||= $me_struct" : '',
-    );
-  }
-  else {
-    push @src, sprintf('push @{%s}, %s%s unless %s;',
-      $parent_idx_ref,
-      $node_idx_ref,
-      $me_struct ? " ||= $me_struct" : '',
-      $node_idx_ref,
-    );
-  }
-
-  # DISABLEPRUNE
-  #my $known_defined = { %{ $parent_info->{known_defined} || {} } };
-  #$known_defined->{$_}++ for @{$collapse_map->{-idcols_current_node}};
-  for my $rel (sort keys %$rel_cols) {
-
-#    push @src, sprintf(
-#      '%s[1]{%s} ||= [];', $node_idx_ref, perlstring($rel)
-#    ) unless $collapse_map->{$rel}{-is_single};
-
-    push @src,  __visit_infmap_collapse($rel_cols->{$rel}, $collapse_map->{$rel}, {
-      node_idx => $collapse_map->{-node_index},
-      sequenced_node_id => $sequenced_node_id,
-      relname => $rel,
-      # DISABLEPRUNE
-      #known_defined => $known_defined,
-    });
-
-    # FIXME SUBOPTIMAL DISABLEPRUNE - disabled to satisfy t/resultset/inflate_result_api.t
-    #if ($collapse_map->{$rel}{-is_optional} and my @null_checks = map
-    #  { "(! defined '\xFF__IDVALPOS__${_}__\xFF')" }
-    #  sort { $a <=> $b } grep
-    #    { ! $known_defined->{$_} }
-    #    @{$collapse_map->{$rel}{-idcols_current_node}}
-    #) {
-    #  $src[-1] = sprintf( '(%s) or %s',
-    #    join (' || ', @null_checks ),
-    #    $src[-1],
-    #  );
-    #}
-  }
-
-  join "\n", @src;
-}
-
 # adding a dep on MoreUtils *just* for this is retarded
 sub __unique_numlist {
   sort { $a <=> $b } keys %{ {map { $_ => 1 } @_ }}
 }
 
-# keep our own DD object around so we don't have to fitz with quoting
-my $dumper_obj;
-sub __visit_dump {
-  # we actually will be producing functional perl code here,
-  # thus no second-guessing of what these globals might have
-  # been set to. DO NOT CHANGE!
-  ($dumper_obj ||= do {
-    require Data::Dumper;
-    Data::Dumper->new([])
-      ->Useperl (0)
-      ->Purity (1)
-      ->Pad ('')
-      ->Useqq (0)
-      ->Terse (1)
-      ->Quotekeys (1)
-      ->Deepcopy (0)
-      ->Deparse (0)
-      ->Maxdepth (0)
-      ->Indent (0)  # faster but harder to read, perhaps leave at 1 ?
-  })->Values ([$_[0]])->Dump;
-}
-
 1;
diff --git a/lib/DBIx/Class/ResultSource/RowParser/Util.pm b/lib/DBIx/Class/ResultSource/RowParser/Util.pm
new file mode 100644 (file)
index 0000000..a4e2eb5
--- /dev/null
@@ -0,0 +1,299 @@
+package # hide from the pauses
+  DBIx::Class::ResultSource::RowParser::Util;
+
+use strict;
+use warnings;
+
+use B 'perlstring';
+
+use base 'Exporter';
+our @EXPORT_OK = qw(
+  assemble_simple_parser
+  assemble_collapsing_parser
+);
+
+sub assemble_simple_parser {
+  #my ($args) = @_;
+
+  # the non-collapsing assembler is easy
+  # FIXME SUBOPTIMAL there could be a yet faster way to do things here, but
+  # need to try an actual implementation and benchmark it:
+  #
+  # <timbunce_> First setup the nested data structure you want for each row
+  #   Then call bind_col() to alias the row fields into the right place in
+  #   the data structure, then to fetch the data do:
+  # push @rows, dclone($row_data_struct) while ($sth->fetchrow);
+  #
+  my $parser_src = sprintf('$_ = %s for @{$_[0]}', __visit_infmap_simple($_[0]) );
+
+  # change the quoted placeholders to unquoted alias-references
+  $parser_src =~ s/ \' \xFF__VALPOS__(\d+)__\xFF \' /"\$_->[$1]"/gex;
+
+  return $parser_src;
+}
+
+# the simple non-collapsing nested structure recursor
+sub __visit_infmap_simple {
+  my $args = shift;
+
+  my $my_cols = {};
+  my $rel_cols;
+  for (keys %{$args->{val_index}}) {
+    if ($_ =~ /^ ([^\.]+) \. (.+) /x) {
+      $rel_cols->{$1}{$2} = $args->{val_index}{$_};
+    }
+    else {
+      $my_cols->{$_} = $args->{val_index}{$_};
+    }
+  }
+
+  my @relperl;
+  for my $rel (sort keys %$rel_cols) {
+
+    # DISABLEPRUNE
+    #my $optional = $args->{is_optional};
+    #$optional ||= ($args->{rsrc}->relationship_info($rel)->{attrs}{join_type} || '') =~ /^left/i;
+
+    push @relperl, join ' => ', perlstring($rel), __visit_infmap_simple({ %$args,
+      val_index => $rel_cols->{$rel},
+      # DISABLEPRUNE
+      #non_top => 1,
+      #is_optional => $optional,
+    });
+
+    # FIXME SUBOPTIMAL DISABLEPRUNE - disabled to satisfy t/resultset/inflate_result_api.t
+    #if ($optional and my @branch_null_checks = map
+    #  { "(! defined '\xFF__VALPOS__${_}__\xFF')" }
+    #  sort { $a <=> $b } values %{$rel_cols->{$rel}}
+    #) {
+    #  $relperl[-1] = sprintf ( '(%s) ? ( %s => [] ) : ( %s )',
+    #    join (' && ', @branch_null_checks ),
+    #    perlstring($rel),
+    #    $relperl[-1],
+    #  );
+    #}
+  }
+
+  my $me_struct = keys %$my_cols
+    ? __visit_dump({ map { $_ => "\xFF__VALPOS__$my_cols->{$_}__\xFF" } (keys %$my_cols) })
+    : 'undef'
+  ;
+
+  return sprintf '[%s]', join (',',
+    $me_struct,
+    @relperl ? sprintf ('{ %s }', join (',', @relperl)) : (),
+  );
+}
+
+sub assemble_collapsing_parser {
+  my $args = shift;
+
+  my ($top_node_key, $top_node_key_assembler);
+
+  if (scalar @{$args->{collapse_map}{-identifying_columns}}) {
+    $top_node_key = join ('', map
+      { "{'\xFF__IDVALPOS__${_}__\xFF'}" }
+      @{$args->{collapse_map}{-identifying_columns}}
+    );
+  }
+  elsif( my @variants = @{$args->{collapse_map}{-identifying_columns_variants}} ) {
+
+    my @path_parts = map { sprintf
+      "( ( defined '\xFF__VALPOS__%d__\xFF' ) && (join qq(\xFF), '', %s, '') )",
+      $_->[0],  # checking just first is enough - one defined, all defined
+      ( join ', ', map { "'\xFF__VALPOS__${_}__\xFF'" } @$_ ),
+    } @variants;
+
+    my $virtual_column_idx = (scalar keys %{$args->{val_index}} ) + 1;
+
+    $top_node_key_assembler = sprintf '$cur_row_ids{%d} = (%s);',
+      $virtual_column_idx,
+      "\n" . join( "\n  or\n", @path_parts, qq{"\0\$rows_pos\0"} );
+
+    $top_node_key = sprintf '{$cur_row_ids{%d}}', $virtual_column_idx;
+
+    $args->{collapse_map} = {
+      %{$args->{collapse_map}},
+      -custom_node_key => $top_node_key,
+    };
+
+  }
+  else {
+    die('Unexpected collapse map contents');
+  }
+
+  my ($data_assemblers, $stats) = __visit_infmap_collapse ($args);
+
+  my $list_of_idcols = join(', ', sort { $a <=> $b } keys %{ $stats->{idcols_seen} } );
+
+  my $parser_src = sprintf (<<'EOS', $list_of_idcols, $top_node_key, $top_node_key_assembler||'', $data_assemblers);
+### BEGIN LITERAL STRING EVAL
+  my ($rows_pos, $result_pos, $cur_row_data, %%cur_row_ids, @collapse_idx, $is_new_res) = (0,0);
+
+  # this loop is a bit arcane - the rationale is that the passed in
+  # $_[0] will either have only one row (->next) or will have all
+  # rows already pulled in (->all and/or unordered). Given that the
+  # result can be rather large - we reuse the same already allocated
+  # array, since the collapsed prefetch is smaller by definition.
+  # At the end we cut the leftovers away and move on.
+  while ($cur_row_data =
+    ( ( $rows_pos >= 0 and $_[0][$rows_pos++] ) or do { $rows_pos = -1; undef } )
+      ||
+    ($_[1] and $_[1]->())
+  ) {
+
+    # due to left joins some of the ids may be NULL/undef, and
+    # won't play well when used as hash lookups
+    # we also need to differentiate NULLs on per-row/per-col basis
+    #(otherwise folding of optional 1:1s will be greatly confused
+    $cur_row_ids{$_} = defined $cur_row_data->[$_] ? $cur_row_data->[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
+      for (%1$s);
+
+    # maybe(!) cache the top node id calculation
+    %3$s
+
+    $is_new_res = ! $collapse_idx[0]%2$s and (
+      $_[1] and $result_pos and (unshift @{$_[2]}, $cur_row_data) and last
+    );
+
+    # the rel assemblers
+    %4$s
+
+    $_[0][$result_pos++] = $collapse_idx[0]%2$s
+      if $is_new_res;
+  }
+
+  splice @{$_[0]}, $result_pos; # truncate the passed in array for cases of collapsing ->all()
+### END LITERAL STRING EVAL
+EOS
+
+  # !!! note - different var than the one above
+  # change the quoted placeholders to unquoted alias-references
+  $parser_src =~ s/ \' \xFF__VALPOS__(\d+)__\xFF \' /"\$cur_row_data->[$1]"/gex;
+  $parser_src =~ s/ \' \xFF__IDVALPOS__(\d+)__\xFF \' /"\$cur_row_ids{$1}"/gex;
+
+  $parser_src;
+}
+
+
+# the collapsing nested structure recursor
+sub __visit_infmap_collapse {
+  my $args = {%{ shift() }};
+
+  my $cur_node_idx = ${ $args->{-node_idx_counter} ||= \do { my $x = 0} }++;
+
+  my ($my_cols, $rel_cols);
+  for ( keys %{$args->{val_index}} ) {
+    if ($_ =~ /^ ([^\.]+) \. (.+) /x) {
+      $rel_cols->{$1}{$2} = $args->{val_index}{$_};
+    }
+    else {
+      $my_cols->{$_} = $args->{val_index}{$_};
+    }
+  }
+
+  my $node_key = $args->{collapse_map}->{-custom_node_key} || join ('', map
+    { "{'\xFF__IDVALPOS__${_}__\xFF'}" }
+    @{$args->{collapse_map}->{-identifying_columns}}
+  );
+
+  my $me_struct = $my_cols
+    ? __visit_dump([{ map { $_ => "\xFF__VALPOS__$my_cols->{$_}__\xFF" } (keys %$my_cols) }])
+    : undef
+  ;
+  my $node_idx_slot = sprintf '$collapse_idx[%d]%s', $cur_node_idx, $node_key;
+
+  my $parent_attach_slot = sprintf( '$collapse_idx[%d]%s[1]{%s}',
+    @{$args}{qw/-parent_node_idx -parent_node_key/},
+    perlstring($args->{-node_relname}),
+  ) if $args->{-node_relname};
+
+  my @src;
+  if ($cur_node_idx == 0) {
+    push @src, sprintf( '%s ||= %s;',
+      $node_idx_slot,
+      $me_struct,
+    ) if $me_struct;
+  }
+  elsif ($args->{collapse_map}->{-is_single}) {
+    push @src, sprintf ( '%s ||= %s%s;',
+      $parent_attach_slot,
+      $node_idx_slot,
+      $me_struct ? " ||= $me_struct" : '',
+    );
+  }
+  else {
+    push @src, sprintf('push @{%s}, %s%s unless %s;',
+      $parent_attach_slot,
+      $node_idx_slot,
+      $me_struct ? " ||= $me_struct" : '',
+      $node_idx_slot,
+    );
+  }
+
+  # DISABLEPRUNE
+  #my $known_defined = { %{ $parent_info->{known_defined} || {} } };
+  #$known_defined->{$_}++ for @{$args->{collapse_map}->{-identifying_columns}};
+  my $stats;
+  for my $rel (sort keys %$rel_cols) {
+
+#    push @src, sprintf(
+#      '%s[1]{%s} ||= [];', $node_idx_slot, perlstring($rel)
+#    ) unless $args->{collapse_map}->{$rel}{-is_single};
+
+    ($src[$#src + 1], $stats->{$rel}) = __visit_infmap_collapse({ %$args,
+      val_index => $rel_cols->{$rel},
+      collapse_map => $args->{collapse_map}{$rel},
+      -parent_node_idx => $cur_node_idx,
+      -parent_node_key => $node_key,
+      -node_relname => $rel,
+    });
+
+    # FIXME SUBOPTIMAL DISABLEPRUNE - disabled to satisfy t/resultset/inflate_result_api.t
+    #if ($args->{collapse_map}->{$rel}{-is_optional} and my @null_checks = map
+    #  { "(! defined '\xFF__IDVALPOS__${_}__\xFF')" }
+    #  sort { $a <=> $b } grep
+    #    { ! $known_defined->{$_} }
+    #    @{$args->{collapse_map}->{$rel}{-identifying_columns}}
+    #) {
+    #  $src[-1] = sprintf( '(%s) or %s',
+    #    join (' || ', @null_checks ),
+    #    $src[-1],
+    #  );
+    #}
+  }
+
+  return (
+    join("\n", @src),
+    {
+      idcols_seen => {
+        ( map { %{ $_->{idcols_seen} } } values %$stats ),
+        ( map { $_ => 1 } @{$args->{collapse_map}->{-identifying_columns}} ),
+      }
+    }
+  );
+}
+
+# keep our own DD object around so we don't have to fitz with quoting
+my $dumper_obj;
+sub __visit_dump {
+  # we actually will be producing functional perl code here,
+  # thus no second-guessing of what these globals might have
+  # been set to. DO NOT CHANGE!
+  ($dumper_obj ||= do {
+    require Data::Dumper;
+    Data::Dumper->new([])
+      ->Useperl (0)
+      ->Purity (1)
+      ->Pad ('')
+      ->Useqq (0)
+      ->Terse (1)
+      ->Quotekeys (1)
+      ->Deepcopy (0)
+      ->Deparse (0)
+      ->Maxdepth (0)
+      ->Indent (0)  # faster but harder to read, perhaps leave at 1 ?
+  })->Values ([$_[0]])->Dump;
+}
+
+1;
index e87cab7..30795a7 100644 (file)
@@ -79,6 +79,9 @@ my $skip_idx = { map { $_ => 1 } (
   # this subclass is expected to inherit whatever crap comes
   # from the parent
   'DBIx::Class::ResultSet::Pager',
+
+  # a utility class, not part of the inheritance chain
+  'DBIx::Class::ResultSource::RowParser::Util',
 ) };
 
 my $has_cmop = eval { require Class::MOP };
index 80d7cf4..fec5656 100644 (file)
@@ -72,38 +72,27 @@ is_same_src (
 is_deeply (
   ($schema->source('CD')->_resolve_collapse({ as => {map { $infmap->[$_] => $_ } 0 .. $#$infmap} })),
   {
-    -node_index => 1,
-    -idcols_current_node => [ 4, 5 ],
-    -idcols_extra_from_children => [ 0, 3 ],
+    -identifying_columns => [ 4, 5 ],
 
     single_track => {
-      -node_index => 2,
-      -idcols_current_node => [ 4, 5 ],
-      -idcols_extra_from_children => [ 0, 3 ],
+      -identifying_columns => [ 4, 5 ],
       -is_optional => 1,
       -is_single => 1,
 
       cd => {
-        -node_index => 3,
-        -idcols_current_node => [ 4, 5 ],
-        -idcols_extra_from_children => [ 0, 3 ],
+        -identifying_columns => [ 4, 5 ],
         -is_single => 1,
 
         artist => {
-          -node_index => 4,
-          -idcols_current_node => [ 4, 5 ],
-          -idcols_extra_from_children => [ 0, 3 ],
+          -identifying_columns => [ 4, 5 ],
           -is_single => 1,
 
           cds => {
-            -node_index => 5,
-            -idcols_current_node => [ 3, 4, 5 ],
-            -idcols_extra_from_children => [ 0 ],
+            -identifying_columns => [ 3, 4, 5 ],
             -is_optional => 1,
 
             tracks => {
-              -node_index => 6,
-              -idcols_current_node => [ 0, 3, 4, 5 ],
+              -identifying_columns => [ 0, 3, 4, 5 ],
               -is_optional => 1,
             },
           },
@@ -119,42 +108,42 @@ is_same_src (
     inflate_map => $infmap,
     collapse => 1,
   }),
-  ' my($rows_pos, $result_pos, $cur_row, @cur_row_ids, @collapse_idx, $is_new_res) = (0, 0);
+  ' my($rows_pos, $result_pos, $cur_row_data, %cur_row_ids, @collapse_idx, $is_new_res) = (0, 0);
 
-    while ($cur_row = (
+    while ($cur_row_data = (
       ( $rows_pos >= 0 and $_[0][$rows_pos++] ) or do { $rows_pos = -1; undef } )
         ||
       ( $_[1] and $_[1]->() )
     ) {
 
-      $cur_row_ids[$_] = defined $cur_row->[$_] ? $cur_row->[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
+      $cur_row_ids{$_} = defined $cur_row_data->[$_] ? $cur_row_data->[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
         for (0, 3, 4, 5);
 
       # a present cref in $_[1] implies lazy prefetch, implies a supplied stash in $_[2]
-      $_[1] and $result_pos and unshift(@{$_[2]}, $cur_row) and last
-        if $is_new_res = ! $collapse_idx[1]{$cur_row_ids[4]}{$cur_row_ids[5]};
+      $_[1] and $result_pos and unshift(@{$_[2]}, $cur_row_data) and last
+        if ( $is_new_res = ! $collapse_idx[0]{$cur_row_ids{4}}{$cur_row_ids{5}} );
 
       # the rowdata itself for root node
-      $collapse_idx[1]{$cur_row_ids[4]}{$cur_row_ids[5]} ||= [{ artist => $cur_row->[5], title => $cur_row->[4], year => $cur_row->[2] }];
+      $collapse_idx[0]{$cur_row_ids{4}}{$cur_row_ids{5}} ||= [{ artist => $cur_row_data->[5], title => $cur_row_data->[4], year => $cur_row_data->[2] }];
 
       # prefetch data of single_track (placed in root)
-      $collapse_idx[1]{$cur_row_ids[4]}{$cur_row_ids[5]}[1]{single_track} ||= $collapse_idx[2]{$cur_row_ids[4]}{$cur_row_ids[5]};
+      $collapse_idx[0]{$cur_row_ids{4}}{$cur_row_ids{5}}[1]{single_track} ||= $collapse_idx[1]{$cur_row_ids{4}}{$cur_row_ids{5}};
 
       # prefetch data of cd (placed in single_track)
-      $collapse_idx[2]{$cur_row_ids[4]}{$cur_row_ids[5]}[1]{cd} ||= $collapse_idx[3]{$cur_row_ids[4]}{$cur_row_ids[5]};
+      $collapse_idx[1]{$cur_row_ids{4}}{$cur_row_ids{5}}[1]{cd} ||= $collapse_idx[2]{$cur_row_ids{4}}{$cur_row_ids{5}};
 
       # prefetch data of artist ( placed in single_track->cd)
-      $collapse_idx[3]{$cur_row_ids[4]}{$cur_row_ids[5]}[1]{artist} ||= $collapse_idx[4]{$cur_row_ids[4]}{$cur_row_ids[5]} ||= [{ artistid => $cur_row->[1] }];
+      $collapse_idx[2]{$cur_row_ids{4}}{$cur_row_ids{5}}[1]{artist} ||= $collapse_idx[3]{$cur_row_ids{4}}{$cur_row_ids{5}} ||= [{ artistid => $cur_row_data->[1] }];
 
       # prefetch data of cds (if available)
-      push @{$collapse_idx[4]{$cur_row_ids[4]}{$cur_row_ids[5]}[1]{cds}}, $collapse_idx[5]{$cur_row_ids[3]}{$cur_row_ids[4]}{$cur_row_ids[5]} ||= [{ cdid => $cur_row->[3] }]
-        unless $collapse_idx[5]{$cur_row_ids[3]}{$cur_row_ids[4]}{$cur_row_ids[5]};
+      push @{$collapse_idx[3]{$cur_row_ids{4}}{$cur_row_ids{5}}[1]{cds}}, $collapse_idx[4]{$cur_row_ids{3}}{$cur_row_ids{4}}{$cur_row_ids{5}} ||= [{ cdid => $cur_row_data->[3] }]
+        unless $collapse_idx[4]{$cur_row_ids{3}}{$cur_row_ids{4}}{$cur_row_ids{5}};
 
       # prefetch data of tracks (if available)
-      push @{$collapse_idx[5]{$cur_row_ids[3]}{$cur_row_ids[4]}{$cur_row_ids[5]}[1]{tracks}}, $collapse_idx[6]{$cur_row_ids[0]}{$cur_row_ids[3]}{$cur_row_ids[4]}{$cur_row_ids[5]} ||= [{ title => $cur_row->[0] }]
-        unless $collapse_idx[6]{$cur_row_ids[0]}{$cur_row_ids[3]}{$cur_row_ids[4]}{$cur_row_ids[5]};
+      push @{$collapse_idx[4]{$cur_row_ids{3}}{$cur_row_ids{4}}{$cur_row_ids{5}}[1]{tracks}}, $collapse_idx[5]{$cur_row_ids{0}}{$cur_row_ids{3}}{$cur_row_ids{4}}{$cur_row_ids{5}} ||= [{ title => $cur_row_data->[0] }]
+        unless $collapse_idx[5]{$cur_row_ids{0}}{$cur_row_ids{3}}{$cur_row_ids{4}}{$cur_row_ids{5}};
 
-      $_[0][$result_pos++] = $collapse_idx[1]{$cur_row_ids[4]}{$cur_row_ids[5]}
+      $_[0][$result_pos++] = $collapse_idx[0]{$cur_row_ids{4}}{$cur_row_ids{5}}
         if $is_new_res;
     }
     splice @{$_[0]}, $result_pos;
@@ -178,37 +167,26 @@ $infmap = [qw/
 is_deeply (
   $schema->source('CD')->_resolve_collapse({ as => {map { $infmap->[$_] => $_ } 0 .. $#$infmap} }),
   {
-    -node_index => 1,
-    -idcols_current_node => [ 1 ], # existing_single_track.cd.artist.artistid
-    -idcols_extra_from_children => [ 0, 5, 6, 8 ],
+    -identifying_columns => [ 1 ], # existing_single_track.cd.artist.artistid
 
     existing_single_track => {
-      -node_index => 2,
-      -idcols_current_node => [ 1 ], # existing_single_track.cd.artist.artistid
-      -idcols_extra_from_children => [ 6, 8 ],
+      -identifying_columns => [ 1 ], # existing_single_track.cd.artist.artistid
       -is_single => 1,
 
       cd => {
-        -node_index => 3,
-        -idcols_current_node => [ 1 ], # existing_single_track.cd.artist.artistid
-        -idcols_extra_from_children => [ 6, 8 ],
+        -identifying_columns => [ 1 ], # existing_single_track.cd.artist.artistid
         -is_single => 1,
 
         artist => {
-          -node_index => 4,
-          -idcols_current_node => [ 1 ], # existing_single_track.cd.artist.artistid
-          -idcols_extra_from_children => [ 6, 8 ],
+          -identifying_columns => [ 1 ], # existing_single_track.cd.artist.artistid
           -is_single => 1,
 
           cds => {
-            -node_index => 5,
-            -idcols_current_node => [ 1, 6 ], # existing_single_track.cd.artist.cds.cdid
-            -idcols_extra_from_children => [ 8 ],
+            -identifying_columns => [ 1, 6 ], # existing_single_track.cd.artist.cds.cdid
             -is_optional => 1,
 
             tracks => {
-              -node_index => 6,
-              -idcols_current_node => [ 1, 6, 8 ], # existing_single_track.cd.artist.cds.cdid, existing_single_track.cd.artist.cds.tracks.title
+              -identifying_columns => [ 1, 6, 8 ], # existing_single_track.cd.artist.cds.cdid, existing_single_track.cd.artist.cds.tracks.title
               -is_optional => 1,
             }
           }
@@ -216,21 +194,16 @@ is_deeply (
       }
     },
     tracks => {
-      -node_index => 7,
-      -idcols_current_node => [ 1, 5 ], # existing_single_track.cd.artist.artistid, tracks.title
-      -idcols_extra_from_children => [ 0 ],
+      -identifying_columns => [ 1, 5 ], # existing_single_track.cd.artist.artistid, tracks.title
       -is_optional => 1,
 
       lyrics => {
-        -node_index => 8,
-        -idcols_current_node => [ 1, 5 ], # existing_single_track.cd.artist.artistid, tracks.title
-        -idcols_extra_from_children => [ 0 ],
+        -identifying_columns => [ 1, 5 ], # existing_single_track.cd.artist.artistid, tracks.title
         -is_single => 1,
         -is_optional => 1,
 
         lyric_versions => {
-          -node_index => 9,
-          -idcols_current_node => [ 0, 1, 5 ], # tracks.lyrics.lyric_versions.text, existing_single_track.cd.artist.artistid, tracks.title
+          -identifying_columns => [ 0, 1, 5 ], # tracks.lyrics.lyric_versions.text, existing_single_track.cd.artist.artistid, tracks.title
           -is_optional => 1,
         },
       },
@@ -244,42 +217,42 @@ is_same_src (
     inflate_map => $infmap,
     collapse => 1,
   }),
-  ' my ($rows_pos, $result_pos, $cur_row, @cur_row_ids, @collapse_idx, $is_new_res) = (0,0);
+  ' my ($rows_pos, $result_pos, $cur_row_data, %cur_row_ids, @collapse_idx, $is_new_res) = (0,0);
 
-    while ($cur_row = (
+    while ($cur_row_data = (
       ( $rows_pos >= 0 and $_[0][$rows_pos++] ) or do { $rows_pos = -1; undef } )
         ||
       ( $_[1] and $_[1]->() )
     ) {
 
-      $cur_row_ids[$_] = defined $cur_row->[$_] ? $cur_row->[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
+      $cur_row_ids{$_} = defined $cur_row_data->[$_] ? $cur_row_data->[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
         for (0, 1, 5, 6, 8);
 
-      $is_new_res = ! $collapse_idx[1]{$cur_row_ids[1]} and (
-        $_[1] and $result_pos and (unshift @{$_[2]}, $cur_row) and last
-      );
+      # a present cref in $_[1] implies lazy prefetch, implies a supplied stash in $_[2]
+      $_[1] and $result_pos and unshift(@{$_[2]}, $cur_row_data) and last
+        if ( $is_new_res = ! $collapse_idx[0]{$cur_row_ids{1}} );
 
-      $collapse_idx[1]{$cur_row_ids[1]} ||= [{ genreid => $cur_row->[4], latest_cd => $cur_row->[7], year => $cur_row->[3] }];
+      $collapse_idx[0]{$cur_row_ids{1}} ||= [{ genreid => $cur_row_data->[4], latest_cd => $cur_row_data->[7], year => $cur_row_data->[3] }];
 
-      $collapse_idx[1]{$cur_row_ids[1]}[1]{existing_single_track} ||= $collapse_idx[2]{$cur_row_ids[1]};
-      $collapse_idx[2]{$cur_row_ids[1]}[1]{cd} ||= $collapse_idx[3]{$cur_row_ids[1]};
-      $collapse_idx[3]{$cur_row_ids[1]}[1]{artist} ||= $collapse_idx[4]{$cur_row_ids[1]} ||= [{ artistid => $cur_row->[1] }];
+      $collapse_idx[0]{$cur_row_ids{1}}[1]{existing_single_track} ||= $collapse_idx[1]{$cur_row_ids{1}};
+      $collapse_idx[1]{$cur_row_ids{1}}[1]{cd} ||= $collapse_idx[2]{$cur_row_ids{1}};
+      $collapse_idx[2]{$cur_row_ids{1}}[1]{artist} ||= $collapse_idx[3]{$cur_row_ids{1}} ||= [{ artistid => $cur_row_data->[1] }];
 
-      push @{ $collapse_idx[4]{$cur_row_ids[1]}[1]{cds} }, $collapse_idx[5]{$cur_row_ids[1]}{$cur_row_ids[6]} ||= [{ cdid => $cur_row->[6], genreid => $cur_row->[9], year => $cur_row->[2] }]
-        unless $collapse_idx[5]{$cur_row_ids[1]}{$cur_row_ids[6]};
+      push @{ $collapse_idx[3]{$cur_row_ids{1}}[1]{cds} }, $collapse_idx[4]{$cur_row_ids{1}}{$cur_row_ids{6}} ||= [{ cdid => $cur_row_data->[6], genreid => $cur_row_data->[9], year => $cur_row_data->[2] }]
+        unless $collapse_idx[4]{$cur_row_ids{1}}{$cur_row_ids{6}};
 
-      push @{ $collapse_idx[5]{$cur_row_ids[1]}{$cur_row_ids[6]}[1]{tracks} }, $collapse_idx[6]{$cur_row_ids[1]}{$cur_row_ids[6]}{$cur_row_ids[8]} ||= [{ title => $cur_row->[8] }]
-        unless $collapse_idx[6]{$cur_row_ids[1]}{$cur_row_ids[6]}{$cur_row_ids[8]};
+      push @{ $collapse_idx[4]{$cur_row_ids{1}}{$cur_row_ids{6}}[1]{tracks} }, $collapse_idx[5]{$cur_row_ids{1}}{$cur_row_ids{6}}{$cur_row_ids{8}} ||= [{ title => $cur_row_data->[8] }]
+        unless $collapse_idx[5]{$cur_row_ids{1}}{$cur_row_ids{6}}{$cur_row_ids{8}};
 
-      push @{ $collapse_idx[1]{$cur_row_ids[1]}[1]{tracks} }, $collapse_idx[7]{$cur_row_ids[1]}{$cur_row_ids[5]} ||= [{ title => $cur_row->[5] }]
-        unless $collapse_idx[7]{$cur_row_ids[1]}{$cur_row_ids[5]};
+      push @{ $collapse_idx[0]{$cur_row_ids{1}}[1]{tracks} }, $collapse_idx[6]{$cur_row_ids{1}}{$cur_row_ids{5}} ||= [{ title => $cur_row_data->[5] }]
+        unless $collapse_idx[6]{$cur_row_ids{1}}{$cur_row_ids{5}};
 
-      $collapse_idx[7]{$cur_row_ids[1]}{$cur_row_ids[5]}[1]{lyrics} ||= $collapse_idx[8]{$cur_row_ids[1]}{$cur_row_ids[5] };
+      $collapse_idx[6]{$cur_row_ids{1}}{$cur_row_ids{5}}[1]{lyrics} ||= $collapse_idx[7]{$cur_row_ids{1}}{$cur_row_ids{5} };
 
-      push @{ $collapse_idx[8]{$cur_row_ids[1]}{$cur_row_ids[5]}[1]{lyric_versions} }, $collapse_idx[9]{$cur_row_ids[0]}{$cur_row_ids[1]}{$cur_row_ids[5]} ||= [{ text => $cur_row->[0] }]
-        unless $collapse_idx[9]{$cur_row_ids[0]}{$cur_row_ids[1]}{$cur_row_ids[5]};
+      push @{ $collapse_idx[7]{$cur_row_ids{1}}{$cur_row_ids{5}}[1]{lyric_versions} }, $collapse_idx[8]{$cur_row_ids{0}}{$cur_row_ids{1}}{$cur_row_ids{5}} ||= [{ text => $cur_row_data->[0] }]
+        unless $collapse_idx[8]{$cur_row_ids{0}}{$cur_row_ids{1}}{$cur_row_ids{5}};
 
-      $_[0][$result_pos++] = $collapse_idx[1]{$cur_row_ids[1]}
+      $_[0][$result_pos++] = $collapse_idx[0]{$cur_row_ids{1}}
         if $is_new_res;
     }
 
@@ -303,46 +276,34 @@ $infmap = [
 is_deeply (
   $schema->source('CD')->_resolve_collapse({ as => {map { $infmap->[$_] => $_ } 0 .. $#$infmap} }),
   {
-    -idcols_current_node => [],
-    -idcols_extra_from_children => [ 0, 2, 3, 4, 8 ],
-    -node_index => 1,
-    -root_node_idcol_variants => [
+    -identifying_columns => [],
+    -identifying_columns_variants => [
       [ 0 ], [ 2 ],
     ],
     single_track => {
-      -idcols_current_node => [ 0 ],
-      -idcols_extra_from_children => [ 4, 8 ],
+      -identifying_columns => [ 0 ],
       -is_optional => 1,
       -is_single => 1,
-      -node_index => 2,
       cd => {
-        -idcols_current_node => [ 0 ],
-        -idcols_extra_from_children => [ 4, 8 ],
+        -identifying_columns => [ 0 ],
         -is_single => 1,
-        -node_index => 3,
         artist => {
-          -idcols_current_node => [ 0 ],
-          -idcols_extra_from_children => [ 4, 8 ],
+          -identifying_columns => [ 0 ],
           -is_single => 1,
-          -node_index => 4,
           cds => {
-            -idcols_current_node => [ 0, 4 ],
-            -idcols_extra_from_children => [ 8 ],
+            -identifying_columns => [ 0, 4 ],
             -is_optional => 1,
-            -node_index => 5,
             tracks => {
-              -idcols_current_node => [ 0, 4, 8 ],
+              -identifying_columns => [ 0, 4, 8 ],
               -is_optional => 1,
-              -node_index => 6,
             }
           }
         }
       }
     },
     tracks => {
-      -idcols_current_node => [ 2, 3 ],
+      -identifying_columns => [ 2, 3 ],
       -is_optional => 1,
-      -node_index => 7,
     }
   },
   'Correct underdefined root collapse map constructed'
@@ -353,51 +314,51 @@ is_same_src (
     inflate_map => $infmap,
     collapse => 1,
   }),
-  ' my($rows_pos, $result_pos, $cur_row, @cur_row_ids, @collapse_idx, $is_new_res) = (0, 0);
+  ' my($rows_pos, $result_pos, $cur_row_data, %cur_row_ids, @collapse_idx, $is_new_res) = (0, 0);
 
-    while ($cur_row = (
+    while ($cur_row_data = (
       ( $rows_pos >= 0 and $_[0][$rows_pos++] ) or do { $rows_pos = -1; undef } )
         ||
       ( $_[1] and $_[1]->() )
     ) {
 
-      $cur_row_ids[$_] = defined $$cur_row[$_] ? $$cur_row[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
+      $cur_row_ids{$_} = defined $$cur_row_data[$_] ? $$cur_row_data[$_] : "\0NULL\xFF$rows_pos\xFF$_\0"
         for (0, 2, 3, 4, 8);
 
       # cache expensive set of ops in a non-existent rowid slot
-      $cur_row_ids[9] = (
-        ( ( defined $cur_row->[0] ) && (join "\xFF", q{}, $cur_row->[0], q{} ))
+      $cur_row_ids{10} = (
+        ( ( defined $cur_row_data->[0] ) && (join "\xFF", q{}, $cur_row_data->[0], q{} ))
           or
-        ( ( defined $cur_row->[2] ) && (join "\xFF", q{}, $cur_row->[2], q{} ))
+        ( ( defined $cur_row_data->[2] ) && (join "\xFF", q{}, $cur_row_data->[2], q{} ))
           or
         "\0$rows_pos\0"
       );
 
-      $is_new_res = ! $collapse_idx[1]{$cur_row_ids[9]} and (
-        $_[1] and $result_pos and (unshift @{$_[2]}, $cur_row) and last
-      );
+      # a present cref in $_[1] implies lazy prefetch, implies a supplied stash in $_[2]
+      $_[1] and $result_pos and unshift(@{$_[2]}, $cur_row_data) and last
+        if ( $is_new_res = ! $collapse_idx[0]{$cur_row_ids{10}} );
 
-      $collapse_idx[1]{$cur_row_ids[9]} ||= [{ year => $$cur_row[1] }];
+      $collapse_idx[0]{$cur_row_ids{10}} ||= [{ year => $$cur_row_data[1] }];
 
-      $collapse_idx[1]{$cur_row_ids[9]}[1]{single_track} ||= ($collapse_idx[2]{$cur_row_ids[0]} ||= [{ trackid => $$cur_row[0] }]);
+      $collapse_idx[0]{$cur_row_ids{10}}[1]{single_track} ||= ($collapse_idx[1]{$cur_row_ids{0}} ||= [{ trackid => $$cur_row_data[0] }]);
 
-      $collapse_idx[2]{$cur_row_ids[0]}[1]{cd} ||= $collapse_idx[3]{$cur_row_ids[0]};
+      $collapse_idx[1]{$cur_row_ids{0}}[1]{cd} ||= $collapse_idx[2]{$cur_row_ids{0}};
 
-      $collapse_idx[3]{$cur_row_ids[0]}[1]{artist} ||= ($collapse_idx[4]{$cur_row_ids[0]} ||= [{ artistid => $$cur_row[6] }]);
+      $collapse_idx[2]{$cur_row_ids{0}}[1]{artist} ||= ($collapse_idx[3]{$cur_row_ids{0}} ||= [{ artistid => $$cur_row_data[6] }]);
 
-      push @{$collapse_idx[4]{$cur_row_ids[0]}[1]{cds}},
-          $collapse_idx[5]{$cur_row_ids[0]}{$cur_row_ids[4]} ||= [{ cdid => $$cur_row[4], genreid => $$cur_row[7], year => $$cur_row[5] }]
-        unless $collapse_idx[5]{$cur_row_ids[0]}{$cur_row_ids[4]};
+      push @{$collapse_idx[3]{$cur_row_ids{0}}[1]{cds}},
+          $collapse_idx[4]{$cur_row_ids{0}}{$cur_row_ids{4}} ||= [{ cdid => $$cur_row_data[4], genreid => $$cur_row_data[7], year => $$cur_row_data[5] }]
+        unless $collapse_idx[4]{$cur_row_ids{0}}{$cur_row_ids{4}};
 
-      push @{$collapse_idx[5]{$cur_row_ids[0]}{$cur_row_ids[4]}[1]{tracks}},
-          $collapse_idx[6]{$cur_row_ids[0]}{$cur_row_ids[4]}{$cur_row_ids[8]} ||= [{ title => $$cur_row[8] }]
-        unless $collapse_idx[6]{$cur_row_ids[0]}{$cur_row_ids[4]}{$cur_row_ids[8]};
+      push @{$collapse_idx[4]{$cur_row_ids{0}}{$cur_row_ids{4}}[1]{tracks}},
+          $collapse_idx[5]{$cur_row_ids{0}}{$cur_row_ids{4}}{$cur_row_ids{8}} ||= [{ title => $$cur_row_data[8] }]
+        unless $collapse_idx[5]{$cur_row_ids{0}}{$cur_row_ids{4}}{$cur_row_ids{8}};
 
-      push @{$collapse_idx[1]{$cur_row_ids[9]}[1]{tracks}},
-          $collapse_idx[7]{$cur_row_ids[2]}{$cur_row_ids[3]} ||= [{ cd => $$cur_row[2], title => $$cur_row[3] }]
-        unless $collapse_idx[7]{$cur_row_ids[2]}{$cur_row_ids[3]};
+      push @{$collapse_idx[0]{$cur_row_ids{10}}[1]{tracks}},
+          $collapse_idx[6]{$cur_row_ids{2}}{$cur_row_ids{3}} ||= [{ cd => $$cur_row_data[2], title => $$cur_row_data[3] }]
+        unless $collapse_idx[6]{$cur_row_ids{2}}{$cur_row_ids{3}};
 
-      $_[0][$result_pos++] = $collapse_idx[1]{$cur_row_ids[9]}
+      $_[0][$result_pos++] = $collapse_idx[0]{$cur_row_ids{10}}
         if $is_new_res;
     }