fix page-within-page bug
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index 55f0098..1bfd6d0 100644 (file)
@@ -9,11 +9,11 @@ use overload
 use Carp::Clan qw/^DBIx::Class/;
 use Data::Page;
 use Storable;
-use Scalar::Util qw/weaken/;
 use DBIx::Class::ResultSetColumn;
+use DBIx::Class::ResultSourceHandle;
 use base qw/DBIx::Class/;
-__PACKAGE__->load_components(qw/AccessorGroup/);
-__PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
+
+__PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
 
 =head1 NAME
 
@@ -85,27 +85,29 @@ sub new {
   return $class->new_result(@_) if ref $class;
 
   my ($source, $attrs) = @_;
-  weaken $source;
+  $source = $source->handle 
+    unless $source->isa('DBIx::Class::ResultSourceHandle');
+  $attrs = { %{$attrs||{}} };
 
   if ($attrs->{page}) {
     $attrs->{rows} ||= 10;
-    $attrs->{offset} ||= 0;
-    $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
+    $attrs->{offset} ||= ($attrs->{rows} * ($attrs->{page} - 1));
   }
 
   $attrs->{alias} ||= 'me';
-  $attrs->{_orig_alias} ||= $attrs->{alias};
 
-  bless {
-    result_source => $source,
-    result_class => $attrs->{result_class} || $source->result_class,
+  my $self = {
+    _source_handle => $source,
+    result_class => $attrs->{result_class} || $source->resolve->result_class,
     cond => $attrs->{where},
-#    from => $attrs->{from},
-#    collapse => $collapse,
     count => undef,
     pager => undef,
     attrs => $attrs
-  }, $class;
+  };
+
+  bless $self, $class;
+
+  return $self;
 }
 
 =head2 search
@@ -132,6 +134,11 @@ call it as C<search(undef, \%attrs)>.
     columns => [qw/name artistid/],
   });
 
