Fix package names in documentation
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index 5575a0a..b421944 100644 (file)
@@ -6,14 +6,14 @@ use overload
         '0+'     => \&count,
         'bool'   => sub { 1; },
         fallback => 1;
+use Carp::Clan qw/^DBIx::Class/;
 use Data::Page;
 use Storable;
-use Scalar::Util qw/weaken/;
-
 use DBIx::Class::ResultSetColumn;
+use DBIx::Class::ResultSourceHandle;
 use base qw/DBIx::Class/;
-__PACKAGE__->load_components(qw/AccessorGroup/);
-__PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
+
+__PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
 
 =head1 NAME
 
@@ -83,89 +83,30 @@ will return a CD object, not a ResultSet.
 sub new {
   my $class = shift;
   return $class->new_result(@_) if ref $class;
-  
-  my ($source, $attrs) = @_;
-  weaken $source;
-  $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
-  #use Data::Dumper; warn Dumper($attrs);
-  my $alias = ($attrs->{alias} ||= 'me');
-  
-  $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
-  delete $attrs->{as} if $attrs->{columns};
-  $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
-  $attrs->{select} = [
-    map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}}
-  ] if $attrs->{columns};
-  $attrs->{as} ||= [
-    map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
-  ];
-  if (my $include = delete $attrs->{include_columns}) {
-    push(@{$attrs->{select}}, @$include);
-    push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
-  }
-  #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
-
-  $attrs->{from} ||= [ { $alias => $source->from } ];
-  $attrs->{seen_join} ||= {};
-  my %seen;
-  if (my $join = delete $attrs->{join}) {
-    foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
-      if (ref $j eq 'HASH') {
-        $seen{$_} = 1 foreach keys %$j;
-      } else {
-        $seen{$j} = 1;
-      }
-    }
-    push(@{$attrs->{from}}, $source->resolve_join(
-      $join, $attrs->{alias}, $attrs->{seen_join})
-    );
-  }
-  
-  $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
-  $attrs->{order_by} = [ $attrs->{order_by} ] if
-    $attrs->{order_by} and !ref($attrs->{order_by});
-  $attrs->{order_by} ||= [];
 
-  my $collapse = $attrs->{collapse} || {};
-  if (my $prefetch = delete $attrs->{prefetch}) {
-    my @pre_order;
-    foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
-      if ( ref $p eq 'HASH' ) {
-        foreach my $key (keys %$p) {
-          push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
-            unless $seen{$key};
-        }
-      } else {
-        push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
-            unless $seen{$p};
-      }
-      my @prefetch = $source->resolve_prefetch(
-           $p, $attrs->{alias}, {}, \@pre_order, $collapse);
-      push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
-      push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
-    }
-    push(@{$attrs->{order_by}}, @pre_order);
-  }
-  $attrs->{collapse} = $collapse;
-#  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
+  my ($source, $attrs) = @_;
+  $source = $source->handle 
+    unless $source->isa('DBIx::Class::ResultSourceHandle');
+  $attrs = { %{$attrs||{}} };
 
   if ($attrs->{page}) {
     $attrs->{rows} ||= 10;
-    $attrs->{offset} ||= 0;
-    $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
   }
 
-  bless {
-    result_source => $source,
-    result_class => $attrs->{result_class} || $source->result_class,
+  $attrs->{alias} ||= 'me';
+
+  my $self = {
+    _source_handle => $source,
+    result_class => $attrs->{result_class} || $source->resolve->result_class,
     cond => $attrs->{where},
-    from => $attrs->{from},
-    collapse => $collapse,
     count => undef,
-    page => delete $attrs->{page},
     pager => undef,
     attrs => $attrs
-  }, $class;
+  };
+
+  bless $self, $class;
+
+  return $self;
 }
 
 =head2 search
@@ -192,49 +133,119 @@ call it as C<search(undef, \%attrs)>.
     columns => [qw/name artistid/],
   });
 
+For a list of attributes that can be passed to C<search>, see
+L</ATTRIBUTES>. For more examples of using this function, see
+L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
+documentation for the first argument, see L<SQL::Abstract>.
+
 =cut
 
 sub search {
   my $self = shift;
+  my $rs = $self->search_rs( @_ );
+  return (wantarray ? $rs->all : $rs);
+}
 
-  my $rs;
-  if( @_ ) {
-    
-    my $attrs = { %{$self->{attrs}} };
-    my $having = delete $attrs->{having};
-    $attrs = { %$attrs, %{ pop(@_) } } if @_ > 1 and ref $_[$#_] eq 'HASH';
+=head2 search_rs
+
+=over 4
+
+=item Arguments: $cond, \%attrs?
+
+=item Return Value: $resultset
+
+=back
 
-    my $where = (@_
-                  ? ((@_ == 1 || ref $_[0] eq "HASH")
+This method does the same exact thing as search() except it will
+always return a resultset, even in list context.
+
+=cut
+
+sub search_rs {
+  my $self = shift;
+
+  my $rows;
+
+  unless (@_) {                 # no search, effectively just a clone
+    $rows = $self->get_cache;
+  }
+
+  my $attrs = {};
+  $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
+  my $our_attrs = { %{$self->{attrs}} };
+  my $having = delete $our_attrs->{having};
+  my $where = delete $our_attrs->{where};
+
+  my $new_attrs = { %{$our_attrs}, %{$attrs} };
+
+  # merge new attrs into inherited
+  foreach my $key (qw/join prefetch/) {
+    next unless exists $attrs->{$key};
+    $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
+  }
+
+  my $cond = (@_
+    ? (
+        (@_ == 1 || ref $_[0] eq "HASH")
+          ? (
+              (ref $_[0] eq 'HASH')
+                ? (
+                    (keys %{ $_[0] }  > 0)
                       ? shift
-                      : ((@_ % 2)
-                          ? $self->throw_exception(
-                              "Odd number of arguments to search")
-                          : {@_}))
-                  : undef());
-    if (defined $where) {
-      $attrs->{where} = (defined $attrs->{where}
-                ? { '-and' =>
-                    [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
-                        $where, $attrs->{where} ] }
-                : $where);
-    }
+                      : undef
+                   )
+                :  shift
+             )
+          : (
+              (@_ % 2)
+                ? $self->throw_exception("Odd number of arguments to search")
+                : {@_}
+             )
+      )
+    : undef
+  );
 
-    if (defined $having) {
-      $attrs->{having} = (defined $attrs->{having}
-                ? { '-and' =>
-                    [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
-                        $having, $attrs->{having} ] }
-                : $having);
-    }
+  if (defined $where) {
+    $new_attrs->{where} = (
+      defined $new_attrs->{where}
+        ? { '-and' => [
+              map {
+                ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
+              } $where, $new_attrs->{where}
+            ]
+          }
+        : $where);
+  }
 
-    $rs = (ref $self)->new($self->result_source, $attrs);
+  if (defined $cond) {
+    $new_attrs->{where} = (
+      defined $new_attrs->{where}
+        ? { '-and' => [
+              map {
+                ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
+              } $cond, $new_attrs->{where}
+            ]
+          }
+        : $cond);
   }
-  else {
-    $rs = $self;
-    $rs->reset;
+
+  if (defined $having) {
+    $new_attrs->{having} = (
+      defined $new_attrs->{having}
+        ? { '-and' => [
+              map {
+                ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
+              } $having, $new_attrs->{having}
+            ]
+          }
+        : $having);
   }
-  return (wantarray ? $rs->all : $rs);
+
+  my $rs = (ref $self)->new($self->result_source, $new_attrs);
+  if ($rows) {
+    $rs->set_cache($rows);
+  }
+  return $rs;
 }
 
 =head2 search_literal
@@ -277,10 +288,12 @@ a row by its primary key:
 
   my $cd = $schema->resultset('CD')->find(5);
 
-You can also find a row by a specific key or unique constraint by specifying
-the C<key> attribute. For example:
+You can also find a row by a specific unique constraint using the C<key>
+attribute. For example:
 
-  my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', { key => 'artist_title' });
+  my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
+    key => 'cd_artist_title'
+  });
 
 Additionally, you can specify the columns explicitly by name:
 
@@ -289,14 +302,17 @@ Additionally, you can specify the columns explicitly by name:
       artist => 'Massive Attack',
       title  => 'Mezzanine',
     },
