Reduce mount of perlgolf in ResultSet.pm
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index 8f9d601..5be8a14 100644 (file)
@@ -2,62 +2,153 @@ package DBIx::Class::ResultSet;
 
 use strict;
 use warnings;
-use overload
-        '0+'     => "count",
-        'bool'   => "_bool",
-        fallback => 1;
+use base qw/DBIx::Class/;
 use Carp::Clan qw/^DBIx::Class/;
+use DBIx::Class::Exception;
 use Data::Page;
 use Storable;
 use DBIx::Class::ResultSetColumn;
 use DBIx::Class::ResultSourceHandle;
 use List::Util ();
-use Scalar::Util ();
-use base qw/DBIx::Class/;
+use Scalar::Util qw/blessed weaken/;
+use Try::Tiny;
+use namespace::clean;
+
+use overload
+        '0+'     => "count",
+        'bool'   => "_bool",
+        fallback => 1;
 
 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
 
 =head1 NAME
 
-DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
+DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
 
 =head1 SYNOPSIS
 
-  my $rs   = $schema->resultset('User')->search({ registered => 1 });
-  my @rows = $schema->resultset('CD')->search({ year => 2005 })->all();
+  my $users_rs   = $schema->resultset('User');
+  while( $user = $users_rs->next) {
+    print $user->username;
+  }
+
+  my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
+  my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
 
 =head1 DESCRIPTION
 
-The resultset is also known as an iterator. It is responsible for handling
-queries that may return an arbitrary number of rows, e.g. via L</search>
-or a C<has_many> relationship.
+A ResultSet is an object which stores a set of conditions representing
+a query. It is the backbone of DBIx::Class (i.e. the really
+important/useful bit).
 
-In the examples below, the following table classes are used:
+No SQL is executed on the database when a ResultSet is created, it
+just stores all the conditions needed to create the query.
 
-  package MyApp::Schema::Artist;
-  use base qw/DBIx::Class/;
-  __PACKAGE__->load_components(qw/Core/);
-  __PACKAGE__->table('artist');
-  __PACKAGE__->add_columns(qw/artistid name/);
-  __PACKAGE__->set_primary_key('artistid');
-  __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
-  1;
+A basic ResultSet representing the data of an entire table is returned
+by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
+L<Source|DBIx::Class::Manual::Glossary/Source> name.
 
-  package MyApp::Schema::CD;
-  use base qw/DBIx::Class/;
-  __PACKAGE__->load_components(qw/Core/);
-  __PACKAGE__->table('cd');
-  __PACKAGE__->add_columns(qw/cdid artist title year/);
-  __PACKAGE__->set_primary_key('cdid');
-  __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
-  1;
+  my $users_rs = $schema->resultset('User');
+
+A new ResultSet is returned from calling L</search> on an existing
+ResultSet. The new one will contain all the conditions of the
+original, plus any new conditions added in the C<search> call.
 
-=head1 OVERLOADING
+A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
+can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
+represents.
+
+The query that the ResultSet represents is B<only> executed against
+the database when these methods are called:
+L</find>, L</next>, L</all>, L</first>, L</single>, L</count>.
 
 If a resultset is used in a numeric context it returns the L</count>.
-However, if it is used in a booleand context it is always true.  So if
-you want to check if a resultset has any results use C<if $rs != 0>.
-C<if $rs> will always be true.
+However, if it is used in a boolean context it is B<always> true.  So if
+you want to check if a resultset has any results, you must use C<if $rs
+!= 0>.
+
+=head1 EXAMPLES
+
+=head2 Chaining resultsets
+
+Let's say you've got a query that needs to be run to return some data
+to the user. But, you have an authorization system in place that
+prevents certain users from seeing certain information. So, you want
+to construct the basic query in one method, but add constraints to it in
+another.
+
+  sub get_data {
+    my $self = shift;
+    my $request = $self->get_request; # Get a request object somehow.
+    my $schema = $self->get_schema;   # Get the DBIC schema object somehow.
+
+    my $cd_rs = $schema->resultset('CD')->search({
+      title => $request->param('title'),
+      year => $request->param('year'),
+    });
+
+    $self->apply_security_policy( $cd_rs );
+
+    return $cd_rs->all();
+  }
+
+  sub apply_security_policy {
+    my $self = shift;
+    my ($rs) = @_;
+
+    return $rs->search({
+      subversive => 0,
+    });
+  }
+
+=head3 Resolving conditions and attributes
+
+When a resultset is chained from another resultset, conditions and
+attributes with the same keys need resolving.
+
+L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
+into the existing ones from the original resultset.
+
+The L</where> and L</having> attributes, and any search conditions, are
+merged with an SQL C<AND> to the existing condition from the original
+resultset.
+
+All other attributes are overridden by any new ones supplied in the
+search attributes.
+
+=head2 Multiple queries
+
+Since a resultset just defines a query, you can do all sorts of
+things with it with the same object.
+
+  # Don't hit the DB yet.
+  my $cd_rs = $schema->resultset('CD')->search({
+    title => 'something',
+    year => 2009,
+  });
+
+  # Each of these hits the DB individually.
+  my $count = $cd_rs->count;
+  my $most_recent = $cd_rs->get_column('date_released')->max();
+  my @records = $cd_rs->all;
+
+And it's not just limited to SELECT statements.
+
+  $cd_rs->delete();
+
+This is even cooler:
+
+  $cd_rs->create({ artist => 'Fred' });
+
+Which is the same as:
+
+  $schema->resultset('CD')->create({
+    title => 'something',
+    year => 2009,
+    artist => 'Fred'
+  });
+
+See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
 
 =head1 METHODS
 
