Solve more prefetch inflation crap
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index 05e4c00..f8bed0e 100644 (file)
@@ -513,6 +513,14 @@ sub find {
     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
     $query = $self->_add_alias($unique_query, $alias);
   }
+  elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
+    # This means that we got here after a merger of relationship conditions
+    # in ::Relationship::Base::search_related (the row method), and furthermore
+    # the relationship is of the 'single' type. This means that the condition
+    # provided by the relationship (already attached to $self) is sufficient,
+    # as there can be only one row in the databse that would satisfy the 
+    # relationship
+  }
   else {
     my @unique_queries = $self->_unique_queries($input_query, $attrs);
     $query = @unique_queries
@@ -521,27 +529,14 @@ sub find {
   }
 
   # Run the query
-  if (keys %$attrs) {
-    my $rs = $self->search($query, $attrs);
-    if (keys %{$rs->_resolved_attrs->{collapse}}) {
-      my $row = $rs->next;
-      carp "Query returned more than one row" if $rs->next;
-      return $row;
-    }
-    else {
-      return $rs->single;
-    }
+  my $rs = $self->search ($query, $attrs);
+  if (keys %{$rs->_resolved_attrs->{collapse}}) {
+    my $row = $rs->next;
+    carp "Query returned more than one row" if $rs->next;
+    return $row;
   }
   else {
-    if (keys %{$self->_resolved_attrs->{collapse}}) {
-      my $rs = $self->search($query);
-      my $row = $rs->next;
-      carp "Query returned more than one row" if $rs->next;
-      return $row;
-    }
-    else {
-      return $self->single($query);
-    }
+    return $rs->single;
   }
 }
 
@@ -661,6 +656,7 @@ sub cursor {
   my ($self) = @_;
 
   my $attrs = $self->_resolved_attrs_copy;
+
   return $self->{cursor}
     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
           $attrs->{where},$attrs);
@@ -697,10 +693,14 @@ a warning:
 
   Query returned more than one row
 
-In this case, you should be using L</first> or L</find> instead, or if you really
+In this case, you should be using L</next> or L</find> instead, or if you really
 know what you are doing, use the L</rows> attribute to explicitly limit the size
 of the resultset.
 
+This method will also throw an exception if it is called on a resultset prefetching
+has_many, as such a prefetch implies fetching multiple rows from the database in
+order to assemble the resulting object.
+
 =back
 
 =cut
@@ -712,6 +712,13 @@ sub single {
   }
 
   my $attrs = $self->_resolved_attrs_copy;
+
+  if (keys %{$attrs->{collapse}}) {
+    $self->throw_exception(
+      'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
+    );
+  }
+
   if ($where) {
     if (defined $attrs->{where}) {
       $attrs->{where} = {
@@ -738,6 +745,7 @@ sub single {
   return (@data ? ($self->_construct_object(@data))[0] : undef);
 }
 
+
 # _is_unique_query
 #
 # Try to determine if the specified query is guaranteed to be unique, based on
@@ -856,10 +864,10 @@ instead. An example conversion is:
 
 sub search_like {
   my $class = shift;
-  carp join ("\n",
-    'search_like() is deprecated and will be removed in 0.09.',
-    'Instead use ->search({ x => { -like => "y%" } })',
-    '(note the outer pair of {}s - they are important!)'
+  carp (
+    'search_like() is deprecated and will be removed in DBIC version 0.09.'
+   .' Instead use ->search({ x => { -like => "y%" } })'
+   .' (note the outer pair of {}s - they are important!)'
   );
   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
@@ -949,7 +957,9 @@ sub next {
 
 sub _construct_object {
   my ($self, @row) = @_;
-  my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
+
+  my $info = $self->_collapse_result($self->{_attrs}{as}, \@row)
+    or return ();
   my @new = $self->result_class->inflate_result($self->result_source, @$info);
   @new = $self->{_attrs}{record_filter}->(@new)
     if exists $self->{_attrs}{record_filter};
@@ -959,6 +969,14 @@ sub _construct_object {
 sub _collapse_result {
   my ($self, $as_proto, $row) = @_;
 
+  # if the first row that ever came in is totally empty - this means we got
+  # hit by a smooth^Wempty left-joined resultset. Just noop in that case
+  # instead of producing a {}
+  #
+  # Note the double-defined - $row may be [ 0, '' ]
+  #
+  return undef unless ( defined List::Util::first { defined $_ } (@$row) );
+
   my @copy = @$row;
 
   # 'foo'         => [ undef, 'foo' ]
@@ -1141,90 +1159,219 @@ sub count {
   return $self->search(@_)->count if @_ and defined $_[0];
   return scalar @{ $self->get_cache } if $self->get_cache;
 
-  my @grouped_subq_attrs = qw/prefetch collapse distinct group_by having/;
-  my @subq_attrs = ();
-  
-  my $attrs = $self->_resolved_attrs;
-  # if we are not paged - we are simply asking for a limit
-  if (not $attrs->{page} and not $attrs->{software_limit}) {
-    push @subq_attrs, qw/rows offset/;
+  my $attrs = $self->_resolved_attrs_copy;
+
+  # this is a little optimization - it is faster to do the limit
+  # adjustments in software, instead of a subquery
+  my $rows = delete $attrs->{rows};
+  my $offset = delete $attrs->{offset};
+
+  my $crs;
+  if ($self->_has_resolved_attr (qw/collapse group_by/)) {
+    $crs = $self->_count_subq_rs ($attrs);
   }
+  else {
+    $crs = $self->_count_rs ($attrs);
+  }
+  my $count = $crs->next;
 
-  my $need_subq = $self->_has_attr (@subq_attrs);
-  my $need_group_subq = $self->_has_attr (@grouped_subq_attrs);
+  $count -= $offset if $offset;
+  $count = $rows if $rows and $rows < $count;
+  $count = 0 if ($count < 0);
 
-  return ($need_subq || $need_group_subq)
-    ? $self->_count_subq ($need_group_subq)
-    : $self->_count_simple
+  return $count;
 }
 
-sub _count_subq {
-  my ($self, $add_group_by) = @_;
+=head2 count_rs
 
-  my $attrs = $self->_resolved_attrs_copy;
+=over 4
 
-  # copy for the subquery, we need to do some adjustments to it too
-  my $sub_attrs = { %$attrs };
+=item Arguments: $cond, \%attrs??
 
-  # these can not go in the subquery, and there is no point of ordering it
-  delete $sub_attrs->{$_} for qw/prefetch collapse select +select as +as columns +columns order_by/;
+=item Return Value: $count_rs
 
-  # if needed force a group_by and the same set of columns (most databases require this)
-  if ($add_group_by) {
+=back
 
-    # if we prefetch, we group_by primary keys only as this is what we would get out of the rs via ->next/->all
-    # simply deleting group_by suffices, as the code below will re-fill it
-    # Note: we check $attrs, as $sub_attrs has collapse deleted
-    if (ref $attrs->{collapse} and keys %{$attrs->{collapse}} ) { 
-      delete $sub_attrs->{group_by};
-    }
+Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
+This can be very handy for subqueries:
+
+  ->search( { amount => $some_rs->count_rs->as_query } )
+
+As with regular resultsets the SQL query will be executed only after
+the resultset is accessed via L</next> or L</all>. That would return
+the same single value obtainable via L</count>.
+
+=cut
 
-    $sub_attrs->{columns} = $sub_attrs->{group_by} ||= [ map { "$attrs->{alias}.$_" } ($self->result_source->primary_columns) ];
+sub count_rs {
+  my $self = shift;
+  return $self->search(@_)->count_rs if @_;
+
+  # this may look like a lack of abstraction (count() does about the same)
+  # but in fact an _rs *must* use a subquery for the limits, as the
+  # software based limiting can not be ported if this $rs is to be used
+  # in a subquery itself (i.e. ->as_query)
+  if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
+    return $self->_count_subq_rs;
   }
+  else {
+    return $self->_count_rs;
+  }
+}
+
+#
+# returns a ResultSetColumn object tied to the count query
+#
+sub _count_rs {
+  my ($self, $attrs) = @_;
+
+  my $rsrc = $self->result_source;
+  $attrs ||= $self->_resolved_attrs;
+
+  my $tmp_attrs = { %$attrs };
+
+  # take off any limits, record_filter is cdbi, and no point of ordering a count 
+  delete $tmp_attrs->{$_} for (qw/select as rows offset order_by record_filter/);
+
+  # overwrite the selector (supplied by the storage)
+  $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $tmp_attrs);
+  $tmp_attrs->{as} = 'count';
+
+  # read the comment on top of the actual function to see what this does
+  $tmp_attrs->{from} = $self->_switch_to_inner_join_if_needed (
+    $tmp_attrs->{from}, $tmp_attrs->{alias}
+  );
+
+  my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
+
+  return $tmp_rs;
+}
+
+#
+# same as above but uses a subquery
+#
+sub _count_subq_rs {
+  my ($self, $attrs) = @_;
+
+  my $rsrc = $self->result_source;
+  $attrs ||= $self->_resolved_attrs_copy;
+
+  my $sub_attrs = { %$attrs };
+
+  # extra selectors do not go in the subquery and there is no point of ordering it
+  delete $sub_attrs->{$_} for qw/collapse prefetch_select select as order_by/;
+
+  # if we prefetch, we group_by primary keys only as this is what we would get out of the rs via ->next/->all
+  # clobber old group_by regardless
+  if ( keys %{$attrs->{collapse}} ) {
+    $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } ($rsrc->primary_columns) ]
+  }
+
+  $sub_attrs->{select} = $rsrc->storage->_subq_count_select ($rsrc, $sub_attrs);
+
+  # read the comment on top of the actual function to see what this does
+  $sub_attrs->{from} = $self->_switch_to_inner_join_if_needed (
+    $sub_attrs->{from}, $sub_attrs->{alias}
+  );
 
   $attrs->{from} = [{
-    count_subq => (ref $self)->new ($self->result_source, $sub_attrs )->as_query
+    count_subq => $rsrc->resultset_class->new ($rsrc, $sub_attrs )->as_query
   }];
 
   # the subquery replaces this
-  delete $attrs->{$_} for qw/where bind prefetch collapse distinct group_by having having_bind/;
+  delete $attrs->{$_} for qw/where bind collapse group_by having having_bind rows offset/;
 
-  return $self->__count ($attrs);
+  return $self->_count_rs ($attrs);
 }
 
-sub _count_simple {
-  my $self = shift;
 
-  my $count = $self->__count;
-  return 0 unless $count;
+# The DBIC relationship chaining implementation is pretty simple - every
+# new related_relationship is pushed onto the {from} stack, and the {select}
+# window simply slides further in. This means that when we count somewhere
+# in the middle, we got to make sure that everything in the join chain is an
+# actual inner join, otherwise the count will come back with unpredictable
+# results (a resultset may be generated with _some_ rows regardless of if
+# the relation which the $rs currently selects has rows or not). E.g.
+# $artist_rs->cds->count - normally generates:
+# SELECT COUNT( * ) FROM artist me LEFT JOIN cd cds ON cds.artist = me.artistid
+# which actually returns the number of artists * (number of cds || 1)
+#
+# So what we do here is crawl {from}, determine if the current alias is at
+# the top of the stack, and if not - make sure the chain is inner-joined down
+# to the root.
+#
+sub _switch_to_inner_join_if_needed {
+  my ($self, $from, $alias) = @_;
+
+  return $from if (
+    ref $from ne 'ARRAY'
+      ||
+    ref $from->[0] ne 'HASH'
+      ||
+    ! $from->[0]{-alias}
+      ||
+    $from->[0]{-alias} eq $alias
+  );
 
-  # need to take offset from resolved attrs
+  # this would be the case with a subquery - we'll never find
+  # the target as it is not in the parseable part of {from}
+  return $from if @$from == 1;
 
-  my $attrs = $self->_resolved_attrs;
+  my (@switch_idx, $found_target);
 
-  $count -= $attrs->{offset} if $attrs->{offset};
-  $count = $attrs->{rows} if $attrs->{rows} and $attrs->{rows} < $count;
-  $count = 0 if ($count < 0);
-  return $count;
-}
+  JOINSCAN:
+  for my $i (1 .. $#$from) {
 
-sub __count {
-  my ($self, $attrs) = @_;
+    push @switch_idx, $i;
+    my $j = $from->[$i];
+    my $jalias = $j->[0]{-alias};
 
-  $attrs ||= $self->_resolved_attrs_copy;
+    # we found our current target - delete any siblings (same level joins)
+    # and bail out
+    if ($jalias eq $alias) {
+      $found_target++;
 
-  # take off any column specs, any pagers, record_filter is cdbi, and no point of ordering a count
-  delete $attrs->{$_} for (qw/columns +columns select +select as +as rows offset page pager order_by record_filter/); 
+      my $cur_depth = $j->[0]{-relation_chain_depth};
+      # we are -1, so look at -2
+      while (@switch_idx > 1
+              && $from->[$switch_idx[-2]][0]{-relation_chain_depth} == $cur_depth
+      ) {
+        splice @switch_idx, -2, 1;
+      }
 
-  $attrs->{select} = { count => '*' };
-  $attrs->{as} = [qw/count/];
+      last JOINSCAN;
+    }
+  }
 
-  my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
-  my ($count) = $tmp_rs->cursor->next;
+  # something else went wrong
+  return $from unless $found_target;
 
-  return $count;
+  # So it looks like we will have to switch some stuff around.
+  # local() is useless here as we will be leaving the scope
+  # anyway, and deep cloning is just too fucking expensive
+  # So replace the inner hashref manually
+  my @new_from;
+  my $sw_idx = { map { $_ => 1 } @switch_idx };
+
+  for my $i (0 .. $#$from) {
+    if ($sw_idx->{$i}) {
+      my %attrs = %{$from->[$i][0]};
+      delete $attrs{-join_type};
+
+      push @new_from, [
+        \%attrs,
+        @{$from->[$i]}[ 1 .. $#{$from->[$i]} ],
+      ];
+    }
+    else {
+      push @new_from, $from->[$i];
+    }
+  }
+
+  return \@new_from;
 }
 
+
 sub _bool {
   return 1;
 }
@@ -1271,13 +1418,12 @@ sub all {
 
   my @obj;
 
-  # TODO: don't call resolve here
   if (keys %{$self->_resolved_attrs->{collapse}}) {
-#  if ($self->{attrs}{prefetch}) {
-      # Using $self->cursor->all is really just an optimisation.
-      # If we're collapsing has_many prefetches it probably makes
-      # very little difference, and this is cleaner than hacking
-      # _construct_object to survive the approach
+    # Using $self->cursor->all is really just an optimisation.
+    # If we're collapsing has_many prefetches it probably makes
+    # very little difference, and this is cleaner than hacking
+    # _construct_object to survive the approach
+    $self->cursor->reset;
     my @row = $self->cursor->next;
     while (@row) {
       push(@obj, $self->_construct_object(@row));
@@ -1290,6 +1436,7 @@ sub all {
   }
 
   $self->set_cache(\@obj) if $self->{attrs}{cache};
+
   return @obj;
 }
 
@@ -1304,6 +1451,8 @@ sub all {
 =back
 
 Resets the resultset's cursor, so you can iterate through the elements again.
+Implicitly resets the storage cursor, so a subsequent L</next> will trigger
+another query.
 
 =cut
 
@@ -1346,15 +1495,15 @@ sub _rs_update_delete {
 
   my $rsrc = $self->result_source;
 
-  my $needs_group_by_subq = $self->_has_attr (qw/prefetch distinct join seen_join group_by/);
-  my $needs_subq = $self->_has_attr (qw/row offset page/);
+  my $needs_group_by_subq = $self->_has_resolved_attr (qw/collapse group_by -join/);
+  my $needs_subq = $self->_has_resolved_attr (qw/row offset/);
 
   if ($needs_group_by_subq or $needs_subq) {
 
     # make a new $rs selecting only the PKs (that's all we really need)
     my $attrs = $self->_resolved_attrs_copy;
 
-    delete $attrs->{$_} for qw/prefetch collapse select +select as +as columns +columns/;
+    delete $attrs->{$_} for qw/collapse select as/;
     $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->primary_columns) ];
 
     if ($needs_group_by_subq) {
@@ -1388,7 +1537,7 @@ sub _rs_update_delete {
 
     my $subrs = (ref $self)->new($rsrc, $attrs);
 
-    return $self->result_source->storage->subq_update_delete($subrs, $op, $values);
+    return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
   }
   else {
     return $rsrc->storage->$op(
@@ -1513,7 +1662,7 @@ sub update_all {
 
 =item Arguments: none
 
-=item Return Value: 1
+=item Return Value: $storage_rv
 
 =back
 
@@ -1521,11 +1670,8 @@ Deletes the contents of the resultset from its result source. Note that this
 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
 to run. See also L<DBIx::Class::Row/delete>.
 
-delete may not generate correct SQL for a query with joins or a resultset
-chained from a related resultset.  In this case it will generate a warning:-
-
-In these cases you may find that delete_all is more appropriate, or you
-need to respecify your query in a way that can be expressed without a join.
+Return value will be the amount of rows deleted; exact type of return value
+is storage-dependent.
 
 =cut
 
@@ -1577,8 +1723,9 @@ In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
 to insert the data, as this is a faster method.
 
 Otherwise, each set of data is inserted into the database using
-L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
-objects is returned.
+L<DBIx::Class::ResultSet/create>, and the resulting objects are
+accumulated into an array. The array itself, or an array reference
+is returned depending on scalar or list context.
 
 Example:  Assuming an Artist Class that has many CDs Classes relating:
 
@@ -1644,7 +1791,7 @@ sub populate {
     foreach my $item (@$data) {
       push(@created, $self->create($item));
     }
-    return @created;
+    return wantarray ? @created : \@created;
   } else {
     my ($first, @rest) = @$data;
 
@@ -1654,13 +1801,19 @@ sub populate {
 
     ## do the belongs_to relationships
     foreach my $index (0..$#$data) {
-      if( grep { !defined $data->[$index]->{$_} } @pks ) {
-        my @ret = $self->populate($data);
-        return;
+
+      # delegate to create() for any dataset without primary keys with specified relationships
+      if (grep { !defined $data->[$index]->{$_} } @pks ) {
+        for my $r (@rels) {
+          if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
+            my @ret = $self->populate($data);
+            return;
+          }
+        }
       }
 
       foreach my $rel (@rels) {
-        next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
+        next unless ref $data->[$index]->{$rel} eq "HASH";
         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
         my $related = $result->result_source->_resolve_condition(
@@ -1875,22 +2028,31 @@ sub _is_deterministic_value {
   return 0;
 }
 
-# _has_attr
+# _has_resolved_attr
 #
 # determines if the resultset defines at least one
 # of the attributes supplied
 #
 # used to determine if a subquery is neccessary
+#
+# supports some virtual attributes:
+#   -join
+#     This will scan for any joins being present on the resultset.
+#     It is not a mere key-search but a deep inspection of {from}
+#
 
-sub _has_attr {
+sub _has_resolved_attr {
   my ($self, @attr_names) = @_;
 
   my $attrs = $self->_resolved_attrs;
 
-  my $join_check_req;
+  my %extra_checks;
 
   for my $n (@attr_names) {
-    ++$join_check_req if $n =~ /join/;
+    if (grep { $n eq $_ } (qw/-join/) ) {
+      $extra_checks{$n}++;
+      next;
+    }
 
     my $attr =  $attrs->{$n};
 
@@ -1907,9 +2069,9 @@ sub _has_attr {
     }
   }
 
-  # a join can be expressed as a multi-level from
+  # a resolved join is expressed as a multi-level from
   return 1 if (
-    $join_check_req
+    $extra_checks{-join}
       and
     ref $attrs->{from} eq 'ARRAY'
       and
@@ -1992,7 +2154,22 @@ B<NOTE>: This feature is still experimental.
 
 =cut
 
-sub as_query { return shift->cursor->as_query(@_) }
+sub as_query {
+  my $self = shift;
+
+  my $attrs = $self->_resolved_attrs_copy;
+
+  # For future use:
+  #
+  # in list ctx:
+  # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
+  # $sql also has no wrapping parenthesis in list ctx
+  #
+  my $sqlbind = $self->result_source->storage
+    ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
+
+  return $sqlbind;
+}
 
 =head2 find_or_new
 
@@ -2033,8 +2210,10 @@ sub find_or_new {
   my $self     = shift;
   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
-  my $exists   = $self->find($hash, $attrs);
-  return defined $exists ? $exists : $self->new_result($hash);
+  if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
+    return $row;
+  }
+  return $self->new_result($hash);
 }
 
 =head2 create
@@ -2120,7 +2299,7 @@ sub create {
 =back
 
   $cd->cd_to_producer->find_or_create({ producer => $producer },
-                                      { key => 'primary });
+                                      { key => 'primary' });
 
 Tries to find a record based on its primary key or unique constraints; if none
 is found, creates one and returns that instead.
@@ -2164,8 +2343,10 @@ sub find_or_create {
   my $self     = shift;
   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
-  my $exists   = $self->find($hash, $attrs);
-  return defined $exists ? $exists : $self->create($hash);
+  if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
+    return $row;
+  }
+  return $self->create($hash);
 }
 
 =head2 update_or_create
@@ -2380,14 +2561,14 @@ sub related_resultset {
 
   $self->{related_resultsets} ||= {};
   return $self->{related_resultsets}{$rel} ||= do {
-    my $rel_obj = $self->result_source->relationship_info($rel);
+    my $rel_info = $self->result_source->relationship_info($rel);
 
     $self->throw_exception(
       "search_related: result source '" . $self->result_source->source_name .
         "' has no such relationship $rel")
-      unless $rel_obj;
+      unless $rel_info;
 
-    my ($from,$seen) = $self->_resolve_from($rel);
+    my ($from,$seen) = $self->_chain_relationship($rel);
 
     my $join_count = $seen->{$rel};
     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
@@ -2485,25 +2666,50 @@ sub current_source_alias {
 # in order to properly resolve prefetch aliases (any alias
 # with a relation_chain_depth less than the depth of the
 # current prefetch is not considered)
-sub _resolve_from {
-  my ($self, $extra_join) = @_;
+sub _chain_relationship {
+  my ($self, $rel) = @_;
   my $source = $self->result_source;
   my $attrs = $self->{attrs};
 
-  my $from = $attrs->{from}
-    || [ { $attrs->{alias} => $source->from } ];
+  my $from = [ @{
+      $attrs->{from}
+        ||
+      [{
+        -source_handle => $source->handle,
+        -alias => $attrs->{alias},
+        $attrs->{alias} => $source->from,
+      }]
+  }];
 
-  my $seen = { %{$attrs->{seen_join}||{}} };
+  my $seen = { %{$attrs->{seen_join} || {} } };
 
   # we need to take the prefetch the attrs into account before we
   # ->_resolve_join as otherwise they get lost - captainL
   my $merged = $self->_merge_attr( $attrs->{join}, $attrs->{prefetch} );
 
-  push @$from, $source->_resolve_join($merged, $attrs->{alias}, $seen) if ($merged);
+  my @requested_joins = $source->_resolve_join($merged, $attrs->{alias}, $seen);
+
+  push @$from, @requested_joins;
 
   ++$seen->{-relation_chain_depth};
 
-  push @$from, $source->_resolve_join($extra_join, $attrs->{alias}, $seen);
+  # if $self already had a join/prefetch specified on it, the requested
+  # $rel might very well be already included. What we do in this case
+  # is effectively a no-op (except that we bump up the chain_depth on
+  # the join in question so we could tell it *is* the search_related)
+  my $already_joined;
+
+  # we consider the last one thus reverse
+  for my $j (reverse @requested_joins) {
+    if ($rel eq $j->[0]{-join_path}[-1]) {
+      $j->[0]{-relation_chain_depth}++;
+      $already_joined++;
+      last;
+    }
+  }
+  unless ($already_joined) {
+    push @$from, $source->_resolve_join($rel, $attrs->{alias}, $seen);
+  }
 
   ++$seen->{-relation_chain_depth};
 
@@ -2596,24 +2802,30 @@ sub _resolved_attrs {
     push( @{ $attrs->{as} }, @$adds );
   }
 
-  $attrs->{from} ||= [ { $self->{attrs}{alias} => $source->from } ];
+  $attrs->{from} ||= [ {
+    -source_handle => $source->handle,
+    -alias => $self->{attrs}{alias},
+    $self->{attrs}{alias} => $source->from,
+  } ];
+
+  if ( $attrs->{join} || $attrs->{prefetch} ) {
+
+    $self->throw_exception ('join/prefetch can not be used with a literal scalarref {from}')
+      if ref $attrs->{from} ne 'ARRAY';
 
-  if ( exists $attrs->{join} || exists $attrs->{prefetch} ) {
     my $join = delete $attrs->{join} || {};
 
     if ( defined $attrs->{prefetch} ) {
       $join = $self->_merge_attr( $join, $attrs->{prefetch} );
-
     }
 
     $attrs->{from} =    # have to copy here to avoid corrupting the original
       [
-      @{ $attrs->{from} },
-      $source->_resolve_join(
-        $join, $alias, { %{ $attrs->{seen_join} || {} } }
-      )
+        @{ $attrs->{from} },
+        $source->_resolve_join(
+          $join, $alias, { %{ $attrs->{seen_join} || {} } }
+        )
       ];
-
   }
 
   if ( $attrs->{order_by} ) {
@@ -2623,34 +2835,49 @@ sub _resolved_attrs {
       : [ $attrs->{order_by} ]
     );
   }
-  else {
-    $attrs->{order_by} = [];
+
+  if ($attrs->{group_by} and ! ref $attrs->{group_by}) {
+    $attrs->{group_by} = [ $attrs->{group_by} ];
   }
 
-  my $collapse = $attrs->{collapse} || {};
+  # If the order_by is otherwise empty - we will use this for TOP limit
+  # emulation and the like.
+  # Although this is needed only if the order_by is not defined, it is
+  # actually cheaper to just populate this rather than properly examining
+  # order_by (stuf like [ {} ] and the like)
+  $attrs->{_virtual_order_by} = [ $self->result_source->primary_columns ];
+
+
+  $attrs->{collapse} ||= {};
   if ( my $prefetch = delete $attrs->{prefetch} ) {
     $prefetch = $self->_merge_attr( {}, $prefetch );
-    my @pre_order;
-    foreach my $p ( ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch) ) {
-
-      # bring joins back to level of current class
-      my $join_map = $self->_joinpath_aliases ($attrs->{from}, $attrs->{seen_join});
-      my @prefetch =
-        $source->_resolve_prefetch( $p, $alias, $join_map, \@pre_order, $collapse );
-      push( @{ $attrs->{select} }, map { $_->[0] } @prefetch );
-      push( @{ $attrs->{as} },     map { $_->[1] } @prefetch );
-    }
-    push( @{ $attrs->{order_by} }, @pre_order );
+
+    my $prefetch_ordering = [];
+
+    my $join_map = $self->_joinpath_aliases ($attrs->{from}, $attrs->{seen_join});
+
+    my @prefetch =
+      $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $attrs->{collapse} );
+
+    $attrs->{prefetch_select} = [ map { $_->[0] } @prefetch ];
+    push @{ $attrs->{select} }, @{$attrs->{prefetch_select}};
+    push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
+
+    push( @{ $attrs->{order_by} }, @$prefetch_ordering );
+    $attrs->{_collapse_order_by} = \@$prefetch_ordering;
   }
 
+
   if (delete $attrs->{distinct}) {
     $attrs->{group_by} ||= [ grep { !ref($_) || (ref($_) ne 'HASH') } @{$attrs->{select}} ];
   }
 
-  $attrs->{collapse} = $collapse;
-
-  if ( $attrs->{page} and not defined $attrs->{offset} ) {
-    $attrs->{offset} = ( $attrs->{rows} * ( $attrs->{page} - 1 ) );
+  # if both page and offset are specified, produce a combined offset
+  # even though it doesn't make much sense, this is what pre 081xx has
+  # been doing
+  if (my $page = delete $attrs->{page}) {
+    $attrs->{offset} = ($attrs->{rows} * ($page - 1)) +
+      ($attrs->{offset} || 0);
   }
 
   return $self->{_attrs} = $attrs;
@@ -2669,7 +2896,7 @@ sub _joinpath_aliases {
 
     my $p = $paths;
     $p = $p->{$_} ||= {} for @{$j->[0]{-join_path}};
-    push @{$p->{-join_aliases} }, $j->[0]{-join_alias};
+    push @{$p->{-join_aliases} }, $j->[0]{-alias};
   }
 
   return $paths;
@@ -3119,7 +3346,7 @@ Makes the resultset paged and specifies the page to retrieve. Effectively
 identical to creating a non-pages resultset and then calling ->page($page)
 on it.
 
-If L<rows> attribute is not specified it defualts to 10 rows per page.
+If L<rows> attribute is not specified it defaults to 10 rows per page.
 
 When you have a paged resultset, L</count> will only return the number
 of rows in the page. To get the total, use the L</pager> and call
@@ -3322,9 +3549,21 @@ with a father in the person table, we could explicitly use C<INNER JOIN>:
     # SELECT child.* FROM person child
     # INNER JOIN person father ON child.father_id = father.id
 
-If you need to express really complex joins or you need a subselect, you
+You can select from a subquery by passing a resultset to from as follows.
+
+    $schema->resultset('Artist')->search( 
+        undef, 
+        {   alias => 'artist2',
+            from  => [ { artist2 => $artist_rs->as_query } ],
+        } );
+
+    # and you'll get sql like this..
+    # SELECT artist2.artistid, artist2.name, artist2.rank, artist2.charfield FROM 
+    #   ( SELECT me.artistid, me.name, me.rank, me.charfield FROM artists me ) artist2
+
+If you need to express really complex joins, you
 can supply literal SQL to C<from> via a scalar reference. In this case
-the contents of the scalar will replace the table name asscoiated with the
+the contents of the scalar will replace the table name associated with the
 resultsource.
 
 WARNING: This technique might very well not work as expected on chained