+For a list of attributes that can be passed to C<search>, see
+L</ATTRIBUTES>. For more examples of using this function, see
+L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
+documentation for the first argument, see L<SQL::Abstract>.
+
 =cut
 
 sub search {
@@ -158,49 +165,38 @@ always return a resultset, even in list context.
 sub search_rs {
   my $self = shift;
 
+  my $rows;
+
+  unless (@_) {                 # no search, effectively just a clone
+    $rows = $self->get_cache;
+  }
+
   my $attrs = {};
   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
-  my $our_attrs = exists $attrs->{_parent_attrs}
-    ? { %{delete $attrs->{_parent_attrs}} }
-    : { %{$self->{attrs}} };
+  my $our_attrs = { %{$self->{attrs}} };
   my $having = delete $our_attrs->{having};
+  my $where = delete $our_attrs->{where};
 
-  # XXX should only maintain _live_join_stack and generate _live_join_h from that
-  if ($attrs->{_live_join_stack}) {
-    foreach my $join (reverse @{$attrs->{_live_join_stack}}) {
-      $attrs->{_live_join_h} = defined $attrs->{_live_join_h}
-        ? { $join => $attrs->{_live_join_h} }
-        : $join;
-    }
-  }
+  my $new_attrs = { %{$our_attrs}, %{$attrs} };
 
   # merge new attrs into inherited
   foreach my $key (qw/join prefetch/) {
     next unless exists $attrs->{$key};
-    if (my $live_join = $attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
-      foreach my $join (reverse @{$live_join}) {
-        $attrs->{$key} = { $join => $attrs->{$key} };
-      }
-    }
-
-    $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, delete $attrs->{$key});
-  }
-
-  $our_attrs->{join} = $self->_merge_attr(
-    $our_attrs->{join}, $attrs->{_live_join_h}
-  ) if ($attrs->{_live_join_h});
-
-  if (defined $our_attrs->{prefetch}) {
-    $our_attrs->{join} = $self->_merge_attr(
-      $our_attrs->{join}, $our_attrs->{prefetch}
-    );
+    $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
   }
 
-  my $new_attrs = { %{$our_attrs}, %{$attrs} };
-  my $where = (@_
+  my $cond = (@_
     ? (
         (@_ == 1 || ref $_[0] eq "HASH")
-          ? shift
+          ? (
+              (ref $_[0] eq 'HASH')
+                ? (
+                    (keys %{ $_[0] }  > 0)
+                      ? shift
+                      : undef
+                   )
+                :  shift
+             )
           : (
               (@_ % 2)
                 ? $self->throw_exception("Odd number of arguments to search")
@@ -222,6 +218,18 @@ sub search_rs {
         : $where);
   }
 
+  if (defined $cond) {
+    $new_attrs->{where} = (
+      defined $new_attrs->{where}
+        ? { '-and' => [
+              map {
+                ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
+              } $cond, $new_attrs->{where}
+            ]
+          }
+        : $cond);
+  }
+
   if (defined $having) {
     $new_attrs->{having} = (
       defined $new_attrs->{having}
@@ -235,13 +243,8 @@ sub search_rs {
   }
 
   my $rs = (ref $self)->new($self->result_source, $new_attrs);
-  $rs->{_parent_rs} = $self->{_parent_rs} if $self->{_parent_rs};
-
-  unless (@_) {                 # no search, effectively just a clone
-    my $rows = $self->get_cache;
-    if ($rows) {
-      $rs->set_cache($rows);
-    }
+  if ($rows) {
+    $rs->set_cache($rows);
   }
   return $rs;
 }
@@ -308,6 +311,9 @@ If the C<key> is specified as C<primary>, it searches only on the primary key.
 If no C<key> is specified, it searches on all unique constraints defined on the
 source, including the primary key.
 
+If your table does not have a primary key, you B<must> provide a value for the
+C<key> attribute matching one of the unique constraints on the source.
+
 See also L</find_or_create> and L</update_or_create>. For information on how to
 declare unique constraints, see
 L<DBIx::Class::ResultSource/add_unique_constraint>.
@@ -323,7 +329,7 @@ sub find {
     ? $self->result_source->unique_constraint_columns($attrs->{key})
     : $self->result_source->primary_columns;
   $self->throw_exception(
-    "Can't find unless a primary key or unique constraint is defined"
+    "Can't find unless a primary key is defined or unique constraint is specified"
   ) unless @cols;
 
   # Parse out a hashref from input
@@ -341,11 +347,33 @@ sub find {
     $input_query = {@_};
   }
 
+  my (%related, $info);
+
+  KEY: foreach my $key (keys %$input_query) {
+    if (ref($input_query->{$key})
+        && ($info = $self->result_source->relationship_info($key))) {
+      my $val = delete $input_query->{$key};
+      next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
+      my $rel_q = $self->result_source->resolve_condition(
+                    $info->{cond}, $val, $key
+                  );
+      die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
+      @related{keys %$rel_q} = values %$rel_q;
+    }
+  }
+  if (my @keys = keys %related) {
+    @{$input_query}{@keys} = values %related;
+  }
+
   my @unique_queries = $self->_unique_queries($input_query, $attrs);
 
-  # Handle cases where the ResultSet defines the query, or where the user is
-  # abusing find
-  my $query = @unique_queries ? \@unique_queries : $input_query;
+  # Build the final query: Default to the disjunction of the unique queries,
+  # but allow the input query in case the ResultSet defines the query or the
+  # user is abusing find
+  my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
+  my $query = @unique_queries
+    ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
+    : $self->_add_alias($input_query, $alias);
 
   # Run the query
   if (keys %$attrs) {
@@ -359,6 +387,22 @@ sub find {
   }
 }
 
+# _add_alias
+#
+# Add the specified alias to the specified query hash. A copy is made so the
+# original query is not modified.
+
+sub _add_alias {
+  my ($self, $query, $alias) = @_;
+
+  my %aliased = %$query;
+  foreach my $col (grep { ! m/\./ } keys %aliased) {
+    $aliased{"$alias.$col"} = delete $aliased{$col};
+  }
+
+  return \%aliased;
+}
+
 # _unique_queries
 #
 # Build a list of queries which satisfy unique constraints.
@@ -370,20 +414,23 @@ sub _unique_queries {
     ? ($attrs->{key})
     : $self->result_source->unique_constraint_names;
 
+  my $where = $self->_collapse_cond($self->{attrs}{where} || {});
+  my $num_where = scalar keys %$where;
+
   my @unique_queries;
   foreach my $name (@constraint_names) {
     my @unique_cols = $self->result_source->unique_constraint_columns($name);
     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
 
-    next unless scalar keys %$unique_query;
+    my $num_cols = scalar @unique_cols;
+    my $num_query = scalar keys %$unique_query;
 
-    # Add the ResultSet's alias
-    my $alias = $self->{attrs}{alias};
-    foreach my $key (grep { ! m/\./ } keys %$unique_query) {
-      $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
+    my $total = $num_query + $num_where;
+    if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
+      # The query is either unique on its own or is unique in combination with
+      # the existing where clause
+      push @unique_queries, $unique_query;
     }
-
-    push @unique_queries, $unique_query;
   }
 
   return @unique_queries;
@@ -486,17 +533,18 @@ sub single {
     }
   }
 
-  unless ($self->_is_unique_query($attrs->{where})) {
-    carp "Query not guaranteed to return a single row"
-      . "; please declare your unique constraints or use search instead";
-  }
+#  XXX: Disabled since it doesn't infer uniqueness in all cases
+#  unless ($self->_is_unique_query($attrs->{where})) {
+#    carp "Query not guaranteed to return a single row"
+#      . "; please declare your unique constraints or use search instead";
+#  }
 
   my @data = $self->result_source->storage->select_single(
     $attrs->{from}, $attrs->{select},
     $attrs->{where}, $attrs
   );
 
-  return (@data ? $self->_construct_object(@data) : ());
+  return (@data ? ($self->_construct_object(@data))[0] : undef);
 }
 
 # _is_unique_query
@@ -521,7 +569,7 @@ sub _is_unique_query {
     foreach my $key (keys %$collapsed) {
       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
       next unless exists $seen{$aliased};  # Additional constraints are okay
-      $seen{$aliased} = scalar @{ $collapsed->{$key} };
+      $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
     }
 
     # If we get 0 or more than 1 value for a column, it's not necessarily unique
@@ -556,8 +604,9 @@ sub _collapse_query {
     }
     else {
 #      warn "LEAF: " . Dumper $query;
-      foreach my $key (keys %$query) {
-        push @{$collapsed->{$key}}, $query->{$key};
+      foreach my $col (keys %$query) {
+        my $value = $query->{$col};
+        $collapsed->{$col}{$value}++;
       }
     }
   }
@@ -577,7 +626,7 @@ sub _collapse_query {
 
   my $max_length = $rs->get_column('length')->max;
 
-Returns a ResultSetColumn instance for $column based on $self
+Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
 
 =cut
 
@@ -680,262 +729,151 @@ sub next {
     $self->{all_cache_position} = 1;
     return ($self->all)[0];
   }
+  if ($self->{stashed_objects}) {
+    my $obj = shift(@{$self->{stashed_objects}});
+    delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
+    return $obj;
+  }
   my @row = (
     exists $self->{stashed_row}
       ? @{delete $self->{stashed_row}}
       : $self->cursor->next
   );
-  return unless (@row);
-  return $self->_construct_object(@row);
+  return undef unless (@row);
+  my ($row, @more) = $self->_construct_object(@row);
+  $self->{stashed_objects} = \@more if @more;
+  return $row;
 }
 
-sub _resolved_attrs {
-  my $self = shift;
-  return $self->{_attrs} if $self->{_attrs};
+sub _construct_object {
+  my ($self, @row) = @_;
+  my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
+  my @new = $self->result_class->inflate_result($self->result_source, @$info);
+  @new = $self->{_attrs}{record_filter}->(@new)
+    if exists $self->{_attrs}{record_filter};
+  return @new;
+}
 
-  my $attrs = $self->{attrs};
-  my $source = $self->{_parent_rs} || $self->{result_source};
-  my $alias = $attrs->{_orig_alias};
+sub _collapse_result {
+  my ($self, $as_proto, $row) = @_;
 
-  # XXX - lose storable dclone
-  my $record_filter = delete $attrs->{record_filter};
-  $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
-  $attrs->{record_filter} = $record_filter if $record_filter;
+  my @copy = @$row;
 
-  $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
-  if ($attrs->{columns}) {
-    delete $attrs->{as};
-  } elsif (!$attrs->{select}) {
-    $attrs->{columns} = [ $self->{result_source}->columns ];
-  }
-  
-  my $select_alias = $self->{attrs}{alias};
-  $attrs->{select} ||= [
-    map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
-  ];
-  $attrs->{as} ||= [
-    map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
-  ];
-  
-  my $adds;
-  if ($adds = delete $attrs->{include_columns}) {
-    $adds = [$adds] unless ref $adds eq 'ARRAY';
-    push(@{$attrs->{select}}, @$adds);
-    push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
-  }
-  if ($adds = delete $attrs->{'+select'}) {
-    $adds = [$adds] unless ref $adds eq 'ARRAY';
-    push(@{$attrs->{select}}, map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
-  }
-  if (my $adds = delete $attrs->{'+as'}) {
-    $adds = [$adds] unless ref $adds eq 'ARRAY';
-    push(@{$attrs->{as}}, @$adds);
-  }
+  # 'foo'         => [ undef, 'foo' ]
+  # 'foo.bar'     => [ 'foo', 'bar' ]
+  # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
 
-  $attrs->{from} ||= [ { $alias => $source->from } ];
-  $attrs->{seen_join} ||= {};
-  my %seen;
-  if (my $join = delete $attrs->{join}) {
-    foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
-      if (ref $j eq 'HASH') {
-        $seen{$_} = 1 foreach keys %$j;
-      } else {
-        $seen{$j} = 1;
-      }
-    }
-    push(@{$attrs->{from}},
-      $source->resolve_join($join, $alias, $attrs->{seen_join})
-    );
-  }
+  my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
 
-  $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
-  if ($attrs->{order_by}) {
-    $attrs->{order_by} = [ $attrs->{order_by} ] unless ref $attrs->{order_by};    
-  } else {
-    $attrs->{order_by} ||= [];    
-  }
+  my %collapse = %{$self->{_attrs}{collapse}||{}};
 
-  my $collapse = $attrs->{collapse} || {};
-  if (my $prefetch = delete $attrs->{prefetch}) {
-    my @pre_order;
-    foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
-      if ( ref $p eq 'HASH' ) {
-        foreach my $key (keys %$p) {
-          push(@{$attrs->{from}}, $source->resolve_join($p, $alias))
-            unless $seen{$key};
-        }
-      } else {
-        push(@{$attrs->{from}}, $source->resolve_join($p, $alias))
-          unless $seen{$p};
-      }
-      # bring joins back to level of current class
-      $p = $self->_reduce_joins($p, $attrs) if $attrs->{_live_join_stack};
-      if ($p) {
-        my @prefetch = $self->result_source->resolve_prefetch(
-          $p, $alias, {}, \@pre_order, $collapse
-        );
-        push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
-        push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
+  my @pri_index;
+
+  # if we're doing collapsing (has_many prefetch) we need to grab records
+  # until the PK changes, so fill @pri_index. if not, we leave it empty so
+  # we know we don't have to bother.
+
+  # the reason for not using the collapse stuff directly is because if you
+  # had for e.g. two artists in a row with no cds, the collapse info for
+  # both would be NULL (undef) so you'd lose the second artist
+
+  # store just the index so we can check the array positions from the row
+  # without having to contruct the full hash
+
+  if (keys %collapse) {
+    my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
+    foreach my $i (0 .. $#construct_as) {
+      next if defined($construct_as[$i][0]); # only self table
+      if (delete $pri{$construct_as[$i][1]}) {
+        push(@pri_index, $i);
       }
+      last unless keys %pri; # short circuit (Johnny Five Is Alive!)
     }
-    push(@{$attrs->{order_by}}, @pre_order);
   }
-  $attrs->{collapse} = $collapse;
 
-  return $self->{_attrs} = $attrs;
-}
+  # no need to do an if, it'll be empty if @pri_index is empty anyway
 
-sub _merge_attr {
-  my ($self, $a, $b) = @_;
-  return $b unless $a;
+  my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
+
+  my @const_rows;
+
+  do { # no need to check anything at the front, we always want the first row
+
+    my %const;
   
-  if (ref $b eq 'HASH' && ref $a eq 'HASH') {
-    foreach my $key (keys %{$b}) {
-      if (exists $a->{$key}) {
-        $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
-      } else {
-        $a->{$key} = $b->{$key};
-      }
+    foreach my $this_as (@construct_as) {
+      $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
     }
-    return $a;
-  } else {
-    $a = [$a] unless ref $a eq 'ARRAY';
-    $b = [$b] unless ref $b eq 'ARRAY';
 
-    my $hash = {};
-    my @array;
-    foreach my $x ($a, $b) {
-      foreach my $element (@{$x}) {
-        if (ref $element eq 'HASH') {
-          $hash = $self->_merge_attr($hash, $element);
-        } elsif (ref $element eq 'ARRAY') {
-          push(@array, @{$element});
-        } else {
-          push(@array, $element) unless $b == $x
-            && grep { $_ eq $element } @array;
-        }
-      }
-    }
-    
-    @array = grep { !exists $hash->{$_} } @array;
+    push(@const_rows, \%const);
 
-    return keys %{$hash}
-      ? ( scalar(@array)
-            ? [$hash, @array]
-            : $hash
-        )
-      : \@array;
-  }
-}
+  } until ( # no pri_index => no collapse => drop straight out
+      !@pri_index
+    or
+      do { # get another row, stash it, drop out if different PK
 
-# bring the joins (which are from the original class) to the level
-# of the current class so that we can resolve them properly
-sub _reduce_joins {
-  my ($self, $p, $attrs) = @_;
-
-  STACK:
-  foreach my $join (@{$attrs->{_live_join_stack}}) {
-    if (ref $p eq 'HASH') {
-      return undef unless exists $p->{$join};
-      $p = $p->{$join};
-    } elsif (ref $p eq 'ARRAY') {
-      foreach my $pe (@{$p}) {
-        return undef if $pe eq $join;
-        if (ref $pe eq 'HASH' && exists $pe->{$join}) {
-          $p = $pe->{$join};
-          next STACK;
-        }
-      }
-      return undef;
-    } else {
-      return undef;
-    }
-  }
-  return $p;
-}
+        @copy = $self->cursor->next;
+        $self->{stashed_row} = \@copy;
 
-sub _construct_object {
-  my ($self, @row) = @_;
-  my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
-  my $new = $self->result_class->inflate_result($self->result_source, @$info);
-  $new = $self->{_attrs}{record_filter}->($new)
-    if exists $self->{_attrs}{record_filter};
-  return $new;
-}
+        # last thing in do block, counts as true if anything doesn't match
 
-sub _collapse_result {
-  my ($self, $as, $row, $prefix) = @_;
+        # check xor defined first for NULL vs. NOT NULL then if one is
+        # defined the other must be so check string equality
 
-  my %const;
-  my @copy = @$row;
-  
-  foreach my $this_as (@$as) {
-    my $val = shift @copy;
-    if (defined $prefix) {
-      if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
-        my $remain = $1;
-        $remain =~ /^(?:(.*)\.)?([^.]+)$/;
-        $const{$1||''}{$2} = $val;
+        grep {
+          (defined $pri_vals{$_} ^ defined $copy[$_])
+          || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
+        } @pri_index;
       }
-    } else {
-      $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
-      $const{$1||''}{$2} = $val;
-    }
-  }
+  );
 
   my $alias = $self->{attrs}{alias};
-  my $info = [ {}, {} ];
-  foreach my $key (keys %const) {
-    if (length $key && $key ne $alias) {
-      my $target = $info;
-      my @parts = split(/\./, $key);
-      foreach my $p (@parts) {
-        $target = $target->[1]->{$p} ||= [];
+  my $info = [];
+
+  my %collapse_pos;
+
+  my @const_keys;
+
+  use Data::Dumper;
+
+  foreach my $const (@const_rows) {
+    scalar @const_keys or do {
+      @const_keys = sort { length($a) <=> length($b) } keys %$const;
+    };
+    foreach my $key (@const_keys) {
+      if (length $key) {
+        my $target = $info;
+        my @parts = split(/\./, $key);
+        my $cur = '';
+        my $data = $const->{$key};
+        foreach my $p (@parts) {
+          $target = $target->[1]->{$p} ||= [];
+          $cur .= ".${p}";
+          if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
+            # collapsing at this point and on final part
+            my $pos = $collapse_pos{$cur};
+            CK: foreach my $ck (@ckey) {
+              if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
+                $collapse_pos{$cur} = $data;
+                delete @collapse_pos{ # clear all positioning for sub-entries
+                  grep { m/^\Q${cur}.\E/ } keys %collapse_pos
+                };
+                push(@$target, []);
+                last CK;
+              }
+            }
+          }
+          if (exists $collapse{$cur}) {
+            $target = $target->[-1];
+          }
+        }
+        $target->[0] = $data;
+      } else {
+        $info->[0] = $const->{$key};
       }
-      $target->[0] = $const{$key};
-    } else {
-      $info->[0] = $const{$key};
-    }
-  }
-  
-  my @collapse;
-  if (defined $prefix) {
-    @collapse = map {
-        m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
-    } keys %{$self->{_attrs}{collapse}}
-  } else {
-    @collapse = keys %{$self->{_attrs}{collapse}};
-  };
-
-  if (@collapse) {
-    my ($c) = sort { length $a <=> length $b } @collapse;
-    my $target = $info;
-    foreach my $p (split(/\./, $c)) {
-      $target = $target->[1]->{$p} ||= [];
     }
-    my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
-    my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
-    my $tree = $self->_collapse_result($as, $row, $c_prefix);
-    my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
-    my (@final, @raw);
-
-    while (
-      !(
-        grep {
-          !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
-        } @co_key
-        )
-    ) {
-      push(@final, $tree);
-      last unless (@raw = $self->cursor->next);
-      $row = $self->{stashed_row} = \@raw;
-      $tree = $self->_collapse_result($as, $row, $c_prefix);
-    }
-    @$target = (@final ? @final : [ {}, {} ]);
-      # single empty result to indicate an empty prefetched has_many
   }
 
-  #print "final info: " . Dumper($info);
   return $info;
 }
 
@@ -952,6 +890,20 @@ sub _collapse_result {
 An accessor for the primary ResultSource object from which this ResultSet
 is derived.
 
+=head2 result_class
+
+=over 4
+
+=item Arguments: $result_class?
+
+=item Return Value: $result_class
+
+=back
+
+An accessor for the class to use when creating row objects. Defaults to 
+C<< result_source->result_class >> - which in most cases is the name of the 
+L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
+
 =cut
 
 
@@ -1001,7 +953,7 @@ sub _count { # Separated out so pager can get the full count
     # todo: try CONCAT for multi-column pk
     my @pk = $self->result_source->primary_columns;
     if (@pk == 1) {
-      my $alias = $attrs->{_orig_alias};
+      my $alias = $attrs->{alias};
       foreach my $column (@distinct) {
         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
           @distinct = ($column);
@@ -1018,10 +970,8 @@ sub _count { # Separated out so pager can get the full count
 
   # offset, order by and page are not needed to count. record_filter is cdbi
   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
-  my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
-  $tmp_rs->{_parent_rs} = $self->{_parent_rs} if $self->{_parent_rs};
-       #XXX - hack to pass through parent of related resultsets
 
+  my $tmp_rs = (ref $self)->new($self->_source_handle, $attrs);
   my ($count) = $tmp_rs->cursor->next;
   return $count;
 }
@@ -1134,13 +1084,14 @@ sub first {
 # appropriately, returning the new condition.
 
 sub _cond_for_update_delete {
-  my ($self) = @_;
+  my ($self, $full_cond) = @_;
   my $cond = {};
 
+  $full_cond ||= $self->{cond};
   # No-op. No condition, we're updating/deleting everything
-  return $cond unless ref $self->{cond};
+  return $cond unless ref $full_cond;
 
-  if (ref $self->{cond} eq 'ARRAY') {
+  if (ref $full_cond eq 'ARRAY') {
     $cond = [
       map {
         my %hash;
@@ -1149,36 +1100,33 @@ sub _cond_for_update_delete {
           $hash{$1} = $_->{$key};
         }
         \%hash;
-      } @{$self->{cond}}
+      } @{$full_cond}
     ];
   }
-  elsif (ref $self->{cond} eq 'HASH') {
-    if ((keys %{$self->{cond}})[0] eq '-and') {
+  elsif (ref $full_cond eq 'HASH') {
+    if ((keys %{$full_cond})[0] eq '-and') {
       $cond->{-and} = [];
 
-      my @cond = @{$self->{cond}{-and}};
+      my @cond = @{$full_cond->{-and}};
       for (my $i = 0; $i < @cond; $i++) {
         my $entry = $cond[$i];
 
-        my %hash;
+        my $hash;
         if (ref $entry eq 'HASH') {
-          foreach my $key (keys %{$entry}) {
-            $key =~ /([^.]+)$/;
-            $hash{$1} = $entry->{$key};
-          }
+          $hash = $self->_cond_for_update_delete($entry);
         }
         else {
           $entry =~ /([^.]+)$/;
-          $hash{$1} = $cond[++$i];
+          $hash->{$1} = $cond[++$i];
         }
 
-        push @{$cond->{-and}}, \%hash;
+        push @{$cond->{-and}}, $hash;
       }
     }
     else {
-      foreach my $key (keys %{$self->{cond}}) {
+      foreach my $key (keys %{$full_cond}) {
         $key =~ /([^.]+)$/;
-        $cond->{$1} = $self->{cond}{$key};
+        $cond->{$1} = $full_cond->{$key};
       }
     }
   }
@@ -1214,9 +1162,9 @@ sub update {
     unless ref $values eq 'HASH';
 
   my $cond = $self->_cond_for_update_delete;
-
+   
   return $self->result_source->storage->update(
-    $self->result_source->from, $values, $cond
+    $self->result_source, $values, $cond
   );
 }
 
@@ -1257,7 +1205,7 @@ sub update_all {
 
 Deletes the contents of the resultset from its result source. Note that this
 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
-to run.
+to run. See also L<DBIx::Class::Row/delete>.
 
 =cut
 
@@ -1266,7 +1214,7 @@ sub delete {
 
   my $cond = $self->_cond_for_update_delete;
 
-  $self->result_source->storage->delete($self->result_source->from, $cond);
+  $self->result_source->storage->delete($self->result_source, $cond);
   return 1;
 }
 
@@ -1291,6 +1239,137 @@ sub delete_all {
   return 1;
 }
 
+=head2 populate
+
+=over 4
+
+=item Arguments: \@data;
+
+=back
+
+Pass an arrayref of hashrefs. Each hashref should be a structure suitable for
+submitting to a $resultset->create(...) method.
+
+In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
+to insert the data, as this is a faster method.
+
+Otherwise, each set of data is inserted into the database using
+L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
+objects is returned.
+
+Example:  Assuming an Artist Class that has many CDs Classes relating:
+
+  my $Artist_rs = $schema->resultset("Artist");
+  
+  ## Void Context Example 
+  $Artist_rs->populate([
+     { artistid => 4, name => 'Manufactured Crap', cds => [ 
+        { title => 'My First CD', year => 2006 },
+        { title => 'Yet More Tweeny-Pop crap', year => 2007 },
+      ],
+     },
+     { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
+        { title => 'My parents sold me to a record company' ,year => 2005 },
+        { title => 'Why Am I So Ugly?', year => 2006 },
+        { title => 'I Got Surgery and am now Popular', year => 2007 }
+      ],
+     },
+  ]);
+  
+  ## Array Context Example
+  my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
+    { name => "Artist One"},
+    { name => "Artist Two"},
+    { name => "Artist Three", cds=> [
+    { title => "First CD", year => 2007},
+    { title => "Second CD", year => 2008},
+  ]}
+  ]);
+  
+  print $ArtistOne->name; ## response is 'Artist One'
+  print $ArtistThree->cds->count ## reponse is '2'
+
+=cut
+
+sub populate {
+  my ($self, $data) = @_;
+  
+  if(defined wantarray) {
+    my @created;
+    foreach my $item (@$data) {
+      push(@created, $self->create($item));
+    }
+    return @created;
+  } else {
+    my ($first, @rest) = @$data;
+
+    my @names = grep {!ref $first->{$_}} keys %$first;
+    my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
+    my @pks = $self->result_source->primary_columns;  
+
+    ## do the belongs_to relationships  
+    foreach my $index (0..$#$data) {
+      if( grep { !defined $data->[$index]->{$_} } @pks ) {
+        my @ret = $self->populate($data);
+        return;
+      }
+    
+      foreach my $rel (@rels) {
+        next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
+        my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
+        my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
+        my $related = $result->result_source->resolve_condition(
+          $result->result_source->relationship_info($reverse)->{cond},
+          $self,        
+          $result,        
+        );
+
+        delete $data->[$index]->{$rel};
+        $data->[$index] = {%{$data->[$index]}, %$related};
+      
+        push @names, keys %$related if $index == 0;
+      }
+    }
+
+    ## do bulk insert on current row
+    my @values = map {
+      [ map {
+         defined $_ ? $_ : $self->throw_exception("Undefined value for column!")
+      } @$_{@names} ]
+    } @$data;
+
+    $self->result_source->storage->insert_bulk(
+      $self->result_source, 
+      \@names, 
+      \@values,
+    );
+
+    ## do the has_many relationships
+    foreach my $item (@$data) {
+
+      foreach my $rel (@rels) {
+        next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
+
+        my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
+     || $self->throw_exception('Cannot find the relating object.');
+     
+        my $child = $parent->$rel;
+    
+        my $related = $child->result_source->resolve_condition(
+          $parent->result_source->relationship_info($rel)->{cond},
+          $child,
+          $parent,
+        );
+
+        my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
+        my @populate = map { {%$_, %$related} } @rows_to_add;
+
+        $child->populate( \@populate );
+      }
+    }
+  }
+}
+
 =head2 pager
 
 =over 4
@@ -1334,7 +1413,7 @@ attribute set on the resultset (10 by default).
 
 sub page {
   my ($self, $page) = @_;
-  return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
+  return (ref $self)->new($self->_source_handle, { %{$self->{attrs}}, page => $page });
 }
 
 =head2 new_result
@@ -1358,14 +1437,75 @@ sub new_result {
   $self->throw_exception(
     "Can't abstract implicit construct, condition not a hash"
   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
-  my %new = %$values;
-  my $alias = $self->{attrs}{_orig_alias};
-  foreach my $key (keys %{$self->{cond}||{}}) {
-    $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
+
+  my $alias = $self->{attrs}{alias};
+  my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
+  my %new = (
+    %{ $self->_remove_alias($values, $alias) },
+    %{ $self->_remove_alias($collapsed_cond, $alias) },
+    -source_handle => $self->_source_handle,
+    -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
+  );
+
+  return $self->result_class->new(\%new);
+}
+
+# _collapse_cond
+#
+# Recursively collapse the condition.
+
+sub _collapse_cond {
+  my ($self, $cond, $collapsed) = @_;
+
+  $collapsed ||= {};
+
+  if (ref $cond eq 'ARRAY') {
+    foreach my $subcond (@$cond) {
+      next unless ref $subcond;  # -or
+#      warn "ARRAY: " . Dumper $subcond;
+      $collapsed = $self->_collapse_cond($subcond, $collapsed);
+    }
+  }
+  elsif (ref $cond eq 'HASH') {
+    if (keys %$cond and (keys %$cond)[0] eq '-and') {
+      foreach my $subcond (@{$cond->{-and}}) {
+#        warn "HASH: " . Dumper $subcond;
+        $collapsed = $self->_collapse_cond($subcond, $collapsed);
+      }
+    }
+    else {
+#      warn "LEAF: " . Dumper $cond;
+      foreach my $col (keys %$cond) {
+        my $value = $cond->{$col};
+        $collapsed->{$col} = $value;
+      }
+    }
+  }
+
+  return $collapsed;
+}
+
+# _remove_alias
+#
+# Remove the specified alias from the specified query hash. A copy is made so
+# the original query is not modified.
+
+sub _remove_alias {
+  my ($self, $query, $alias) = @_;
+
+  my %orig = %{ $query || {} };
+  my %unaliased;
+
+  foreach my $key (keys %orig) {
+    if ($key !~ /\./) {
+      $unaliased{$key} = $orig{$key};
+      next;
+    }
+    $unaliased{$1} = $orig{$key}
+      if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
   }
-  my $obj = $self->result_class->new(\%new);
-  $obj->result_source($self->result_source) if $obj->can('result_source');
-  return $obj;
+
+  return \%unaliased;
 }
 
 =head2 find_or_new
@@ -1508,7 +1648,7 @@ sub update_or_create {
   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
 
-  my $row = $self->find($cond);
+  my $row = $self->find($cond, $attrs);
   if (defined $row) {
     $row->update($cond);
     return $row;
@@ -1601,31 +1741,216 @@ sub related_resultset {
     my $rel_obj = $self->result_source->relationship_info($rel);
 
     $self->throw_exception(
-      "search_related: result source '" . $self->result_source->name .
+      "search_related: result source '" . $self->_source_handle->source_moniker .
         "' has no such relationship $rel")
       unless $rel_obj;
     
-    my $join_count = $self->_resolved_attrs->{seen_join}{$rel};
-    my $alias = $join_count ? join('_', $rel, $join_count+1) : $rel;
-
-    my @live_join_stack = (@{$self->{attrs}{_live_join_stack}||[]}, $rel);
-
-    my $rs = $self->result_source->schema->resultset($rel_obj->{class})->search(
-      undef, {
-        select => undef,
-        as => undef,
-        alias => $alias,
-        _live_join_stack => \@live_join_stack,
-        _parent_attrs => $self->{attrs}}
-    );
+    my ($from,$seen) = $self->_resolve_from($rel);
+
+    my $join_count = $seen->{$rel};
+    my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
 
-    # keep reference of the original resultset
-    $rs->{_parent_rs} = $self->{_parent_rs} || $self->result_source;
+    #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
+    my %attrs = %{$self->{attrs}||{}};
+    delete $attrs{result_class};
+
+    my $new_cache;
+
+    if (my $cache = $self->get_cache) {
+      if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
+        $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
+                        @$cache ];
+      }
+    }
 
-    return $rs;
+    my $new = $self->_source_handle
+                   ->schema
+                   ->resultset($rel_obj->{class})
+                   ->search_rs(
+                       undef, {
+                         %attrs,
+                         join => undef,
+                         prefetch => undef,
+                         select => undef,
+                         as => undef,
+                         alias => $alias,
+                         where => $self->{cond},
+                         seen_join => $seen,
+                         from => $from,
+                     });
+    $new->set_cache($new_cache) if $new_cache;
+    $new;
   };
 }
 
+sub _resolve_from {
+  my ($self, $extra_join) = @_;
+  my $source = $self->result_source;
+  my $attrs = $self->{attrs};
+  
+  my $from = $attrs->{from}
+    || [ { $attrs->{alias} => $source->from } ];
+    
+  my $seen = { %{$attrs->{seen_join}||{}} };
+
+  my $join = ($attrs->{join}
+               ? [ $attrs->{join}, $extra_join ]
+               : $extra_join);
+  $from = [
+    @$from,
+    ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
+  ];
+
+  return ($from,$seen);
+}
+
+sub _resolved_attrs {
+  my $self = shift;
+  return $self->{_attrs} if $self->{_attrs};
+
+  my $attrs = { %{$self->{attrs}||{}} };
+  my $source = $self->result_source;
+  my $alias = $attrs->{alias};
+
+  $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
+  if ($attrs->{columns}) {
+    delete $attrs->{as};
+  } elsif (!$attrs->{select}) {
+    $attrs->{columns} = [ $source->columns ];
+  }
+  $attrs->{select} = 
+    ($attrs->{select}
+      ? (ref $attrs->{select} eq 'ARRAY'
+          ? [ @{$attrs->{select}} ]
+          : [ $attrs->{select} ])
+      : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
+    );
+  $attrs->{as} =
+    ($attrs->{as}
+      ? (ref $attrs->{as} eq 'ARRAY'
+          ? [ @{$attrs->{as}} ]
+          : [ $attrs->{as} ])
+      : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
+    );
+  
+  my $adds;
+  if ($adds = delete $attrs->{include_columns}) {
+    $adds = [$adds] unless ref $adds eq 'ARRAY';
+    push(@{$attrs->{select}}, @$adds);
+    push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
+  }
+  if ($adds = delete $attrs->{'+select'}) {
+    $adds = [$adds] unless ref $adds eq 'ARRAY';
+    push(@{$attrs->{select}},
+           map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
+  }
+  if (my $adds = delete $attrs->{'+as'}) {
+    $adds = [$adds] unless ref $adds eq 'ARRAY';
+    push(@{$attrs->{as}}, @$adds);
+  }
+
+  $attrs->{from} ||= [ { 'me' => $source->from } ];
+
+  if (exists $attrs->{join} || exists $attrs->{prefetch}) {
+    my $join = delete $attrs->{join} || {};
+
+    if (defined $attrs->{prefetch}) {
+      $join = $self->_merge_attr(
+        $join, $attrs->{prefetch}
+      );
+    }
+
+    $attrs->{from} =   # have to copy here to avoid corrupting the original
+      [
+        @{$attrs->{from}}, 
+        $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
+      ];
+  }
+
+  $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
+  if ($attrs->{order_by}) {
+    $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
+                           ? [ @{$attrs->{order_by}} ]
+                           : [ $attrs->{order_by} ]);
+  } else {
+    $attrs->{order_by} = [];    
+  }
+
+  my $collapse = $attrs->{collapse} || {};
+  if (my $prefetch = delete $attrs->{prefetch}) {
+    $prefetch = $self->_merge_attr({}, $prefetch);
+    my @pre_order;
+    my $seen = $attrs->{seen_join} || {};
+    foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
+      # bring joins back to level of current class
+      my @prefetch = $source->resolve_prefetch(
+        $p, $alias, $seen, \@pre_order, $collapse
+      );
+      push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
+      push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
+    }
+    push(@{$attrs->{order_by}}, @pre_order);
+  }
+  $attrs->{collapse} = $collapse;
+
+  return $self->{_attrs} = $attrs;
+}
+
+sub _merge_attr {
+  my ($self, $a, $b) = @_;
+  return $b unless defined($a);
+  return $a unless defined($b);
+  
+  if (ref $b eq 'HASH' && ref $a eq 'HASH') {
+    foreach my $key (keys %{$b}) {
+      if (exists $a->{$key}) {
+        $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
+      } else {
+        $a->{$key} = $b->{$key};
+      }
+    }
+    return $a;
+  } else {
+    $a = [$a] unless ref $a eq 'ARRAY';
+    $b = [$b] unless ref $b eq 'ARRAY';
+
+    my $hash = {};
+    my @array;
+    foreach my $x ($a, $b) {
+      foreach my $element (@{$x}) {
+        if (ref $element eq 'HASH') {
+          $hash = $self->_merge_attr($hash, $element);
+        } elsif (ref $element eq 'ARRAY') {
+          push(@array, @{$element});
+        } else {
+          push(@array, $element) unless $b == $x
+            && grep { $_ eq $element } @array;
+        }
+      }
+    }
+    
+    @array = grep { !exists $hash->{$_} } @array;
+
+    return keys %{$hash}
+      ? ( scalar(@array)
+            ? [$hash, @array]
+            : $hash
+        )
+      : \@array;
+  }
+}
+
+sub result_source {
+    my $self = shift;
+
+    if (@_) {
+        $self->_source_handle($_[0]->handle);
+    } else {
+        $self->_source_handle->resolve;
+    }
+}
+
 =head2 throw_exception
 
 See L<DBIx::Class::Schema/throw_exception> for details.
@@ -1634,7 +1959,7 @@ See L<DBIx::Class::Schema/throw_exception> for details.
 
 sub throw_exception {
   my $self=shift;
-  $self->result_source->schema->throw_exception(@_);
+  $self->_source_handle->schema->throw_exception(@_);
 }
 
 # XXX: FIXME: Attributes docs need clearing up
@@ -1656,8 +1981,8 @@ Which column(s) to order the results by. This is currently passed
 through directly to SQL, so you can give e.g. C<year DESC> for a
 descending order on the column `year'.
 
-Please note that if you have quoting enabled (see
-L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
+Please note that if you have C<quote_char> enabled (see
+L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
 so you will need to manually quote things as appropriate.)
 
@@ -1690,7 +2015,9 @@ Shortcut to include additional columns in the returned results - for example
   });
 
 would return all CDs and include a 'name' column to the information
-passed to object inflation
+passed to object inflation. Note that the 'artist' is the name of the
+column (or relationship) accessor, and 'name' is the name of the column
+accessor in the related table.
 
 =head2 select
 
@@ -1741,8 +2068,14 @@ Indicates additional column names for those added via L<+select>.
 
 =back
 
-Indicates column names for object inflation. This is used in conjunction with
-C<select>, usually when C<select> contains one or more function or stored
+Indicates column names for object inflation. That is, c< as >
+indicates the name that the column can be accessed as via the
+C<get_column> method (or via the object accessor, B<if one already
+exists>).  It has nothing to do with the SQL code C< SELECT foo AS bar
+>.
+
+The C< as > attribute is used in conjunction with C<select>,
+usually when C<select> contains one or more function or stored
 procedure names:
 
   $rs = $schema->resultset('Employee')->search(undef, {
@@ -1769,9 +2102,15 @@ use C<get_column> instead:
 You can create your own accessors if required - see
 L<DBIx::Class::Manual::Cookbook> for details.
 
-Please note: This will NOT insert an C<AS employee_count> into the SQL statement
-produced, it is used for internal access only. Thus attempting to use the accessor
-in an C<order_by> clause or similar will fail misrably.
+Please note: This will NOT insert an C<AS employee_count> into the SQL
+statement produced, it is used for internal access only. Thus
+attempting to use the accessor in an C<order_by> clause or similar
+will fail miserably.
+
+To get around this limitation, you can supply literal SQL to your
+C<select> attibute that contains the C<AS alias> text, eg:
+
+  select => [\'myfield AS alias']
 
 =head2 join
 
@@ -1810,6 +2149,19 @@ For example:
     }
   );
 
+You need to use the relationship (not the table) name in  conditions, 
+because they are aliased as such. The current table is aliased as "me", so 
+you need to use me.column_name in order to avoid ambiguity. For example:
+
+  # Get CDs from 1984 with a 'Foo' track 
+  my $rs = $schema->resultset('CD')->search(
+    { 
+      'me.year' => 1984,
+      'tracks.name' => 'Foo'
+    },
+    { join => 'tracks' }
+  );
+  
 If the same join is supplied twice, it will be aliased to <rel>_2 (and
 similarly for a third time). For e.g.