Fix possible regression with prefetch select resolution
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index e21feeb..ed2d144 100644 (file)
@@ -46,29 +46,15 @@ A new ResultSet is returned from calling L</search> on an existing
 ResultSet. The new one will contain all the conditions of the
 original, plus any new conditions added in the C<search> call.
 
-A ResultSet is also an iterator. L</next> is used to return all the
-L<DBIx::Class::Row>s the ResultSet represents.
+A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
+can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
+represents.
 
 The query that the ResultSet represents is B<only> executed against
 the database when these methods are called:
+L</find> L</next> L</all> L</first> L</single> L</count>
 
-=over
-
-=item L</find>
-
-=item L</next>
-
-=item L</all>
-
-=item L</count>
-
-=item L</single>
-
-=item L</first>
-
-=back
-
-=head1 EXAMPLES 
+=head1 EXAMPLES
 
 =head2 Chaining resultsets
 
@@ -193,7 +179,7 @@ sub new {
   return $class->new_result(@_) if ref $class;
 
   my ($source, $attrs) = @_;
-  $source = $source->handle 
+  $source = $source->handle
     unless $source->isa('DBIx::Class::ResultSourceHandle');
   $attrs = { %{$attrs||{}} };
 
@@ -296,7 +282,7 @@ sub search_rs {
 
   unless (
     (@_ && defined($_[0])) # @_ == () or (undef)
-    || 
+    ||
     (keys %$attrs # empty attrs or only 'safe' attrs
     && List::Util::first { !$safe{$_} } keys %$attrs)
   ) {
@@ -393,7 +379,7 @@ Pass a literal chunk of SQL to be added to the conditional part of the
 resultset query.
 
 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
-only be used in that context. C<search_literal> is a convenience method. 
+only be used in that context. C<search_literal> is a convenience method.
 It is equivalent to calling $schema->search(\[]), but if you want to ensure
 columns are bound correctly, use C<search>.
 
@@ -403,14 +389,14 @@ Example of how to use C<search> instead of C<search_literal>
   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
 
 
-See L<DBIx::Class::Manual::Cookbook/Searching> and 
+See L<DBIx::Class::Manual::Cookbook/Searching> and
 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
 require C<search_literal>.
 
 =cut
 
 sub search_literal {
-  my ($self, $sql, @bind) = @_; 
+  my ($self, $sql, @bind) = @_;
   my $attr;
   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
     $attr = pop @bind;
@@ -505,7 +491,7 @@ sub find {
         && ($info = $self->result_source->relationship_info($key))) {
       my $val = delete $input_query->{$key};
       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
-      my $rel_q = $self->result_source->resolve_condition(
+      my $rel_q = $self->result_source->_resolve_condition(
                     $info->{cond}, $val, $key
                   );
       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
@@ -674,7 +660,8 @@ L<DBIx::Class::Cursor> for more information.
 sub cursor {
   my ($self) = @_;
 
-  my $attrs = { %{$self->_resolved_attrs} };
+  my $attrs = $self->_resolved_attrs_copy;
+
   return $self->{cursor}
     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
           $attrs->{where},$attrs);
@@ -712,7 +699,7 @@ a warning:
   Query returned more than one row
 
 In this case, you should be using L</first> or L</find> instead, or if you really
-know what you are doing, use the L</rows> attribute to explicitly limit the size 
+know what you are doing, use the L</rows> attribute to explicitly limit the size
 of the resultset.
 
 =back
@@ -725,7 +712,8 @@ sub single {
       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
   }
 
-  my $attrs = { %{$self->_resolved_attrs} };
+  my $attrs = $self->_resolved_attrs_copy;
+
   if ($where) {
     if (defined $attrs->{where}) {
       $attrs->{where} = {
@@ -752,6 +740,7 @@ sub single {
   return (@data ? ($self->_construct_object(@data))[0] : undef);
 }
 
+
 # _is_unique_query
 #
 # Try to determine if the specified query is guaranteed to be unique, based on
@@ -870,10 +859,10 @@ instead. An example conversion is:
 
 sub search_like {
   my $class = shift;
-  carp join ("\n",
-    'search_like() is deprecated and will be removed in 0.09.',
-    'Instead use ->search({ x => { -like => "y%" } })',
-    '(note the outer pair of {}s - they are important!)'
+  carp (
+    'search_like() is deprecated and will be removed in DBIC version 0.09.'
+   .' Instead use ->search({ x => { -like => "y%" } })'
+   .' (note the outer pair of {}s - they are important!)'
   );
   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
@@ -1016,7 +1005,7 @@ sub _collapse_result {
   do { # no need to check anything at the front, we always want the first row
 
     my %const;
-  
+
     foreach my $this_as (@construct_as) {
       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
     }
@@ -1063,7 +1052,7 @@ sub _collapse_result {
         foreach my $p (@parts) {
           $target = $target->[1]->{$p} ||= [];
           $cur .= ".${p}";
-          if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
+          if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) {
             # collapsing at this point and on final part
             my $pos = $collapse_pos{$cur};
             CK: foreach my $ck (@ckey) {
@@ -1114,8 +1103,8 @@ is derived.
 
 =back
 
-An accessor for the class to use when creating row objects. Defaults to 
-C<< result_source->result_class >> - which in most cases is the name of the 
+An accessor for the class to use when creating row objects. Defaults to
+C<< result_source->result_class >> - which in most cases is the name of the
 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
 
 Note that changing the result_class will also remove any components
@@ -1145,8 +1134,8 @@ sub result_class {
 =back
 
 Performs an SQL C<COUNT> with the same query as the resultset was built
-with to find the number of elements. If passed arguments, does a search
-on the resultset and counts the results of that.
+with to find the number of elements. Passing arguments is equivalent to
+C<< $rs->search ($cond, \%attrs)->count >>
 
 =cut
 
@@ -1154,39 +1143,16 @@ sub count {
   my $self = shift;
   return $self->search(@_)->count if @_ and defined $_[0];
   return scalar @{ $self->get_cache } if $self->get_cache;
-  my $count = $self->_count;
-  return 0 unless $count;
 
-  # need to take offset from resolved attrs
-
-  $count -= $self->{_attrs}{offset} if $self->{_attrs}{offset};
-  $count = $self->{attrs}{rows} if
-    $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
-  $count = 0 if ($count < 0);
-  return $count;
-}
-
-sub _count { # Separated out so pager can get the full count
-  my $self = shift;
-  my $attrs = { %{$self->_resolved_attrs} };
+  my $meth = $self->_has_attr (qw/prefetch collapse distinct group_by/)
+    ? 'count_grouped'
+    : 'count'
+  ;
 
-  if (my $group_by = $attrs->{group_by}) {
-    delete $attrs->{order_by};
+  my $attrs = $self->_resolved_attrs_copy;
+  my $rsrc = $self->result_source;
 
-    $attrs->{select} = $group_by; 
-    $attrs->{from} = [ { 'mesub' => (ref $self)->new($self->result_source, $attrs)->cursor->as_query } ];
-    delete $attrs->{where};
-  }
-
-  $attrs->{select} = { count => '*' };
-  $attrs->{as} = [qw/count/];
-
-  # offset, order by, group by, where and page are not needed to count. record_filter is cdbi
-  delete $attrs->{$_} for qw/rows offset order_by group_by page pager record_filter/;
-
-  my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
-  my ($count) = $tmp_rs->cursor->next;
-  return $count;
+  return $rsrc->storage->$meth ($rsrc, $attrs);
 }
 
 sub _bool {
@@ -1298,6 +1264,72 @@ sub first {
   return $_[0]->reset->next;
 }
 
+
+# _rs_update_delete
+#
+# Determines whether and what type of subquery is required for the $rs operation.
+# If grouping is necessary either supplies its own, or verifies the current one
+# After all is done delegates to the proper storage method.
+
+sub _rs_update_delete {
+  my ($self, $op, $values) = @_;
+
+  my $rsrc = $self->result_source;
+
+  my $needs_group_by_subq = $self->_has_attr (qw/prefetch distinct join seen_join group_by/);
+  my $needs_subq = $self->_has_attr (qw/row offset page/);
+
+  if ($needs_group_by_subq or $needs_subq) {
+
+    # make a new $rs selecting only the PKs (that's all we really need)
+    my $attrs = $self->_resolved_attrs_copy;
+
+    delete $attrs->{$_} for qw/prefetch collapse select +select as +as columns +columns/;
+    $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->primary_columns) ];
+
+    if ($needs_group_by_subq) {
+      # make sure no group_by was supplied, or if there is one - make sure it matches
+      # the columns compiled above perfectly. Anything else can not be sanely executed
+      # on most databases so croak right then and there
+
+      if (my $g = $attrs->{group_by}) {
+        my @current_group_by = map
+          { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
+          (ref $g eq 'ARRAY' ? @$g : $g );
+
+        if (
+          join ("\x00", sort @current_group_by)
+            ne
+          join ("\x00", sort @{$attrs->{columns}} )
+        ) {
+          $self->throw_exception (
+            "You have just attempted a $op operation on a resultset which does group_by"
+            . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
+            . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
+            . ' kind of queries. Please retry the operation with a modified group_by or'
+            . ' without using one at all.'
+          );
+        }
+      }
+      else {
+        $attrs->{group_by} = $attrs->{columns};
+      }
+    }
+
+    my $subrs = (ref $self)->new($rsrc, $attrs);
+
+    return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
+  }
+  else {
+    return $rsrc->storage->$op(
+      $rsrc,
+      $op eq 'update' ? $values : (),
+      $self->_cond_for_update_delete,
+    );
+  }
+}
+
+
 # _cond_for_update_delete
 #
 # update/delete require the condition to be modified to handle
@@ -1312,18 +1344,6 @@ sub _cond_for_update_delete {
   # No-op. No condition, we're updating/deleting everything
   return $cond unless ref $full_cond;
 
-  # Some attributes when present require a subquery
-  # This might not work on some database (mysql), but...
-  # it won't work without the subquery either so who cares
-  if (grep { defined $self->{attrs}{$_} } qw/join seen_join from rows group_by/) {
-
-    foreach my $pk ($self->result_source->primary_columns) {
-      $cond->{$pk} = { IN => $self->get_column($pk)->as_query };
-    }
-
-    return $cond;
-  }
-
   if (ref $full_cond eq 'ARRAY') {
     $cond = [
       map {
@@ -1363,7 +1383,7 @@ sub _cond_for_update_delete {
   else {
     $self->throw_exception("Can't update/delete on resultset with condition unless hash or array");
   }
+
   return $cond;
 }
 
@@ -1386,14 +1406,10 @@ if no records were updated; exact type of success value is storage-dependent.
 
 sub update {
   my ($self, $values) = @_;
-  $self->throw_exception("Values for update must be a hash")
+  $self->throw_exception('Values for update must be a hash')
     unless ref $values eq 'HASH';
 
-  my $cond = $self->_cond_for_update_delete;
-  
-  return $self->result_source->storage->update(
-    $self->result_source, $values, $cond
-  );
+  return $self->_rs_update_delete ('update', $values);
 }
 
 =head2 update_all
@@ -1413,7 +1429,7 @@ will run DBIC cascade triggers, while L</update> will not.
 
 sub update_all {
   my ($self, $values) = @_;
-  $self->throw_exception("Values for update must be a hash")
+  $self->throw_exception('Values for update_all must be a hash')
     unless ref $values eq 'HASH';
   foreach my $obj ($self->all) {
     $obj->set_columns($values)->update;
@@ -1427,7 +1443,7 @@ sub update_all {
 
 =item Arguments: none
 
-=item Return Value: 1
+=item Return Value: $storage_rv
 
 =back
 
@@ -1435,23 +1451,17 @@ Deletes the contents of the resultset from its result source. Note that this
 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
 to run. See also L<DBIx::Class::Row/delete>.
 
-delete may not generate correct SQL for a query with joins or a resultset
-chained from a related resultset.  In this case it will generate a warning:-
-
-In these cases you may find that delete_all is more appropriate, or you
-need to respecify your query in a way that can be expressed without a join.
+Return value will be the amount of rows deleted; exact type of return value
+is storage-dependent.
 
 =cut
 
 sub delete {
-  my ($self) = @_;
-  $self->throw_exception("Delete should not be passed any arguments")
-    if $_[1];
-
-  my $cond = $self->_cond_for_update_delete;
+  my $self = shift;
+  $self->throw_exception('delete does not accept any arguments')
+    if @_;
 
-  $self->result_source->storage->delete($self->result_source, $cond);
-  return 1;
+  return $self->_rs_update_delete ('delete');
 }
 
 =head2 delete_all
@@ -1470,7 +1480,10 @@ will run DBIC cascade triggers, while L</delete> will not.
 =cut
 
 sub delete_all {
-  my ($self) = @_;
+  my $self = shift;
+  $self->throw_exception('delete_all does not accept any arguments')
+    if @_;
+
   $_->delete for $self->all;
   return 1;
 }
@@ -1488,7 +1501,7 @@ For the arrayref of hashrefs style each hashref should be a structure suitable
 forsubmitting to a $resultset->create(...) method.
 
 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
-to insert the data, as this is a faster method.  
+to insert the data, as this is a faster method.
 
 Otherwise, each set of data is inserted into the database using
 L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
@@ -1497,10 +1510,10 @@ objects is returned.
 Example:  Assuming an Artist Class that has many CDs Classes relating:
 
   my $Artist_rs = $schema->resultset("Artist");
-  
-  ## Void Context Example 
+
+  ## Void Context Example
   $Artist_rs->populate([
-     { artistid => 4, name => 'Manufactured Crap', cds => [ 
+     { artistid => 4, name => 'Manufactured Crap', cds => [
         { title => 'My First CD', year => 2006 },
         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
       ],
@@ -1512,7 +1525,7 @@ Example:  Assuming an Artist Class that has many CDs Classes relating:
       ],
      },
   ]);
-  
+
   ## Array Context Example
   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
     { name => "Artist One"},
@@ -1522,7 +1535,7 @@ Example:  Assuming an Artist Class that has many CDs Classes relating:
     { title => "Second CD", year => 2008},
   ]}
   ]);
-  
+
   print $ArtistOne->name; ## response is 'Artist One'
   print $ArtistThree->cds->count ## reponse is '2'
 
@@ -1538,11 +1551,11 @@ example:
   ]);
 
 Please note an important effect on your data when choosing between void and
-wantarray context. Since void context goes straight to C<insert_bulk> in 
+wantarray context. Since void context goes straight to C<insert_bulk> in
 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
-c<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to 
-create primary keys for you, you will find that your PKs are empty.  In this 
-case you will have to use the wantarray context in order to create those 
+C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
+create primary keys for you, you will find that your PKs are empty.  In this
+case you will have to use the wantarray context in order to create those
 values.
 
 =cut
@@ -1552,7 +1565,7 @@ sub populate {
   my $data = ref $_[0][0] eq 'HASH'
     ? $_[0] : ref $_[0][0] eq 'ARRAY' ? $self->_normalize_populate_args($_[0]) :
     $self->throw_exception('Populate expects an arrayref of hashes or arrayref of arrayrefs');
-  
+
   if(defined wantarray) {
     my @created;
     foreach my $item (@$data) {
@@ -1564,28 +1577,34 @@ sub populate {
 
     my @names = grep {!ref $first->{$_}} keys %$first;
     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
-    my @pks = $self->result_source->primary_columns;  
+    my @pks = $self->result_source->primary_columns;
 
-    ## do the belongs_to relationships  
+    ## do the belongs_to relationships
     foreach my $index (0..$#$data) {
-      if( grep { !defined $data->[$index]->{$_} } @pks ) {
-        my @ret = $self->populate($data);
-        return;
+
+      # delegate to create() for any dataset without primary keys with specified relationships
+      if (grep { !defined $data->[$index]->{$_} } @pks ) {
+        for my $r (@rels) {
+          if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
+            my @ret = $self->populate($data);
+            return;
+          }
+        }
       }
-    
+
       foreach my $rel (@rels) {
-        next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
+        next unless ref $data->[$index]->{$rel} eq "HASH";
         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
-        my $related = $result->result_source->resolve_condition(
+        my $related = $result->result_source->_resolve_condition(
           $result->result_source->relationship_info($reverse)->{cond},
-          $self,        
-          $result,        
+          $self,
+          $result,
         );
 
         delete $data->[$index]->{$rel};
         $data->[$index] = {%{$data->[$index]}, %$related};
-      
+
         push @names, keys %$related if $index == 0;
       }
     }
@@ -1594,8 +1613,8 @@ sub populate {
     my @values = map { [ @$_{@names} ] } @$data;
 
     $self->result_source->storage->insert_bulk(
-      $self->result_source, 
-      \@names, 
+      $self->result_source,
+      \@names,
       \@values,
     );
 
@@ -1605,12 +1624,12 @@ sub populate {
       foreach my $rel (@rels) {
         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
 
-        my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
+        my $parent = $self->find(map {{$_=>$item->{$_}} } @pks)
      || $self->throw_exception('Cannot find the relating object.');
-     
+
         my $child = $parent->$rel;
-    
-        my $related = $child->result_source->resolve_condition(
+
+        my $related = $child->result_source->_resolve_condition(
           $parent->result_source->relationship_info($rel)->{cond},
           $child,
           $parent,
@@ -1642,7 +1661,7 @@ sub _normalize_populate_args {
     foreach my $index (0..$#names) {
       $result_to_create{$names[$index]} = $$datum[$index];
     }
-    push @results_to_create, \%result_to_create;    
+    push @results_to_create, \%result_to_create;
   }
   return \@results_to_create;
 }
@@ -1667,12 +1686,25 @@ C<total_entries> on the L<Data::Page> object.
 
 sub pager {
   my ($self) = @_;
+
+  return $self->{pager} if $self->{pager};
+
   my $attrs = $self->{attrs};
   $self->throw_exception("Can't create pager for non-paged rs")
     unless $self->{attrs}{page};
   $attrs->{rows} ||= 10;
-  return $self->{pager} ||= Data::Page->new(
-    $self->_count, $attrs->{rows}, $self->{attrs}{page});
+
+  # throw away the paging flags and re-run the count (possibly
+  # with a subselect) to get the real total count
+  my $count_attrs = { %$attrs };
+  delete $count_attrs->{$_} for qw/rows offset page pager/;
+  my $total_count = (ref $self)->new($self->result_source, $count_attrs)->count;
+
+  return $self->{pager} = Data::Page->new(
+    $total_count,
+    $attrs->{rows},
+    $self->{attrs}{page}
+  );
 }
 
 =head2 page
@@ -1733,13 +1765,13 @@ sub new_result {
     $self->throw_exception(
       "Can't abstract implicit construct, condition not a hash"
     ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
-  
+
     my $collapsed_cond = (
       $self->{cond}
         ? $self->_collapse_cond($self->{cond})
         : {}
     );
-  
+
     # precendence must be given to passed values over values inherited from
     # the cond, so the order here is important.
     my %implied =  %{$self->_remove_alias($collapsed_cond, $alias)};
@@ -1764,7 +1796,7 @@ sub new_result {
 
 # _is_deterministic_value
 #
-# Make an effor to strip non-deterministic values from the condition, 
+# Make an effor to strip non-deterministic values from the condition,
 # to make sure new_result chokes less
 
 sub _is_deterministic_value {
@@ -1776,6 +1808,50 @@ sub _is_deterministic_value {
   return 0;
 }
 
+# _has_attr
+#
+# determines if the resultset defines at least one
+# of the attributes supplied
+#
+# used to determine if a subquery is neccessary
+
+sub _has_attr {
+  my ($self, @attr_names) = @_;
+
+  my $attrs = $self->_resolved_attrs;
+
+  my $join_check_req;
+
+  for my $n (@attr_names) {
+    ++$join_check_req if $n =~ /join/;
+
+    my $attr =  $attrs->{$n};
+
+    next if not defined $attr;
+
+    if (ref $attr eq 'HASH') {
+      return 1 if keys %$attr;
+    }
+    elsif (ref $attr eq 'ARRAY') {
+      return 1 if @$attr;
+    }
+    else {
+      return 1 if $attr;
+    }
+  }
+
+  # a join can be expressed as a multi-level from
+  return 1 if (
+    $join_check_req
+      and
+    ref $attrs->{from} eq 'ARRAY'
+      and
+    @{$attrs->{from}} > 1
+  );
+
+  return 0;
+}
+
 # _collapse_cond
 #
 # Recursively collapse the condition.
@@ -1849,7 +1925,22 @@ B<NOTE>: This feature is still experimental.
 
 =cut
 
-sub as_query { return shift->cursor->as_query(@_) }
+sub as_query {
+  my $self = shift;
+
+  my $attrs = $self->_resolved_attrs_copy;
+
+  # For future use:
+  #
+  # in list ctx:
+  # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
+  # $sql also has no wrapping parenthesis in list ctx
+  #
+  my $sqlbind = $self->result_source->storage
+    ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
+
+  return $sqlbind;
+}
 
 =head2 find_or_new
 
@@ -1890,8 +1981,10 @@ sub find_or_new {
   my $self     = shift;
   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
-  my $exists   = $self->find($hash, $attrs);
-  return defined $exists ? $exists : $self->new_result($hash);
+  if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
+    return $row;
+  }
+  return $self->new_result($hash);
 }
 
 =head2 create
@@ -1934,12 +2027,12 @@ Example of creating a new row.
     name=>"Some Person",
     email=>"somebody@someplace.com"
   });
-  
+
 Example of creating a new row and also creating rows in a related C<has_many>
 or C<has_one> resultset.  Note Arrayref.
 
   $artist_rs->create(
-     { artistid => 4, name => 'Manufactured Crap', cds => [ 
+     { artistid => 4, name => 'Manufactured Crap', cds => [
         { title => 'My First CD', year => 2006 },
         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
       ],
@@ -2021,8 +2114,10 @@ sub find_or_create {
   my $self     = shift;
   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
-  my $exists   = $self->find($hash, $attrs);
-  return defined $exists ? $exists : $self->create($hash);
+  if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
+    return $row;
+  }
+  return $self->create($hash);
 }
 
 =head2 update_or_create
@@ -2055,10 +2150,10 @@ For example:
     { key => 'cd_artist_title' }
   );
 
-  $cd->cd_to_producer->update_or_create({ 
-    producer => $producer, 
+  $cd->cd_to_producer->update_or_create({
+    producer => $producer,
     name => 'harry',
-  }, { 
+  }, {
     key => 'primary,
   });
 
@@ -2243,7 +2338,7 @@ sub related_resultset {
       "search_related: result source '" . $self->result_source->source_name .
         "' has no such relationship $rel")
       unless $rel_obj;
-    
+
     my ($from,$seen) = $self->_resolve_from($rel);
 
     my $join_count = $seen->{$rel};
@@ -2336,32 +2431,50 @@ sub current_source_alias {
   return ($self->{attrs} || {})->{alias} || 'me';
 }
 
+# This code is called by search_related, and makes sure there
+# is clear separation between the joins before, during, and
+# after the relationship. This information is needed later
+# in order to properly resolve prefetch aliases (any alias
+# with a relation_chain_depth less than the depth of the
+# current prefetch is not considered)
 sub _resolve_from {
   my ($self, $extra_join) = @_;
   my $source = $self->result_source;
   my $attrs = $self->{attrs};
-  
-  my $from = $attrs->{from}
-    || [ { $attrs->{alias} => $source->from } ];
-    
-  my $seen = { %{$attrs->{seen_join}||{}} };
 
-  my $join = ($attrs->{join}
-               ? [ $attrs->{join}, $extra_join ]
-               : $extra_join);
+  my $from = [ @{
+      $attrs->{from}
+        ||
+      [{
+        -source_handle => $source->handle,
+        -alias => $attrs->{alias},
+        $attrs->{alias} => $source->from,
+      }]
+  }];
+
+  my $seen = { %{$attrs->{seen_join} || {} } };
 
-  # we need to take the prefetch the attrs into account before we 
-  # ->resolve_join as otherwise they get lost - captainL
-  my $merged = $self->_merge_attr( $join, $attrs->{prefetch} );
+  # we need to take the prefetch the attrs into account before we
+  # ->_resolve_join as otherwise they get lost - captainL
+  my $merged = $self->_merge_attr( $attrs->{join}, $attrs->{prefetch} );
 
-  $from = [
-    @$from,
-    ($join ? $source->resolve_join($merged, $attrs->{alias}, $seen) : ()),
-  ];
+  push @$from, $source->_resolve_join($merged, $attrs->{alias}, $seen) if ($merged);
+
+  ++$seen->{-relation_chain_depth};
+
+  push @$from, $source->_resolve_join($extra_join, $attrs->{alias}, $seen);
+
+  ++$seen->{-relation_chain_depth};
 
   return ($from,$seen);
 }
 
+# too many times we have to do $attrs = { %{$self->_resolved_attrs} }
+sub _resolved_attrs_copy {
+  my $self = shift;
+  return { %{$self->_resolved_attrs (@_)} };
+}
+
 sub _resolved_attrs {
   my $self = shift;
   return $self->{_attrs} if $self->{_attrs};
@@ -2380,14 +2493,14 @@ sub _resolved_attrs {
               ? $_
               : {
                   (
-                    /^\Q${alias}.\E(.+)$/ 
+                    /^\Q${alias}.\E(.+)$/
                       ? "$1"
                       : "$_"
                   )
-                => 
+                =>
                   (
-                    /\./ 
-                      ? "$_" 
+                    /\./
+                      ? "$_"
                       : "${alias}.$_"
                   )
             }
@@ -2442,7 +2555,11 @@ sub _resolved_attrs {
     push( @{ $attrs->{as} }, @$adds );
   }
 
-  $attrs->{from} ||= [ { $self->{attrs}{alias} => $source->from } ];
+  $attrs->{from} ||= [ {
+    -source_handle => $source->handle,
+    -alias => $self->{attrs}{alias},
+    $self->{attrs}{alias} => $source->from,
+  } ];
 
   if ( exists $attrs->{join} || exists $attrs->{prefetch} ) {
     my $join = delete $attrs->{join} || {};
@@ -2455,15 +2572,13 @@ sub _resolved_attrs {
     $attrs->{from} =    # have to copy here to avoid corrupting the original
       [
       @{ $attrs->{from} },
-      $source->resolve_join(
+      $source->_resolve_join(
         $join, $alias, { %{ $attrs->{seen_join} || {} } }
       )
       ];
 
   }
 
-  $attrs->{group_by} ||= $attrs->{select}
-    if delete $attrs->{distinct};
   if ( $attrs->{order_by} ) {
     $attrs->{order_by} = (
       ref( $attrs->{order_by} ) eq 'ARRAY'
@@ -2475,34 +2590,67 @@ sub _resolved_attrs {
     $attrs->{order_by} = [];
   }
 
+  # If the order_by is otherwise empty - we will use this for TOP limit
+  # emulation and the like.
+  # Although this is needed only if the order_by is not defined, it is
+  # actually cheaper to just populate this rather than properly examining
+  # order_by (stuf like [ {} ] and the like)
+  $attrs->{_virtual_order_by} = [ $self->result_source->primary_columns ];
+
+
   my $collapse = $attrs->{collapse} || {};
+
   if ( my $prefetch = delete $attrs->{prefetch} ) {
     $prefetch = $self->_merge_attr( {}, $prefetch );
-    my @pre_order;
-    my $seen = { %{ $attrs->{seen_join} || {} } };
-    foreach my $p ( ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch) ) {
-
-      # bring joins back to level of current class
-      my @prefetch =
-        $source->resolve_prefetch( $p, $alias, $seen, \@pre_order, $collapse );
-      push( @{ $attrs->{select} }, map { $_->[0] } @prefetch );
-      push( @{ $attrs->{as} },     map { $_->[1] } @prefetch );
-    }
-    push( @{ $attrs->{order_by} }, @pre_order );
+
+    my $prefetch_ordering = [];
+
+    my $join_map = $self->_joinpath_aliases ($attrs->{from}, $attrs->{seen_join});
+
+    my @prefetch =
+      $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $collapse );
+
+    push( @{ $attrs->{select} }, map { $_->[0] } @prefetch );
+    push( @{ $attrs->{as} },     map { $_->[1] } @prefetch );
+
+    push( @{ $attrs->{order_by} }, @$prefetch_ordering );
+  }
+
+  if (delete $attrs->{distinct}) {
+    $attrs->{group_by} ||= [ grep { !ref($_) || (ref($_) ne 'HASH') } @{$attrs->{select}} ];
   }
+
   $attrs->{collapse} = $collapse;
 
-  if ( $attrs->{page} ) {
-    $attrs->{offset} ||= 0;
-    $attrs->{offset} += ( $attrs->{rows} * ( $attrs->{page} - 1 ) );
+  if ( $attrs->{page} and not defined $attrs->{offset} ) {
+    $attrs->{offset} = ( $attrs->{rows} * ( $attrs->{page} - 1 ) );
   }
 
   return $self->{_attrs} = $attrs;
 }
 
+sub _joinpath_aliases {
+  my ($self, $fromspec, $seen) = @_;
+
+  my $paths = {};
+  return $paths unless ref $fromspec eq 'ARRAY';
+
+  for my $j (@$fromspec) {
+
+    next if ref $j ne 'ARRAY';
+    next if $j->[0]{-relation_chain_depth} < ( $seen->{-relation_chain_depth} || 0);
+
+    my $p = $paths;
+    $p = $p->{$_} ||= {} for @{$j->[0]{-join_path}};
+    push @{$p->{-join_aliases} }, $j->[0]{-alias};
+  }
+
+  return $paths;
+}
+
 sub _rollout_attr {
   my ($self, $attr) = @_;
-  
+
   if (ref $attr eq 'HASH') {
     return $self->_rollout_hash($attr);
   } elsif (ref $attr eq 'ARRAY') {
@@ -2553,7 +2701,7 @@ sub _calculate_score {
       }
     } else {
       return ($a eq $b_key) ? 1 : 0;
-    }       
+    }
   } else {
     if (ref $a eq 'HASH') {
       my ($a_key) = keys %{$a};
@@ -2569,7 +2717,7 @@ sub _merge_attr {
 
   return $import unless defined($orig);
   return $orig unless defined($import);
-  
+
   $orig = $self->_rollout_attr($orig);
   $import = $self->_rollout_attr($import);
 
@@ -2845,19 +2993,19 @@ For example:
     }
   );
 
-You need to use the relationship (not the table) name in  conditions, 
-because they are aliased as such. The current table is aliased as "me", so 
+You need to use the relationship (not the table) name in  conditions,
+because they are aliased as such. The current table is aliased as "me", so
 you need to use me.column_name in order to avoid ambiguity. For example:
 
-  # Get CDs from 1984 with a 'Foo' track 
+  # Get CDs from 1984 with a 'Foo' track
   my $rs = $schema->resultset('CD')->search(
-    { 
+    {
       'me.year' => 1984,
       'tracks.name' => 'Foo'
     },
     { join => 'tracks' }
   );
-  
+
 If the same join is supplied twice, it will be aliased to <rel>_2 (and
 similarly for a third time). For e.g.
 
@@ -2910,12 +3058,12 @@ C<cd> or C<artist> relationships, which saves us two SQL statements in this
 case.
 
 Simple prefetches will be joined automatically, so there is no need
-for a C<join> attribute in the above search. 
+for a C<join> attribute in the above search.
 
 C<prefetch> can be used with the following relationship types: C<belongs_to>,
 C<has_one> (or if you're using C<add_relationship>, any relationship declared
 with an accessor type of 'single' or 'filter'). A more complex example that
-prefetches an artists cds, the tracks on those cds, and the tags associted 
+prefetches an artists cds, the tracks on those cds, and the tags associted
 with that artist is given below (assuming many-to-many from artists to tags):
 
  my $rs = $schema->resultset('Artist')->search(
@@ -2927,7 +3075,7 @@ with that artist is given below (assuming many-to-many from artists to tags):
      ]
    }
  );
+
 
 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
 attributes will be ignored.
@@ -3147,9 +3295,21 @@ with a father in the person table, we could explicitly use C<INNER JOIN>:
     # SELECT child.* FROM person child
     # INNER JOIN person father ON child.father_id = father.id
 
-If you need to express really complex joins or you need a subselect, you
+You can select from a subquery by passing a resultset to from as follows.
+
+    $schema->resultset('Artist')->search( 
+        undef, 
+        {   alias => 'artist2',
+            from  => [ { artist2 => $artist_rs->as_query } ],
+        } );
+
+    # and you'll get sql like this..
+    # SELECT artist2.artistid, artist2.name, artist2.rank, artist2.charfield FROM 
+    #   ( SELECT me.artistid, me.name, me.rank, me.charfield FROM artists me ) artist2
+
+If you need to express really complex joins, you
 can supply literal SQL to C<from> via a scalar reference. In this case
-the contents of the scalar will replace the table name asscoiated with the
+the contents of the scalar will replace the table name associated with the
 resultsource.
 
 WARNING: This technique might very well not work as expected on chained
@@ -3179,12 +3339,12 @@ searches - you have been warned.
     $table = $rs->result_source->name;
     $latest = $rs->search (
         undef,
-        { from => \ " 
-            (SELECT e1.* FROM $table e1 
-                JOIN $table e2 
-                    ON e1.location = e2.location 
-                    AND e1.sequence < e2.sequence 
-                WHERE e2.sequence is NULL 
+        { from => \ "
+            (SELECT e1.* FROM $table e1
+                JOIN $table e2
+                    ON e1.location = e2.location
+                    AND e1.sequence < e2.sequence
+                WHERE e2.sequence is NULL
             ) me",
         },
     );