@@ -94,7 +185,7 @@ sub new {
   return $class->new_result(@_) if ref $class;
 
   my ($source, $attrs) = @_;
-  $source = $source->handle 
+  $source = $source->handle
     unless $source->isa('DBIx::Class::ResultSourceHandle');
   $attrs = { %{$attrs||{}} };
 
@@ -109,7 +200,6 @@ sub new {
   my $self = {
     _source_handle => $source,
     cond => $attrs->{where},
-    count => undef,
     pager => undef,
     attrs => $attrs
   };
@@ -180,98 +270,98 @@ always return a resultset, even in list context.
 sub search_rs {
   my $self = shift;
 
-  my $attrs = {};
-  $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
-  my $our_attrs = { %{$self->{attrs}} };
-  my $having = delete $our_attrs->{having};
-  my $where = delete $our_attrs->{where};
+  # Special-case handling for (undef, undef).
+  if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
+    @_ = ();
+  }
 
-  my $rows;
+  my $call_attrs = {};
+  $call_attrs = pop(@_) if @_ > 1 and ref $_[-1] eq 'HASH';
 
+  # see if we can keep the cache (no $rs changes)
+  my $cache;
   my %safe = (alias => 1, cache => 1);
+  if ( ! List::Util::first { !$safe{$_} } keys %$call_attrs and (
+    ! defined $_[0]
+      or
+    ref $_[0] eq 'HASH' && ! keys %{$_[0]}
+      or
+    ref $_[0] eq 'ARRAY' && ! @{$_[0]}
+  )) {
+    $cache = $self->get_cache;
+  }
 
-  unless (
-    (@_ && defined($_[0])) # @_ == () or (undef)
-    || 
-    (keys %$attrs # empty attrs or only 'safe' attrs
-    && List::Util::first { !$safe{$_} } keys %$attrs)
-  ) {
-    # no search, effectively just a clone
-    $rows = $self->get_cache;
+  my $old_attrs = { %{$self->{attrs}} };
+  my $old_having = delete $old_attrs->{having};
+  my $old_where = delete $old_attrs->{where};
+
+  # reset the selector list
+  if (List::Util::first { exists $call_attrs->{$_} } qw{columns select as}) {
+     delete @{$old_attrs}{qw{select as columns +select +as +columns include_columns}};
   }
 
-  my $new_attrs = { %{$our_attrs}, %{$attrs} };
+  my $new_attrs = { %{$old_attrs}, %{$call_attrs} };
 
   # merge new attrs into inherited
-  foreach my $key (qw/join prefetch +select +as/) {
-    next unless exists $attrs->{$key};
-    $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
+  foreach my $key (qw/join prefetch +select +as +columns include_columns bind/) {
+    next unless exists $call_attrs->{$key};
+    $new_attrs->{$key} = $self->_merge_attr($old_attrs->{$key}, $call_attrs->{$key});
   }
 
-  my $cond = (@_
-    ? (
-        (@_ == 1 || ref $_[0] eq "HASH")
-          ? (
-              (ref $_[0] eq 'HASH')
-                ? (
-                    (keys %{ $_[0] }  > 0)
-                      ? shift
-                      : undef
-                   )
-                :  shift
-             )
-          : (
-              (@_ % 2)
-                ? $self->throw_exception("Odd number of arguments to search")
-                : {@_}
-             )
-      )
-    : undef
-  );
+  # rip apart the rest of @_, parse a condition
+  my $call_cond = do {
 
-  if (defined $where) {
-    $new_attrs->{where} = (
-      defined $new_attrs->{where}
-        ? { '-and' => [
-              map {
-                ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
-              } $where, $new_attrs->{where}
-            ]
-          }
-        : $where);
-  }
+    if (ref $_[0] eq 'HASH') {
+      (keys %{$_[0]}) ? $_[0] : undef
+    }
+    elsif (@_ == 1) {
+      $_[0]
+    }
+    elsif (@_ % 2) {
+      $self->throw_exception('Odd number of arguments to search')
+    }
+    else {
+      +{ @_ }
+    }
 
-  if (defined $cond) {
-    $new_attrs->{where} = (
-      defined $new_attrs->{where}
-        ? { '-and' => [
-              map {
-                ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
-              } $cond, $new_attrs->{where}
-            ]
-          }
-        : $cond);
+  } if @_;
+
+  for ($old_where, $call_cond) {
+    if (defined $_) {
+      $new_attrs->{where} = $self->_stack_cond (
+        $_, $new_attrs->{where}
+      );
+    }
   }
 
-  if (defined $having) {
-    $new_attrs->{having} = (
-      defined $new_attrs->{having}
-        ? { '-and' => [
-              map {
-                ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
-              } $having, $new_attrs->{having}
-            ]
-          }
-        : $having);
+  if (defined $old_having) {
+    $new_attrs->{having} = $self->_stack_cond (
+      $old_having, $new_attrs->{having}
+    )
   }
 
   my $rs = (ref $self)->new($self->result_source, $new_attrs);
-  if ($rows) {
-    $rs->set_cache($rows);
-  }
+
+  $rs->set_cache($cache) if ($cache);
+
   return $rs;
 }
 
+sub _stack_cond {
+  my ($self, $left, $right) = @_;
+  if (defined $left xor defined $right) {
+    return defined $left ? $left : $right;
+  }
+  elsif (defined $left) {
+    return { -and => [ map
+      { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
+      ($left, $right)
+    ]};
+  }
+
+  return undef;
+}
+
 =head2 search_literal
 
 =over 4
@@ -289,19 +379,29 @@ Pass a literal chunk of SQL to be added to the conditional part of the
 resultset query.
 
 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
-only be used in that context. There are known problems using C<search_literal>
-in chained queries; it can result in bind values in the wrong order.  See
-L<DBIx::Class::Manual::Cookbook/Searching> and
+only be used in that context. C<search_literal> is a convenience method.
+It is equivalent to calling $schema->search(\[]), but if you want to ensure
+columns are bound correctly, use C<search>.
+
+Example of how to use C<search> instead of C<search_literal>
+
+  my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
+  my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
+
+
+See L<DBIx::Class::Manual::Cookbook/Searching> and
 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
 require C<search_literal>.
 
 =cut
 
 sub search_literal {
-  my ($self, $cond, @vals) = @_;
-  my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
-  $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
-  return $self->search(\$cond, $attrs);
+  my ($self, $sql, @bind) = @_;
+  my $attr;
+  if ( @bind && ref($bind[-1]) eq 'HASH' ) {
+    $attr = pop @bind;
+  }
+  return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
 }
 
 =head2 find
@@ -361,47 +461,49 @@ sub find {
   my $self = shift;
   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
 
-  # Default to the primary key, but allow a specific key
-  my @cols = exists $attrs->{key}
-    ? $self->result_source->unique_constraint_columns($attrs->{key})
-    : $self->result_source->primary_columns;
-  $self->throw_exception(
-    "Can't find unless a primary key is defined or unique constraint is specified"
-  ) unless @cols;
-
-  # Parse out a hashref from input
+  # Parse out a query from input
   my $input_query;
   if (ref $_[0] eq 'HASH') {
     $input_query = { %{$_[0]} };
   }
-  elsif (@_ == @cols) {
-    $input_query = {};
-    @{$input_query}{@cols} = @_;
-  }
   else {
-    # Compatibility: Allow e.g. find(id => $value)
-    carp "Find by key => value deprecated; please use a hashref instead";
-    $input_query = {@_};
-  }
+    my $constraint = exists $attrs->{key} ? $attrs->{key} : 'primary';
+    my @c_cols = $self->result_source->unique_constraint_columns($constraint);
+
+    $self->throw_exception(
+      "No constraint columns, maybe a malformed '$constraint' constraint?"
+    ) unless @c_cols;
+
+    $self->throw_exception (
+      'find() expects either a column/value hashref, or a list of values '
+    . "corresponding to the columns of the specified unique constraint '$constraint'"
+    ) unless @c_cols == @_;
 
-  my (%related, $info);
+    $input_query = {};
+    @{$input_query}{@c_cols} = @_;
+  }
 
-  KEY: foreach my $key (keys %$input_query) {
-    if (ref($input_query->{$key})
-        && ($info = $self->result_source->relationship_info($key))) {
+  my %related;
+  for my $key (keys %$input_query) {
+    if (
+      my $keyref = ref($input_query->{$key})
+        and
+      my $relinfo = $self->result_source->relationship_info($key)
+    ) {
       my $val = delete $input_query->{$key};
-      next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
-      my $rel_q = $self->result_source->resolve_condition(
-                    $info->{cond}, $val, $key
-                  );
-      die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
+
+      next if $keyref eq 'ARRAY'; # has_many for multi_create
+
+      my $rel_q = $self->result_source->_resolve_condition(
+        $relinfo->{cond}, $val, $key
+      );
+      die "Can't handle complex relationship conditions in find" if ref($rel_q) ne 'HASH';
       @related{keys %$rel_q} = values %$rel_q;
     }
   }
-  if (my @keys = keys %related) {
-    @{$input_query}{@keys} = values %related;
-  }
 
+  # relationship conditions take precedence (?)
+  @{$input_query}{keys %related} = values %related;
 
   # Build the final query: Default to the disjunction of the unique queries,
   # but allow the input query in case the ResultSet defines the query or the
@@ -413,35 +515,34 @@ sub find {
     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
     $query = $self->_add_alias($unique_query, $alias);
   }
+  elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
+    # This means that we got here after a merger of relationship conditions
+    # in ::Relationship::Base::search_related (the row method), and furthermore
+    # the relationship is of the 'single' type. This means that the condition
+    # provided by the relationship (already attached to $self) is sufficient,
+    # as there can be only one row in the database that would satisfy the
+    # relationship
+  }
   else {
+    # no key was specified - fall down to heuristics mode
+    # get all possible unique queries based on the combination of $query
+    # and the condition available in $self, and then run a search with
+    # each and every possible constraint (as long as it's completely specified)
     my @unique_queries = $self->_unique_queries($input_query, $attrs);
     $query = @unique_queries
       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
       : $self->_add_alias($input_query, $alias);
   }
 
-  # Run the query
-  if (keys %$attrs) {
-    my $rs = $self->search($query, $attrs);
-    if (keys %{$rs->_resolved_attrs->{collapse}}) {
-      my $row = $rs->next;
-      carp "Query returned more than one row" if $rs->next;
-      return $row;
-    }
-    else {
-      return $rs->single;
-    }
+  # Run the query, passing the result_class since it should propagate for find
+  my $rs = $self->search ($query, {result_class => $self->result_class, %$attrs});
+  if (keys %{$rs->_resolved_attrs->{collapse}}) {
+    my $row = $rs->next;
+    carp "Query returned more than one row" if $rs->next;
+    return $row;
   }
   else {
-    if (keys %{$self->_resolved_attrs->{collapse}}) {
-      my $rs = $self->search($query);
-      my $row = $rs->next;
-      carp "Query returned more than one row" if $rs->next;
-      return $row;
-    }
-    else {
-      return $self->single($query);
-    }
+    return $rs->single;
   }
 }
 
@@ -463,7 +564,7 @@ sub _add_alias {
 
 # _unique_queries
 #
-# Build a list of queries which satisfy unique constraints.
+# Build a list of queries which satisfy the unique constraint(s) as per $attrs
 
 sub _unique_queries {
   my ($self, $query, $attrs) = @_;
@@ -475,12 +576,16 @@ sub _unique_queries {
   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
   my $num_where = scalar keys %$where;
 
-  my @unique_queries;
+  my (@unique_queries, %seen_column_combinations);
   foreach my $name (@constraint_names) {
-    my @unique_cols = $self->result_source->unique_constraint_columns($name);
-    my $unique_query = $self->_build_unique_query($query, \@unique_cols);
+    my @constraint_cols = $self->result_source->unique_constraint_columns($name);
+
+    my $constraint_sig = join "\x00", sort @constraint_cols;
+    next if $seen_column_combinations{$constraint_sig}++;
+
+    my $unique_query = $self->_build_unique_query($query, \@constraint_cols);
 
-    my $num_cols = scalar @unique_cols;
+    my $num_cols = scalar @constraint_cols;
     my $num_query = scalar keys %$unique_query;
 
     my $total = $num_query + $num_where;
@@ -534,7 +639,7 @@ sub search_related {
 =head2 search_related_rs
 
 This method works exactly the same as search_related, except that
-it guarantees a restultset, even in list context.
+it guarantees a resultset, even in list context.
 
 =cut
 
@@ -560,7 +665,8 @@ L<DBIx::Class::Cursor> for more information.
 sub cursor {
   my ($self) = @_;
 
-  my $attrs = { %{$self->_resolved_attrs} };
+  my $attrs = $self->_resolved_attrs_copy;
+
   return $self->{cursor}
     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
           $attrs->{where},$attrs);
@@ -572,15 +678,15 @@ sub cursor {
 
 =item Arguments: $cond?
 
-=item Return Value: $row_object?
+=item Return Value: $row_object | undef
 
 =back
 
   my $cd = $schema->resultset('CD')->single({ year => 2001 });
 
 Inflates the first result without creating a cursor if the resultset has
-any records in it; if not returns nothing. Used by L</find> as a lean version of
-L</search>.
+any records in it; if not returns C<undef>. Used by L</find> as a lean version
+of L</search>.
 
 While this method can take an optional search condition (just like L</search>)
 being a fast-code-path it does not recognize search attributes. If you need to
@@ -591,23 +697,38 @@ L<DBIx::Class::ResultSet> returned.
 
 =item B<Note>
 
-As of 0.08100, this method enforces the assumption that the preceeding
+As of 0.08100, this method enforces the assumption that the preceding
 query returns only one row. If more than one row is returned, you will receive
 a warning:
 
   Query returned more than one row
 
-In this case, you should be using L</first> or L</find> instead, or if you really
-know what you are doing, use the L</rows> attribute to explicitly limit the size 
+In this case, you should be using L</next> or L</find> instead, or if you really
+know what you are doing, use the L</rows> attribute to explicitly limit the size
 of the resultset.
 
+This method will also throw an exception if it is called on a resultset prefetching
+has_many, as such a prefetch implies fetching multiple rows from the database in
+order to assemble the resulting object.
+
 =back
 
 =cut
 
 sub single {
   my ($self, $where) = @_;
-  my $attrs = { %{$self->_resolved_attrs} };
+  if(@_ > 2) {
+      $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
+  }
+
+  my $attrs = $self->_resolved_attrs_copy;
+
+  if (keys %{$attrs->{collapse}}) {
+    $self->throw_exception(
+      'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
+    );
+  }
+
   if ($where) {
     if (defined $attrs->{where}) {
       $attrs->{where} = {
@@ -620,12 +741,6 @@ sub single {
     }
   }
 
-#  XXX: Disabled since it doesn't infer uniqueness in all cases
-#  unless ($self->_is_unique_query($attrs->{where})) {
-#    carp "Query not guaranteed to return a single row"
-#      . "; please declare your unique constraints or use search instead";
-#  }
-
   my @data = $self->result_source->storage->select_single(
     $attrs->{from}, $attrs->{select},
     $attrs->{where}, $attrs
@@ -634,37 +749,6 @@ sub single {
   return (@data ? ($self->_construct_object(@data))[0] : undef);
 }
 
-# _is_unique_query
-#
-# Try to determine if the specified query is guaranteed to be unique, based on
-# the declared unique constraints.
-
-sub _is_unique_query {
-  my ($self, $query) = @_;
-
-  my $collapsed = $self->_collapse_query($query);
-  my $alias = $self->{attrs}{alias};
-
-  foreach my $name ($self->result_source->unique_constraint_names) {
-    my @unique_cols = map {
-      "$alias.$_"
-    } $self->result_source->unique_constraint_columns($name);
-
-    # Count the values for each unique column
-    my %seen = map { $_ => 0 } @unique_cols;
-
-    foreach my $key (keys %$collapsed) {
-      my $aliased = $key =~ /\./ ? $key : "$alias.$key";
-      next unless exists $seen{$aliased};  # Additional constraints are okay
-      $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
-    }
-
-    # If we get 0 or more than 1 value for a column, it's not necessarily unique
-    return 1 unless grep { $_ != 1 } values %seen;
-  }
-
-  return 0;
-}
 
 # _collapse_query
 #
@@ -678,19 +762,16 @@ sub _collapse_query {
   if (ref $query eq 'ARRAY') {
     foreach my $subquery (@$query) {
       next unless ref $subquery;  # -or
-#      warn "ARRAY: " . Dumper $subquery;
       $collapsed = $self->_collapse_query($subquery, $collapsed);
     }
   }
   elsif (ref $query eq 'HASH') {
     if (keys %$query and (keys %$query)[0] eq '-and') {
       foreach my $subquery (@{$query->{-and}}) {
-#        warn "HASH: " . Dumper $subquery;
         $collapsed = $self->_collapse_query($subquery, $collapsed);
       }
     }
     else {
-#      warn "LEAF: " . Dumper $query;
       foreach my $col (keys %$query) {
         my $value = $query->{$col};
         $collapsed->{$col}{$value}++;
@@ -742,10 +823,24 @@ You most likely want to use L</search> with specific operators.
 
 For more information, see L<DBIx::Class::Manual::Cookbook>.
 
+This method is deprecated and will be removed in 0.09. Use L</search()>
+instead. An example conversion is:
+
+  ->search_like({ foo => 'bar' });
+
+  # Becomes
+
+  ->search({ foo => { like => 'bar' } });
+
 =cut
 
 sub search_like {
   my $class = shift;
+  carp (
+    'search_like() is deprecated and will be removed in DBIC version 0.09.'
+   .' Instead use ->search({ x => { -like => "y%" } })'
+   .' (note the outer pair of {}s - they are important!)'
+  );
   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
@@ -776,7 +871,7 @@ sub slice {
   $attrs->{offset} = $self->{attrs}{offset} || 0;
   $attrs->{offset} += $min;
   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
-  return $self->search(undef(), $attrs);
+  return $self->search(undef, $attrs);
   #my $slice = (ref $self)->new($self->result_source, $attrs);
   #return (wantarray ? $slice->all : $slice);
 }
@@ -787,7 +882,7 @@ sub slice {
 
 =item Arguments: none
 
-=item Return Value: $result?
+=item Return Value: $result | undef
 
 =back
 
@@ -813,6 +908,7 @@ sub next {
     return $cache->[$self->{all_cache_position}++];
   }
   if ($self->{attrs}{cache}) {
+    delete $self->{pager};
     $self->{all_cache_position} = 1;
     return ($self->all)[0];
   }
@@ -834,7 +930,9 @@ sub next {
 
 sub _construct_object {
   my ($self, @row) = @_;
-  my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
+
+  my $info = $self->_collapse_result($self->{_attrs}{as}, \@row)
+    or return ();
   my @new = $self->result_class->inflate_result($self->result_source, @$info);
   @new = $self->{_attrs}{record_filter}->(@new)
     if exists $self->{_attrs}{record_filter};
@@ -868,7 +966,7 @@ sub _collapse_result {
   # without having to contruct the full hash
 
   if (keys %collapse) {
-    my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
+    my %pri = map { ($_ => 1) } $self->result_source->_pri_cols;
     foreach my $i (0 .. $#construct_as) {
       next if defined($construct_as[$i][0]); # only self table
       if (delete $pri{$construct_as[$i][1]}) {
@@ -887,7 +985,7 @@ sub _collapse_result {
   do { # no need to check anything at the front, we always want the first row
 
     my %const;
-  
+
     foreach my $this_as (@construct_as) {
       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
     }
@@ -934,7 +1032,7 @@ sub _collapse_result {
         foreach my $p (@parts) {
           $target = $target->[1]->{$p} ||= [];
           $cur .= ".${p}";
-          if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
+          if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) {
             # collapsing at this point and on final part
             my $pos = $collapse_pos{$cur};
             CK: foreach my $ck (@ckey) {
@@ -985,17 +1083,28 @@ is derived.
 
 =back
 
-An accessor for the class to use when creating row objects. Defaults to 
-C<< result_source->result_class >> - which in most cases is the name of the 
+An accessor for the class to use when creating row objects. Defaults to
+C<< result_source->result_class >> - which in most cases is the name of the
 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
 
+Note that changing the result_class will also remove any components
+that were originally loaded in the source class via
+L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
+in the original source class will not run.
+
 =cut
 
 sub result_class {
   my ($self, $result_class) = @_;
   if ($result_class) {
-    $self->ensure_class_loaded($result_class);
+    unless (ref $result_class) { # don't fire this for an object
+      $self->ensure_class_loaded($result_class);
+    }
     $self->_result_class($result_class);
+    # THIS LINE WOULD BE A BUG - this accessor specifically exists to
+    # permit the user to set result class on one result set only; it only
+    # chains if provided to search()
+    #$self->{attrs}{result_class} = $result_class if ref $self;
   }
   $self->_result_class;
 }
@@ -1011,14 +1120,8 @@ sub result_class {
 =back
 
 Performs an SQL C<COUNT> with the same query as the resultset was built
-with to find the number of elements. If passed arguments, does a search
-on the resultset and counts the results of that.
-
-Note: When using C<count> with C<group_by>, L<DBIx::Class> emulates C<GROUP BY>
-using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
-not support C<DISTINCT> with multiple columns. If you are using such a
-database, you should only use columns from the main table in your C<group_by>
-clause.
+with to find the number of elements. Passing arguments is equivalent to
+C<< $rs->search ($cond, \%attrs)->count >>
 
 =cut
 
@@ -1026,50 +1129,144 @@ sub count {
   my $self = shift;
   return $self->search(@_)->count if @_ and defined $_[0];
   return scalar @{ $self->get_cache } if $self->get_cache;
-  my $count = $self->_count;
-  return 0 unless $count;
 
-  # need to take offset from resolved attrs
+  my $attrs = $self->_resolved_attrs_copy;
+
+  # this is a little optimization - it is faster to do the limit
+  # adjustments in software, instead of a subquery
+  my $rows = delete $attrs->{rows};
+  my $offset = delete $attrs->{offset};
+
+  my $crs;
+  if ($self->_has_resolved_attr (qw/collapse group_by/)) {
+    $crs = $self->_count_subq_rs ($attrs);
+  }
+  else {
+    $crs = $self->_count_rs ($attrs);
+  }
+  my $count = $crs->next;
 
-  $count -= $self->{_attrs}{offset} if $self->{_attrs}{offset};
-  $count = $self->{attrs}{rows} if
-    $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
+  $count -= $offset if $offset;
+  $count = $rows if $rows and $rows < $count;
   $count = 0 if ($count < 0);
+
   return $count;
 }
 
-sub _count { # Separated out so pager can get the full count
+=head2 count_rs
+
+=over 4
+
+=item Arguments: $cond, \%attrs??
+
+=item Return Value: $count_rs
+
+=back
+
+Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
+This can be very handy for subqueries:
+
+  ->search( { amount => $some_rs->count_rs->as_query } )
+
+As with regular resultsets the SQL query will be executed only after
+the resultset is accessed via L</next> or L</all>. That would return
+the same single value obtainable via L</count>.
+
+=cut
+
+sub count_rs {
   my $self = shift;
-  my $select = { count => '*' };
-
-  my $attrs = { %{$self->_resolved_attrs} };
-  if (my $group_by = delete $attrs->{group_by}) {
-    delete $attrs->{having};
-    my @distinct = (ref $group_by ?  @$group_by : ($group_by));
-    # todo: try CONCAT for multi-column pk
-    my @pk = $self->result_source->primary_columns;
-    if (@pk == 1) {
-      my $alias = $attrs->{alias};
-      foreach my $column (@distinct) {
-        if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
-          @distinct = ($column);
-          last;
-        }
-      }
-    }
+  return $self->search(@_)->count_rs if @_;
+
+  # this may look like a lack of abstraction (count() does about the same)
+  # but in fact an _rs *must* use a subquery for the limits, as the
+  # software based limiting can not be ported if this $rs is to be used
+  # in a subquery itself (i.e. ->as_query)
+  if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
+    return $self->_count_subq_rs;
+  }
+  else {
+    return $self->_count_rs;
+  }
+}
+
+#
+# returns a ResultSetColumn object tied to the count query
+#
+sub _count_rs {
+  my ($self, $attrs) = @_;
+
+  my $rsrc = $self->result_source;
+  $attrs ||= $self->_resolved_attrs;
+
+  my $tmp_attrs = { %$attrs };
+  # take off any limits, record_filter is cdbi, and no point of ordering nor locking a count
+  delete @{$tmp_attrs}{qw/rows offset order_by record_filter for/};
+
+  # overwrite the selector (supplied by the storage)
+  $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $attrs);
+  $tmp_attrs->{as} = 'count';
+
+  my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
+
+  return $tmp_rs;
+}
+
+#
+# same as above but uses a subquery
+#
+sub _count_subq_rs {
+  my ($self, $attrs) = @_;
+
+  my $rsrc = $self->result_source;
+  $attrs ||= $self->_resolved_attrs;
+
+  my $sub_attrs = { %$attrs };
+  # extra selectors do not go in the subquery and there is no point of ordering it, nor locking it
+  delete @{$sub_attrs}{qw/collapse select _prefetch_select as order_by for/};
 
-    $select = { count => { distinct => \@distinct } };
+  # if we multi-prefetch we group_by primary keys only as this is what we would
+  # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
+  if ( keys %{$attrs->{collapse}}  ) {
+    $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } ($rsrc->_pri_cols) ]
   }
 
-  $attrs->{select} = $select;
-  $attrs->{as} = [qw/count/];
+  # Calculate subquery selector
+  if (my $g = $sub_attrs->{group_by}) {
 
-  # offset, order by and page are not needed to count. record_filter is cdbi
-  delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
+    my $sql_maker = $rsrc->storage->sql_maker;
 
-  my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
-  my ($count) = $tmp_rs->cursor->next;
-  return $count;
+    # necessary as the group_by may refer to aliased functions
+    my $sel_index;
+    for my $sel (@{$attrs->{select}}) {
+      $sel_index->{$sel->{-as}} = $sel
+        if (ref $sel eq 'HASH' and $sel->{-as});
+    }
+
+    for my $g_part (@$g) {
+      my $colpiece = $sel_index->{$g_part} || $g_part;
+
+      # disqualify join-based group_by's. Arcane but possible query
+      # also horrible horrible hack to alias a column (not a func.)
+      # (probably need to introduce SQLA syntax)
+      if ($colpiece =~ /\./ && $colpiece !~ /^$attrs->{alias}\./) {
+        my $as = $colpiece;
+        $as =~ s/\./__/;
+        $colpiece = \ sprintf ('%s AS %s', map { $sql_maker->_quote ($_) } ($colpiece, $as) );
+      }
+      push @{$sub_attrs->{select}}, $colpiece;
+    }
+  }
+  else {
+    my @pcols = map { "$attrs->{alias}.$_" } ($rsrc->primary_columns);
+    $sub_attrs->{select} = @pcols ? \@pcols : [ 1 ];
+  }
+
+  return $rsrc->resultset_class
+               ->new ($rsrc, $sub_attrs)
+                ->as_subselect_rs
+                 ->search ({}, { columns => { count => $rsrc->storage->_count_select ($rsrc, $attrs) } })
+                  ->get_column ('count');
 }
 
 sub _bool {
@@ -1109,18 +1306,21 @@ is returned in list context.
 =cut
 
 sub all {
-  my ($self) = @_;
+  my $self = shift;
+  if(@_) {
+      $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
+  }
+
   return @{ $self->get_cache } if $self->get_cache;
 
   my @obj;
 
-  # TODO: don't call resolve here
   if (keys %{$self->_resolved_attrs->{collapse}}) {
-#  if ($self->{attrs}{prefetch}) {
-      # Using $self->cursor->all is really just an optimisation.
-      # If we're collapsing has_many prefetches it probably makes
-      # very little difference, and this is cleaner than hacking
-      # _construct_object to survive the approach
+    # Using $self->cursor->all is really just an optimisation.
+    # If we're collapsing has_many prefetches it probably makes
+    # very little difference, and this is cleaner than hacking
+    # _construct_object to survive the approach
+    $self->cursor->reset;
     my @row = $self->cursor->next;
     while (@row) {
       push(@obj, $self->_construct_object(@row));
@@ -1133,6 +1333,7 @@ sub all {
   }
 
   $self->set_cache(\@obj) if $self->{attrs}{cache};
+
   return @obj;
 }
 
@@ -1147,6 +1348,8 @@ sub all {
 =back
 
 Resets the resultset's cursor, so you can iterate through the elements again.
+Implicitly resets the storage cursor, so a subsequent L</next> will trigger
+another query.
 
 =cut
 
@@ -1164,12 +1367,12 @@ sub reset {
 
 =item Arguments: none
 
-=item Return Value: $object?
+=item Return Value: $object | undef
 
 =back
 
-Resets the resultset and returns an object for the first result (if the
-resultset returns anything).
+Resets the resultset and returns an object for the first result (or C<undef>
+if the resultset is empty).
 
 =cut
 
@@ -1177,69 +1380,76 @@ sub first {
   return $_[0]->reset->next;
 }
 
-# _cond_for_update_delete
-#
-# update/delete require the condition to be modified to handle
-# the differing SQL syntax available.  This transforms the $self->{cond}
-# appropriately, returning the new condition.
-
-sub _cond_for_update_delete {
-  my ($self, $full_cond) = @_;
-  my $cond = {};
-
-  $full_cond ||= $self->{cond};
-  # No-op. No condition, we're updating/deleting everything
-  return $cond unless ref $full_cond;
-
-  if (ref $full_cond eq 'ARRAY') {
-    $cond = [
-      map {
-        my %hash;
-        foreach my $key (keys %{$_}) {
-          $key =~ /([^.]+)$/;
-          $hash{$1} = $_->{$key};
-        }
-        \%hash;
-      } @{$full_cond}
-    ];
-  }
-  elsif (ref $full_cond eq 'HASH') {
-    if ((keys %{$full_cond})[0] eq '-and') {
-      $cond->{-and} = [];
-
-      my @cond = @{$full_cond->{-and}};
-      for (my $i = 0; $i < @cond; $i++) {
-        my $entry = $cond[$i];
 
-        my $hash;
-        if (ref $entry eq 'HASH') {
-          $hash = $self->_cond_for_update_delete($entry);
-        }
-        else {
-          $entry =~ /([^.]+)$/;
-          $hash->{$1} = $cond[++$i];
+# _rs_update_delete
+#
+# Determines whether and what type of subquery is required for the $rs operation.
+# If grouping is necessary either supplies its own, or verifies the current one
+# After all is done delegates to the proper storage method.
+
+sub _rs_update_delete {
+  my ($self, $op, $values) = @_;
+
+  my $rsrc = $self->result_source;
+
+  # if a condition exists we need to strip all table qualifiers
+  # if this is not possible we'll force a subquery below
+  my $cond = $rsrc->schema->storage->_strip_cond_qualifiers ($self->{cond});
+
+  my $needs_group_by_subq = $self->_has_resolved_attr (qw/collapse group_by -join/);
+  my $needs_subq = $needs_group_by_subq || (not defined $cond) || $self->_has_resolved_attr(qw/rows offset/);
+
+  if ($needs_group_by_subq or $needs_subq) {
+
+    # make a new $rs selecting only the PKs (that's all we really need)
+    my $attrs = $self->_resolved_attrs_copy;
+
+
+    delete $attrs->{$_} for qw/collapse _collapse_order_by select _prefetch_select as/;
+    $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->_pri_cols) ];
+
+    if ($needs_group_by_subq) {
+      # make sure no group_by was supplied, or if there is one - make sure it matches
+      # the columns compiled above perfectly. Anything else can not be sanely executed
+      # on most databases so croak right then and there
+
+      if (my $g = $attrs->{group_by}) {
+        my @current_group_by = map
+          { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
+          @$g
+        ;
+
+        if (
+          join ("\x00", sort @current_group_by)
+            ne
+          join ("\x00", sort @{$attrs->{columns}} )
+        ) {
+          $self->throw_exception (
+            "You have just attempted a $op operation on a resultset which does group_by"
+            . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
+            . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
+            . ' kind of queries. Please retry the operation with a modified group_by or'
+            . ' without using one at all.'
+          );
         }
-
-        push @{$cond->{-and}}, $hash;
       }
-    }
-    else {
-      foreach my $key (keys %{$full_cond}) {
-        $key =~ /([^.]+)$/;
-        $cond->{$1} = $full_cond->{$key};
+      else {
+        $attrs->{group_by} = $attrs->{columns};
       }
     }
+
+    my $subrs = (ref $self)->new($rsrc, $attrs);
+    return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
   }
   else {
-    $self->throw_exception(
-      "Can't update/delete on resultset with condition unless hash or array"
+    return $rsrc->storage->$op(
+      $rsrc,
+      $op eq 'update' ? $values : (),
+      $cond,
     );
   }
-
-  return $cond;
 }
 
-
 =head2 update
 
 =over 4
@@ -1251,21 +1461,25 @@ sub _cond_for_update_delete {
 =back
 
 Sets the specified columns in the resultset to the supplied values in a
-single query. Return value will be true if the update succeeded or false
-if no records were updated; exact type of success value is storage-dependent.
+single query. Note that this will not run any accessor/set_column/update
+triggers, nor will it update any row object instances derived from this
+resultset (this includes the contents of the L<resultset cache|/set_cache>
+if any). See L</update_all> if you need to execute any on-update
+triggers or cascades defined either by you or a
+L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
+
+The return value is a pass through of what the underlying
+storage backend returned, and may vary. See L<DBI/execute> for the most
+common case.
 
 =cut
 
 sub update {
   my ($self, $values) = @_;
-  $self->throw_exception("Values for update must be a hash")
+  $self->throw_exception('Values for update must be a hash')
     unless ref $values eq 'HASH';
 
-  my $cond = $self->_cond_for_update_delete;
-   
-  return $self->result_source->storage->update(
-    $self->result_source, $values, $cond
-  );
+  return $self->_rs_update_delete ('update', $values);
 }
 
 =head2 update_all
@@ -1278,18 +1492,20 @@ sub update {
 
 =back
 
-Fetches all objects and updates them one at a time. Note that C<update_all>
-will run DBIC cascade triggers, while L</update> will not.
+Fetches all objects and updates them one at a time via
+L<DBIx::Class::Row/update>. Note that C<update_all> will run DBIC defined
+triggers, while L</update> will not.
 
 =cut
 
 sub update_all {
   my ($self, $values) = @_;
-  $self->throw_exception("Values for update must be a hash")
+  $self->throw_exception('Values for update_all must be a hash')
     unless ref $values eq 'HASH';
-  foreach my $obj ($self->all) {
-    $obj->set_columns($values)->update;
-  }
+
+  my $guard = $self->result_source->schema->txn_scope_guard;
+  $_->update($values) for $self->all;
+  $guard->commit;
   return 1;
 }
 
@@ -1299,38 +1515,29 @@ sub update_all {
 
 =item Arguments: none
 
-=item Return Value: 1
+=item Return Value: $storage_rv
 
 =back
 
-Deletes the contents of the resultset from its result source. Note that this
-will not run DBIC cascade triggers. See L</delete_all> if you need triggers
-to run. See also L<DBIx::Class::Row/delete>.
-
-delete may not generate correct SQL for a query with joins or a resultset
-chained from a related resultset.  In this case it will generate a warning:-
-
-  WARNING! Currently $rs->delete() does not generate proper SQL on
-  joined resultsets, and may delete rows well outside of the contents
-  of $rs. Use at your own risk
+Deletes the rows matching this resultset in a single query. Note that this
+will not run any delete triggers, nor will it alter the
+L<in_storage|DBIx::Class::Row/in_storage> status of any row object instances
+derived from this resultset (this includes the contents of the
+L<resultset cache|/set_cache> if any). See L</delete_all> if you need to
+execute any on-delete triggers or cascades defined either by you or a
+L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
 
-In these cases you may find that delete_all is more appropriate, or you
-need to respecify your query in a way that can be expressed without a join.
+The return value is a pass through of what the underlying storage backend
+returned, and may vary. See L<DBI/execute> for the most common case.
 
 =cut
 
 sub delete {
-  my ($self) = @_;
-  $self->throw_exception("Delete should not be passed any arguments")
-    if $_[1];
-  carp(   'WARNING! Currently $rs->delete() does not generate proper SQL'
-        . ' on joined resultsets, and may delete rows well outside of the'
-        . ' contents of $rs. Use at your own risk' )
-    if ( $self->{attrs}{seen_join} );
-  my $cond = $self->_cond_for_update_delete;
-
-  $self->result_source->storage->delete($self->result_source, $cond);
-  return 1;
+  my $self = shift;
+  $self->throw_exception('delete does not accept any arguments')
+    if @_;
+
+  return $self->_rs_update_delete ('delete');
 }
 
 =head2 delete_all
@@ -1343,14 +1550,20 @@ sub delete {
 
 =back
 
-Fetches all objects and deletes them one at a time. Note that C<delete_all>
-will run DBIC cascade triggers, while L</delete> will not.
+Fetches all objects and deletes them one at a time via
+L<DBIx::Class::Row/delete>. Note that C<delete_all> will run DBIC defined
+triggers, while L</delete> will not.
 
 =cut
 
 sub delete_all {
-  my ($self) = @_;
+  my $self = shift;
+  $self->throw_exception('delete_all does not accept any arguments')
+    if @_;
+
+  my $guard = $self->result_source->schema->txn_scope_guard;
   $_->delete for $self->all;
+  $guard->commit;
   return 1;
 }
 
@@ -1367,31 +1580,32 @@ For the arrayref of hashrefs style each hashref should be a structure suitable
 forsubmitting to a $resultset->create(...) method.
 
 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
-to insert the data, as this is a faster method.  
+to insert the data, as this is a faster method.
 
 Otherwise, each set of data is inserted into the database using
-L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
-objects is returned.
+L<DBIx::Class::ResultSet/create>, and the resulting objects are
+accumulated into an array. The array itself, or an array reference
+is returned depending on scalar or list context.
 
 Example:  Assuming an Artist Class that has many CDs Classes relating:
 
   my $Artist_rs = $schema->resultset("Artist");
-  
-  ## Void Context Example 
+
+  ## Void Context Example
   $Artist_rs->populate([
-     { artistid => 4, name => 'Manufactured Crap', cds => [ 
+     { artistid => 4, name => 'Manufactured Crap', cds => [
         { title => 'My First CD', year => 2006 },
         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
       ],
      },
      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
-        { title => 'My parents sold me to a record company' ,year => 2005 },
+        { title => 'My parents sold me to a record company', year => 2005 },
         { title => 'Why Am I So Ugly?', year => 2006 },
         { title => 'I Got Surgery and am now Popular', year => 2007 }
       ],
      },
   ]);
-  
+
   ## Array Context Example
   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
     { name => "Artist One"},
@@ -1401,7 +1615,7 @@ Example:  Assuming an Artist Class that has many CDs Classes relating:
     { title => "Second CD", year => 2008},
   ]}
   ]);
-  
+
   print $ArtistOne->name; ## response is 'Artist One'
   print $ArtistThree->cds->count ## reponse is '2'
 
@@ -1413,69 +1627,88 @@ example:
     [qw/artistid name/],
     [100, 'A Formally Unknown Singer'],
     [101, 'A singer that jumped the shark two albums ago'],
-    [102, 'An actually cool singer.'],
+    [102, 'An actually cool singer'],
   ]);
 
 Please note an important effect on your data when choosing between void and
-wantarray context. Since void context goes straight to C<insert_bulk> in 
+wantarray context. Since void context goes straight to C<insert_bulk> in
 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
-c<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to 
-create primary keys for you, you will find that your PKs are empty.  In this 
-case you will have to use the wantarray context in order to create those 
+C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
+create primary keys for you, you will find that your PKs are empty.  In this
+case you will have to use the wantarray context in order to create those
 values.
 
 =cut
 
 sub populate {
-  my $self = shift @_;
-  my $data = ref $_[0][0] eq 'HASH'
-    ? $_[0] : ref $_[0][0] eq 'ARRAY' ? $self->_normalize_populate_args($_[0]) :
-    $self->throw_exception('Populate expects an arrayref of hashes or arrayref of arrayrefs');
-  
+  my $self = shift;
+
+  # cruft placed in standalone method
+  my $data = $self->_normalize_populate_args(@_);
+
   if(defined wantarray) {
     my @created;
     foreach my $item (@$data) {
       push(@created, $self->create($item));
     }
-    return @created;
+    return wantarray ? @created : \@created;
   } else {
-    my ($first, @rest) = @$data;
+    my $first = $data->[0];
+
+    # if a column is a registered relationship, and is a non-blessed hash/array, consider
+    # it relationship data
+    my (@rels, @columns);
+    for (keys %$first) {
+      my $ref = ref $first->{$_};
+      $self->result_source->has_relationship($_) && ($ref eq 'ARRAY' or $ref eq 'HASH')
+        ? push @rels, $_
+        : push @columns, $_
+      ;
+    }
 
-    my @names = grep {!ref $first->{$_}} keys %$first;
-    my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
-    my @pks = $self->result_source->primary_columns;  
+    my @pks = $self->result_source->primary_columns;
 
-    ## do the belongs_to relationships  
+    ## do the belongs_to relationships
     foreach my $index (0..$#$data) {
-      if( grep { !defined $data->[$index]->{$_} } @pks ) {
-        my @ret = $self->populate($data);
-        return;
+
+      # delegate to create() for any dataset without primary keys with specified relationships
+      if (grep { !defined $data->[$index]->{$_} } @pks ) {
+        for my $r (@rels) {
+          if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
+            my @ret = $self->populate($data);
+            return;
+          }
+        }
       }
-    
+
       foreach my $rel (@rels) {
-        next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
+        next unless ref $data->[$index]->{$rel} eq "HASH";
         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
-        my $related = $result->result_source->resolve_condition(
+        my $related = $result->result_source->_resolve_condition(
           $result->result_source->relationship_info($reverse)->{cond},
-          $self,        
-          $result,        
+          $self,
+          $result,
         );
 
         delete $data->[$index]->{$rel};
         $data->[$index] = {%{$data->[$index]}, %$related};
-      
-        push @names, keys %$related if $index == 0;
+
+        push @columns, keys %$related if $index == 0;
       }
     }
 
-    ## do bulk insert on current row
-    my @values = map { [ @$_{@names} ] } @$data;
+    ## inherit the data locked in the conditions of the resultset
+    my ($rs_data) = $self->_merge_cond_with_data({});
+    delete @{$rs_data}{@columns};
+    my @inherit_cols = keys %$rs_data;
+    my @inherit_data = values %$rs_data;
 
+    ## do bulk insert on current row
     $self->result_source->storage->insert_bulk(
-      $self->result_source, 
-      \@names, 
-      \@values,
+      $self->result_source,
+      [@columns, @inherit_cols],
+      [ map { [ @$_{@columns}, @inherit_data ] } @$data ],
     );
 
     ## do the has_many relationships
@@ -1484,12 +1717,12 @@ sub populate {
       foreach my $rel (@rels) {
         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
 
-        my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
+        my $parent = $self->find({map { $_ => $item->{$_} } @pks})
      || $self->throw_exception('Cannot find the relating object.');
-     
+
         my $child = $parent->$rel;
-    
-        my $related = $child->result_source->resolve_condition(
+
+        my $related = $child->result_source->_resolve_condition(
           $parent->result_source->relationship_info($rel)->{cond},
           $child,
           $parent,
@@ -1504,26 +1737,27 @@ sub populate {
   }
 }
 
-=head2 _normalize_populate_args ($args)
-
-Private method used by L</populate> to normalize it's incoming arguments.  Factored
-out in case you want to subclass and accept new argument structures to the
-L</populate> method.
-
-=cut
 
+# populate() argumnets went over several incarnations
+# What we ultimately support is AoH
 sub _normalize_populate_args {
-  my ($self, $data) = @_;
-  my @names = @{shift(@$data)};
-  my @results_to_create;
-  foreach my $datum (@$data) {
-    my %result_to_create;
-    foreach my $index (0..$#names) {
-      $result_to_create{$names[$index]} = $$datum[$index];
+  my ($self, $arg) = @_;
+
+  if (ref $arg eq 'ARRAY') {
+    if (ref $arg->[0] eq 'HASH') {
+      return $arg;
+    }
+    elsif (ref $arg->[0] eq 'ARRAY') {
+      my @ret;
+      my @colnames = @{$arg->[0]};
+      foreach my $values (@{$arg}[1 .. $#$arg]) {
+        push @ret, { map { $colnames[$_] => $values->[$_] } (0 .. $#colnames) };
+      }
+      return \@ret;
     }
-    push @results_to_create, \%result_to_create;    
   }
-  return \@results_to_create;
+
+  $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs');
 }
 
 =head2 pager
@@ -1539,19 +1773,195 @@ sub _normalize_populate_args {
 Return Value a L<Data::Page> object for the current resultset. Only makes
 sense for queries with a C<page> attribute.
 
+To get the full count of entries for a paged resultset, call
+C<total_entries> on the L<Data::Page> object.
+
 =cut
 
-sub pager {
-  my ($self) = @_;
-  my $attrs = $self->{attrs};
-  $self->throw_exception("Can't create pager for non-paged rs")
-    unless $self->{attrs}{page};
-  $attrs->{rows} ||= 10;
-  return $self->{pager} ||= Data::Page->new(
-    $self->_count, $attrs->{rows}, $self->{attrs}{page});
-}
+# make a wizard good for both a scalar and a hashref
+my $mk_lazy_count_wizard = sub {
+  require Variable::Magic;
 
-=head2 page
+  my $stash = { total_rs => shift };
+  my $slot = shift; # only used by the hashref magic
+
+  my $magic = Variable::Magic::wizard (
+    data => sub { $stash },
+
+    (!$slot)
+    ? (
+      # the scalar magic
+      get => sub {
+        # set value lazily, and dispell for good
+        ${$_[0]} = $_[1]{total_rs}->count;
+        Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref});
+        return 1;
+      },
+      set => sub {
+        # an explicit set implies dispell as well
+        # the unless() is to work around "fun and giggles" below
+        Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref})
+          unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
+        return 1;
+      },
+    )
+    : (
+      # the uvar magic
+      fetch => sub {
+        if ($_[2] eq $slot and !$_[1]{inactive}) {
+          my $cnt = $_[1]{total_rs}->count;
+          $_[0]->{$slot} = $cnt;
+
+          # attempting to dispell in a fetch handle (works in store), seems
+          # to invariable segfault on 5.10, 5.12, 5.13 :(
+          # so use an inactivator instead
+          #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
+          $_[1]{inactive}++;
+        }
+        return 1;
+      },
+      store => sub {
+        if (! $_[1]{inactive} and $_[2] eq $slot) {
+          #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
+          $_[1]{inactive}++
+            unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
+        }
+        return 1;
+      },
+    ),
+  );
+
+  $stash->{magic_selfref} = $magic;
+  weaken ($stash->{magic_selfref}); # this fails on 5.8.1
+
+  return $magic;
+};
+
+# the tie class for 5.8.1
+{
+  package DBIx::Class::__DBIC_LAZY_RS_COUNT__;
+  use base qw/Tie::Hash/;
+
+  sub FIRSTKEY { my $dummy = scalar keys %{$_[0]{data}}; each %{$_[0]{data}} }
+  sub NEXTKEY  { each %{$_[0]{data}} }
+  sub EXISTS   { exists $_[0]{data}{$_[1]} }
+  sub DELETE   { delete $_[0]{data}{$_[1]} }
+  sub CLEAR    { %{$_[0]{data}} = () }
+  sub SCALAR   { scalar %{$_[0]{data}} }
+
+  sub TIEHASH {
+    $_[1]{data} = {%{$_[1]{selfref}}};
+    %{$_[1]{selfref}} = ();
+    Scalar::Util::weaken ($_[1]{selfref});
+    return bless ($_[1], $_[0]);
+  };
+
+  sub FETCH {
+    if ($_[1] eq $_[0]{slot}) {
+      my $cnt = $_[0]{data}{$_[1]} = $_[0]{total_rs}->count;
+      untie %{$_[0]{selfref}};
+      %{$_[0]{selfref}} = %{$_[0]{data}};
+      return $cnt;
+    }
+    else {
+      $_[0]{data}{$_[1]};
+    }
+  }
+
+  sub STORE {
+    $_[0]{data}{$_[1]} = $_[2];
+    if ($_[1] eq $_[0]{slot}) {
+      untie %{$_[0]{selfref}};
+      %{$_[0]{selfref}} = %{$_[0]{data}};
+    }
+    $_[2];
+  }
+}
+
+sub pager {
+  my ($self) = @_;
+
+  return $self->{pager} if $self->{pager};
+
+  if ($self->get_cache) {
+    $self->throw_exception ('Pagers on cached resultsets are not supported');
+  }
+
+  my $attrs = $self->{attrs};
+  $self->throw_exception("Can't create pager for non-paged rs")
+    unless $self->{attrs}{page};
+  $attrs->{rows} ||= 10;
+
+  # throw away the paging flags and re-run the count (possibly
+  # with a subselect) to get the real total count
+  my $count_attrs = { %$attrs };
+  delete $count_attrs->{$_} for qw/rows offset page pager/;
+  my $total_rs = (ref $self)->new($self->result_source, $count_attrs);
+
+
+### the following may seem awkward and dirty, but it's a thought-experiment
+### necessary for future development of DBIx::DS. Do *NOT* change this code
+### before talking to ribasushi/mst
+
+  my $pager = Data::Page->new(
+    0,  #start with an empty set
+    $attrs->{rows},
+    $self->{attrs}{page},
+  );
+
+  my $data_slot = 'total_entries';
+
+  # Since we are interested in a cached value (once it's set - it's set), every
+  # technique will detach from the magic-host once the time comes to fire the
+  # ->count (or in the segfaulting case of >= 5.10 it will deactivate itself)
+
+  if ($] < 5.008003) {
+    # 5.8.1 throws 'Modification of a read-only value attempted' when one tries
+    # to weakref the magic container :(
+    # tested on 5.8.1
+    tie (%$pager, 'DBIx::Class::__DBIC_LAZY_RS_COUNT__',
+      { slot => $data_slot, total_rs => $total_rs, selfref => $pager }
+    );
+  }
+  elsif ($] < 5.010) {
+    # We can use magic on the hash value slot. It's interesting that the magic is
+    # attached to the hash-slot, and does *not* stop working once I do the dummy
+    # assignments after the cast()
+    # tested on 5.8.3 and 5.8.9
+    my $magic = $mk_lazy_count_wizard->($total_rs);
+    Variable::Magic::cast ( $pager->{$data_slot}, $magic );
+
+    # this is for fun and giggles
+    $pager->{$data_slot} = -1;
+    $pager->{$data_slot} = 0;
+
+    # this does not work for scalars, but works with
+    # uvar magic below
+    #my %vals = %$pager;
+    #%$pager = ();
+    #%{$pager} = %vals;
+  }
+  else {
+    # And the uvar magic
+    # works on 5.10.1, 5.12.1 and 5.13.4 in its current form,
+    # however see the wizard maker for more notes
+    my $magic = $mk_lazy_count_wizard->($total_rs, $data_slot);
+    Variable::Magic::cast ( %$pager, $magic );
+
+    # still works
+    $pager->{$data_slot} = -1;
+    $pager->{$data_slot} = 0;
+
+    # this now works
+    my %vals = %$pager;
+    %$pager = ();
+    %{$pager} = %vals;
+  }
+
+  return $self->{pager} = $pager;
+}
+
+=head2 page
 
 =over 4
 
@@ -1596,50 +2006,71 @@ sub new_result {
   $self->throw_exception( "new_result needs a hash" )
     unless (ref $values eq 'HASH');
 
-  my %new;
+  my ($merged_cond, $cols_from_relations) = $self->_merge_cond_with_data($values);
+
+  my %new = (
+    %$merged_cond,
+    @$cols_from_relations
+      ? (-cols_from_relations => $cols_from_relations)
+      : (),
+    -source_handle => $self->_source_handle,
+    -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
+  );
+
+  return $self->result_class->new(\%new);
+}
+
+# _merge_cond_with_data
+#
+# Takes a simple hash of K/V data and returns its copy merged with the
+# condition already present on the resultset. Additionally returns an
+# arrayref of value/condition names, which were inferred from related
+# objects (this is needed for in-memory related objects)
+sub _merge_cond_with_data {
+  my ($self, $data) = @_;
+
+  my (%new_data, @cols_from_relations);
+
   my $alias = $self->{attrs}{alias};
 
-  if (
-    defined $self->{cond}
-    && $self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION
-  ) {
-    %new = %{$self->{attrs}{related_objects}};
-  } else {
+  if (! defined $self->{cond}) {
+    # just massage $data below
+  }
+  elsif ($self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION) {
+    %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
+    @cols_from_relations = keys %new_data;
+  }
+  elsif (ref $self->{cond} ne 'HASH') {
     $self->throw_exception(
-      "Can't abstract implicit construct, condition not a hash"
-    ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
-  
-    my $collapsed_cond = (
-      $self->{cond}
-        ? $self->_collapse_cond($self->{cond})
-        : {}
+      "Can't abstract implicit construct, resultset condition not a hash"
     );
-  
+  }
+  else {
     # precendence must be given to passed values over values inherited from
     # the cond, so the order here is important.
-    my %implied =  %{$self->_remove_alias($collapsed_cond, $alias)};
-    while( my($col,$value) = each %implied ){
-      if(ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '='){
-        $new{$col} = $value->{'='};
+    my $collapsed_cond = $self->_collapse_cond($self->{cond});
+    my %implied = %{$self->_remove_alias($collapsed_cond, $alias)};
+
+    while ( my($col, $value) = each %implied ) {
+      if (ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '=') {
+        $new_data{$col} = $value->{'='};
         next;
       }
-      $new{$col} = $value if $self->_is_deterministic_value($value);
+      $new_data{$col} = $value if $self->_is_deterministic_value($value);
     }
   }
 
-  %new = (
-    %new,
-    %{ $self->_remove_alias($values, $alias) },
-    -source_handle => $self->_source_handle,
-    -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
+  %new_data = (
+    %new_data,
+    %{ $self->_remove_alias($data, $alias) },
   );
 
-  return $self->result_class->new(\%new);
+  return (\%new_data, \@cols_from_relations);
 }
 
 # _is_deterministic_value
 #
-# Make an effor to strip non-deterministic values from the condition, 
+# Make an effor to strip non-deterministic values from the condition,
 # to make sure new_result chokes less
 
 sub _is_deterministic_value {
@@ -1647,7 +2078,60 @@ sub _is_deterministic_value {
   my $value = shift;
   my $ref_type = ref $value;
   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
-  return 1 if Scalar::Util::blessed($value);
+  return 1 if blessed $value;
+  return 0;
+}
+
+# _has_resolved_attr
+#
+# determines if the resultset defines at least one
+# of the attributes supplied
+#
+# used to determine if a subquery is neccessary
+#
+# supports some virtual attributes:
+#   -join
+#     This will scan for any joins being present on the resultset.
+#     It is not a mere key-search but a deep inspection of {from}
+#
+
+sub _has_resolved_attr {
+  my ($self, @attr_names) = @_;
+
+  my $attrs = $self->_resolved_attrs;
+
+  my %extra_checks;
+
+  for my $n (@attr_names) {
+    if (grep { $n eq $_ } (qw/-join/) ) {
+      $extra_checks{$n}++;
+      next;
+    }
+
+    my $attr =  $attrs->{$n};
+
+    next if not defined $attr;
+
+    if (ref $attr eq 'HASH') {
+      return 1 if keys %$attr;
+    }
+    elsif (ref $attr eq 'ARRAY') {
+      return 1 if @$attr;
+    }
+    else {
+      return 1 if $attr;
+    }
+  }
+
+  # a resolved join is expressed as a multi-level from
+  return 1 if (
+    $extra_checks{-join}
+      and
+    ref $attrs->{from} eq 'ARRAY'
+      and
+    @{$attrs->{from}} > 1
+  );
+
   return 0;
 }
 
@@ -1663,19 +2147,16 @@ sub _collapse_cond {
   if (ref $cond eq 'ARRAY') {
     foreach my $subcond (@$cond) {
       next unless ref $subcond;  # -or
-#      warn "ARRAY: " . Dumper $subcond;
       $collapsed = $self->_collapse_cond($subcond, $collapsed);
     }
   }
   elsif (ref $cond eq 'HASH') {
     if (keys %$cond and (keys %$cond)[0] eq '-and') {
       foreach my $subcond (@{$cond->{-and}}) {
-#        warn "HASH: " . Dumper $subcond;
         $collapsed = $self->_collapse_cond($subcond, $collapsed);
       }
     }
     else {
-#      warn "LEAF: " . Dumper $cond;
       foreach my $col (keys %$cond) {
         my $value = $cond->{$col};
         $collapsed->{$col} = $value;
@@ -1709,6 +2190,39 @@ sub _remove_alias {
   return \%unaliased;
 }
 
+=head2 as_query
+
+=over 4
+
+=item Arguments: none
+
+=item Return Value: \[ $sql, @bind ]
+
+=back
+
+Returns the SQL query and bind vars associated with the invocant.
+
+This is generally used as the RHS for a subquery.
+
+=cut
+
+sub as_query {
+  my $self = shift;
+
+  my $attrs = $self->_resolved_attrs_copy;
+
+  # For future use:
+  #
+  # in list ctx:
+  # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
+  # $sql also has no wrapping parenthesis in list ctx
+  #
+  my $sqlbind = $self->result_source->storage
+    ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
+
+  return $sqlbind;
+}
+
 =head2 find_or_new
 
 =over 4
@@ -1725,7 +2239,7 @@ sub _remove_alias {
   $cd->cd_to_producer->find_or_new({ producer => $producer },
                                    { key => 'primary });
 
-Find an existing record from this resultset, based on it's primary
+Find an existing record from this resultset, based on its primary
 key, or a unique constraint. If none exists, instantiate a new result
 object and return it. The object will not be saved into your storage
 until you call L<DBIx::Class::Row/insert> on it.
@@ -1734,13 +2248,14 @@ You most likely want this method when looking for existing rows using
 a unique constraint that is not the primary key, or looking for
 related rows.
 
-If you want objects to be saved immediately, use L</find_or_create> instead.
+If you want objects to be saved immediately, use L</find_or_create>
+instead.
 
-B<Note>: C<find_or_new> is probably not what you want when creating a
-new row in a table that uses primary keys supplied by the
-database. Passing in a primary key column with a value of I<undef>
-will cause L</find> to attempt to search for a row with a value of
-I<NULL>.
+B<Note>: Take care when using C<find_or_new> with a table having
+columns with default values that you intend to be automatically
+supplied by the database (e.g. an auto_increment primary key column).
+In normal usage, the value of such columns should NOT be included at
+all in the call to C<find_or_new>, even when set to C<undef>.
 
 =cut
 
@@ -1748,8 +2263,10 @@ sub find_or_new {
   my $self     = shift;
   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
-  my $exists   = $self->find($hash, $attrs);
-  return defined $exists ? $exists : $self->new_result($hash);
+  if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
+    return $row;
+  }
+  return $self->new_result($hash);
 }
 
 =head2 create
@@ -1771,14 +2288,17 @@ To create one row for this resultset, pass a hashref of key/value
 pairs representing the columns of the table and the values you wish to
 store. If the appropriate relationships are set up, foreign key fields
 can also be passed an object representing the foreign row, and the
-value will be set to it's primary key.
+value will be set to its primary key.
+
+To create related objects, pass a hashref of related-object column values
+B<keyed on the relationship name>. If the relationship is of type C<multi>
+(L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
+The process will correctly identify columns holding foreign keys, and will
+transparently populate them from the keys of the corresponding relation.
+This can be applied recursively, and will work correctly for a structure
+with an arbitrary depth and width, as long as the relationships actually
+exists and the correct column data has been supplied.
 
-To create related objects, pass a hashref for the value if the related
-item is a foreign key relationship (L<DBIx::Class::Relationship/belongs_to>),
-and use the name of the relationship as the key. (NOT the name of the field,
-necessarily). For C<has_many> and C<has_one> relationships, pass an arrayref
-of hashrefs containing the data for each of the rows to create in the foreign
-tables, again using the relationship name as the key.
 
 Instead of hashrefs of plain related data (key/value pairs), you may
 also pass new or inserted objects. New objects (not inserted yet, see
@@ -1790,14 +2310,14 @@ Example of creating a new row.
 
   $person_rs->create({
     name=>"Some Person",
-       email=>"somebody@someplace.com"
+    email=>"somebody@someplace.com"
   });
-  
+
 Example of creating a new row and also creating rows in a related C<has_many>
 or C<has_one> resultset.  Note Arrayref.
 
   $artist_rs->create(
-     { artistid => 4, name => 'Manufactured Crap', cds => [ 
+     { artistid => 4, name => 'Manufactured Crap', cds => [
         { title => 'My First CD', year => 2006 },
         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
       ],
@@ -1805,16 +2325,29 @@ or C<has_one> resultset.  Note Arrayref.
   );
 
 Example of creating a new row and also creating a row in a related
-C<belongs_to>resultset. Note Hashref.
+C<belongs_to> resultset. Note Hashref.
 
   $cd_rs->create({
     title=>"Music for Silly Walks",
-       year=>2000,
-       artist => {
-         name=>"Silly Musician",
-       }
+    year=>2000,
+    artist => {
+      name=>"Silly Musician",
+    }
   });
 
+=over
+
+=item WARNING
+
+When subclassing ResultSet never attempt to override this method. Since
+it is a simple shortcut for C<< $self->new_result($attrs)->insert >>, a
+lot of the internals simply never call it, so your override will be
+bypassed more often than not. Override either L<new|DBIx::Class::Row/new>
+or L<insert|DBIx::Class::Row/insert> depending on how early in the
+L</create> process you need to intervene.
+
+=back
+
 =cut
 
 sub create {
@@ -1835,7 +2368,7 @@ sub create {
 =back
 
   $cd->cd_to_producer->find_or_create({ producer => $producer },
-                                      { key => 'primary });
+                                      { key => 'primary' });
 
 Tries to find a record based on its primary key or unique constraints; if none
 is found, creates one and returns that instead.
@@ -1864,11 +2397,11 @@ condition. Another process could create a record in the table after
 the find has completed and before the create has started. To avoid
 this problem, use find_or_create() inside a transaction.
 
-B<Note>: C<find_or_create> is probably not what you want when creating
-a new row in a table that uses primary keys supplied by the
-database. Passing in a primary key column with a value of I<undef>
-will cause L</find> to attempt to search for a row with a value of
-I<NULL>.
+B<Note>: Take care when using C<find_or_create> with a table having
+columns with default values that you intend to be automatically
+supplied by the database (e.g. an auto_increment primary key column).
+In normal usage, the value of such columns should NOT be included at
+all in the call to C<find_or_create>, even when set to C<undef>.
 
 See also L</find> and L</update_or_create>. For information on how to declare
 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
@@ -1879,8 +2412,10 @@ sub find_or_create {
   my $self     = shift;
   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
-  my $exists   = $self->find($hash, $attrs);
-  return defined $exists ? $exists : $self->create($hash);
+  if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
+    return $row;
+  }
+  return $self->create($hash);
 }
 
 =head2 update_or_create
@@ -1913,11 +2448,11 @@ For example:
     { key => 'cd_artist_title' }
   );
 
-  $cd->cd_to_producer->update_or_create({ 
-    producer => $producer, 
+  $cd->cd_to_producer->update_or_create({
+    producer => $producer,
     name => 'harry',
-  }, { 
-    key => 'primary,
+  }, {
+    key => 'primary',
   });
 
 
@@ -1929,11 +2464,11 @@ If the C<key> is specified as C<primary>, it searches only on the primary key.
 See also L</find> and L</find_or_create>. For information on how to declare
 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
 
-B<Note>: C<update_or_create> is probably not what you want when
-looking for a row in a table that uses primary keys supplied by the
-database, unless you actually have a key value. Passing in a primary
-key column with a value of I<undef> will cause L</find> to attempt to
-search for a row with a value of I<NULL>.
+B<Note>: Take care when using C<update_or_create> with a table having
+columns with default values that you intend to be automatically
+supplied by the database (e.g. an auto_increment primary key column).
+In normal usage, the value of such columns should NOT be included at
+all in the call to C<update_or_create>, even when set to C<undef>.
 
 =cut
 
@@ -1951,13 +2486,76 @@ sub update_or_create {
   return $self->create($cond);
 }
 
+=head2 update_or_new
+
+=over 4
+
+=item Arguments: \%col_values, { key => $unique_constraint }?
+
+=item Return Value: $rowobject
+
+=back
+
+  $resultset->update_or_new({ col => $val, ... });
+
+First, searches for an existing row matching one of the unique constraints
+(including the primary key) on the source of this resultset. If a row is
+found, updates it with the other given column values. Otherwise, instantiate
+a new result object and return it. The object will not be saved into your storage
+until you call L<DBIx::Class::Row/insert> on it.
+
+Takes an optional C<key> attribute to search on a specific unique constraint.
+For example:
+
+  # In your application
+  my $cd = $schema->resultset('CD')->update_or_new(
+    {
+      artist => 'Massive Attack',
+      title  => 'Mezzanine',
+      year   => 1998,
+    },
+    { key => 'cd_artist_title' }
+  );
+
+  if ($cd->in_storage) {
+      # the cd was updated
+  }
+  else {
+      # the cd is not yet in the database, let's insert it
+      $cd->insert;
+  }
+
+B<Note>: Take care when using C<update_or_new> with a table having
+columns with default values that you intend to be automatically
+supplied by the database (e.g. an auto_increment primary key column).
+In normal usage, the value of such columns should NOT be included at
+all in the call to C<update_or_new>, even when set to C<undef>.
+
+See also L</find>, L</find_or_create> and L</find_or_new>.
+
+=cut
+
+sub update_or_new {
+    my $self  = shift;
+    my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
+    my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
+
+    my $row = $self->find( $cond, $attrs );
+    if ( defined $row ) {
+        $row->update($cond);
+        return $row;
+    }
+
+    return $self->new_result($cond);
+}
+
 =head2 get_cache
 
 =over 4
 
 =item Arguments: none
 
-=item Return Value: \@cache_objects?
+=item Return Value: \@cache_objects | undef
 
 =back
 
@@ -2005,7 +2603,7 @@ sub set_cache {
 
 =item Arguments: none
 
-=item Return Value: []
+=item Return Value: undef
 
 =back
 
@@ -2017,6 +2615,40 @@ sub clear_cache {
   shift->set_cache(undef);
 }
 
+=head2 is_paged
+
+=over 4
+
+=item Arguments: none
+
+=item Return Value: true, if the resultset has been paginated
+
+=back
+
+=cut
+
+sub is_paged {
+  my ($self) = @_;
+  return !!$self->{attrs}{page};
+}
+
+=head2 is_ordered
+
+=over 4
+
+=item Arguments: none
+
+=item Return Value: true, if the resultset has been ordered with C<order_by>.
+
+=back
+
+=cut
+
+sub is_ordered {
+  my ($self) = @_;
+  return scalar $self->result_source->storage->_extract_order_columns($self->{attrs}{order_by});
+}
+
 =head2 related_resultset
 
 =over 4
@@ -2038,21 +2670,30 @@ sub related_resultset {
 
   $self->{related_resultsets} ||= {};
   return $self->{related_resultsets}{$rel} ||= do {
-    my $rel_obj = $self->result_source->relationship_info($rel);
+    my $rsrc = $self->result_source;
+    my $rel_info = $rsrc->relationship_info($rel);
 
     $self->throw_exception(
-      "search_related: result source '" . $self->result_source->source_name .
+      "search_related: result source '" . $rsrc->source_name .
         "' has no such relationship $rel")
-      unless $rel_obj;
-    
-    my ($from,$seen) = $self->_resolve_from($rel);
+      unless $rel_info;
+
+    my $attrs = $self->_chain_relationship($rel);
+
+    my $join_count = $attrs->{seen_join}{$rel};
+
+    my $alias = $self->result_source->storage
+        ->relname_to_table_alias($rel, $join_count);
+
+    # since this is search_related, and we already slid the select window inwards
+    # (the select/as attrs were deleted in the beginning), we need to flip all
+    # left joins to inner, so we get the expected results
+    # read the comment on top of the actual function to see what this does
+    $attrs->{from} = $rsrc->schema->storage->_inner_join_to_node ($attrs->{from}, $alias);
 
-    my $join_count = $seen->{$rel};
-    my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
 
     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
-    my %attrs = %{$self->{attrs}||{}};
-    delete @attrs{qw(result_class alias)};
+    delete @{$attrs}{qw(result_class alias)};
 
     my $new_cache;
 
@@ -2063,7 +2704,7 @@ sub related_resultset {
       }
     }
 
-    my $rel_source = $self->result_source->related_source($rel);
+    my $rel_source = $rsrc->related_source($rel);
 
     my $new = do {
 
@@ -2073,20 +2714,14 @@ sub related_resultset {
       # to work sanely (e.g. RestrictWithObject wants to be able to add
       # extra query restrictions, and these may need to be $alias.)
 
-      my $attrs = $rel_source->resultset_attributes;
-      local $attrs->{alias} = $alias;
+      my $rel_attrs = $rel_source->resultset_attributes;
+      local $rel_attrs->{alias} = $alias;
 
       $rel_source->resultset
                  ->search_rs(
                      undef, {
-                       %attrs,
-                       join => undef,
-                       prefetch => undef,
-                       select => undef,
-                       as => undef,
-                       where => $self->{cond},
-                       seen_join => $seen,
-                       from => $from,
+                       %$attrs,
+                       where => $attrs->{where},
                    });
     };
     $new->set_cache($new_cache) if $new_cache;
@@ -2104,9 +2739,8 @@ sub related_resultset {
 
 =back
 
-Returns the current alias of the result source that corrensponds to the result
-set (this alias will eventually be used as the SQL table alias in the SQL
-query). Usually it is C<me>.
+Returns the current table alias for the result source this resultset is built
+on, that will be used in the SQL query. Usually it is C<me>.
 
 Currently the source alias that refers to the result set returned by a
 L</search>/L</find> family method depends on how you got to the resultset: it's
@@ -2138,127 +2772,422 @@ sub current_source_alias {
   return ($self->{attrs} || {})->{alias} || 'me';
 }
 
-sub _resolve_from {
-  my ($self, $extra_join) = @_;
+=head2 as_subselect_rs
+
+=over 4
+
+=item Arguments: none
+
+=item Return Value: $resultset
+
+=back
+
+Act as a barrier to SQL symbols.  The resultset provided will be made into a
+"virtual view" by including it as a subquery within the from clause.  From this
+point on, any joined tables are inaccessible to ->search on the resultset (as if
+it were simply where-filtered without joins).  For example:
+
+ my $rs = $schema->resultset('Bar')->search({'x.name' => 'abc'},{ join => 'x' });
+
+ # 'x' now pollutes the query namespace
+
+ # So the following works as expected
+ my $ok_rs = $rs->search({'x.other' => 1});
+
+ # But this doesn't: instead of finding a 'Bar' related to two x rows (abc and
+ # def) we look for one row with contradictory terms and join in another table
+ # (aliased 'x_2') which we never use
+ my $broken_rs = $rs->search({'x.name' => 'def'});
+
+ my $rs2 = $rs->as_subselect_rs;
+
+ # doesn't work - 'x' is no longer accessible in $rs2, having been sealed away
+ my $not_joined_rs = $rs2->search({'x.other' => 1});
+
+ # works as expected: finds a 'table' row related to two x rows (abc and def)
+ my $correctly_joined_rs = $rs2->search({'x.name' => 'def'});
+
+Another example of when one might use this would be to select a subset of
+columns in a group by clause:
+
+ my $rs = $schema->resultset('Bar')->search(undef, {
+   group_by => [qw{ id foo_id baz_id }],
+ })->as_subselect_rs->search(undef, {
+   columns => [qw{ id foo_id }]
+ });
+
+In the above example normally columns would have to be equal to the group by,
+but because we isolated the group by into a subselect the above works.
+
+=cut
+
+sub as_subselect_rs {
+  my $self = shift;
+
+  my $attrs = $self->_resolved_attrs;
+
+  my $fresh_rs = (ref $self)->new (
+    $self->result_source
+  );
+
+  # these pieces will be locked in the subquery
+  delete $fresh_rs->{cond};
+  delete @{$fresh_rs->{attrs}}{qw/where bind/};
+
+  return $fresh_rs->search( {}, {
+    from => [{
+      $attrs->{alias} => $self->as_query,
+      -alias         => $attrs->{alias},
+      -source_handle => $self->result_source->handle,
+    }],
+    alias => $attrs->{alias},
+  });
+}
+
+# This code is called by search_related, and makes sure there
+# is clear separation between the joins before, during, and
+# after the relationship. This information is needed later
+# in order to properly resolve prefetch aliases (any alias
+# with a relation_chain_depth less than the depth of the
+# current prefetch is not considered)
+#
+# The increments happen twice per join. An even number means a
+# relationship specified via a search_related, whereas an odd
+# number indicates a join/prefetch added via attributes
+#
+# Also this code will wrap the current resultset (the one we
+# chain to) in a subselect IFF it contains limiting attributes
+sub _chain_relationship {
+  my ($self, $rel) = @_;
   my $source = $self->result_source;
-  my $attrs = $self->{attrs};
-  
-  my $from = $attrs->{from}
-    || [ { $attrs->{alias} => $source->from } ];
-    
-  my $seen = { %{$attrs->{seen_join}||{}} };
+  my $attrs = { %{$self->{attrs}||{}} };
+
+  # we need to take the prefetch the attrs into account before we
+  # ->_resolve_join as otherwise they get lost - captainL
+  my $join = $self->_merge_attr( $attrs->{join}, $attrs->{prefetch} );
+
+  delete @{$attrs}{qw/join prefetch collapse group_by distinct select as columns +select +as +columns/};
 
-  my $join = ($attrs->{join}
-               ? [ $attrs->{join}, $extra_join ]
-               : $extra_join);
+  my $seen = { %{ (delete $attrs->{seen_join}) || {} } };
 
-  # we need to take the prefetch the attrs into account before we 
-  # ->resolve_join as otherwise they get lost - captainL
-  my $merged = $self->_merge_attr( $join, $attrs->{prefetch} );
+  my $from;
+  my @force_subq_attrs = qw/offset rows group_by having/;
 
-  $from = [
-    @$from,
-    ($join ? $source->resolve_join($merged, $attrs->{alias}, $seen) : ()),
-  ];
+  if (
+    ($attrs->{from} && ref $attrs->{from} ne 'ARRAY')
+      ||
+    $self->_has_resolved_attr (@force_subq_attrs)
+  ) {
+    # Nuke the prefetch (if any) before the new $rs attrs
+    # are resolved (prefetch is useless - we are wrapping
+    # a subquery anyway).
+    my $rs_copy = $self->search;
+    $rs_copy->{attrs}{join} = $self->_merge_attr (
+      $rs_copy->{attrs}{join},
+      delete $rs_copy->{attrs}{prefetch},
+    );
 
-  return ($from,$seen);
+    $from = [{
+      -source_handle => $source->handle,
+      -alias => $attrs->{alias},
+      $attrs->{alias} => $rs_copy->as_query,
+    }];
+    delete @{$attrs}{@force_subq_attrs, qw/where bind/};
+    $seen->{-relation_chain_depth} = 0;
+  }
+  elsif ($attrs->{from}) {  #shallow copy suffices
+    $from = [ @{$attrs->{from}} ];
+  }
+  else {
+    $from = [{
+      -source_handle => $source->handle,
+      -alias => $attrs->{alias},
+      $attrs->{alias} => $source->from,
+    }];
+  }
+
+  my $jpath = ($seen->{-relation_chain_depth})
+    ? $from->[-1][0]{-join_path}
+    : [];
+
+  my @requested_joins = $source->_resolve_join(
+    $join,
+    $attrs->{alias},
+    $seen,
+    $jpath,
+  );
+
+  push @$from, @requested_joins;
+
+  $seen->{-relation_chain_depth}++;
+
+  # if $self already had a join/prefetch specified on it, the requested
+  # $rel might very well be already included. What we do in this case
+  # is effectively a no-op (except that we bump up the chain_depth on
+  # the join in question so we could tell it *is* the search_related)
+  my $already_joined;
+
+  # we consider the last one thus reverse
+  for my $j (reverse @requested_joins) {
+    my ($last_j) = keys %{$j->[0]{-join_path}[-1]};
+    if ($rel eq $last_j) {
+      $j->[0]{-relation_chain_depth}++;
+      $already_joined++;
+      last;
+    }
+  }
+
+  unless ($already_joined) {
+    push @$from, $source->_resolve_join(
+      $rel,
+      $attrs->{alias},
+      $seen,
+      $jpath,
+    );
+  }
+
+  $seen->{-relation_chain_depth}++;
+
+  return {%$attrs, from => $from, seen_join => $seen};
+}
+
+# too many times we have to do $attrs = { %{$self->_resolved_attrs} }
+sub _resolved_attrs_copy {
+  my $self = shift;
+  return { %{$self->_resolved_attrs (@_)} };
 }
 
 sub _resolved_attrs {
   my $self = shift;
   return $self->{_attrs} if $self->{_attrs};
 
-  my $attrs = { %{$self->{attrs}||{}} };
+  my $attrs  = { %{ $self->{attrs} || {} } };
   my $source = $self->result_source;
-  my $alias = $attrs->{alias};
+  my $alias  = $attrs->{alias};
 
   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
-  if ($attrs->{columns}) {
-    delete $attrs->{as};
-  } elsif (!$attrs->{select}) {
-    $attrs->{columns} = [ $source->columns ];
-  }
-  $attrs->{select} = 
-    ($attrs->{select}
-      ? (ref $attrs->{select} eq 'ARRAY'
-          ? [ @{$attrs->{select}} ]
-          : [ $attrs->{select} ])
-      : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
-    );
-  $attrs->{as} =
-    ($attrs->{as}
-      ? (ref $attrs->{as} eq 'ARRAY'
-          ? [ @{$attrs->{as}} ]
-          : [ $attrs->{as} ])
-      : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
-    );
-  
-  my $adds;
-  if ($adds = delete $attrs->{include_columns}) {
-    $adds = [$adds] unless ref $adds eq 'ARRAY';
-    push(@{$attrs->{select}}, @$adds);
-    push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
+  my @colbits;
+
+  # build columns (as long as select isn't set) into a set of as/select hashes
+  unless ( $attrs->{select} ) {
+
+    my @cols;
+    if ( ref $attrs->{columns} eq 'ARRAY' ) {
+      @cols = @{ delete $attrs->{columns}}
+    } elsif ( defined $attrs->{columns} ) {
+      @cols = delete $attrs->{columns}
+    } else {
+      @cols = $source->columns
+    }
+
+    for (@cols) {
+      if ( ref $_ eq 'HASH' ) {
+        push @colbits, $_
+      } else {
+        my $key = /^\Q${alias}.\E(.+)$/
+          ? "$1"
+          : "$_";
+        my $value = /\./
+          ? "$_"
+          : "${alias}.$_";
+        push @colbits, { $key => $value };
+      }
+    }
   }
-  if ($adds = delete $attrs->{'+select'}) {
+
+  # add the additional columns on
+  foreach (qw{include_columns +columns}) {
+    if ( $attrs->{$_} ) {
+      my @list = ( ref($attrs->{$_}) eq 'ARRAY' )
+        ? @{ delete $attrs->{$_} }
+        : delete $attrs->{$_};
+      for (@list) {
+        if ( ref($_) eq 'HASH' ) {
+          push @colbits, $_
+        } else {
+          my $key = ( split /\./, $_ )[-1];
+          my $value = ( /\./ ? $_ : "$alias.$_" );
+          push @colbits, { $key => $value };
+        }
+      }
+    }
+  }
+
+  # start with initial select items
+  if ( $attrs->{select} ) {
+    $attrs->{select} =
+        ( ref $attrs->{select} eq 'ARRAY' )
+      ? [ @{ $attrs->{select} } ]
+      : [ $attrs->{select} ];
+
+    if ( $attrs->{as} ) {
+      $attrs->{as} =
+        (
+          ref $attrs->{as} eq 'ARRAY'
+            ? [ @{ $attrs->{as} } ]
+            : [ $attrs->{as} ]
+        )
+    } else {
+      $attrs->{as} = [ map {
+         m/^\Q${alias}.\E(.+)$/
+           ? $1
+           : $_
+         } @{ $attrs->{select} }
+      ]
+    }
+  }
+  else {
+
+    # otherwise we intialise select & as to empty
+    $attrs->{select} = [];
+    $attrs->{as}     = [];
+  }
+
+  # now add colbits to select/as
+  push @{ $attrs->{select} }, map values %{$_}, @colbits;
+  push @{ $attrs->{as}     }, map keys   %{$_}, @colbits;
+
+  if ( my $adds = delete $attrs->{'+select'} ) {
     $adds = [$adds] unless ref $adds eq 'ARRAY';
-    push(@{$attrs->{select}},
-           map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
+    push @{ $attrs->{select} },
+      map { /\./ || ref $_ ? $_ : "$alias.$_" } @$adds;
   }
-  if (my $adds = delete $attrs->{'+as'}) {
+  if ( my $adds = delete $attrs->{'+as'} ) {
     $adds = [$adds] unless ref $adds eq 'ARRAY';
-    push(@{$attrs->{as}}, @$adds);
+    push @{ $attrs->{as} }, @$adds;
   }
 
-  $attrs->{from} ||= [ { 'me' => $source->from } ];
+  $attrs->{from} ||= [{
+    -source_handle => $source->handle,
+    -alias => $self->{attrs}{alias},
+    $self->{attrs}{alias} => $source->from,
+  }];
+
+  if ( $attrs->{join} || $attrs->{prefetch} ) {
+
+    $self->throw_exception ('join/prefetch can not be used with a custom {from}')
+      if ref $attrs->{from} ne 'ARRAY';
 
-  if (exists $attrs->{join} || exists $attrs->{prefetch}) {
     my $join = delete $attrs->{join} || {};
 
-    if (defined $attrs->{prefetch}) {
-      $join = $self->_merge_attr(
-        $join, $attrs->{prefetch}
-      );
-      
+    if ( defined $attrs->{prefetch} ) {
+      $join = $self->_merge_attr( $join, $attrs->{prefetch} );
     }
 
-    $attrs->{from} =   # have to copy here to avoid corrupting the original
+    $attrs->{from} =    # have to copy here to avoid corrupting the original
       [
-        @{$attrs->{from}}, 
-        $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
+        @{ $attrs->{from} },
+        $source->_resolve_join(
+          $join,
+          $alias,
+          { %{ $attrs->{seen_join} || {} } },
+          ( $attrs->{seen_join} && keys %{$attrs->{seen_join}})
+            ? $attrs->{from}[-1][0]{-join_path}
+            : []
+          ,
+        )
       ];
+  }
 
+  if ( defined $attrs->{order_by} ) {
+    $attrs->{order_by} = (
+      ref( $attrs->{order_by} ) eq 'ARRAY'
+      ? [ @{ $attrs->{order_by} } ]
+      : [ $attrs->{order_by} || () ]
+    );
   }
 
-  $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
-  if ($attrs->{order_by}) {
-    $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
-                           ? [ @{$attrs->{order_by}} ]
-                           : [ $attrs->{order_by} ]);
-  } else {
-    $attrs->{order_by} = [];    
-  }
-
-  my $collapse = $attrs->{collapse} || {};
-  if (my $prefetch = delete $attrs->{prefetch}) {
-    $prefetch = $self->_merge_attr({}, $prefetch);
-    my @pre_order;
-    my $seen = $attrs->{seen_join} || {};
-    foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
-      # bring joins back to level of current class
-      my @prefetch = $source->resolve_prefetch(
-        $p, $alias, $seen, \@pre_order, $collapse
-      );
-      push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
-      push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
+  if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
+    $attrs->{group_by} = [ $attrs->{group_by} ];
+  }
+
+  # generate the distinct induced group_by early, as prefetch will be carried via a
+  # subquery (since a group_by is present)
+  if (delete $attrs->{distinct}) {
+    if ($attrs->{group_by}) {
+      carp ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
+    }
+    else {
+      my $storage = $self->result_source->schema->storage;
+      my $rs_column_list = $storage->_resolve_column_info ($attrs->{from});
+
+      my $group_spec = $attrs->{group_by} = [];
+      my %group_index;
+
+      for (@{$attrs->{select}}) {
+        if (! ref($_) or ref ($_) ne 'HASH' ) {
+          push @$group_spec, $_;
+          $group_index{$_}++;
+          if ($rs_column_list->{$_} and $_ !~ /\./ ) {
+            # add a fully qualified version as well
+            $group_index{"$rs_column_list->{$_}{-source_alias}.$_"}++;
+          }
+        }
+      }
+      # add any order_by parts that are not already present in the group_by
+      # we need to be careful not to add any named functions/aggregates
+      # i.e. select => [ ... { count => 'foo', -as 'foocount' } ... ]
+      for my $chunk ($storage->_extract_order_columns($attrs->{order_by})) {
+
+        # only consider real columns (for functions the user got to do an explicit group_by)
+        my $colinfo = $rs_column_list->{$chunk}
+          or next;
+
+        $chunk = "$colinfo->{-source_alias}.$chunk" if $chunk !~ /\./;
+        push @$group_spec, $chunk unless $group_index{$chunk}++;
+      }
     }
-    push(@{$attrs->{order_by}}, @pre_order);
   }
-  $attrs->{collapse} = $collapse;
 
-  if ($attrs->{page}) {
-    $attrs->{offset} ||= 0;
-    $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
+  $attrs->{collapse} ||= {};
+  if ( my $prefetch = delete $attrs->{prefetch} ) {
+    $prefetch = $self->_merge_attr( {}, $prefetch );
+
+    my $prefetch_ordering = [];
+
+    # this is a separate structure (we don't look in {from} directly)
+    # as the resolver needs to shift things off the lists to work
+    # properly (identical-prefetches on different branches)
+    my $join_map = {};
+    if (ref $attrs->{from} eq 'ARRAY') {
+
+      my $start_depth = $attrs->{seen_join}{-relation_chain_depth} || 0;
+
+      for my $j ( @{$attrs->{from}}[1 .. $#{$attrs->{from}} ] ) {
+        next unless $j->[0]{-alias};
+        next unless $j->[0]{-join_path};
+        next if ($j->[0]{-relation_chain_depth} || 0) < $start_depth;
+
+        my @jpath = map { keys %$_ } @{$j->[0]{-join_path}};
+
+        my $p = $join_map;
+        $p = $p->{$_} ||= {} for @jpath[ ($start_depth/2) .. $#jpath]; #only even depths are actual jpath boundaries
+        push @{$p->{-join_aliases} }, $j->[0]{-alias};
+      }
+    }
+
+    my @prefetch =
+      $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $attrs->{collapse} );
+
+    # we need to somehow mark which columns came from prefetch
+    $attrs->{_prefetch_select} = [ map { $_->[0] } @prefetch ];
+
+    push @{ $attrs->{select} }, @{$attrs->{_prefetch_select}};
+    push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
+
+    push( @{$attrs->{order_by}}, @$prefetch_ordering );
+    $attrs->{_collapse_order_by} = \@$prefetch_ordering;
+  }
+
+  # if both page and offset are specified, produce a combined offset
+  # even though it doesn't make much sense, this is what pre 081xx has
+  # been doing
+  if (my $page = delete $attrs->{page}) {
+    $attrs->{offset} =
+      ($attrs->{rows} * ($page - 1))
+            +
+      ($attrs->{offset} || 0)
+    ;
   }
 
   return $self->{_attrs} = $attrs;
@@ -2266,7 +3195,7 @@ sub _resolved_attrs {
 
 sub _rollout_attr {
   my ($self, $attr) = @_;
-  
+
   if (ref $attr eq 'HASH') {
     return $self->_rollout_hash($attr);
   } elsif (ref $attr eq 'ARRAY') {
@@ -2306,6 +3235,13 @@ sub _rollout_hash {
 sub _calculate_score {
   my ($self, $a, $b) = @_;
 
+  if (defined $a xor defined $b) {
+    return 0;
+  }
+  elsif (not defined $a) {
+    return 1;
+  }
+
   if (ref $b eq 'HASH') {
     my ($b_key) = keys %{$b};
     if (ref $a eq 'HASH') {
@@ -2317,7 +3253,7 @@ sub _calculate_score {
       }
     } else {
       return ($a eq $b_key) ? 1 : 0;
-    }       
+    }
   } else {
     if (ref $a eq 'HASH') {
       my ($a_key) = keys %{$a};
@@ -2333,7 +3269,7 @@ sub _merge_attr {
 
   return $import unless defined($orig);
   return $orig unless defined($import);
-  
+
   $orig = $self->_rollout_attr($orig);
   $import = $self->_rollout_attr($import);
 
@@ -2387,41 +3323,55 @@ See L<DBIx::Class::Schema/throw_exception> for details.
 
 sub throw_exception {
   my $self=shift;
+
   if (ref $self && $self->_source_handle->schema) {
     $self->_source_handle->schema->throw_exception(@_)
-  } else {
-    croak(@_);
   }
-
+  else {
+    DBIx::Class::Exception->throw(@_);
+  }
 }
 
 # XXX: FIXME: Attributes docs need clearing up
 
 =head1 ATTRIBUTES
 
-The resultset takes various attributes that modify its behavior. Here's an
-overview of them:
+Attributes are used to refine a ResultSet in various ways when
+searching for data. They can be passed to any method which takes an
+C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
+L</count>.
+
+These are in no particular order:
 
 =head2 order_by
 
 =over 4
 
-=item Value: ($order_by | \@order_by)
+=item Value: ( $order_by | \@order_by | \%order_by )
 
 =back
 
-Which column(s) to order the results by. This is currently passed
-through directly to SQL, so you can give e.g. C<year DESC> for a
-descending order on the column `year'.
+Which column(s) to order the results by.
+
+[The full list of suitable values is documented in
+L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
+common options.]
+
+If a single column name, or an arrayref of names is supplied, the
+argument is passed through directly to SQL. The hashref syntax allows
+for connection-agnostic specification of ordering direction:
 
-Please note that if you have C<quote_char> enabled (see
-L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
-specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
-so you will need to manually quote things as appropriate.)
+ For descending order:
 
-If your L<SQL::Abstract> version supports it (>=1.50), you can also use
-C<{-desc => 'year'}>, which takes care of the quoting for you. This is the
-recommended syntax.
+  order_by => { -desc => [qw/col1 col2 col3/] }
+
+ For explicit ascending order:
+
+  order_by => { -asc => 'col' }
+
+The old scalarref syntax (i.e. order_by => \'year DESC') is still
+supported, although you are strongly encouraged to use the hashref
+syntax as outlined above.
 
 =head2 columns
 
@@ -2431,12 +3381,24 @@ recommended syntax.
 
 =back
 
-Shortcut to request a particular set of columns to be retrieved.  Adds
-C<me.> onto the start of any column without a C<.> in it and sets C<select>
-from that, then auto-populates C<as> from C<select> as normal. (You may also
-use the C<cols> attribute, as in earlier versions of DBIC.)
+Shortcut to request a particular set of columns to be retrieved. Each
+column spec may be a string (a table column name), or a hash (in which
+case the key is the C<as> value, and the value is used as the C<select>
+expression). Adds C<me.> onto the start of any column without a C<.> in
+it and sets C<select> from that, then auto-populates C<as> from
+C<select> as normal. (You may also use the C<cols> attribute, as in
+earlier versions of DBIC.)
 
-=head2 include_columns
+Essentially C<columns> does the same as L</select> and L</as>.
+
+    columns => [ 'foo', { bar => 'baz' } ]
+
+is the same as
+
+    select => [qw/foo baz/],
+    as => [qw/foo bar/]
+
+=head2 +columns
 
 =over 4
 
@@ -2444,10 +3406,13 @@ use the C<cols> attribute, as in earlier versions of DBIC.)
 
 =back
 
-Shortcut to include additional columns in the returned results - for example
+Indicates additional columns to be selected from storage. Works the same
+as L</columns> but adds columns to the selection. (You may also use the
+C<include_columns> attribute, as in earlier versions of DBIC). For
+example:-
 
   $schema->resultset('CD')->search(undef, {
-    include_columns => ['artist.name'],
+    '+columns' => ['artist.name'],
     join => ['artist']
   });
 
@@ -2456,6 +3421,16 @@ passed to object inflation. Note that the 'artist' is the name of the
 column (or relationship) accessor, and 'name' is the name of the column
 accessor in the related table.
 
+=head2 include_columns
+
+=over 4
+
+=item Value: \@columns
+
+=back
+
+Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
+
 =head2 select
 
 =over 4
@@ -2472,20 +3447,27 @@ names:
     select => [
       'name',
       { count => 'employeeid' },
-      { sum => 'salary' }
+      { max => { length => 'name' }, -as => 'longest_name' }
     ]
   });
 
-When you use function/stored procedure names and do not supply an C<as>
-attribute, the column names returned are storage-dependent. E.g. MySQL would
-return a column named C<count(employeeid)> in the above example.
+  # Equivalent SQL
+  SELECT name, COUNT( employeeid ), MAX( LENGTH( name ) ) AS longest_name FROM employee
+
+B<NOTE:> You will almost always need a corresponding L</as> attribute when you
+use L</select>, to instruct DBIx::Class how to store the result of the column.
+Also note that the L</as> attribute has nothing to do with the SQL-side 'AS'
+identifier aliasing. You can however alias a function, so you can use it in
+e.g. an C<ORDER BY> clause. This is done via the C<-as> B<select function
+attribute> supplied as shown in the example above.
 
 =head2 +select
 
 =over 4
 
 Indicates additional columns to be selected from storage.  Works the same as
-L</select> but adds columns to the selection.
+L</select> but adds columns to the default selection, instead of specifying
+an explicit list.
 
 =back
 
@@ -2493,7 +3475,7 @@ L</select> but adds columns to the selection.
 
 =over 4
 
-Indicates additional column names for those added via L</+select>.
+Indicates additional column names for those added via L</+select>. See L</as>.
 
 =back
 
@@ -2505,25 +3487,26 @@ Indicates additional column names for those added via L</+select>.
 
 =back
 
-Indicates column names for object inflation. That is, C<as>
-indicates the name that the column can be accessed as via the
-C<get_column> method (or via the object accessor, B<if one already
-exists>).  It has nothing to do with the SQL code C<SELECT foo AS bar>.
-
-The C<as> attribute is used in conjunction with C<select>,
-usually when C<select> contains one or more function or stored
-procedure names:
+Indicates column names for object inflation. That is L</as> indicates the
+slot name in which the column value will be stored within the
+L<Row|DBIx::Class::Row> object. The value will then be accessible via this
+identifier by the C<get_column> method (or via the object accessor B<if one
+with the same name already exists>) as shown below. The L</as> attribute has
+B<nothing to do> with the SQL-side C<AS>. See L</select> for details.
 
   $rs = $schema->resultset('Employee')->search(undef, {
     select => [
       'name',
-      { count => 'employeeid' }
+      { count => 'employeeid' },
+      { max => { length => 'name' }, -as => 'longest_name' }
     ],
-    as => ['name', 'employee_count'],
+    as => [qw/
+      name
+      employee_count
+      max_name_length
+    /],
   });
 
-  my $employee = $rs->first(); # get the first Employee
-
 If the object against which the search is performed already has an accessor
 matching a column name specified in C<as>, the value can be retrieved using
 the accessor as normal:
@@ -2538,16 +3521,6 @@ use C<get_column> instead:
 You can create your own accessors if required - see
 L<DBIx::Class::Manual::Cookbook> for details.
 
-Please note: This will NOT insert an C<AS employee_count> into the SQL
-statement produced, it is used for internal access only. Thus
-attempting to use the accessor in an C<order_by> clause or similar
-will fail miserably.
-
-To get around this limitation, you can supply literal SQL to your
-C<select> attibute that contains the C<AS alias> text, eg:
-
-  select => [\'myfield AS alias']
-
 =head2 join
 
 =over 4
@@ -2585,19 +3558,19 @@ For example:
     }
   );
 
-You need to use the relationship (not the table) name in  conditions, 
-because they are aliased as such. The current table is aliased as "me", so 
+You need to use the relationship (not the table) name in  conditions,
+because they are aliased as such. The current table is aliased as "me", so
 you need to use me.column_name in order to avoid ambiguity. For example:
 
-  # Get CDs from 1984 with a 'Foo' track 
+  # Get CDs from 1984 with a 'Foo' track
   my $rs = $schema->resultset('CD')->search(
-    { 
+    {
       'me.year' => 1984,
       'tracks.name' => 'Foo'
     },
     { join => 'tracks' }
   );
-  
+
 If the same join is supplied twice, it will be aliased to <rel>_2 (and
 similarly for a third time). For e.g.
 
@@ -2650,12 +3623,12 @@ C<cd> or C<artist> relationships, which saves us two SQL statements in this
 case.
 
 Simple prefetches will be joined automatically, so there is no need
-for a C<join> attribute in the above search. 
+for a C<join> attribute in the above search.
 
 C<prefetch> can be used with the following relationship types: C<belongs_to>,
 C<has_one> (or if you're using C<add_relationship>, any relationship declared
 with an accessor type of 'single' or 'filter'). A more complex example that
-prefetches an artists cds, the tracks on those cds, and the tags associted 
+prefetches an artists cds, the tracks on those cds, and the tags associated
 with that artist is given below (assuming many-to-many from artists to tags):
 
  my $rs = $schema->resultset('Artist')->search(
@@ -2667,11 +3640,47 @@ with that artist is given below (assuming many-to-many from artists to tags):
      ]
    }
  );
+
 
 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
 attributes will be ignored.
 
+B<CAVEATs>: Prefetch does a lot of deep magic. As such, it may not behave
+exactly as you might expect.
+
+=over 4
+
+=item *
+
+Prefetch uses the L</cache> to populate the prefetched relationships. This
+may or may not be what you want.
+
+=item *
+
+If you specify a condition on a prefetched relationship, ONLY those
+rows that match the prefetched condition will be fetched into that relationship.
+This means that adding prefetch to a search() B<may alter> what is returned by
+traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
+
+  my $artist_rs = $schema->resultset('Artist')->search({
+      'cds.year' => 2008,
+  }, {
+      join => 'cds',
+  });
+
+  my $count = $artist_rs->first->cds->count;
+
+  my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
+
+  my $prefetch_count = $artist_rs_prefetch->first->cds->count;
+
+  cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
+
+that cmp_ok() may or may not pass depending on the datasets involved. This
+behavior may or may not survive the 0.09 transition.
+
+=back
+
 =head2 page
 
 =over 4
@@ -2684,7 +3693,11 @@ Makes the resultset paged and specifies the page to retrieve. Effectively
 identical to creating a non-pages resultset and then calling ->page($page)
 on it.
 
-If L<rows> attribute is not specified it defualts to 10 rows per page.
+If L<rows> attribute is not specified it defaults to 10 rows per page.
+
+When you have a paged resultset, L</count> will only return the number
+of rows in the page. To get the total, use the L</pager> and call
+C<total_entries> on it.
 
 =head2 rows
 
@@ -2694,7 +3707,7 @@ If L<rows> attribute is not specified it defualts to 10 rows per page.
 
 =back
 
-Specifes the maximum number of rows for direct retrieval or the number of
+Specifies the maximum number of rows for direct retrieval or the number of
 rows per page if the page attribute or method is used.
 
 =head2 offset
@@ -2742,7 +3755,8 @@ done.
 
 =back
 
-Set to 1 to group by all columns.
+Set to 1 to group by all columns. If the resultset already has a group_by
+attribute, this setting is ignored and an appropriate warning is issued.
 
 =head2 where
 
@@ -2753,8 +3767,8 @@ Adds to the WHERE clause.
   # only return rows WHERE deleted IS NULL for all searches
   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
 
-Can be overridden by passing C<{ where => undef }> as an attribute
-to a resulset.
+Can be overridden by passing C<< { where => undef } >> as an attribute
+to a resultset.
 
 =back
 
@@ -2776,165 +3790,6 @@ By default, searches are not cached.
 For more examples of using these attributes, see
 L<DBIx::Class::Manual::Cookbook>.
 
-=head2 from
-
-=over 4
-
-=item Value: \@from_clause
-
-=back
-
-The C<from> attribute gives you manual control over the C<FROM> clause of SQL
-statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
-clauses.
-
-NOTE: Use this on your own risk.  This allows you to shoot off your foot!
-
-C<join> will usually do what you need and it is strongly recommended that you
-avoid using C<from> unless you cannot achieve the desired result using C<join>.
-And we really do mean "cannot", not just tried and failed. Attempting to use
-this because you're having problems with C<join> is like trying to use x86
-ASM because you've got a syntax error in your C. Trust us on this.
-
-Now, if you're still really, really sure you need to use this (and if you're
-not 100% sure, ask the mailing list first), here's an explanation of how this
-works.
-
-The syntax is as follows -
-
-  [
-    { <alias1> => <table1> },
-    [
-      { <alias2> => <table2>, -join_type => 'inner|left|right' },
-      [], # nested JOIN (optional)
-      { <table1.column1> => <table2.column2>, ... (more conditions) },
-    ],
-    # More of the above [ ] may follow for additional joins
-  ]
-
-  <table1> <alias1>
-  JOIN
-    <table2> <alias2>
-    [JOIN ...]
-  ON <table1.column1> = <table2.column2>
-  <more joins may follow>
-
-An easy way to follow the examples below is to remember the following:
-
-    Anything inside "[]" is a JOIN
-    Anything inside "{}" is a condition for the enclosing JOIN
-
-The following examples utilize a "person" table in a family tree application.
-In order to express parent->child relationships, this table is self-joined:
-
-    # Person->belongs_to('father' => 'Person');
-    # Person->belongs_to('mother' => 'Person');
-
-C<from> can be used to nest joins. Here we return all children with a father,
-then search against all mothers of those children:
-
-  $rs = $schema->resultset('Person')->search(
-      undef,
-      {
-          alias => 'mother', # alias columns in accordance with "from"
-          from => [
-              { mother => 'person' },
-              [
-                  [
-                      { child => 'person' },
-                      [
-                          { father => 'person' },
-                          { 'father.person_id' => 'child.father_id' }
-                      ]
-                  ],
-                  { 'mother.person_id' => 'child.mother_id' }
-              ],
-          ]
-      },
-  );
-
-  # Equivalent SQL:
-  # SELECT mother.* FROM person mother
-  # JOIN (
-  #   person child
-  #   JOIN person father
-  #   ON ( father.person_id = child.father_id )
-  # )
-  # ON ( mother.person_id = child.mother_id )
-
-The type of any join can be controlled manually. To search against only people
-with a father in the person table, we could explicitly use C<INNER JOIN>:
-
-    $rs = $schema->resultset('Person')->search(
-        undef,
-        {
-            alias => 'child', # alias columns in accordance with "from"
-            from => [
-                { child => 'person' },
-                [
-                    { father => 'person', -join_type => 'inner' },
-                    { 'father.id' => 'child.father_id' }
-                ],
-            ]
-        },
-    );
-
-    # Equivalent SQL:
-    # SELECT child.* FROM person child
-    # INNER JOIN person father ON child.father_id = father.id
-
-If you need to express really complex joins or you need a subselect, you
-can supply literal SQL to C<from> via a scalar reference. In this case
-the contents of the scalar will replace the table name asscoiated with the
-resultsource.
-
-WARNING: This technique might very well not work as expected on chained
-searches - you have been warned.
-
-    # Assuming the Event resultsource is defined as:
-
-        MySchema::Event->add_columns (
-            sequence => {
-                data_type => 'INT',
-                is_auto_increment => 1,
-            },
-            location => {
-                data_type => 'INT',
-            },
-            type => {
-                data_type => 'INT',
-            },
-        );
-        MySchema::Event->set_primary_key ('sequence');
-
-    # This will get back the latest event for every location. The column
-    # selector is still provided by DBIC, all we do is add a JOIN/WHERE
-    # combo to limit the resultset
-
-    $rs = $schema->resultset('Event');
-    $table = $rs->result_source->name;
-    $latest = $rs->search (
-        undef,
-        { from => \ " 
-            (SELECT e1.* FROM $table e1 
-                JOIN $table e2 
-                    ON e1.location = e2.location 
-                    AND e1.sequence < e2.sequence 
-                WHERE e2.sequence is NULL 
-            ) me",
-        },
-    );
-
-    # Equivalent SQL (with the DBIC chunks added):
-
-    SELECT me.sequence, me.location, me.type FROM
-       (SELECT e1.* FROM events e1
-           JOIN events e2
-               ON e1.location = e2.location
-               AND e1.sequence < e2.sequence
-           WHERE e2.sequence is NULL
-       ) me;
-
 =head2 for
 
 =over 4