-    { key => 'artist_title' }
+    { key => 'cd_artist_title' }
   );
 
-If no C<key> is specified and you explicitly name columns, it searches on all
-unique constraints defined on the source, including the primary key.
-
 If the C<key> is specified as C<primary>, it searches only on the primary key.
 
+If no C<key> is specified, it searches on all unique constraints defined on the
+source, including the primary key.
+
+If your table does not have a primary key, you B<must> provide a value for the
+C<key> attribute matching one of the unique constraints on the source.
+
 See also L</find_or_create> and L</update_or_create>. For information on how to
 declare unique constraints, see
 L<DBIx::Class::ResultSource/add_unique_constraint>.
@@ -304,101 +320,140 @@ L<DBIx::Class::ResultSource/add_unique_constraint>.
 =cut
 
 sub find {
-  my ($self, @vals) = @_;
-  my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
-
-  my %unique_constraints = $self->result_source->unique_constraints;
-  $self->throw_exception(
-    "Can't find unless a primary key or unique constraint is defined"
-  ) unless %unique_constraints;
+  my $self = shift;
+  my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
 
+  # Default to the primary key, but allow a specific key
+  my @cols = exists $attrs->{key}
+    ? $self->result_source->unique_constraint_columns($attrs->{key})
+    : $self->result_source->primary_columns;
   $self->throw_exception(
-    "Unknown key $attrs->{key} on '" . $self->result_source->name . "'"
-  ) if (exists $attrs->{key} and not exists $unique_constraints{$attrs->{key}});
-
-  # Build a list of queries
-  my @unique_hashes;
-
-  if (ref $vals[0] eq 'HASH') {
-    my @constraint_names = exists $attrs->{key}
-      ? ($attrs->{key})
-      : keys %unique_constraints;
-
-    foreach my $name (@constraint_names) {
-      my @unique_cols = @{ $unique_constraints{$name} };
-      my $unique_hash = $self->_unique_hash($vals[0], \@unique_cols);
+    "Can't find unless a primary key is defined or unique constraint is specified"
+  ) unless @cols;
 
-      # TODO: Check that the ResultSet defines the rest of the query
-      push @unique_hashes, $unique_hash
-        if scalar keys %$unique_hash;# == scalar @unique_cols;
-    }
+  # Parse out a hashref from input
+  my $input_query;
+  if (ref $_[0] eq 'HASH') {
+    $input_query = { %{$_[0]} };
+  }
+  elsif (@_ == @cols) {
+    $input_query = {};
+    @{$input_query}{@cols} = @_;
   }
   else {
-    my @unique_cols = exists $attrs->{key}
-      ? @{ $unique_constraints{$attrs->{key}} }
-      : $self->result_source->primary_columns;
-
-    if (@vals == @unique_cols) {
-      my %unique_hash;
-      @unique_hash{@unique_cols} = @vals;
-
-      push @unique_hashes, \%unique_hash;
-    }
-    else {
-      # Hack for CDBI queries
-      my %hash = @vals;
-      push @unique_hashes, \%hash;
-    }
+    # Compatibility: Allow e.g. find(id => $value)
+    carp "Find by key => value deprecated; please use a hashref instead";
+    $input_query = {@_};
   }
 
-  # Add the ResultSet's alias
-  foreach my $unique_hash (@unique_hashes) {
-    foreach my $key (grep { ! m/\./ } keys %$unique_hash) {
-      $unique_hash->{"$self->{attrs}{alias}.$key"} = delete $unique_hash->{$key};
+  my (%related, $info);
+
+  KEY: foreach my $key (keys %$input_query) {
+    if (ref($input_query->{$key})
+        && ($info = $self->result_source->relationship_info($key))) {
+      my $val = delete $input_query->{$key};
+      next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
+      my $rel_q = $self->result_source->resolve_condition(
+                    $info->{cond}, $val, $key
+                  );
+      die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
+      @related{keys %$rel_q} = values %$rel_q;
     }
   }
+  if (my @keys = keys %related) {
+    @{$input_query}{@keys} = values %related;
+  }
 
-  # Handle cases where the ResultSet already defines the query
-  my $query = @unique_hashes ? \@unique_hashes : undef;
+  my @unique_queries = $self->_unique_queries($input_query, $attrs);
 
+  # Build the final query: Default to the disjunction of the unique queries,
+  # but allow the input query in case the ResultSet defines the query or the
+  # user is abusing find
+  my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
+  my $query = @unique_queries
+    ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
+    : $self->_add_alias($input_query, $alias);
+
+  # Run the query
   if (keys %$attrs) {
     my $rs = $self->search($query, $attrs);
-    return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
+    return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
   }
   else {
-    return keys %{$self->{collapse}}
+    return keys %{$self->_resolved_attrs->{collapse}}
       ? $self->search($query)->next
       : $self->single($query);
   }
 }
 
-# _unique_hash
+# _add_alias
+#
+# Add the specified alias to the specified query hash. A copy is made so the
+# original query is not modified.
+
+sub _add_alias {
+  my ($self, $query, $alias) = @_;
+
+  my %aliased = %$query;
+  foreach my $col (grep { ! m/\./ } keys %aliased) {
+    $aliased{"$alias.$col"} = delete $aliased{$col};
+  }
+
+  return \%aliased;
+}
+
+# _unique_queries
 #
-# Constrain the specified hash based on the specific column names.
+# Build a list of queries which satisfy unique constraints.
 
-sub _unique_hash {
-  my ($self, $hash, $unique_cols) = @_;
+sub _unique_queries {
+  my ($self, $query, $attrs) = @_;
 
-  # Ugh, CDBI lowercases column names
-  if (exists $INC{'DBIx/Class/CDBICompat/ColumnCase.pm'}) {
-    foreach my $key (keys %$hash) {
-      $hash->{lc $key} = delete $hash->{$key};
+  my @constraint_names = exists $attrs->{key}
+    ? ($attrs->{key})
+    : $self->result_source->unique_constraint_names;
+
+  my $where = $self->_collapse_cond($self->{attrs}{where} || {});
+  my $num_where = scalar keys %$where;
+
+  my @unique_queries;
+  foreach my $name (@constraint_names) {
+    my @unique_cols = $self->result_source->unique_constraint_columns($name);
+    my $unique_query = $self->_build_unique_query($query, \@unique_cols);
+
+    my $num_cols = scalar @unique_cols;
+    my $num_query = scalar keys %$unique_query;
+
+    my $total = $num_query + $num_where;
+    if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
+      # The query is either unique on its own or is unique in combination with
+      # the existing where clause
+      push @unique_queries, $unique_query;
     }
   }
 
-  my %unique_hash =
-    map  { $_ => $hash->{$_} }
-    grep { exists $hash->{$_} }
-    @$unique_cols;
+  return @unique_queries;
+}
+
+# _build_unique_query
+#
+# Constrain the specified query hash based on the specified column names.
+
+sub _build_unique_query {
+  my ($self, $query, $unique_cols) = @_;
 
-  return \%unique_hash;
+  return {
+    map  { $_ => $query->{$_} }
+    grep { exists $query->{$_} }
+      @$unique_cols
+  };
 }
 
 =head2 search_related
 
 =over 4
 
-=item Arguments: $cond, \%attrs?
+=item Arguments: $rel, $cond, \%attrs?
 
 =item Return Value: $new_resultset
 
@@ -434,9 +489,10 @@ L<DBIx::Class::Cursor> for more information.
 
 sub cursor {
   my ($self) = @_;
-  my $attrs = { %{$self->{attrs}} };
+
+  my $attrs = { %{$self->_resolved_attrs} };
   return $self->{cursor}
-    ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
+    ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
           $attrs->{where},$attrs);
 }
 
@@ -455,11 +511,15 @@ sub cursor {
 Inflates the first result without creating a cursor if the resultset has
 any records in it; if not returns nothing. Used by L</find> as an optimisation.
 
+Can optionally take an additional condition *only* - this is a fast-code-path
+method; if you need to add extra joins or similar call ->search and then
+->single without a condition on the $rs returned from that.
+
 =cut
 
 sub single {
   my ($self, $where) = @_;
-  my $attrs = { %{$self->{attrs}} };
+  my $attrs = { %{$self->_resolved_attrs} };
   if ($where) {
     if (defined $attrs->{where}) {
       $attrs->{where} = {
@@ -471,10 +531,86 @@ sub single {
       $attrs->{where} = $where;
     }
   }
+
+#  XXX: Disabled since it doesn't infer uniqueness in all cases
+#  unless ($self->_is_unique_query($attrs->{where})) {
+#    carp "Query not guaranteed to return a single row"
+#      . "; please declare your unique constraints or use search instead";
+#  }
+
   my @data = $self->result_source->storage->select_single(
-          $self->{from}, $attrs->{select},
-          $attrs->{where},$attrs);
-  return (@data ? $self->_construct_object(@data) : ());
+    $attrs->{from}, $attrs->{select},
+    $attrs->{where}, $attrs
+  );
+
+  return (@data ? ($self->_construct_object(@data))[0] : undef);
+}
+
+# _is_unique_query
+#
+# Try to determine if the specified query is guaranteed to be unique, based on
+# the declared unique constraints.
+
+sub _is_unique_query {
+  my ($self, $query) = @_;
+
+  my $collapsed = $self->_collapse_query($query);
+  my $alias = $self->{attrs}{alias};
+
+  foreach my $name ($self->result_source->unique_constraint_names) {
+    my @unique_cols = map {
+      "$alias.$_"
+    } $self->result_source->unique_constraint_columns($name);
+
+    # Count the values for each unique column
+    my %seen = map { $_ => 0 } @unique_cols;
+
+    foreach my $key (keys %$collapsed) {
+      my $aliased = $key =~ /\./ ? $key : "$alias.$key";
+      next unless exists $seen{$aliased};  # Additional constraints are okay
+      $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
+    }
+
+    # If we get 0 or more than 1 value for a column, it's not necessarily unique
+    return 1 unless grep { $_ != 1 } values %seen;
+  }
+
+  return 0;
+}
+
+# _collapse_query
+#
+# Recursively collapse the query, accumulating values for each column.
+
+sub _collapse_query {
+  my ($self, $query, $collapsed) = @_;
+
+  $collapsed ||= {};
+
+  if (ref $query eq 'ARRAY') {
+    foreach my $subquery (@$query) {
+      next unless ref $subquery;  # -or
+#      warn "ARRAY: " . Dumper $subquery;
+      $collapsed = $self->_collapse_query($subquery, $collapsed);
+    }
+  }
+  elsif (ref $query eq 'HASH') {
+    if (keys %$query and (keys %$query)[0] eq '-and') {
+      foreach my $subquery (@{$query->{-and}}) {
+#        warn "HASH: " . Dumper $subquery;
+        $collapsed = $self->_collapse_query($subquery, $collapsed);
+      }
+    }
+    else {
+#      warn "LEAF: " . Dumper $query;
+      foreach my $col (keys %$query) {
+        my $value = $query->{$col};
+        $collapsed->{$col}{$value}++;
+      }
+    }
+  }
+
+  return $collapsed;
 }
 
 =head2 get_column
@@ -489,13 +625,12 @@ sub single {
 
   my $max_length = $rs->get_column('length')->max;
 
-Returns a ResultSetColumn instance for $column based on $self
+Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
 
 =cut
 
 sub get_column {
   my ($self, $column) = @_;
-
   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
   return $new;
 }
@@ -577,7 +712,7 @@ Can be used to efficiently iterate over records in the resultset:
     print $cd->title;
   }
 
-Note that you need to store the resultset object, and call C<next> on it. 
+Note that you need to store the resultset object, and call C<next> on it.
 Calling C<< resultset('Table')->next >> repeatedly will always return the
 first record from the resultset.
 
@@ -585,101 +720,155 @@ first record from the resultset.
 
 sub next {
   my ($self) = @_;
-  if (@{$self->{all_cache} || []}) {
+  if (my $cache = $self->get_cache) {
     $self->{all_cache_position} ||= 0;
-    return $self->{all_cache}->[$self->{all_cache_position}++];
+    return $cache->[$self->{all_cache_position}++];
   }
   if ($self->{attrs}{cache}) {
     $self->{all_cache_position} = 1;
     return ($self->all)[0];
   }
-  my @row = (exists $self->{stashed_row} ?
-               @{delete $self->{stashed_row}} :
-               $self->cursor->next
+  if ($self->{stashed_objects}) {
+    my $obj = shift(@{$self->{stashed_objects}});
+    delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
+    return $obj;
+  }
+  my @row = (
+    exists $self->{stashed_row}
+      ? @{delete $self->{stashed_row}}
+      : $self->cursor->next
   );
-#  warn Dumper(\@row); use Data::Dumper;
-  return unless (@row);
-  return $self->_construct_object(@row);
+  return undef unless (@row);
+  my ($row, @more) = $self->_construct_object(@row);
+  $self->{stashed_objects} = \@more if @more;
+  return $row;
 }
 
 sub _construct_object {
   my ($self, @row) = @_;
-  my @as = @{ $self->{attrs}{as} };
-  
-  my $info = $self->_collapse_result(\@as, \@row);
-  
-  my $new = $self->result_class->inflate_result($self->result_source, @$info);
-  
-  $new = $self->{attrs}{record_filter}->($new)
-    if exists $self->{attrs}{record_filter};
-  return $new;
+  my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
+  my @new = $self->result_class->inflate_result($self->result_source, @$info);
+  @new = $self->{_attrs}{record_filter}->(@new)
+    if exists $self->{_attrs}{record_filter};
+  return @new;
 }
 
 sub _collapse_result {
-  my ($self, $as, $row, $prefix) = @_;
-
-  my %const;
+  my ($self, $as_proto, $row) = @_;
 
   my @copy = @$row;
-  foreach my $this_as (@$as) {
-    my $val = shift @copy;
-    if (defined $prefix) {
-      if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
-        my $remain = $1;
-        $remain =~ /^(?:(.*)\.)?([^.]+)$/;
-        $const{$1||''}{$2} = $val;
-      }
-    } else {
-      $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
-      $const{$1||''}{$2} = $val;
-    }
-  }
 
-  my $info = [ {}, {} ];
-  foreach my $key (keys %const) {
-    if (length $key) {
-      my $target = $info;
-      my @parts = split(/\./, $key);
-      foreach my $p (@parts) {
-        $target = $target->[1]->{$p} ||= [];
+  # 'foo'         => [ undef, 'foo' ]
+  # 'foo.bar'     => [ 'foo', 'bar' ]
+  # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
+
+  my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
+
+  my %collapse = %{$self->{_attrs}{collapse}||{}};
+
+  my @pri_index;
+
+  # if we're doing collapsing (has_many prefetch) we need to grab records
+  # until the PK changes, so fill @pri_index. if not, we leave it empty so
+  # we know we don't have to bother.
+
+  # the reason for not using the collapse stuff directly is because if you
+  # had for e.g. two artists in a row with no cds, the collapse info for
+  # both would be NULL (undef) so you'd lose the second artist
+
+  # store just the index so we can check the array positions from the row
+  # without having to contruct the full hash
+
+  if (keys %collapse) {
+    my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
+    foreach my $i (0 .. $#construct_as) {
+      next if defined($construct_as[$i][0]); # only self table
+      if (delete $pri{$construct_as[$i][1]}) {
+        push(@pri_index, $i);
       }
-      $target->[0] = $const{$key};
-    } else {
-      $info->[0] = $const{$key};
+      last unless keys %pri; # short circuit (Johnny Five Is Alive!)
     }
   }
 
-  my @collapse;
-  if (defined $prefix) {
-    @collapse = map {
-        m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
-    } keys %{$self->{collapse}}
-  } else {
-    @collapse = keys %{$self->{collapse}};
-  };
+  # no need to do an if, it'll be empty if @pri_index is empty anyway
+
+  my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
+
+  my @const_rows;
 
-  if (@collapse) {
-    my ($c) = sort { length $a <=> length $b } @collapse;
-    my $target = $info;
-    foreach my $p (split(/\./, $c)) {
-      $target = $target->[1]->{$p} ||= [];
+  do { # no need to check anything at the front, we always want the first row
+
+    my %const;
+  
+    foreach my $this_as (@construct_as) {
+      $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
     }
-    my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
-    my @co_key = @{$self->{collapse}{$c_prefix}};
-    my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
-    my $tree = $self->_collapse_result($as, $row, $c_prefix);
-    my (@final, @raw);
-    while ( !(grep {
-                !defined($tree->[0]->{$_}) ||
-                $co_check{$_} ne $tree->[0]->{$_}
-              } @co_key) ) {
-      push(@final, $tree);
-      last unless (@raw = $self->cursor->next);
-      $row = $self->{stashed_row} = \@raw;
-      $tree = $self->_collapse_result($as, $row, $c_prefix);
-      #warn Data::Dumper::Dumper($tree, $row);
+
+    push(@const_rows, \%const);
+
+  } until ( # no pri_index => no collapse => drop straight out
+      !@pri_index
+    or
+      do { # get another row, stash it, drop out if different PK
+
+        @copy = $self->cursor->next;
+        $self->{stashed_row} = \@copy;
+
+        # last thing in do block, counts as true if anything doesn't match
+
+        # check xor defined first for NULL vs. NOT NULL then if one is
+        # defined the other must be so check string equality
+
+        grep {
+          (defined $pri_vals{$_} ^ defined $copy[$_])
+          || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
+        } @pri_index;
+      }
+  );
+
+  my $alias = $self->{attrs}{alias};
+  my $info = [];
+
+  my %collapse_pos;
+
+  my @const_keys;
+
+  foreach my $const (@const_rows) {
+    scalar @const_keys or do {
+      @const_keys = sort { length($a) <=> length($b) } keys %$const;
+    };
+    foreach my $key (@const_keys) {
+      if (length $key) {
+        my $target = $info;
+        my @parts = split(/\./, $key);
+        my $cur = '';
+        my $data = $const->{$key};
+        foreach my $p (@parts) {
+          $target = $target->[1]->{$p} ||= [];
+          $cur .= ".${p}";
+          if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
+            # collapsing at this point and on final part
+            my $pos = $collapse_pos{$cur};
+            CK: foreach my $ck (@ckey) {
+              if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
+                $collapse_pos{$cur} = $data;
+                delete @collapse_pos{ # clear all positioning for sub-entries
+                  grep { m/^\Q${cur}.\E/ } keys %collapse_pos
+                };
+                push(@$target, []);
+                last CK;
+              }
+            }
+          }
+          if (exists $collapse{$cur}) {
+            $target = $target->[-1];
+          }
+        }
+        $target->[0] = $data;
+      } else {
+        $info->[0] = $const->{$key};
+      }
     }
-    @$target = @final;
   }
 
   return $info;
@@ -698,6 +887,20 @@ sub _collapse_result {
 An accessor for the primary ResultSource object from which this ResultSet
 is derived.
 
+=head2 result_class
+
+=over 4
+
+=item Arguments: $result_class?
+
+=item Return Value: $result_class
+
+=back
+
+An accessor for the class to use when creating row objects. Defaults to 
+C<< result_source->result_class >> - which in most cases is the name of the 
+L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
+
 =cut
 
 
@@ -726,29 +929,33 @@ clause.
 sub count {
   my $self = shift;
   return $self->search(@_)->count if @_ and defined $_[0];
-  return scalar @{ $self->get_cache } if @{ $self->get_cache };
-
+  return scalar @{ $self->get_cache } if $self->get_cache;
   my $count = $self->_count;
   return 0 unless $count;
 
-  $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
+  # need to take offset from resolved attrs
+
+  $count -= $self->{_attrs}{offset} if $self->{_attrs}{offset};
   $count = $self->{attrs}{rows} if
     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
+  $count = 0 if ($count < 0);
   return $count;
 }
 
 sub _count { # Separated out so pager can get the full count
   my $self = shift;
   my $select = { count => '*' };
-  my $attrs = { %{ $self->{attrs} } };
+
+  my $attrs = { %{$self->_resolved_attrs} };
   if (my $group_by = delete $attrs->{group_by}) {
     delete $attrs->{having};
     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
     # todo: try CONCAT for multi-column pk
     my @pk = $self->result_source->primary_columns;
     if (@pk == 1) {
+      my $alias = $attrs->{alias};
       foreach my $column (@distinct) {
-        if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
+        if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
           @distinct = ($column);
           last;
         }
@@ -756,7 +963,6 @@ sub _count { # Separated out so pager can get the full count
     }
 
     $select = { count => { distinct => \@distinct } };
-    #use Data::Dumper; die Dumper $select;
   }
 
   $attrs->{select} = $select;
@@ -764,8 +970,9 @@ sub _count { # Separated out so pager can get the full count
 
   # offset, order by and page are not needed to count. record_filter is cdbi
   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
-        
-  my ($count) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
+
+  my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
+  my ($count) = $tmp_rs->cursor->next;
   return $count;
 }
 
@@ -803,16 +1010,17 @@ is returned in list context.
 
 sub all {
   my ($self) = @_;
-  return @{ $self->get_cache } if @{ $self->get_cache };
+  return @{ $self->get_cache } if $self->get_cache;
 
   my @obj;
 
-  if (keys %{$self->{collapse}}) {
+  # TODO: don't call resolve here
+  if (keys %{$self->_resolved_attrs->{collapse}}) {
+#  if ($self->{attrs}{prefetch}) {
       # Using $self->cursor->all is really just an optimisation.
       # If we're collapsing has_many prefetches it probably makes
       # very little difference, and this is cleaner than hacking
       # _construct_object to survive the approach
-    $self->cursor->reset;
     my @row = $self->cursor->next;
     while (@row) {
       push(@obj, $self->_construct_object(@row));
@@ -844,6 +1052,7 @@ Resets the resultset's cursor, so you can iterate through the elements again.
 
 sub reset {
   my ($self) = @_;
+  delete $self->{_attrs} if exists $self->{_attrs};
   $self->{all_cache_position} = 0;
   $self->cursor->reset;
   return $self;
@@ -875,13 +1084,14 @@ sub first {
 # appropriately, returning the new condition.
 
 sub _cond_for_update_delete {
-  my ($self) = @_;
+  my ($self, $full_cond) = @_;
   my $cond = {};
 
-  if (!ref($self->{cond})) {
-    # No-op. No condition, we're updating/deleting everything
-  }
-  elsif (ref $self->{cond} eq 'ARRAY') {
+  $full_cond ||= $self->{cond};
+  # No-op. No condition, we're updating/deleting everything
+  return $cond unless ref $full_cond;
+
+  if (ref $full_cond eq 'ARRAY') {
     $cond = [
       map {
         my %hash;
@@ -890,36 +1100,33 @@ sub _cond_for_update_delete {
           $hash{$1} = $_->{$key};
         }
         \%hash;
-      } @{$self->{cond}}
+      } @{$full_cond}
     ];
   }
-  elsif (ref $self->{cond} eq 'HASH') {
-    if ((keys %{$self->{cond}})[0] eq '-and') {
+  elsif (ref $full_cond eq 'HASH') {
+    if ((keys %{$full_cond})[0] eq '-and') {
       $cond->{-and} = [];
 
-      my @cond = @{$self->{cond}{-and}};
-      for (my $i = 0; $i < @cond - 1; $i++) {
+      my @cond = @{$full_cond->{-and}};
+      for (my $i = 0; $i < @cond; $i++) {
         my $entry = $cond[$i];
 
-        my %hash;
+        my $hash;
         if (ref $entry eq 'HASH') {
-          foreach my $key (keys %{$entry}) {
-            $key =~ /([^.]+)$/;
-            $hash{$1} = $entry->{$key};
-          }
+          $hash = $self->_cond_for_update_delete($entry);
         }
         else {
           $entry =~ /([^.]+)$/;
-          $hash{$entry} = $cond[++$i];
+          $hash->{$1} = $cond[++$i];
         }
 
-        push @{$cond->{-and}}, \%hash;
+        push @{$cond->{-and}}, $hash;
       }
     }
     else {
-      foreach my $key (keys %{$self->{cond}}) {
+      foreach my $key (keys %{$full_cond}) {
         $key =~ /([^.]+)$/;
-        $cond->{$1} = $self->{cond}{$key};
+        $cond->{$1} = $full_cond->{$key};
       }
     }
   }
@@ -955,9 +1162,9 @@ sub update {
     unless ref $values eq 'HASH';
 
   my $cond = $self->_cond_for_update_delete;
-
+   
   return $self->result_source->storage->update(
-    $self->result_source->from, $values, $cond
+    $self->result_source, $values, $cond
   );
 }
 
@@ -998,17 +1205,16 @@ sub update_all {
 
 Deletes the contents of the resultset from its result source. Note that this
 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
-to run.
+to run. See also L<DBIx::Class::Row/delete>.
 
 =cut
 
 sub delete {
   my ($self) = @_;
-  my $del = {};
 
   my $cond = $self->_cond_for_update_delete;
 
-  $self->result_source->storage->delete($self->result_source->from, $cond);
+  $self->result_source->storage->delete($self->result_source, $cond);
   return 1;
 }
 
@@ -1033,6 +1239,133 @@ sub delete_all {
   return 1;
 }
 
+=head2 populate
+
+=over 4
+
+=item Arguments: \@data;
+
+=back
+
+Pass an arrayref of hashrefs. Each hashref should be a structure suitable for
+submitting to a $resultset->create(...) method.
+
+In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
+to insert the data, as this is a faster method.
+
+Otherwise, each set of data is inserted into the database using
+L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
+objects is returned.
+
+Example:  Assuming an Artist Class that has many CDs Classes relating:
+
+  my $Artist_rs = $schema->resultset("Artist");
+  
+  ## Void Context Example 
+  $Artist_rs->populate([
+     { artistid => 4, name => 'Manufactured Crap', cds => [ 
+        { title => 'My First CD', year => 2006 },
+        { title => 'Yet More Tweeny-Pop crap', year => 2007 },
+      ],
+     },
+     { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
+        { title => 'My parents sold me to a record company' ,year => 2005 },
+        { title => 'Why Am I So Ugly?', year => 2006 },
+        { title => 'I Got Surgery and am now Popular', year => 2007 }
+      ],
+     },
+  ]);
+  
+  ## Array Context Example
+  my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
+    { name => "Artist One"},
+    { name => "Artist Two"},
+    { name => "Artist Three", cds=> [
+    { title => "First CD", year => 2007},
+    { title => "Second CD", year => 2008},
+  ]}
+  ]);
+  
+  print $ArtistOne->name; ## response is 'Artist One'
+  print $ArtistThree->cds->count ## reponse is '2'
+
+=cut
+
+sub populate {
+  my ($self, $data) = @_;
+  
+  if(defined wantarray) {
+    my @created;
+    foreach my $item (@$data) {
+      push(@created, $self->create($item));
+    }
+    return @created;
+  } else {
+    my ($first, @rest) = @$data;
+
+    my @names = grep {!ref $first->{$_}} keys %$first;
+    my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
+    my @pks = $self->result_source->primary_columns;  
+
+    ## do the belongs_to relationships  
+    foreach my $index (0..$#$data) {
+      if( grep { !defined $data->[$index]->{$_} } @pks ) {
+        my @ret = $self->populate($data);
+        return;
+      }
+    
+      foreach my $rel (@rels) {
+        next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
+        my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
+        my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
+        my $related = $result->result_source->resolve_condition(
+          $result->result_source->relationship_info($reverse)->{cond},
+          $self,        
+          $result,        
+        );
+
+        delete $data->[$index]->{$rel};
+        $data->[$index] = {%{$data->[$index]}, %$related};
+      
+        push @names, keys %$related if $index == 0;
+      }
+    }
+
+    ## do bulk insert on current row
+    my @values = map { [ @$_{@names} ] } @$data;
+
+    $self->result_source->storage->insert_bulk(
+      $self->result_source, 
+      \@names, 
+      \@values,
+    );
+
+    ## do the has_many relationships
+    foreach my $item (@$data) {
+
+      foreach my $rel (@rels) {
+        next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
+
+        my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
+     || $self->throw_exception('Cannot find the relating object.');
+     
+        my $child = $parent->$rel;
+    
+        my $related = $child->result_source->resolve_condition(
+          $parent->result_source->relationship_info($rel)->{cond},
+          $child,
+          $parent,
+        );
+
+        my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
+        my @populate = map { {%$_, %$related} } @rows_to_add;
+
+        $child->populate( \@populate );
+      }
+    }
+  }
+}
+
 =head2 pager
 
 =over 4
@@ -1052,10 +1385,10 @@ sub pager {
   my ($self) = @_;
   my $attrs = $self->{attrs};
   $self->throw_exception("Can't create pager for non-paged rs")
-    unless $self->{page};
+    unless $self->{attrs}{page};
   $attrs->{rows} ||= 10;
   return $self->{pager} ||= Data::Page->new(
-    $self->_count, $attrs->{rows}, $self->{page});
+    $self->_count, $attrs->{rows}, $self->{attrs}{page});
 }
 
 =head2 page
@@ -1076,9 +1409,7 @@ attribute set on the resultset (10 by default).
 
 sub page {
   my ($self, $page) = @_;
-  my $attrs = { %{$self->{attrs}} };
-  $attrs->{page} = $page;
-  return (ref $self)->new($self->result_source, $attrs);
+  return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
 }
 
 =head2 new_result
@@ -1102,23 +1433,84 @@ sub new_result {
   $self->throw_exception(
     "Can't abstract implicit construct, condition not a hash"
   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
-  my %new = %$values;
+
   my $alias = $self->{attrs}{alias};
-  foreach my $key (keys %{$self->{cond}||{}}) {
-    $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
-  }
-  my $obj = $self->result_class->new(\%new);
-  $obj->result_source($self->result_source) if $obj->can('result_source');
-  return $obj;
-}
+  my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
+  my %new = (
+    %{ $self->_remove_alias($values, $alias) },
+    %{ $self->_remove_alias($collapsed_cond, $alias) },
+    -source_handle => $self->_source_handle,
+    -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
+  );
 
-=head2 find_or_new
+  return $self->result_class->new(\%new);
+}
 
-=over 4
+# _collapse_cond
+#
+# Recursively collapse the condition.
 
-=item Arguments: \%vals, \%attrs?
+sub _collapse_cond {
+  my ($self, $cond, $collapsed) = @_;
 
-=item Return Value: $object
+  $collapsed ||= {};
+
+  if (ref $cond eq 'ARRAY') {
+    foreach my $subcond (@$cond) {
+      next unless ref $subcond;  # -or
+#      warn "ARRAY: " . Dumper $subcond;
+      $collapsed = $self->_collapse_cond($subcond, $collapsed);
+    }
+  }
+  elsif (ref $cond eq 'HASH') {
+    if (keys %$cond and (keys %$cond)[0] eq '-and') {
+      foreach my $subcond (@{$cond->{-and}}) {
+#        warn "HASH: " . Dumper $subcond;
+        $collapsed = $self->_collapse_cond($subcond, $collapsed);
+      }
+    }
+    else {
+#      warn "LEAF: " . Dumper $cond;
+      foreach my $col (keys %$cond) {
+        my $value = $cond->{$col};
+        $collapsed->{$col} = $value;
+      }
+    }
+  }
+
+  return $collapsed;
+}
+
+# _remove_alias
+#
+# Remove the specified alias from the specified query hash. A copy is made so
+# the original query is not modified.
+
+sub _remove_alias {
+  my ($self, $query, $alias) = @_;
+
+  my %orig = %{ $query || {} };
+  my %unaliased;
+
+  foreach my $key (keys %orig) {
+    if ($key !~ /\./) {
+      $unaliased{$key} = $orig{$key};
+      next;
+    }
+    $unaliased{$1} = $orig{$key}
+      if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
+  }
+
+  return \%unaliased;
+}
+
+=head2 find_or_new
+
+=over 4
+
+=item Arguments: \%vals, \%attrs?
+
+=item Return Value: $object
 
 =back
 
@@ -1173,8 +1565,8 @@ sub create {
 
   $class->find_or_create({ key => $val, ... });
 
-Searches for a record matching the search condition; if it doesn't find one,
-creates one and returns that instead.
+Tries to find a record based on its primary key or unique constraint; if none
+is found, creates one and returns that instead.
 
   my $cd = $schema->resultset('CD')->find_or_create({
     cdid   => 5,
@@ -1191,7 +1583,7 @@ constraint. For example:
       artist => 'Massive Attack',
       title  => 'Mezzanine',
     },
-    { key => 'artist_title' }
+    { key => 'cd_artist_title' }
   );
 
 See also L</find> and L</update_or_create>. For information on how to declare
@@ -1234,7 +1626,7 @@ For example:
       title  => 'Mezzanine',
       year   => 1998,
     },
-    { key => 'artist_title' }
+    { key => 'cd_artist_title' }
   );
 
 If no C<key> is specified, it searches on all unique constraints defined on the
@@ -1250,16 +1642,15 @@ unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
 sub update_or_create {
   my $self = shift;
   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
-  my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
+  my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
 
-  my $row = $self->find($hash, $attrs);
+  my $row = $self->find($cond, $attrs);
   if (defined $row) {
-    $row->set_columns($hash);
-    $row->update;
+    $row->update($cond);
     return $row;
   }
 
-  return $self->create($hash);
+  return $self->create($cond);
 }
 
 =head2 get_cache
@@ -1277,7 +1668,7 @@ Gets the contents of the cache for the resultset, if the cache is set.
 =cut
 
 sub get_cache {
-  shift->{all_cache} || [];
+  shift->{all_cache};
 }
 
 =head2 set_cache
@@ -1300,13 +1691,7 @@ than re-querying the database even if the cache attr is not set.
 sub set_cache {
   my ( $self, $data ) = @_;
   $self->throw_exception("set_cache requires an arrayref")
-    if ref $data ne 'ARRAY';
-  my $result_class = $self->result_class;
-  foreach( @$data ) {
-    $self->throw_exception(
-      "cannot cache object of type '$_', expected '$result_class'"
-    ) if ref $_ ne $result_class;
-  }
+      if defined($data) && (ref $data ne 'ARRAY');
   $self->{all_cache} = $data;
 }
 
@@ -1325,7 +1710,7 @@ Clears the cache for the resultset.
 =cut
 
 sub clear_cache {
-  shift->set_cache([]);
+  shift->set_cache(undef);
 }
 
 =head2 related_resultset
@@ -1345,32 +1730,306 @@ Returns a related resultset for the supplied relationship name.
 =cut
 
 sub related_resultset {
-  my ( $self, $rel ) = @_;
+  my ($self, $rel) = @_;
+
   $self->{related_resultsets} ||= {};
   return $self->{related_resultsets}{$rel} ||= do {
-      #warn "fetching related resultset for rel '$rel'";
-      my $rel_obj = $self->result_source->relationship_info($rel);
-      $self->throw_exception(
-        "search_related: result source '" . $self->result_source->name .
-        "' has no such relationship ${rel}")
-        unless $rel_obj; #die Dumper $self->{attrs};
-
-      my $rs = $self->search(undef, { join => $rel });
-      my $alias = defined $rs->{attrs}{seen_join}{$rel}
-                    && $rs->{attrs}{seen_join}{$rel} > 1
-                  ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
-                  : $rel;
-
-      $self->result_source->schema->resultset($rel_obj->{class}
-           )->search( undef,
-             { %{$rs->{attrs}},
-               alias => $alias,
-               select => undef,
-               as => undef }
-           );
+    my $rel_obj = $self->result_source->relationship_info($rel);
+
+    $self->throw_exception(
+      "search_related: result source '" . $self->result_source->source_name .
+        "' has no such relationship $rel")
+      unless $rel_obj;
+    
+    my ($from,$seen) = $self->_resolve_from($rel);
+
+    my $join_count = $seen->{$rel};
+    my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
+
+    #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
+    my %attrs = %{$self->{attrs}||{}};
+    delete @attrs{qw(result_class alias)};
+
+    my $new_cache;
+
+    if (my $cache = $self->get_cache) {
+      if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
+        $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
+                        @$cache ];
+      }
+    }
+
+    my $rel_source = $self->result_source->related_source($rel);
+
+    my $new = do {
+
+      # The reason we do this now instead of passing the alias to the
+      # search_rs below is that if you wrap/overload resultset on the
+      # source you need to know what alias it's -going- to have for things
+      # to work sanely (e.g. RestrictWithObject wants to be able to add
+      # extra query restrictions, and these may need to be $alias.)
+
+      my $attrs = $rel_source->resultset_attributes;
+      local $attrs->{alias} = $alias;
+
+      $rel_source->resultset
+                 ->search_rs(
+                     undef, {
+                       %attrs,
+                       join => undef,
+                       prefetch => undef,
+                       select => undef,
+                       as => undef,
+                       where => $self->{cond},
+                       seen_join => $seen,
+                       from => $from,
+                   });
+    };
+    $new->set_cache($new_cache) if $new_cache;
+    $new;
   };
 }
 
+sub _resolve_from {
+  my ($self, $extra_join) = @_;
+  my $source = $self->result_source;
+  my $attrs = $self->{attrs};
+  
+  my $from = $attrs->{from}
+    || [ { $attrs->{alias} => $source->from } ];
+    
+  my $seen = { %{$attrs->{seen_join}||{}} };
+
+  my $join = ($attrs->{join}
+               ? [ $attrs->{join}, $extra_join ]
+               : $extra_join);
+
+  # we need to take the prefetch the attrs into account before we 
+  # ->resolve_join as otherwise they get lost - captainL
+  my $merged = $self->_merge_attr( $join, $attrs->{prefetch} );
+
+  $from = [
+    @$from,
+    ($join ? $source->resolve_join($merged, $attrs->{alias}, $seen) : ()),
+  ];
+
+  return ($from,$seen);
+}
+
+sub _resolved_attrs {
+  my $self = shift;
+  return $self->{_attrs} if $self->{_attrs};
+
+  my $attrs = { %{$self->{attrs}||{}} };
+  my $source = $self->result_source;
+  my $alias = $attrs->{alias};
+
+  $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
+  if ($attrs->{columns}) {
+    delete $attrs->{as};
+  } elsif (!$attrs->{select}) {
+    $attrs->{columns} = [ $source->columns ];
+  }
+  $attrs->{select} = 
+    ($attrs->{select}
+      ? (ref $attrs->{select} eq 'ARRAY'
+          ? [ @{$attrs->{select}} ]
+          : [ $attrs->{select} ])
+      : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
+    );
+  $attrs->{as} =
+    ($attrs->{as}
+      ? (ref $attrs->{as} eq 'ARRAY'
+          ? [ @{$attrs->{as}} ]
+          : [ $attrs->{as} ])
+      : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
+    );
+  
+  my $adds;
+  if ($adds = delete $attrs->{include_columns}) {
+    $adds = [$adds] unless ref $adds eq 'ARRAY';
+    push(@{$attrs->{select}}, @$adds);
+    push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
+  }
+  if ($adds = delete $attrs->{'+select'}) {
+    $adds = [$adds] unless ref $adds eq 'ARRAY';
+    push(@{$attrs->{select}},
+           map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
+  }
+  if (my $adds = delete $attrs->{'+as'}) {
+    $adds = [$adds] unless ref $adds eq 'ARRAY';
+    push(@{$attrs->{as}}, @$adds);
+  }
+
+  $attrs->{from} ||= [ { 'me' => $source->from } ];
+
+  if (exists $attrs->{join} || exists $attrs->{prefetch}) {
+    my $join = delete $attrs->{join} || {};
+
+    if (defined $attrs->{prefetch}) {
+      $join = $self->_merge_attr(
+        $join, $attrs->{prefetch}
+      );
+      
+    }
+
+    $attrs->{from} =   # have to copy here to avoid corrupting the original
+      [
+        @{$attrs->{from}}, 
+        $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
+      ];
+
+  }
+
+  $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
+  if ($attrs->{order_by}) {
+    $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
+                           ? [ @{$attrs->{order_by}} ]
+                           : [ $attrs->{order_by} ]);
+  } else {
+    $attrs->{order_by} = [];    
+  }
+
+  my $collapse = $attrs->{collapse} || {};
+  if (my $prefetch = delete $attrs->{prefetch}) {
+    $prefetch = $self->_merge_attr({}, $prefetch);
+    my @pre_order;
+    my $seen = $attrs->{seen_join} || {};
+    foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
+      # bring joins back to level of current class
+      my @prefetch = $source->resolve_prefetch(
+        $p, $alias, $seen, \@pre_order, $collapse
+      );
+      push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
+      push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
+    }
+    push(@{$attrs->{order_by}}, @pre_order);
+  }
+  $attrs->{collapse} = $collapse;
+
+  if ($attrs->{page}) {
+    $attrs->{offset} ||= 0;
+    $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
+  }
+
+  return $self->{_attrs} = $attrs;
+}
+
+sub _rollout_attr {
+  my ($self, $attr) = @_;
+  
+  if (ref $attr eq 'HASH') {
+    return $self->_rollout_hash($attr);
+  } elsif (ref $attr eq 'ARRAY') {
+    return $self->_rollout_array($attr);
+  } else {
+    return [$attr];
+  }
+}
+
+sub _rollout_array {
+  my ($self, $attr) = @_;
+
+  my @rolled_array;
+  foreach my $element (@{$attr}) {
+    if (ref $element eq 'HASH') {
+      push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
+    } elsif (ref $element eq 'ARRAY') {
+      #  XXX - should probably recurse here
+      push( @rolled_array, @{$self->_rollout_array($element)} );
+    } else {
+      push( @rolled_array, $element );
+    }
+  }
+  return \@rolled_array;
+}
+
+sub _rollout_hash {
+  my ($self, $attr) = @_;
+
+  my @rolled_array;
+  foreach my $key (keys %{$attr}) {
+    push( @rolled_array, { $key => $attr->{$key} } );
+  }
+  return \@rolled_array;
+}
+
+sub _calculate_score {
+  my ($self, $a, $b) = @_;
+
+  if (ref $b eq 'HASH') {
+    my ($b_key) = keys %{$b};
+    if (ref $a eq 'HASH') {
+      my ($a_key) = keys %{$a};
+      if ($a_key eq $b_key) {
+        return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
+      } else {
+        return 0;
+      }
+    } else {
+      return ($a eq $b_key) ? 1 : 0;
+    }       
+  } else {
+    if (ref $a eq 'HASH') {
+      my ($a_key) = keys %{$a};
+      return ($b eq $a_key) ? 1 : 0;
+    } else {
+      return ($b eq $a) ? 1 : 0;
+    }
+  }
+}
+
+sub _merge_attr {
+  my ($self, $a, $b) = @_;
+
+  return $b unless defined($a);
+  return $a unless defined($b);
+  
+  $a = $self->_rollout_attr($a);
+  $b = $self->_rollout_attr($b);
+
+  my $seen_keys;
+  foreach my $b_element ( @{$b} ) {
+    # find best candidate from $a to merge $b_element into
+    my $best_candidate = { position => undef, score => 0 }; my $position = 0;
+    foreach my $a_element ( @{$a} ) {
+      my $score = $self->_calculate_score( $a_element, $b_element );
+      if ($score > $best_candidate->{score}) {
+        $best_candidate->{position} = $position;
+        $best_candidate->{score} = $score;
+      }
+      $position++;
+    }
+    my ($b_key) = ( ref $b_element eq 'HASH' ) ? keys %{$b_element} : ($b_element);
+    if ($best_candidate->{score} == 0 || exists $seen_keys->{$b_key}) {
+      push( @{$a}, $b_element );
+    } else {
+      $seen_keys->{$b_key} = 1; # don't merge the same key twice
+      my $a_best = $a->[$best_candidate->{position}];
+      # merge a_best and b_element together and replace original with merged
+      if (ref $a_best ne 'HASH') {
+        $a->[$best_candidate->{position}] = $b_element;
+      } elsif (ref $b_element eq 'HASH') {
+        my ($key) = keys %{$a_best};
+        $a->[$best_candidate->{position}] = { $key => $self->_merge_attr($a_best->{$key}, $b_element->{$key}) };
+      }
+    }
+  }
+
+  return $a;
+}
+
+sub result_source {
+    my $self = shift;
+
+    if (@_) {
+        $self->_source_handle($_[0]->handle);
+    } else {
+        $self->_source_handle->resolve;
+    }
+}
+
 =head2 throw_exception
 
 See L<DBIx::Class::Schema/throw_exception> for details.
@@ -1379,7 +2038,7 @@ See L<DBIx::Class::Schema/throw_exception> for details.
 
 sub throw_exception {
   my $self=shift;
-  $self->result_source->schema->throw_exception(@_);
+  $self->_source_handle->schema->throw_exception(@_);
 }
 
 # XXX: FIXME: Attributes docs need clearing up
@@ -1401,6 +2060,11 @@ Which column(s) to order the results by. This is currently passed
 through directly to SQL, so you can give e.g. C<year DESC> for a
 descending order on the column `year'.
 
+Please note that if you have C<quote_char> enabled (see
+L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
+specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
+so you will need to manually quote things as appropriate.)
+
 =head2 columns
 
 =over 4
@@ -1430,7 +2094,9 @@ Shortcut to include additional columns in the returned results - for example
   });
 
 would return all CDs and include a 'name' column to the information
-passed to object inflation
+passed to object inflation. Note that the 'artist' is the name of the
+column (or relationship) accessor, and 'name' is the name of the column
+accessor in the related table.
 
 =head2 select
 
@@ -1456,6 +2122,23 @@ When you use function/stored procedure names and do not supply an C<as>
 attribute, the column names returned are storage-dependent. E.g. MySQL would
 return a column named C<count(employeeid)> in the above example.
 
+=head2 +select
+
+=over 4
+
+Indicates additional columns to be selected from storage.  Works the same as
+L<select> but adds columns to the selection.
+
+=back
+
+=head2 +as
+
+=over 4
+
+Indicates additional column names for those added via L<+select>.
+
+=back
+
 =head2 as
 
 =over 4
@@ -1464,8 +2147,14 @@ return a column named C<count(employeeid)> in the above example.
 
 =back
 
-Indicates column names for object inflation. This is used in conjunction with
-C<select>, usually when C<select> contains one or more function or stored
+Indicates column names for object inflation. That is, c< as >
+indicates the name that the column can be accessed as via the
+C<get_column> method (or via the object accessor, B<if one already
+exists>).  It has nothing to do with the SQL code C< SELECT foo AS bar
+>.
+
+The C< as > attribute is used in conjunction with C<select>,
+usually when C<select> contains one or more function or stored
 procedure names:
 
   $rs = $schema->resultset('Employee')->search(undef, {
@@ -1492,6 +2181,16 @@ use C<get_column> instead:
 You can create your own accessors if required - see
 L<DBIx::Class::Manual::Cookbook> for details.
 
+Please note: This will NOT insert an C<AS employee_count> into the SQL
+statement produced, it is used for internal access only. Thus
+attempting to use the accessor in an C<order_by> clause or similar
+will fail miserably.
+
+To get around this limitation, you can supply literal SQL to your
+C<select> attibute that contains the C<AS alias> text, eg:
+
+  select => [\'myfield AS alias']
+
 =head2 join
 
 =over 4
@@ -1529,6 +2228,19 @@ For example:
     }
   );
 
+You need to use the relationship (not the table) name in  conditions, 
+because they are aliased as such. The current table is aliased as "me", so 
+you need to use me.column_name in order to avoid ambiguity. For example:
+
+  # Get CDs from 1984 with a 'Foo' track 
+  my $rs = $schema->resultset('CD')->search(
+    { 
+      'me.year' => 1984,
+      'tracks.name' => 'Foo'
+    },
+    { join => 'tracks' }
+  );
+  
 If the same join is supplied twice, it will be aliased to <rel>_2 (and
 similarly for a third time). For e.g.
 
@@ -1586,6 +2298,110 @@ C<prefetch> can be used with the following relationship types: C<belongs_to>,
 C<has_one> (or if you're using C<add_relationship>, any relationship declared
 with an accessor type of 'single' or 'filter').
 
+=head2 page
+
+=over 4
+
+=item Value: $page
+
+=back
+
+Makes the resultset paged and specifies the page to retrieve. Effectively
+identical to creating a non-pages resultset and then calling ->page($page)
+on it.
+
+If L<rows> attribute is not specified it defualts to 10 rows per page.
+
+=head2 rows
+
+=over 4
+
+=item Value: $rows
+
+=back
+
+Specifes the maximum number of rows for direct retrieval or the number of
+rows per page if the page attribute or method is used.
+
+=head2 offset
+
+=over 4
+
+=item Value: $offset
+
+=back
+
+Specifies the (zero-based) row number for the  first row to be returned, or the
+of the first row of the first page if paging is used.
+
+=head2 group_by
+
+=over 4
+
+=item Value: \@columns
+
+=back
+
+A arrayref of columns to group by. Can include columns of joined tables.
+
+  group_by => [qw/ column1 column2 ... /]
+
+=head2 having
+
+=over 4
+
+=item Value: $condition
+
+=back
+
+HAVING is a select statement attribute that is applied between GROUP BY and
+ORDER BY. It is applied to the after the grouping calculations have been
+done.
+
+  having => { 'count(employee)' => { '>=', 100 } }
+
+=head2 distinct
+
+=over 4
+
+=item Value: (0 | 1)
+
+=back
+
+Set to 1 to group by all columns.
+
+=head2 where
+
+=over 4
+
+Adds to the WHERE clause.
+
+  # only return rows WHERE deleted IS NULL for all searches
+  __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
+
+Can be overridden by passing C<{ where => undef }> as an attribute
+to a resulset.
+
+=back
+
+=head2 cache
+
+Set to 1 to cache search results. This prevents extra SQL queries if you
+revisit rows in your ResultSet:
+
+  my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
+
+  while( my $artist = $resultset->next ) {
+    ... do stuff ...
+  }
+
+  $rs->first; # without cache, this would issue a query
+
+By default, searches are not cached.
+
+For more examples of using these attributes, see
+L<DBIx::Class::Manual::Cookbook>.
+
 =head2 from
 
 =over 4
@@ -1599,21 +2415,35 @@ statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
 clauses.
 
 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
+
 C<join> will usually do what you need and it is strongly recommended that you
 avoid using C<from> unless you cannot achieve the desired result using C<join>.
+And we really do mean "cannot", not just tried and failed. Attempting to use
+this because you're having problems with C<join> is like trying to use x86
+ASM because you've got a syntax error in your C. Trust us on this.
+
+Now, if you're still really, really sure you need to use this (and if you're
+not 100% sure, ask the mailing list first), here's an explanation of how this
+works.
 
-In simple terms, C<from> works as follows:
+The syntax is as follows -
 
+  [
+    { <alias1> => <table1> },
     [
-        { <alias> => <table>, -join_type => 'inner|left|right' }
-        [] # nested JOIN (optional)
-        { <table.column> => <foreign_table.foreign_key> }
-    ]
+      { <alias2> => <table2>, -join_type => 'inner|left|right' },
+      [], # nested JOIN (optional)
+      { <table1.column1> => <table2.column2>, ... (more conditions) },
+    ],
+    # More of the above [ ] may follow for additional joins
+  ]
 
-    JOIN
-        <alias> <table>
-        [JOIN ...]
-    ON <table.column> = <foreign_table.foreign_key>
+  <table1> <alias1>
+  JOIN
+    <table2> <alias2>
+    [JOIN ...]
+  ON <table1.column1> = <table2.column2>
+  <more joins may follow>
 
 An easy way to follow the examples below is to remember the following:
 
@@ -1679,83 +2509,6 @@ with a father in the person table, we could explicitly use C<INNER JOIN>:
     # SELECT child.* FROM person child
     # INNER JOIN person father ON child.father_id = father.id
 
-=head2 page
-
-=over 4
-
-=item Value: $page
-
-=back
-
-Makes the resultset paged and specifies the page to retrieve. Effectively
-identical to creating a non-pages resultset and then calling ->page($page)
-on it.
-
-=head2 rows
-
-=over 4
-
-=item Value: $rows
-
-=back
-
-Specifes the maximum number of rows for direct retrieval or the number of
-rows per page if the page attribute or method is used.
-
-=head2 group_by
-
-=over 4
-
-=item Value: \@columns
-
-=back
-
-A arrayref of columns to group by. Can include columns of joined tables.
-
-  group_by => [qw/ column1 column2 ... /]
-
-=head2 having
-
-=over 4
-
-=item Value: $condition
-
-=back
-
-HAVING is a select statement attribute that is applied between GROUP BY and
-ORDER BY. It is applied to the after the grouping calculations have been
-done. 
-
-  having => { 'count(employee)' => { '>=', 100 } }
-
-=head2 distinct
-
-=over 4
-
-=item Value: (0 | 1)
-
-=back
-
-Set to 1 to group by all columns.
-
-=head2 cache
-
-Set to 1 to cache search results. This prevents extra SQL queries if you
-revisit rows in your ResultSet:
-
-  my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
-  
-  while( my $artist = $resultset->next ) {
-    ... do stuff ...
-  }
-
-  $rs->first; # without cache, this would issue a query
-
-By default, searches are not cached.
-
-For more examples of using these attributes, see
-L<DBIx::Class::Manual::Cookbook>.
-
 =cut
 
 1;